Skip to content

Redis Streams#63

Open
abnegate wants to merge 20 commits intomainfrom
feat-redis-streams
Open

Redis Streams#63
abnegate wants to merge 20 commits intomainfrom
feat-redis-streams

Conversation

@abnegate
Copy link
Copy Markdown
Member

No description provided.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jan 10, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread phpstan.neon Outdated
Copy link
Copy Markdown

Copilot AI commented Jan 19, 2026

@abnegate I've opened a new pull request, #65, to work on those changes. Once the pull request is ready, I'll request review from you.

Copilot AI and others added 6 commits January 19, 2026 12:01
Co-authored-by: abnegate <5857008+abnegate@users.noreply.github.com>
Agent-Logs-Url: https://github.com/utopia-php/queue/sessions/b5046d7d-ddb8-4519-95a0-4d8206fe607e

Co-authored-by: abnegate <5857008+abnegate@users.noreply.github.com>
Replace PHPStan ignores with official phpredis stub file
@abnegate abnegate marked this pull request as ready for review April 29, 2026 07:31
@abnegate abnegate requested a review from Copilot April 29, 2026 07:31
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Apr 29, 2026

Greptile Summary

This PR introduces a Redis Streams-based broker (RedisStreams) with consumer groups, delayed/scheduled jobs, DLQ retry, and both standalone and cluster connection implementations. The Schedule value object and StreamConnection interface are well-designed, but two logic bugs in the broker need attention before merge:

  • Non-atomic delayed-job promotion: processDelayedJobs uses a non-atomic read (sortedSetRangeByScore) followed by a separate add+remove, allowing multiple concurrent workers to double-enqueue the same delayed job. The existing sortedSetPopByScore Lua script should be used instead.
  • Shared interval timer in consumeMultiple: $lastScheduleCheck / $lastDelayedCheck are per-instance, not per-queue, so after the first queue updates the timer, all remaining queues in the same consumeMultiple iteration skip their scheduled/delayed processing for that cycle.

Confidence Score: 3/5

Not safe to merge — two P1 bugs can cause duplicate job processing and silent starvation of scheduled jobs in multi-worker/multi-queue deployments.

Two independent P1 findings: a race condition that duplicates delayed jobs across workers, and a timer-sharing bug that prevents per-queue scheduled job processing in consumeMultiple. Both affect correctness in production multi-worker setups.

src/Queue/Broker/RedisStreams.php — processDelayedJobs and the interval timers in consumeMultiple.

Important Files Changed

Filename Overview
src/Queue/Broker/RedisStreams.php Core broker implementation; two P1 bugs: non-atomic delayed-job promotion causes duplicates with multiple workers, and a shared interval timer breaks per-queue scheduling in consumeMultiple.
src/Queue/Connection/RedisStream.php StreamConnection implementation for standalone Redis; well-structured with proper BUSYGROUP/NOGROUP error handling and a Lua script for atomic sorted-set pop.
src/Queue/Connection/RedisStreamCluster.php Cluster variant of RedisStream; mirrors RedisStream.php but silently drops NOACK support from streamReadGroup (no rawCommand fallback).
src/Queue/Schedule.php New immutable Schedule value object with cron/interval support, pause/resume, run-count limiting, and serialization — clean design.
src/Queue/StreamConnection.php New interface extending Connection; well-documented with full Redis Streams API coverage.
tests/Queue/Unit/RedisStreamsBrokerTest.php Unit tests using mocks for the broker; good coverage of enqueue, retry, DLQ, and schedule operations.
tests/Queue/E2E/Adapter/SwooleRedisStreamsTest.php E2E test for the Swoole+Redis Streams adapter; exercises the full consume loop end-to-end.

Comments Outside Diff (4)

  1. src/Queue/Broker/RedisStreams.php, line 954-970 (link)

    P1 Duplicate delayed jobs with multiple workers

    processDelayedJobs reads due members with sortedSetRangeByScore (non-atomic) and then removes them individually. When two workers run concurrently both read the same set of due jobs, both call streamAdd adding duplicates, and only the second sortedSetRemove returns 0 (but the duplicate is already in the stream). The sortedSetPopByScore Lua script exists on the connection and performs an atomic ZRANGEBYSCORE+ZREM — using it here would prevent double-enqueuing without the job-loss risk.

  2. src/Queue/Broker/RedisStreams.php, line 977-986 (link)

    P1 Shared interval timer breaks per-queue scheduling in consumeMultiple

    $lastScheduleCheck and $lastDelayedCheck are single instance properties. In consumeMultiple, processScheduledJobs and processDelayedJobs are called for each queue in a loop. The first queue updates $this->lastScheduleCheck, so every subsequent queue in the same iteration skips the check until the next interval fires. If only two queues are consumed, the second queue's scheduled jobs can be delayed by up to SCHEDULE_CHECK_INTERVAL_MS (1 s) in the best case or silently starved if the loop runs faster than the interval.

  3. src/Queue/Broker/RedisStreams.php, line 919-937 (link)

    P2 xAutoClaim cursor is discarded, causing a full re-scan on every loop

    streamAutoClaim returns [next_start_id, claimed_entries, deleted_ids]. The first element is a pagination cursor — '0-0' means the full scan is complete, anything else means more entries remain. By always passing '0-0' and ignoring $result[0], every consumer loop re-scans from the beginning of the PEL. While functionally safe, this is O(PEL size) work on every iteration. Storing and advancing the cursor between calls would bound each iteration to a fixed window of entries.

  4. src/Queue/Broker/RedisStreams.php, line 955-961 (link)

    P2 Confusing member/score variable swap in processDelayedJobs

    sortedSetRangeByScore returns a plain indexed array (no scores), so iterating with foreach ($dueJobs as $member => $score) gives an integer $member (array index) and a string $score (the actual JSON member). The workaround $jobData = is_string($member) ? $member : $score; relies on the accidental fact that array indices are ints. This is fragile and the comment above it is misleading. Using a simple foreach ($dueJobs as $jobData) would be clear and correct.

Reviews (1): Last reviewed commit: "Merge branch 'main' into feat-redis-stre..." | Re-trigger Greptile

Comment on lines +319 to +323
$this->consumerId,
[$streamKey],
1,
0 // Non-blocking to check all queues quickly
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 enqueueAt drops sub-second timing precision

$timestamp - \time() produces an integer number of seconds, then enqueueDelayed multiplies by 1000 to get milliseconds. Any fractional-second offset between $timestamp and the current time is lost. If $timestamp is 0.9 s in the future the job is scheduled for "now" (max(0, …) clamps to 0). Using (int)(($timestamp - \microtime(true)) * 1000) directly as the delay would preserve millisecond precision.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Redis Streams–backed queue broker with delayed/scheduled job support, plus associated test infrastructure and static-analysis stubs.

Changes:

  • Introduces RedisStreams broker, StreamConnection abstraction, and Redis/RedisCluster stream-capable connections.
  • Adds a Schedule value object and extensive unit/E2E coverage around streams + scheduling APIs.
  • Updates tooling/config (PHPStan stubs scanning, Pint excludes, PHPUnit unit suite) and Docker Compose/services for running the new worker.

Reviewed changes

Copilot reviewed 17 out of 18 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/Queue/servers/SwooleRedisStreams/worker.php Adds a Swoole worker entrypoint wired to the Redis Streams broker for E2E runs.
tests/Queue/servers/SwooleRedisStreams/Dockerfile Adds a container image for the Redis Streams Swoole worker.
tests/Queue/Unit/ScheduleTest.php Adds unit tests for Schedule creation/validation/serialization and next-run calculations.
tests/Queue/Unit/RedisStreamsBrokerTest.php Adds Redis Streams broker tests (enqueue, delayed, schedules, observability APIs).
tests/Queue/Unit/RedisStreamConnectionTest.php Adds Redis Stream connection tests for stream/sorted-set/hash helper methods.
tests/Queue/E2E/Adapter/SwooleRedisStreamsTest.php Adds E2E adapter tests for Redis Streams with Swoole worker.
stubs/RedisCluster.stub.php Adds phpredis RedisCluster stub for PHPStan typing (streams + other APIs).
stubs/README.md Documents why the phpredis stub is vendored and how it’s used.
src/Queue/StreamConnection.php Introduces a stream-oriented connection interface (streams + zset + hash helpers).
src/Queue/Schedule.php Adds Schedule object for cron/interval recurring jobs.
src/Queue/Connection/RedisStreamCluster.php Implements StreamConnection on top of Redis Cluster connections.
src/Queue/Connection/RedisStream.php Implements StreamConnection on top of single-node Redis connections.
src/Queue/Broker/RedisStreams.php Implements Redis Streams broker with consume loop, retries, delayed jobs, schedules, and observability helpers.
pint.json Excludes stubs/ from Pint formatting.
phpunit.xml Adds a Unit test suite pointing at tests/Queue/Unit.
phpstan.neon Adds stubs/ to PHPStan scan directories for RedisCluster typing.
docker-compose.yml Adds a Redis Streams Swoole worker service and adjusts service wiring.
composer.lock Adds cron-expression dependency used by the new Schedule implementation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/Queue/Schedule.php
Comment on lines +5 to +6
use Cron\CronExpression;

Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schedule depends on Cron\CronExpression, but composer.json doesn’t declare dragonmantank/cron-expression in require (only appears in composer.lock). This will break installs that regenerate the lock or consumers that require this package. Add it to require (or change the implementation to avoid this dependency).

Copilot uses AI. Check for mistakes.
public function testInvalidCronExpression(): void
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('Invalid cron expression');
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schedule throws Invalid cron expression: {$cron}, but this test expects the message to be exactly Invalid cron expression. Update the expected message (or use a regex/substring assertion) so the test matches the actual exception text.

Suggested change
$this->expectExceptionMessage('Invalid cron expression');
$this->expectExceptionMessageMatches('/^Invalid cron expression:/');

Copilot uses AI. Check for mistakes.
Comment on lines +212 to +219
$streamSize = $this->connection->streamLen($this->getStreamKey($queue));
$delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue));

return $streamSize + $delayedSize;
}


/**
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publisher::getQueueSize() is documented as “pending messages”, but streamLen() (XLEN) counts all entries in the stream, including already-acknowledged/history entries (until trimming). This will over-report queue size as the stream grows. Consider using consumer-group lag/pending (e.g., XINFO GROUPS lag) plus delayed count instead of XLEN for the main queue.

Suggested change
$streamSize = $this->connection->streamLen($this->getStreamKey($queue));
$delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue));
return $streamSize + $delayedSize;
}
/**
$streamKey = $this->getStreamKey($queue);
$groupName = $this->getGroupName($queue);
$queueSize = $this->getConsumerGroupQueueSize($streamKey, $groupName);
$delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue));
return $queueSize + $delayedSize;
}
/**
* Returns the actionable backlog for a consumer-group-backed stream.
*
* This is calculated as:
* - lag: messages not yet delivered to the group
* - pending: messages delivered to the group but not yet acknowledged
*
* If consumer-group info is not available through the current connection
* abstraction, we fall back to the raw stream length as a best effort.
*/
private function getConsumerGroupQueueSize(string $streamKey, string $groupName): int
{
$groups = $this->getConsumerGroupsInfo($streamKey);
foreach ($groups as $group) {
if (($group['name'] ?? null) !== $groupName) {
continue;
}
$lag = (int) ($group['lag'] ?? 0);
$pending = (int) ($group['pending'] ?? 0);
return max(0, $lag) + max(0, $pending);
}
return $this->connection->streamLen($streamKey);
}
/**
* Fetches and normalizes Redis XINFO GROUPS data for a stream.
*
* Supports several possible connection APIs so this class can use group
* backlog metrics without requiring changes outside this file.
*
* @return array<int, array{name?: string, lag?: int, pending?: int}>
*/
private function getConsumerGroupsInfo(string $streamKey): array
{
$groups = [];
if (\method_exists($this->connection, 'streamInfoGroups')) {
$groups = $this->connection->streamInfoGroups($streamKey);
} elseif (\method_exists($this->connection, 'xInfoGroups')) {
$groups = $this->connection->xInfoGroups($streamKey);
} elseif (\method_exists($this->connection, 'xInfo')) {
$groups = $this->connection->xInfo('GROUPS', $streamKey);
} elseif (\method_exists($this->connection, 'rawCommand')) {
$groups = $this->connection->rawCommand('XINFO', 'GROUPS', $streamKey);
}
if (!\is_array($groups) || $groups === []) {
return [];
}
$normalized = [];
foreach ($groups as $group) {
if (!\is_array($group)) {
continue;
}
$normalized[] = [
'name' => $group['name'] ?? $group[0] ?? null,
'pending' => (int) ($group['pending'] ?? $group[2] ?? 0),
'lag' => (int) ($group['lag'] ?? $group[8] ?? 0),
];
}
return $normalized;
}
/**

Copilot uses AI. Check for mistakes.
Comment on lines +149 to +151
try {
$result = $redis->rawCommand(...$command);
return $result ?: false;
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the NOACK branch, rawCommand('XREADGROUP', ...) returns Redis’ raw nested response format, which won’t match the associative array shape returned by xReadGroup() (and expected by callers via $entries[$streamKey][$id]). Parse/normalize the raw response before returning (or avoid rawCommand if NOACK is supported).

Copilot uses AI. Check for mistakes.
Comment on lines +118 to +124
$result = $this->getRedis()->xReadGroup(
$group,
$consumer,
$streamIds,
$count,
$block > 0 ? $block : null
);
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xReadGroup() is invoked with null for $block when $block <= 0 ($block > 0 ? $block : null). The included phpredis stub (stubs/RedisCluster.stub.php) types this argument as int, so this will fail static analysis (and may be a runtime TypeError depending on ext-redis). Consider omitting the argument entirely when not blocking, rather than passing null.

Suggested change
$result = $this->getRedis()->xReadGroup(
$group,
$consumer,
$streamIds,
$count,
$block > 0 ? $block : null
);
if ($block > 0) {
$result = $this->getRedis()->xReadGroup(
$group,
$consumer,
$streamIds,
$count,
$block
);
} else {
$result = $this->getRedis()->xReadGroup(
$group,
$consumer,
$streamIds,
$count
);
}

Copilot uses AI. Check for mistakes.
Comment on lines +328 to +332
local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3])
if #members > 0 then
redis.call('ZREM', KEYS[1], unpack(members))
end
return members
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Lua script uses unpack(members), which has practical limits; the non-cluster implementation caps $limit to avoid stack/unpack failures, but this cluster variant doesn’t. Apply the same $limit cap here to keep behavior consistent and prevent runtime failures when callers pass large limits.

Copilot uses AI. Check for mistakes.
@abnegate
Copy link
Copy Markdown
Member Author

@copilot Address the greptile comments and failing codeql check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants