Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
… feat-redis-streams
… feat-redis-streams
Co-authored-by: abnegate <5857008+abnegate@users.noreply.github.com>
Agent-Logs-Url: https://github.com/utopia-php/queue/sessions/e43e1d67-3d59-4bd0-818d-f5258b4978bd Co-authored-by: abnegate <5857008+abnegate@users.noreply.github.com>
Agent-Logs-Url: https://github.com/utopia-php/queue/sessions/dae9cdca-5610-4a51-8cb9-b70651040d92 Co-authored-by: abnegate <5857008+abnegate@users.noreply.github.com>
Agent-Logs-Url: https://github.com/utopia-php/queue/sessions/b5046d7d-ddb8-4519-95a0-4d8206fe607e Co-authored-by: abnegate <5857008+abnegate@users.noreply.github.com>
Replace PHPStan ignores with official phpredis stub file
Greptile SummaryThis PR introduces a Redis Streams-based broker (
Confidence Score: 3/5Not safe to merge — two P1 bugs can cause duplicate job processing and silent starvation of scheduled jobs in multi-worker/multi-queue deployments. Two independent P1 findings: a race condition that duplicates delayed jobs across workers, and a timer-sharing bug that prevents per-queue scheduled job processing in consumeMultiple. Both affect correctness in production multi-worker setups. src/Queue/Broker/RedisStreams.php — processDelayedJobs and the interval timers in consumeMultiple. Important Files Changed
|
| $this->consumerId, | ||
| [$streamKey], | ||
| 1, | ||
| 0 // Non-blocking to check all queues quickly | ||
| ); |
There was a problem hiding this comment.
enqueueAt drops sub-second timing precision
$timestamp - \time() produces an integer number of seconds, then enqueueDelayed multiplies by 1000 to get milliseconds. Any fractional-second offset between $timestamp and the current time is lost. If $timestamp is 0.9 s in the future the job is scheduled for "now" (max(0, …) clamps to 0). Using (int)(($timestamp - \microtime(true)) * 1000) directly as the delay would preserve millisecond precision.
There was a problem hiding this comment.
Pull request overview
Adds a Redis Streams–backed queue broker with delayed/scheduled job support, plus associated test infrastructure and static-analysis stubs.
Changes:
- Introduces
RedisStreamsbroker,StreamConnectionabstraction, and Redis/RedisCluster stream-capable connections. - Adds a
Schedulevalue object and extensive unit/E2E coverage around streams + scheduling APIs. - Updates tooling/config (PHPStan stubs scanning, Pint excludes, PHPUnit unit suite) and Docker Compose/services for running the new worker.
Reviewed changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/Queue/servers/SwooleRedisStreams/worker.php | Adds a Swoole worker entrypoint wired to the Redis Streams broker for E2E runs. |
| tests/Queue/servers/SwooleRedisStreams/Dockerfile | Adds a container image for the Redis Streams Swoole worker. |
| tests/Queue/Unit/ScheduleTest.php | Adds unit tests for Schedule creation/validation/serialization and next-run calculations. |
| tests/Queue/Unit/RedisStreamsBrokerTest.php | Adds Redis Streams broker tests (enqueue, delayed, schedules, observability APIs). |
| tests/Queue/Unit/RedisStreamConnectionTest.php | Adds Redis Stream connection tests for stream/sorted-set/hash helper methods. |
| tests/Queue/E2E/Adapter/SwooleRedisStreamsTest.php | Adds E2E adapter tests for Redis Streams with Swoole worker. |
| stubs/RedisCluster.stub.php | Adds phpredis RedisCluster stub for PHPStan typing (streams + other APIs). |
| stubs/README.md | Documents why the phpredis stub is vendored and how it’s used. |
| src/Queue/StreamConnection.php | Introduces a stream-oriented connection interface (streams + zset + hash helpers). |
| src/Queue/Schedule.php | Adds Schedule object for cron/interval recurring jobs. |
| src/Queue/Connection/RedisStreamCluster.php | Implements StreamConnection on top of Redis Cluster connections. |
| src/Queue/Connection/RedisStream.php | Implements StreamConnection on top of single-node Redis connections. |
| src/Queue/Broker/RedisStreams.php | Implements Redis Streams broker with consume loop, retries, delayed jobs, schedules, and observability helpers. |
| pint.json | Excludes stubs/ from Pint formatting. |
| phpunit.xml | Adds a Unit test suite pointing at tests/Queue/Unit. |
| phpstan.neon | Adds stubs/ to PHPStan scan directories for RedisCluster typing. |
| docker-compose.yml | Adds a Redis Streams Swoole worker service and adjusts service wiring. |
| composer.lock | Adds cron-expression dependency used by the new Schedule implementation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| use Cron\CronExpression; | ||
|
|
There was a problem hiding this comment.
Schedule depends on Cron\CronExpression, but composer.json doesn’t declare dragonmantank/cron-expression in require (only appears in composer.lock). This will break installs that regenerate the lock or consumers that require this package. Add it to require (or change the implementation to avoid this dependency).
| public function testInvalidCronExpression(): void | ||
| { | ||
| $this->expectException(\InvalidArgumentException::class); | ||
| $this->expectExceptionMessage('Invalid cron expression'); |
There was a problem hiding this comment.
Schedule throws Invalid cron expression: {$cron}, but this test expects the message to be exactly Invalid cron expression. Update the expected message (or use a regex/substring assertion) so the test matches the actual exception text.
| $this->expectExceptionMessage('Invalid cron expression'); | |
| $this->expectExceptionMessageMatches('/^Invalid cron expression:/'); |
| $streamSize = $this->connection->streamLen($this->getStreamKey($queue)); | ||
| $delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue)); | ||
|
|
||
| return $streamSize + $delayedSize; | ||
| } | ||
|
|
||
|
|
||
| /** |
There was a problem hiding this comment.
Publisher::getQueueSize() is documented as “pending messages”, but streamLen() (XLEN) counts all entries in the stream, including already-acknowledged/history entries (until trimming). This will over-report queue size as the stream grows. Consider using consumer-group lag/pending (e.g., XINFO GROUPS lag) plus delayed count instead of XLEN for the main queue.
| $streamSize = $this->connection->streamLen($this->getStreamKey($queue)); | |
| $delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue)); | |
| return $streamSize + $delayedSize; | |
| } | |
| /** | |
| $streamKey = $this->getStreamKey($queue); | |
| $groupName = $this->getGroupName($queue); | |
| $queueSize = $this->getConsumerGroupQueueSize($streamKey, $groupName); | |
| $delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue)); | |
| return $queueSize + $delayedSize; | |
| } | |
| /** | |
| * Returns the actionable backlog for a consumer-group-backed stream. | |
| * | |
| * This is calculated as: | |
| * - lag: messages not yet delivered to the group | |
| * - pending: messages delivered to the group but not yet acknowledged | |
| * | |
| * If consumer-group info is not available through the current connection | |
| * abstraction, we fall back to the raw stream length as a best effort. | |
| */ | |
| private function getConsumerGroupQueueSize(string $streamKey, string $groupName): int | |
| { | |
| $groups = $this->getConsumerGroupsInfo($streamKey); | |
| foreach ($groups as $group) { | |
| if (($group['name'] ?? null) !== $groupName) { | |
| continue; | |
| } | |
| $lag = (int) ($group['lag'] ?? 0); | |
| $pending = (int) ($group['pending'] ?? 0); | |
| return max(0, $lag) + max(0, $pending); | |
| } | |
| return $this->connection->streamLen($streamKey); | |
| } | |
| /** | |
| * Fetches and normalizes Redis XINFO GROUPS data for a stream. | |
| * | |
| * Supports several possible connection APIs so this class can use group | |
| * backlog metrics without requiring changes outside this file. | |
| * | |
| * @return array<int, array{name?: string, lag?: int, pending?: int}> | |
| */ | |
| private function getConsumerGroupsInfo(string $streamKey): array | |
| { | |
| $groups = []; | |
| if (\method_exists($this->connection, 'streamInfoGroups')) { | |
| $groups = $this->connection->streamInfoGroups($streamKey); | |
| } elseif (\method_exists($this->connection, 'xInfoGroups')) { | |
| $groups = $this->connection->xInfoGroups($streamKey); | |
| } elseif (\method_exists($this->connection, 'xInfo')) { | |
| $groups = $this->connection->xInfo('GROUPS', $streamKey); | |
| } elseif (\method_exists($this->connection, 'rawCommand')) { | |
| $groups = $this->connection->rawCommand('XINFO', 'GROUPS', $streamKey); | |
| } | |
| if (!\is_array($groups) || $groups === []) { | |
| return []; | |
| } | |
| $normalized = []; | |
| foreach ($groups as $group) { | |
| if (!\is_array($group)) { | |
| continue; | |
| } | |
| $normalized[] = [ | |
| 'name' => $group['name'] ?? $group[0] ?? null, | |
| 'pending' => (int) ($group['pending'] ?? $group[2] ?? 0), | |
| 'lag' => (int) ($group['lag'] ?? $group[8] ?? 0), | |
| ]; | |
| } | |
| return $normalized; | |
| } | |
| /** |
| try { | ||
| $result = $redis->rawCommand(...$command); | ||
| return $result ?: false; |
There was a problem hiding this comment.
In the NOACK branch, rawCommand('XREADGROUP', ...) returns Redis’ raw nested response format, which won’t match the associative array shape returned by xReadGroup() (and expected by callers via $entries[$streamKey][$id]). Parse/normalize the raw response before returning (or avoid rawCommand if NOACK is supported).
| $result = $this->getRedis()->xReadGroup( | ||
| $group, | ||
| $consumer, | ||
| $streamIds, | ||
| $count, | ||
| $block > 0 ? $block : null | ||
| ); |
There was a problem hiding this comment.
xReadGroup() is invoked with null for $block when $block <= 0 ($block > 0 ? $block : null). The included phpredis stub (stubs/RedisCluster.stub.php) types this argument as int, so this will fail static analysis (and may be a runtime TypeError depending on ext-redis). Consider omitting the argument entirely when not blocking, rather than passing null.
| $result = $this->getRedis()->xReadGroup( | |
| $group, | |
| $consumer, | |
| $streamIds, | |
| $count, | |
| $block > 0 ? $block : null | |
| ); | |
| if ($block > 0) { | |
| $result = $this->getRedis()->xReadGroup( | |
| $group, | |
| $consumer, | |
| $streamIds, | |
| $count, | |
| $block | |
| ); | |
| } else { | |
| $result = $this->getRedis()->xReadGroup( | |
| $group, | |
| $consumer, | |
| $streamIds, | |
| $count | |
| ); | |
| } |
| local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) | ||
| if #members > 0 then | ||
| redis.call('ZREM', KEYS[1], unpack(members)) | ||
| end | ||
| return members |
There was a problem hiding this comment.
The Lua script uses unpack(members), which has practical limits; the non-cluster implementation caps $limit to avoid stack/unpack failures, but this cluster variant doesn’t. Apply the same $limit cap here to keep behavior consistent and prevent runtime failures when callers pass large limits.
|
@copilot Address the greptile comments and failing codeql check |
No description provided.