diff --git a/composer.lock b/composer.lock
index 02a67df..09b632e 100644
--- a/composer.lock
+++ b/composer.lock
@@ -143,6 +143,70 @@
],
"time": "2025-08-20T19:15:30+00:00"
},
+ {
+ "name": "dragonmantank/cron-expression",
+ "version": "v3.6.0",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/dragonmantank/cron-expression.git",
+ "reference": "d61a8a9604ec1f8c3d150d09db6ce98b32675013"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/dragonmantank/cron-expression/zipball/d61a8a9604ec1f8c3d150d09db6ce98b32675013",
+ "reference": "d61a8a9604ec1f8c3d150d09db6ce98b32675013",
+ "shasum": ""
+ },
+ "require": {
+ "php": "^8.2|^8.3|^8.4|^8.5"
+ },
+ "replace": {
+ "mtdowling/cron-expression": "^1.0"
+ },
+ "require-dev": {
+ "phpstan/extension-installer": "^1.4.3",
+ "phpstan/phpstan": "^1.12.32|^2.1.31",
+ "phpunit/phpunit": "^8.5.48|^9.0"
+ },
+ "type": "library",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "3.x-dev"
+ }
+ },
+ "autoload": {
+ "psr-4": {
+ "Cron\\": "src/Cron/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "Chris Tankersley",
+ "email": "chris@ctankersley.com",
+ "homepage": "https://github.com/dragonmantank"
+ }
+ ],
+ "description": "CRON for PHP: Calculate the next or previous run date and determine if a CRON expression is due",
+ "keywords": [
+ "cron",
+ "schedule"
+ ],
+ "support": {
+ "issues": "https://github.com/dragonmantank/cron-expression/issues",
+ "source": "https://github.com/dragonmantank/cron-expression/tree/v3.6.0"
+ },
+ "funding": [
+ {
+ "url": "https://github.com/dragonmantank",
+ "type": "github"
+ }
+ ],
+ "time": "2025-10-31T18:51:33+00:00"
+ },
{
"name": "google/protobuf",
"version": "v4.32.0",
diff --git a/docker-compose.yml b/docker-compose.yml
index 6b230a3..b00f81e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,6 +10,7 @@ services:
- swoole
- swoole-amqp
- swoole-redis-cluster
+ - swoole-redis-streams
- workerman
swoole:
@@ -35,6 +36,17 @@ services:
redis-cluster-0:
condition: service_healthy
+ swoole-redis-streams:
+ container_name: swoole-redis-streams
+ build: ./tests/Queue/servers/SwooleRedisStreams/.
+ command: php /usr/src/code/tests/Queue/servers/SwooleRedisStreams/worker.php
+ volumes:
+ - ./vendor:/usr/src/code/vendor
+ - ./src:/usr/src/code/src
+ - ./tests:/usr/src/code/tests
+ depends_on:
+ - redis
+
swoole-amqp:
container_name: swoole-amqp
build: ./tests/Queue/servers/AMQP/.
@@ -61,8 +73,6 @@ services:
redis:
container_name: redis
image: "redis:alpine"
- ports:
- - "6379:6379"
redis-cluster-0:
image: docker.io/bitnamilegacy/redis-cluster:7.4
diff --git a/phpstan.neon b/phpstan.neon
index 6852b4c..4206b0d 100644
--- a/phpstan.neon
+++ b/phpstan.neon
@@ -6,4 +6,5 @@ parameters:
- tests
scanDirectories:
- - vendor/swoole
\ No newline at end of file
+ - vendor/swoole
+ - stubs
\ No newline at end of file
diff --git a/phpunit.xml b/phpunit.xml
index cb11a1f..84b3b4c 100644
--- a/phpunit.xml
+++ b/phpunit.xml
@@ -8,5 +8,8 @@
./tests/Queue/E2E/Adapter
+
+ ./tests/Queue/Unit
+
\ No newline at end of file
diff --git a/pint.json b/pint.json
index dc0de5d..bc15e55 100644
--- a/pint.json
+++ b/pint.json
@@ -1,5 +1,8 @@
{
"preset": "psr12",
+ "exclude": [
+ "stubs"
+ ],
"rules": {
"single_quote": true
}
diff --git a/src/Queue/Broker/RedisStreams.php b/src/Queue/Broker/RedisStreams.php
new file mode 100644
index 0000000..a4da641
--- /dev/null
+++ b/src/Queue/Broker/RedisStreams.php
@@ -0,0 +1,931 @@
+consumerId = 'worker-' . \uniqid();
+ }
+
+ /**
+ * Set the consumer ID for this broker instance.
+ *
+ * @param string $consumerId
+ * @return void
+ */
+ public function setConsumerId(string $consumerId): void
+ {
+ $this->consumerId = $consumerId;
+ }
+
+ /**
+ * Get the consumer ID for this broker instance.
+ *
+ * @return string
+ */
+ public function getConsumerId(): string
+ {
+ return $this->consumerId;
+ }
+
+
+ /**
+ * @inheritDoc
+ */
+ public function enqueue(Queue $queue, array $payload): bool
+ {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+
+ // Ensure consumer group exists
+ $this->ensureConsumerGroup($streamKey, $groupName);
+
+ $messageData = [
+ 'pid' => \uniqid(more_entropy: true),
+ 'queue' => $queue->name,
+ 'timestamp' => \time(),
+ 'payload' => $payload,
+ ];
+
+ $encodedData = \json_encode($messageData);
+ if ($encodedData === false) {
+ throw new \RuntimeException('Failed to encode message data: ' . \json_last_error_msg());
+ }
+
+ $fields = [
+ 'data' => $encodedData,
+ 'retry_count' => '0',
+ ];
+
+ $result = $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+
+ return $result !== false;
+ }
+
+ /**
+ * Enqueue a job to be processed after a delay.
+ *
+ * @param Queue $queue
+ * @param array $payload
+ * @param int $delaySeconds Seconds to delay before processing
+ * @return bool
+ */
+ public function enqueueDelayed(Queue $queue, array $payload, int $delaySeconds): bool
+ {
+ if ($delaySeconds < 0) {
+ throw new \InvalidArgumentException('Delay seconds must be non-negative');
+ }
+
+ $delayedKey = $this->getDelayedKey($queue);
+
+ $messageData = [
+ 'pid' => \uniqid(more_entropy: true),
+ 'queue' => $queue->name,
+ 'timestamp' => \time(),
+ 'payload' => $payload,
+ ];
+
+ $encodedData = \json_encode($messageData);
+ if ($encodedData === false) {
+ throw new \RuntimeException('Failed to encode message data: ' . \json_last_error_msg());
+ }
+
+ $fields = [
+ 'data' => $encodedData,
+ 'retry_count' => '0',
+ ];
+
+ // Score is the timestamp when the job should be processed (in milliseconds)
+ $executeAt = (int)(\microtime(true) * 1000) + ($delaySeconds * 1000);
+
+ $encodedFields = \json_encode($fields);
+ if ($encodedFields === false) {
+ throw new \RuntimeException('Failed to encode field data: ' . \json_last_error_msg());
+ }
+
+ $result = $this->connection->sortedSetAdd($delayedKey, $executeAt, $encodedFields);
+
+ return $result >= 0;
+ }
+
+ /**
+ * Enqueue a job to be processed at a specific time.
+ *
+ * @param Queue $queue
+ * @param array $payload
+ * @param int $timestamp Unix timestamp when the job should be processed
+ * @return bool
+ */
+ public function enqueueAt(Queue $queue, array $payload, int $timestamp): bool
+ {
+ $delaySeconds = \max(0, $timestamp - \time());
+ return $this->enqueueDelayed($queue, $payload, $delaySeconds);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function retry(Queue $queue, ?int $limit = null): void
+ {
+ $dlqKey = $this->getDlqKey($queue);
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+
+ // Ensure group exists
+ $this->ensureConsumerGroup($streamKey, $groupName);
+
+ // Read from DLQ stream
+ $entries = $this->connection->streamRange($dlqKey, '-', '+', $limit ?? 100);
+
+ $processed = 0;
+ $idsToDelete = [];
+
+ foreach ($entries as $entryId => $fields) {
+ if ($limit !== null && $processed >= $limit) {
+ break;
+ }
+
+ // Reset retry count and re-add to main stream
+ $fields['retry_count'] = '0';
+ unset($fields['error'], $fields['failed_at']);
+
+ $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+ $idsToDelete[] = $entryId;
+ $processed++;
+ }
+
+ // Delete retried entries from DLQ
+ if (!empty($idsToDelete)) {
+ $this->connection->streamDel($dlqKey, $idsToDelete);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
+ {
+ if ($failedJobs) {
+ return $this->connection->streamLen($this->getDlqKey($queue));
+ }
+
+ $streamSize = $this->connection->streamLen($this->getStreamKey($queue));
+ $delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue));
+
+ return $streamSize + $delayedSize;
+ }
+
+
+ /**
+ * @inheritDoc
+ */
+ public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
+ {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+ $dlqKey = $this->getDlqKey($queue);
+ $delayedKey = $this->getDelayedKey($queue);
+
+ // Ensure consumer groups exist
+ $this->ensureConsumerGroup($streamKey, $groupName);
+ $this->ensureConsumerGroup($dlqKey, $groupName);
+
+ while (!$this->closed) {
+ try {
+ // 1. Process due scheduled jobs
+ $this->processScheduledJobs($queue);
+
+ // 2. Process due delayed jobs
+ $this->processDelayedJobs($queue, $delayedKey, $streamKey);
+
+ // 3. Claim abandoned messages from crashed consumers
+ $this->claimAbandonedMessages($streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+
+ // 4. Read new messages from stream
+ $entries = $this->connection->streamReadGroup(
+ $groupName,
+ $this->consumerId,
+ [$streamKey],
+ 1,
+ self::BLOCK_TIMEOUT_MS
+ );
+
+ if ($entries === false || empty($entries)) {
+ continue;
+ }
+
+ foreach ($entries[$streamKey] ?? [] as $entryId => $fields) {
+ $this->processEntry($entryId, $fields, $streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+ } catch (\RedisException $e) {
+ if ($this->closed) {
+ break;
+ }
+ throw $e;
+ }
+ }
+ }
+
+ /**
+ * Consume from multiple queues simultaneously.
+ *
+ * @param Queue[] $queues Array of queues to consume from
+ * @param callable $messageCallback Receives (Message $message, Queue $queue)
+ * @param callable $successCallback Receives (Message $message, Queue $queue)
+ * @param callable $errorCallback Receives (Message $message, Queue $queue, \Throwable $error)
+ * @return void
+ */
+ public function consumeMultiple(array $queues, callable $messageCallback, callable $successCallback, callable $errorCallback): void
+ {
+ $streamKeys = [];
+ $queueMap = [];
+
+ foreach ($queues as $queue) {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+
+ // Ensure consumer groups exist
+ $this->ensureConsumerGroup($streamKey, $groupName);
+ $this->ensureConsumerGroup($this->getDlqKey($queue), $groupName);
+
+ $streamKeys[] = $streamKey;
+ $queueMap[$streamKey] = $queue;
+ }
+
+ while (!$this->closed) {
+ try {
+ // Process scheduled and delayed jobs for all queues
+ foreach ($queues as $queue) {
+ $this->processScheduledJobs($queue);
+ $this->processDelayedJobs($queue, $this->getDelayedKey($queue), $this->getStreamKey($queue));
+
+ // Claim abandoned messages
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+ $dlqKey = $this->getDlqKey($queue);
+ $this->claimAbandonedMessages($streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+
+ // Read from each queue individually with its own consumer group
+ // Note: Redis XREADGROUP requires a single consumer group, so we can't
+ // read from multiple streams with different groups in one call
+ foreach ($queues as $queue) {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+ $dlqKey = $this->getDlqKey($queue);
+
+ $entries = $this->connection->streamReadGroup(
+ $groupName,
+ $this->consumerId,
+ [$streamKey],
+ 1,
+ 0 // Non-blocking to check all queues quickly
+ );
+
+ if ($entries === false || empty($entries)) {
+ continue;
+ }
+
+ foreach ($entries[$streamKey] ?? [] as $entryId => $fields) {
+ $this->processEntry($entryId, $fields, $streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+ }
+
+ // Brief sleep to prevent tight loop when all queues are empty
+ \usleep(10000); // 10ms
+ } catch (\RedisException $e) {
+ if ($this->closed) {
+ break;
+ }
+ throw $e;
+ }
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function close(): void
+ {
+ $this->closed = true;
+ }
+
+
+ /**
+ * Register a recurring schedule.
+ *
+ * @param Queue $queue
+ * @param Schedule $schedule
+ * @return bool
+ */
+ public function schedule(Queue $queue, Schedule $schedule): bool
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ // Store schedule definition
+ $encodedSchedule = \json_encode($schedule->toArray());
+ if ($encodedSchedule === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+
+ $stored = $this->connection->hashSet($schedulesKey, $schedule->id, $encodedSchedule);
+
+ if (!$stored) {
+ return false;
+ }
+
+ // Calculate and store next run time
+ $nextRun = $schedule->getNextRunTime();
+ if ($nextRun !== null) {
+ $this->connection->sortedSetAdd($nextKey, (float)($nextRun * 1000), $schedule->id);
+ }
+
+ return true;
+ }
+
+ /**
+ * Remove a recurring schedule.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return bool
+ */
+ public function unschedule(Queue $queue, string $scheduleId): bool
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ $this->connection->hashDel($schedulesKey, $scheduleId);
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+
+ return true;
+ }
+
+ /**
+ * Get a schedule by ID.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return Schedule|null
+ */
+ public function getSchedule(Queue $queue, string $scheduleId): ?Schedule
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $data = $this->connection->hashGet($schedulesKey, $scheduleId);
+
+ if ($data === false) {
+ return null;
+ }
+
+ $decodedData = \json_decode($data, true);
+ if ($decodedData === null && \json_last_error() !== JSON_ERROR_NONE) {
+ throw new \RuntimeException('Failed to decode schedule data: ' . \json_last_error_msg());
+ }
+
+ return Schedule::fromArray($decodedData);
+ }
+
+ /**
+ * Get all schedules for a queue.
+ *
+ * @param Queue $queue
+ * @return Schedule[]
+ */
+ public function getSchedules(Queue $queue): array
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $all = $this->connection->hashGetAll($schedulesKey);
+
+ $schedules = [];
+ foreach ($all as $id => $data) {
+ $decodedData = \json_decode($data, true);
+ if ($decodedData === null && \json_last_error() !== JSON_ERROR_NONE) {
+ // Skip corrupted schedule data rather than failing completely
+ continue;
+ }
+ $schedules[$id] = Schedule::fromArray($decodedData);
+ }
+
+ return $schedules;
+ }
+
+ /**
+ * Pause a schedule.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return bool
+ */
+ public function pauseSchedule(Queue $queue, string $scheduleId): bool
+ {
+ $schedule = $this->getSchedule($queue, $scheduleId);
+ if ($schedule === null) {
+ return false;
+ }
+
+ $paused = $schedule->pause();
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ $encodedSchedule = \json_encode($paused->toArray());
+ if ($encodedSchedule === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+
+ // Update schedule and remove from next execution queue
+ $this->connection->hashSet($schedulesKey, $scheduleId, $encodedSchedule);
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+
+ return true;
+ }
+
+ /**
+ * Resume a paused schedule.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return bool
+ */
+ public function resumeSchedule(Queue $queue, string $scheduleId): bool
+ {
+ $schedule = $this->getSchedule($queue, $scheduleId);
+ if ($schedule === null) {
+ return false;
+ }
+
+ $resumed = $schedule->resume();
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ $encodedSchedule = \json_encode($resumed->toArray());
+ if ($encodedSchedule === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+
+ // Update schedule and add next execution time
+ $this->connection->hashSet($schedulesKey, $scheduleId, $encodedSchedule);
+
+ $nextRun = $resumed->getNextRunTime();
+ if ($nextRun !== null) {
+ $this->connection->sortedSetAdd($nextKey, (float)($nextRun * 1000), $scheduleId);
+ }
+
+ return true;
+ }
+
+
+ /**
+ * Get stream information.
+ *
+ * @param Queue $queue
+ * @return array
+ */
+ public function getStreamInfo(Queue $queue): array
+ {
+ return $this->connection->streamInfo($this->getStreamKey($queue));
+ }
+
+ /**
+ * Get consumer group information.
+ *
+ * @param Queue $queue
+ * @return array
+ */
+ public function getGroupInfo(Queue $queue): array
+ {
+ $groups = $this->connection->streamGroupInfo($this->getStreamKey($queue));
+ $groupName = $this->getGroupName($queue);
+
+ foreach ($groups as $group) {
+ if (($group['name'] ?? '') === $groupName) {
+ return $group;
+ }
+ }
+
+ return [];
+ }
+
+ /**
+ * Get consumers information for the queue's consumer group.
+ *
+ * @param Queue $queue
+ * @return array
+ */
+ public function getConsumersInfo(Queue $queue): array
+ {
+ return $this->connection->streamConsumersInfo(
+ $this->getStreamKey($queue),
+ $this->getGroupName($queue)
+ );
+ }
+
+ /**
+ * Get the consumer lag (messages waiting to be delivered).
+ *
+ * @param Queue $queue
+ * @return int
+ */
+ public function getLag(Queue $queue): int
+ {
+ $groupInfo = $this->getGroupInfo($queue);
+ return $groupInfo['lag'] ?? 0;
+ }
+
+ /**
+ * Get count of delayed jobs.
+ *
+ * @param Queue $queue
+ * @return int
+ */
+ public function getDelayedCount(Queue $queue): int
+ {
+ return $this->connection->sortedSetSize($this->getDelayedKey($queue));
+ }
+
+ /**
+ * Get pending message count (messages being processed).
+ *
+ * @param Queue $queue
+ * @return int
+ */
+ public function getPendingCount(Queue $queue): int
+ {
+ $pending = $this->connection->streamPendingSummary(
+ $this->getStreamKey($queue),
+ $this->getGroupName($queue)
+ );
+
+ return $pending[0] ?? 0;
+ }
+
+ /**
+ * Get messages from stream (for replay/history).
+ *
+ * @param Queue $queue
+ * @param string $start Start ID ('-' for minimum)
+ * @param string $end End ID ('+' for maximum)
+ * @param int|null $count Max messages
+ * @return Message[]
+ */
+ public function getMessages(Queue $queue, string $start = '-', string $end = '+', ?int $count = null): array
+ {
+ $entries = $this->connection->streamRange($this->getStreamKey($queue), $start, $end, $count);
+
+ $messages = [];
+ foreach ($entries as $id => $fields) {
+ $data = \json_decode($fields['data'] ?? '{}', true);
+ if ($data === null && \json_last_error() !== JSON_ERROR_NONE) {
+ // Skip corrupted message data
+ continue;
+ }
+ $data['streamId'] = $id;
+ $messages[] = new Message($data);
+ }
+
+ return $messages;
+ }
+
+ /**
+ * Get a specific message by ID.
+ *
+ * @param Queue $queue
+ * @param string $id Stream entry ID
+ * @return Message|null
+ */
+ public function getMessage(Queue $queue, string $id): ?Message
+ {
+ $entries = $this->connection->streamRange($this->getStreamKey($queue), $id, $id, 1);
+
+ if (empty($entries)) {
+ return null;
+ }
+
+ $fields = \reset($entries);
+ $data = \json_decode($fields['data'] ?? '{}', true);
+ if ($data === null && \json_last_error() !== JSON_ERROR_NONE) {
+ throw new \RuntimeException('Failed to decode message data: ' . \json_last_error_msg());
+ }
+ $data['streamId'] = $id;
+
+ return new Message($data);
+ }
+
+ /**
+ * Manually trim the stream.
+ *
+ * @param Queue $queue
+ * @param int $maxLen Maximum length to keep
+ * @return int Number of entries trimmed
+ */
+ public function trimStream(Queue $queue, int $maxLen): int
+ {
+ // Use exact trimming (not approximate) for manual trim operations
+ return $this->connection->streamTrim($this->getStreamKey($queue), $maxLen, false);
+ }
+
+ /**
+ * Delete a consumer from the consumer group.
+ *
+ * @param Queue $queue
+ * @param string $consumerId
+ * @return int Number of pending messages that were owned by the consumer
+ */
+ public function deleteConsumer(Queue $queue, string $consumerId): int
+ {
+ return $this->connection->streamDeleteConsumer(
+ $this->getStreamKey($queue),
+ $this->getGroupName($queue),
+ $consumerId
+ );
+ }
+
+
+ /**
+ * Process a single stream entry.
+ */
+ private function processEntry(
+ string $entryId,
+ array $fields,
+ string $streamKey,
+ string $groupName,
+ string $dlqKey,
+ Queue $queue,
+ callable $messageCallback,
+ callable $successCallback,
+ callable $errorCallback
+ ): void {
+ $messageData = \json_decode($fields['data'] ?? '{}', true);
+ $messageData['timestamp'] = (int)($messageData['timestamp'] ?? \time());
+ $messageData['streamId'] = $entryId;
+ $retryCount = (int)($fields['retry_count'] ?? 0);
+
+ $message = new Message($messageData);
+
+ // Update stats
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.total");
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.processing");
+
+ try {
+ $messageCallback($message);
+
+ // Acknowledge the message
+ $this->connection->streamAck($streamKey, $groupName, $entryId);
+
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.success");
+ $successCallback($message);
+ } catch (\Throwable $th) {
+ // Acknowledge the failed message to remove from pending
+ $this->connection->streamAck($streamKey, $groupName, $entryId);
+
+ if ($retryCount < $this->maxRetries) {
+ // Re-add to stream with incremented retry count
+ $fields['retry_count'] = (string)($retryCount + 1);
+ $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+ } else {
+ // Move to DLQ
+ $fields['error'] = $th->getMessage();
+ $fields['failed_at'] = (string)\time();
+ $this->connection->streamAdd($dlqKey, $fields, '*', $this->maxStreamLength);
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed");
+ }
+
+ $errorCallback($message, $th);
+ } finally {
+ $this->connection->decrement("{$queue->namespace}.stats.{$queue->name}.processing");
+ }
+ }
+
+ /**
+ * Ensure consumer group exists.
+ */
+ private function ensureConsumerGroup(string $streamKey, string $groupName): void
+ {
+ $this->connection->streamCreateGroup($streamKey, $groupName, '0', true);
+ }
+
+ /**
+ * Claim abandoned messages from crashed consumers.
+ */
+ private function claimAbandonedMessages(
+ string $streamKey,
+ string $groupName,
+ string $dlqKey,
+ Queue $queue,
+ callable $messageCallback,
+ callable $successCallback,
+ callable $errorCallback
+ ): void {
+ $result = $this->connection->streamAutoClaim(
+ $streamKey,
+ $groupName,
+ $this->consumerId,
+ $this->claimIdleTimeMs,
+ '0-0',
+ 10
+ );
+
+ if (empty($result) || empty($result[1])) {
+ return;
+ }
+
+ // Process claimed messages
+ foreach ($result[1] as $entryId => $fields) {
+ if (\is_array($fields)) {
+ $this->processEntry($entryId, $fields, $streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+ }
+ }
+
+ /**
+ * Process delayed jobs that are now due.
+ */
+ private function processDelayedJobs(Queue $queue, string $delayedKey, string $streamKey): void
+ {
+ $now = (int)(\microtime(true) * 1000);
+
+ // Only check periodically
+ if ($now - $this->lastDelayedCheck < self::DELAYED_CHECK_INTERVAL_MS) {
+ return;
+ }
+ $this->lastDelayedCheck = $now;
+
+ // Get jobs that are now due (read without removing to prevent job loss on crash)
+ $dueJobs = $this->connection->sortedSetRangeByScore($delayedKey, 0, (float)$now, 100);
+
+ foreach ($dueJobs as $member => $score) {
+ // In zRangeByScore with scores, member is the value and score is the key when WITHSCORES is used
+ // But without WITHSCORES option, we get a simple array of members
+ $jobData = is_string($member) ? $member : $score;
+ $fields = \json_decode($jobData, true);
+
+ if ($fields && \json_last_error() === JSON_ERROR_NONE) {
+ // Add to stream first - if this fails, job stays in delayed set for retry
+ $streamId = $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+
+ // Only remove from delayed set after successful add to prevent job loss
+ if ($streamId !== false) {
+ $this->connection->sortedSetRemove($delayedKey, $jobData);
+ }
+ }
+ }
+ }
+
+ /**
+ * Process scheduled jobs that are now due.
+ */
+ private function processScheduledJobs(Queue $queue): void
+ {
+ $now = (int)(\microtime(true) * 1000);
+
+ // Only check periodically
+ if ($now - $this->lastScheduleCheck < self::SCHEDULE_CHECK_INTERVAL_MS) {
+ return;
+ }
+ $this->lastScheduleCheck = $now;
+
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+ $streamKey = $this->getStreamKey($queue);
+
+ // Get schedules that are due (read without removing to prevent duplicate processing)
+ $dueScheduleIds = $this->connection->sortedSetRangeByScore($nextKey, 0, (float)$now, 100);
+
+ foreach ($dueScheduleIds as $scheduleId) {
+ $scheduleData = $this->connection->hashGet($schedulesKey, $scheduleId);
+ if ($scheduleData === false) {
+ // Schedule was deleted, remove from next run queue
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+ continue;
+ }
+
+ $decodedData = \json_decode($scheduleData, true);
+ if ($decodedData === null && \json_last_error() !== JSON_ERROR_NONE) {
+ // Invalid JSON, remove from queue to prevent infinite loop
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+ continue;
+ }
+
+ $schedule = Schedule::fromArray($decodedData);
+
+ // Remove from next run queue first (atomic operation)
+ $removed = $this->connection->sortedSetRemove($nextKey, $scheduleId);
+
+ // Skip if another consumer already processed this (removed = 0)
+ if ($removed === 0) {
+ continue;
+ }
+
+ // Skip if not active (paused, max runs reached, etc.)
+ if (!$schedule->isActive()) {
+ continue;
+ }
+
+ // Enqueue the job
+ $messageData = [
+ 'pid' => \uniqid(more_entropy: true),
+ 'queue' => $queue->name,
+ 'timestamp' => \time(),
+ 'payload' => $schedule->payload,
+ 'schedule_id' => $schedule->id,
+ ];
+
+ $encodedData = \json_encode($messageData);
+ if ($encodedData === false) {
+ throw new \RuntimeException('Failed to encode message data: ' . \json_last_error_msg());
+ }
+
+ $fields = [
+ 'data' => $encodedData,
+ 'retry_count' => '0',
+ ];
+
+ $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+
+ // Update run count
+ $updated = $schedule->incrementRunCount();
+ $encodedUpdated = \json_encode($updated->toArray());
+ if ($encodedUpdated === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+ $this->connection->hashSet($schedulesKey, $scheduleId, $encodedUpdated);
+
+ // Calculate and store next run time
+ $nextRun = $updated->getNextRunTime(\time());
+ if ($nextRun !== null && $updated->isActive()) {
+ $this->connection->sortedSetAdd($nextKey, (float)($nextRun * 1000), $scheduleId);
+ }
+ }
+ }
+
+
+ private function getStreamKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.stream.{$queue->name}";
+ }
+
+ private function getDlqKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.stream.{$queue->name}.dlq";
+ }
+
+ private function getDelayedKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.delayed.{$queue->name}";
+ }
+
+ private function getGroupName(Queue $queue): string
+ {
+ return "{$queue->namespace}.group.{$queue->name}";
+ }
+
+ private function getSchedulesKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.schedules.{$queue->name}";
+ }
+
+ private function getScheduleNextKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.schedule.next.{$queue->name}";
+ }
+}
diff --git a/src/Queue/Connection/RedisStream.php b/src/Queue/Connection/RedisStream.php
new file mode 100644
index 0000000..acce640
--- /dev/null
+++ b/src/Queue/Connection/RedisStream.php
@@ -0,0 +1,463 @@
+getRedis();
+
+ if ($maxLen !== null) {
+ // Use exact MAXLEN (approximate=false) by default for reliable trimming
+ // Note: approximate trimming with phpredis may not trim immediately
+ return $redis->xAdd($stream, $id, $fields, $maxLen, $approximate);
+ }
+
+ return $redis->xAdd($stream, $id, $fields);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamCreateGroup(string $stream, string $group, string $id = '0', bool $mkstream = true): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('CREATE', $stream, $group, $id, $mkstream);
+ // phpredis may return false instead of throwing on BUSYGROUP
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ if ($error !== null && str_contains($error, 'BUSYGROUP')) {
+ return true;
+ }
+ return false;
+ }
+ return (bool)$result;
+ } catch (\RedisException $e) {
+ // Group already exists - BUSYGROUP error
+ if (str_contains($e->getMessage(), 'BUSYGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDestroyGroup(string $stream, string $group): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('DESTROY', $stream, $group);
+ // phpredis may return false instead of throwing on errors
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ // Stream doesn't exist or group doesn't exist - treat as success (already destroyed)
+ if ($error !== null && (
+ str_contains($error, 'NOGROUP') ||
+ str_contains($error, 'no such key') ||
+ str_contains($error, 'key to exist')
+ )) {
+ return true;
+ }
+ return false;
+ }
+ // Result of 0 means the group didn't exist - that's fine, it's "destroyed"
+ return true;
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDeleteConsumer(string $stream, string $group, string $consumer): int
+ {
+ try {
+ return $this->getRedis()->xGroup('DELCONSUMER', $stream, $group, $consumer);
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return 0;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamReadGroup(
+ string $group,
+ string $consumer,
+ array $streams,
+ int $count = 1,
+ int $block = 0,
+ bool $noack = false
+ ): array|false {
+ $streamIds = [];
+ foreach ($streams as $stream) {
+ $streamIds[$stream] = '>'; // Read only new messages
+ }
+
+ // Build options array for xReadGroup
+ $options = [];
+ if ($noack) {
+ $options['NOACK'] = true;
+ }
+
+ $redis = $this->getRedis();
+
+ // phpredis doesn't support NOACK in xReadGroup directly, so we need to use rawCommand
+ if ($noack) {
+ // Build the command manually for NOACK support
+ $command = ['XREADGROUP', 'GROUP', $group, $consumer];
+ if ($count > 0) {
+ $command[] = 'COUNT';
+ $command[] = (string)$count;
+ }
+ if ($block > 0) {
+ $command[] = 'BLOCK';
+ $command[] = (string)$block;
+ }
+ $command[] = 'NOACK';
+ $command[] = 'STREAMS';
+
+ foreach ($streamIds as $stream => $id) {
+ $command[] = $stream;
+ }
+ foreach ($streamIds as $stream => $id) {
+ $command[] = $id;
+ }
+
+ try {
+ $result = $redis->rawCommand(...$command);
+ return $result ?: false;
+ } catch (\RedisException $e) {
+ return false;
+ }
+ }
+
+ $result = $redis->xReadGroup(
+ $group,
+ $consumer,
+ $streamIds,
+ $count,
+ $block > 0 ? $block : null
+ );
+
+ return $result ?: false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAck(string $stream, string $group, string|array $ids): int
+ {
+ $ids = is_array($ids) ? $ids : [$ids];
+ return $this->getRedis()->xAck($stream, $group, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPendingSummary(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xPending($stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPending(
+ string $stream,
+ string $group,
+ string $start = '-',
+ string $end = '+',
+ int $count = 100,
+ ?string $consumer = null
+ ): array {
+ try {
+ if ($consumer !== null) {
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count, $consumer) ?: [];
+ }
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ array $ids,
+ bool $justId = false
+ ): array {
+ try {
+ $options = $justId ? ['JUSTID'] : [];
+ return $this->getRedis()->xClaim($stream, $group, $consumer, $minIdleTime, $ids, $options) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAutoClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ string $start = '0-0',
+ int $count = 100
+ ): array {
+ try {
+ $result = $this->getRedis()->xAutoClaim($stream, $group, $consumer, $minIdleTime, $start, $count);
+ return $result ?: ['0-0', [], []];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return ['0-0', [], []];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDel(string $stream, array $ids): int
+ {
+ if (empty($ids)) {
+ return 0;
+ }
+ return $this->getRedis()->xDel($stream, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamLen(string $stream): int
+ {
+ return $this->getRedis()->xLen($stream);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamTrim(string $stream, int $maxLen, bool $approximate = true): int
+ {
+ return $this->getRedis()->xTrim($stream, $maxLen, $approximate);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('STREAM', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamGroupInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('GROUPS', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamConsumersInfo(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xInfo('CONSUMERS', $stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRange(string $stream, string $start = '-', string $end = '+', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRange($stream, $start, $end, $count) ?: [];
+ }
+ return $this->getRedis()->xRange($stream, $start, $end) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRevRange(string $stream, string $end = '+', string $start = '-', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRevRange($stream, $end, $start, $count) ?: [];
+ }
+ return $this->getRedis()->xRevRange($stream, $end, $start) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetAdd(string $key, float $score, string $member): int
+ {
+ return $this->getRedis()->zAdd($key, $score, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetPopByScore(string $key, float $min, float $max, int $limit = 100): array
+ {
+ $redis = $this->getRedis();
+
+ // Limit to prevent Lua stack overflow (unpack has ~8000 item limit)
+ if ($limit > 5000) {
+ $limit = 5000;
+ }
+
+ // Use Lua script for atomic pop by score
+ $script = <<<'LUA'
+local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3])
+if #members > 0 then
+ redis.call('ZREM', KEYS[1], unpack(members))
+end
+return members
+LUA;
+
+ $result = $redis->eval($script, [$key, (string)$min, (string)$max, (string)$limit], 1);
+ return $result ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRangeByScore(string $key, float $min, float $max, ?int $limit = null): array
+ {
+ $options = [];
+ if ($limit !== null) {
+ $options['limit'] = [0, $limit];
+ }
+ return $this->getRedis()->zRangeByScore($key, (string)$min, (string)$max, $options) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetSize(string $key): int
+ {
+ return $this->getRedis()->zCard($key);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRemove(string $key, string $member): int
+ {
+ return $this->getRedis()->zRem($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetScore(string $key, string $member): float|false
+ {
+ return $this->getRedis()->zScore($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashSet(string $key, string $field, string $value): bool
+ {
+ return $this->getRedis()->hSet($key, $field, $value) !== false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGet(string $key, string $field): string|false
+ {
+ return $this->getRedis()->hGet($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGetAll(string $key): array
+ {
+ return $this->getRedis()->hGetAll($key) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashDel(string $key, string $field): int
+ {
+ return $this->getRedis()->hDel($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashExists(string $key, string $field): bool
+ {
+ return $this->getRedis()->hExists($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashLen(string $key): int
+ {
+ return $this->getRedis()->hLen($key);
+ }
+}
diff --git a/src/Queue/Connection/RedisStreamCluster.php b/src/Queue/Connection/RedisStreamCluster.php
new file mode 100644
index 0000000..d297708
--- /dev/null
+++ b/src/Queue/Connection/RedisStreamCluster.php
@@ -0,0 +1,422 @@
+getRedis();
+
+ if ($maxLen !== null) {
+ // Use exact MAXLEN (approximate=false) by default for reliable trimming
+ return $redis->xAdd($stream, $id, $fields, $maxLen, $approximate);
+ }
+
+ return $redis->xAdd($stream, $id, $fields);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamCreateGroup(string $stream, string $group, string $id = '0', bool $mkstream = true): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('CREATE', $stream, $group, $id, $mkstream);
+ // phpredis may return false instead of throwing on BUSYGROUP
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ if ($error !== null && str_contains($error, 'BUSYGROUP')) {
+ return true;
+ }
+ return false;
+ }
+ return (bool)$result;
+ } catch (\RedisException $e) {
+ // Group already exists - BUSYGROUP error
+ if (str_contains($e->getMessage(), 'BUSYGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDestroyGroup(string $stream, string $group): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('DESTROY', $stream, $group);
+ // phpredis may return false instead of throwing on errors
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ // Stream doesn't exist or group doesn't exist - treat as success (already destroyed)
+ if ($error !== null && (
+ str_contains($error, 'NOGROUP') ||
+ str_contains($error, 'no such key') ||
+ str_contains($error, 'key to exist')
+ )) {
+ return true;
+ }
+ return false;
+ }
+ // Result of 0 means the group didn't exist - that's fine, it's "destroyed"
+ return true;
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDeleteConsumer(string $stream, string $group, string $consumer): int
+ {
+ try {
+ return $this->getRedis()->xGroup('DELCONSUMER', $stream, $group, $consumer);
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return 0;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamReadGroup(
+ string $group,
+ string $consumer,
+ array $streams,
+ int $count = 1,
+ int $block = 0,
+ bool $noack = false
+ ): array|false {
+ $streamIds = [];
+ foreach ($streams as $stream) {
+ $streamIds[$stream] = '>'; // Read only new messages
+ }
+
+ $result = $this->getRedis()->xReadGroup(
+ $group,
+ $consumer,
+ $streamIds,
+ $count,
+ $block > 0 ? $block : null
+ );
+
+ return $result ?: false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAck(string $stream, string $group, string|array $ids): int
+ {
+ $ids = is_array($ids) ? $ids : [$ids];
+ return $this->getRedis()->xAck($stream, $group, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPendingSummary(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xPending($stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPending(
+ string $stream,
+ string $group,
+ string $start = '-',
+ string $end = '+',
+ int $count = 100,
+ ?string $consumer = null
+ ): array {
+ try {
+ if ($consumer !== null) {
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count, $consumer) ?: [];
+ }
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ array $ids,
+ bool $justId = false
+ ): array {
+ try {
+ $options = $justId ? ['JUSTID'] : [];
+ return $this->getRedis()->xClaim($stream, $group, $consumer, $minIdleTime, $ids, $options) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAutoClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ string $start = '0-0',
+ int $count = 100
+ ): array {
+ try {
+ $result = $this->getRedis()->xAutoClaim($stream, $group, $consumer, $minIdleTime, $start, $count);
+ return $result ?: ['0-0', [], []];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return ['0-0', [], []];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDel(string $stream, array $ids): int
+ {
+ if (empty($ids)) {
+ return 0;
+ }
+ return $this->getRedis()->xDel($stream, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamLen(string $stream): int
+ {
+ return $this->getRedis()->xLen($stream);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamTrim(string $stream, int $maxLen, bool $approximate = true): int
+ {
+ return $this->getRedis()->xTrim($stream, $maxLen, $approximate);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('STREAM', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamGroupInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('GROUPS', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamConsumersInfo(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xInfo('CONSUMERS', $stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRange(string $stream, string $start = '-', string $end = '+', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRange($stream, $start, $end, $count) ?: [];
+ }
+ return $this->getRedis()->xRange($stream, $start, $end) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRevRange(string $stream, string $end = '+', string $start = '-', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRevRange($stream, $end, $start, $count) ?: [];
+ }
+ return $this->getRedis()->xRevRange($stream, $end, $start) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetAdd(string $key, float $score, string $member): int
+ {
+ return $this->getRedis()->zAdd($key, $score, $member);
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * Note: In cluster mode, Lua scripts must use keys from the same hash slot.
+ * This works because we use a single key.
+ */
+ public function sortedSetPopByScore(string $key, float $min, float $max, int $limit = 100): array
+ {
+ $redis = $this->getRedis();
+
+ // Use Lua script for atomic pop by score
+ $script = <<<'LUA'
+local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3])
+if #members > 0 then
+ redis.call('ZREM', KEYS[1], unpack(members))
+end
+return members
+LUA;
+
+ $result = $redis->eval($script, [$key, (string)$min, (string)$max, (string)$limit], 1);
+ return $result ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRangeByScore(string $key, float $min, float $max, ?int $limit = null): array
+ {
+ $options = [];
+ if ($limit !== null) {
+ $options['limit'] = [0, $limit];
+ }
+ return $this->getRedis()->zRangeByScore($key, (string)$min, (string)$max, $options) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetSize(string $key): int
+ {
+ return $this->getRedis()->zCard($key);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRemove(string $key, string $member): int
+ {
+ return $this->getRedis()->zRem($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetScore(string $key, string $member): float|false
+ {
+ return $this->getRedis()->zScore($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashSet(string $key, string $field, string $value): bool
+ {
+ return $this->getRedis()->hSet($key, $field, $value) !== false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGet(string $key, string $field): string|false
+ {
+ return $this->getRedis()->hGet($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGetAll(string $key): array
+ {
+ return $this->getRedis()->hGetAll($key) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashDel(string $key, string $field): int
+ {
+ return $this->getRedis()->hDel($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashExists(string $key, string $field): bool
+ {
+ return $this->getRedis()->hExists($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashLen(string $key): int
+ {
+ return $this->getRedis()->hLen($key);
+ }
+}
diff --git a/src/Queue/Schedule.php b/src/Queue/Schedule.php
new file mode 100644
index 0000000..0d521f5
--- /dev/null
+++ b/src/Queue/Schedule.php
@@ -0,0 +1,278 @@
+isActive()) {
+ return null;
+ }
+
+ $now = time();
+ $baseTime = $lastRun ?? $now;
+
+ // Apply startAt constraint
+ if ($this->startAt !== null && $baseTime < $this->startAt) {
+ $baseTime = $this->startAt;
+ }
+
+ if ($this->cron !== null) {
+ // Cron-based schedule
+ $cronExpression = new CronExpression($this->cron);
+ $nextRun = $cronExpression->getNextRunDate(\DateTime::createFromFormat('U', (string)$baseTime))->getTimestamp();
+ } else {
+ // Interval-based schedule
+ if ($lastRun === null) {
+ // First run - use startAt or now
+ $nextRun = $this->startAt ?? $now;
+ } else {
+ // Subsequent runs - add interval
+ $nextRun = $lastRun + $this->interval;
+
+ // If we've passed the next run time, schedule for next interval from now
+ if ($nextRun < $now) {
+ $elapsed = $now - $lastRun;
+ $intervals = (int)ceil($elapsed / $this->interval);
+ $nextRun = $lastRun + ($intervals * $this->interval);
+ }
+ }
+ }
+
+ // Check endAt constraint
+ if ($this->endAt !== null && $nextRun > $this->endAt) {
+ return null;
+ }
+
+ return $nextRun;
+ }
+
+ /**
+ * Check if the schedule is still active (not exceeded limits).
+ *
+ * @return bool
+ */
+ public function isActive(): bool
+ {
+ if ($this->paused) {
+ return false;
+ }
+
+ // Check max runs
+ if ($this->maxRuns !== null && $this->runCount >= $this->maxRuns) {
+ return false;
+ }
+
+ // Check end time
+ if ($this->endAt !== null && time() > $this->endAt) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Check if the schedule is paused.
+ *
+ * @return bool
+ */
+ public function isPaused(): bool
+ {
+ return $this->paused;
+ }
+
+ /**
+ * Get the current run count.
+ *
+ * @return int
+ */
+ public function getRunCount(): int
+ {
+ return $this->runCount;
+ }
+
+ /**
+ * Increment the run count and return a new instance.
+ *
+ * @return self
+ */
+ public function incrementRunCount(): self
+ {
+ return new self(
+ $this->id,
+ $this->payload,
+ $this->cron,
+ $this->interval,
+ $this->startAt,
+ $this->endAt,
+ $this->maxRuns,
+ $this->runCount + 1,
+ $this->paused
+ );
+ }
+
+ /**
+ * Create a paused copy of this schedule.
+ *
+ * @return self
+ */
+ public function pause(): self
+ {
+ return new self(
+ $this->id,
+ $this->payload,
+ $this->cron,
+ $this->interval,
+ $this->startAt,
+ $this->endAt,
+ $this->maxRuns,
+ $this->runCount,
+ true
+ );
+ }
+
+ /**
+ * Create a resumed copy of this schedule.
+ *
+ * @return self
+ */
+ public function resume(): self
+ {
+ return new self(
+ $this->id,
+ $this->payload,
+ $this->cron,
+ $this->interval,
+ $this->startAt,
+ $this->endAt,
+ $this->maxRuns,
+ $this->runCount,
+ false
+ );
+ }
+
+ /**
+ * Serialize to array for storage.
+ *
+ * @return array
+ */
+ public function toArray(): array
+ {
+ return [
+ 'id' => $this->id,
+ 'payload' => $this->payload,
+ 'cron' => $this->cron,
+ 'interval' => $this->interval,
+ 'startAt' => $this->startAt,
+ 'endAt' => $this->endAt,
+ 'maxRuns' => $this->maxRuns,
+ 'runCount' => $this->runCount,
+ 'paused' => $this->paused,
+ ];
+ }
+
+ /**
+ * Deserialize from array.
+ *
+ * @param array $data
+ * @return self
+ */
+ public static function fromArray(array $data): self
+ {
+ return new self(
+ $data['id'],
+ $data['payload'],
+ $data['cron'] ?? null,
+ $data['interval'] ?? null,
+ $data['startAt'] ?? null,
+ $data['endAt'] ?? null,
+ $data['maxRuns'] ?? null,
+ $data['runCount'] ?? 0,
+ $data['paused'] ?? false,
+ );
+ }
+
+ /**
+ * Get a human-readable description of the schedule.
+ *
+ * @return string
+ */
+ public function getDescription(): string
+ {
+ if ($this->cron !== null) {
+ return "Cron: {$this->cron}";
+ }
+
+ $interval = $this->interval;
+
+ // Try to find the largest clean unit that divides evenly
+ if ($interval >= 86400 && $interval % 86400 === 0) {
+ $days = (int)($interval / 86400);
+ return "Every {$days} day" . ($days !== 1 ? 's' : '');
+ }
+ if ($interval >= 3600 && $interval % 3600 === 0) {
+ $hours = (int)($interval / 3600);
+ return "Every {$hours} hour" . ($hours !== 1 ? 's' : '');
+ }
+ if ($interval >= 60 && $interval % 60 === 0) {
+ $minutes = (int)($interval / 60);
+ return "Every {$minutes} minute" . ($minutes !== 1 ? 's' : '');
+ }
+
+ return "Every {$interval} second" . ($interval !== 1 ? 's' : '');
+ }
+}
diff --git a/src/Queue/StreamConnection.php b/src/Queue/StreamConnection.php
new file mode 100644
index 0000000..ab8d2e1
--- /dev/null
+++ b/src/Queue/StreamConnection.php
@@ -0,0 +1,332 @@
+= 9.0
+ *
+ * @param string $key The key to delete
+ * @param mixed $value The value to compare against the key's value.
+ * @return RedisCluster|int|false Returns 1 if the key was deleted, 0 if it was not.
+ */
+ public function delifeq(string $key, mixed $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::discard()
+ */
+ public function discard(): bool;
+
+ /**
+ * @see \Redis::dump()
+ */
+ public function dump(string $key): RedisCluster|string|false;
+
+ /**
+ * @see \Redis::echo()
+ */
+ public function echo(string|array $key_or_address, string $msg): RedisCluster|string|false;
+
+ /**
+ * @see \Redis::eval()
+ */
+ public function eval(string $script, array $args = [], int $num_keys = 0): mixed;
+
+ /**
+ * @see \Redis::eval_ro()
+ */
+ public function eval_ro(string $script, array $args = [], int $num_keys = 0): mixed;
+
+ /**
+ * @see \Redis::evalsha()
+ */
+ public function evalsha(string $script_sha, array $args = [], int $num_keys = 0): mixed;
+
+ /**
+ * @see \Redis::evalsha_ro()
+ */
+ public function evalsha_ro(string $script_sha, array $args = [], int $num_keys = 0): mixed;
+
+ /**
+ * @see \Redis::exec()
+ */
+ public function exec(): array|false;
+
+ /**
+ * @see \Redis::exists()
+ */
+ public function exists(mixed $key, mixed ...$other_keys): RedisCluster|int|bool;
+
+ /**
+ * @see \Redis::touch()
+ */
+ public function touch(mixed $key, mixed ...$other_keys): RedisCluster|int|bool;
+
+ /**
+ * @see \Redis::expire()
+ */
+ public function expire(string $key, int $timeout, ?string $mode = null): RedisCluster|bool;
+
+ /**
+ * @see \Redis::expireAt()
+ */
+ public function expireat(string $key, int $timestamp, ?string $mode = null): RedisCluster|bool;
+
+ /**
+ * @see \Redis::expiretime()
+ */
+ public function expiretime(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::pexpiretime()
+ */
+ public function pexpiretime(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::flushAll()
+ */
+ public function flushall(string|array $key_or_address, bool $async = false): RedisCluster|bool;
+
+ /**
+ * @see \Redis::flushDB()
+ */
+ public function flushdb(string|array $key_or_address, bool $async = false): RedisCluster|bool;
+
+ /**
+ * @see \Redis::geoadd()
+ */
+ public function geoadd(string $key, float $lng, float $lat, string $member, mixed ...$other_triples_and_options): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::geodist()
+ */
+ public function geodist(string $key, string $src, string $dest, ?string $unit = null): RedisCluster|float|false;
+
+ /**
+ * @see \Redis::geohash()
+ */
+ public function geohash(string $key, string $member, string ...$other_members): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::geopos()
+ */
+ public function geopos(string $key, string $member, string ...$other_members): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::georadius()
+ */
+ public function georadius(string $key, float $lng, float $lat, float $radius, string $unit, array $options = []): mixed;
+
+ /**
+ * @see \Redis::georadius_ro()
+ */
+ public function georadius_ro(string $key, float $lng, float $lat, float $radius, string $unit, array $options = []): mixed;
+
+ /**
+ * @see \Redis::georadiusbymember()
+ */
+ public function georadiusbymember(string $key, string $member, float $radius, string $unit, array $options = []): mixed;
+
+ /**
+ * @see \Redis::georadiusbymember_ro()
+ */
+ public function georadiusbymember_ro(string $key, string $member, float $radius, string $unit, array $options = []): mixed;
+
+ /**
+ * @see https://redis.io/commands/geosearch
+ */
+ public function geosearch(string $key, array|string $position, array|int|float $shape, string $unit, array $options = []): RedisCluster|array;
+
+ /**
+ * @see https://redis.io/commands/geosearchstore
+ */
+ public function geosearchstore(string $dst, string $src, array|string $position, array|int|float $shape, string $unit, array $options = []): RedisCluster|array|int|false;
+
+ /**
+ * @see \Redis::get()
+ */
+ public function get(string $key): mixed;
+
+ /**
+ * @see \Redis::getDel()
+ */
+ public function getdel(string $key): mixed;
+
+ /**
+ * @see \Redis::getWithMeta()
+ */
+ public function getWithMeta(string $key): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::getEx()
+ */
+ public function getex(string $key, array $options = []): RedisCluster|string|false;
+
+ /**
+ * @see \Redis::getBit()
+ */
+ public function getbit(string $key, int $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::getLastError()
+ */
+ public function getlasterror(): string|null;
+
+ /**
+ * @see \Redis::getMode()
+ */
+ public function getmode(): int;
+
+ /**
+ * @see \Redis::getOption()
+ */
+ public function getoption(int $option): mixed;
+
+ /**
+ * @see \Redis::getRange()
+ */
+ public function getrange(string $key, int $start, int $end): RedisCluster|string|false;
+
+ /**
+ * @see \Redis::lcs()
+ */
+ public function lcs(string $key1, string $key2, ?array $options = null): RedisCluster|string|array|int|false;
+
+ /**
+ * @see \Redis::getset()
+ */
+ public function getset(string $key, mixed $value): RedisCluster|string|bool;
+
+ /**
+ * @see \Redis::getTransferredBytes()
+ */
+ public function gettransferredbytes(): array|false;
+
+ /**
+ * @see \Redis::clearTransferredBytes()
+ */
+ public function cleartransferredbytes(): void;
+
+ /**
+ * @see \Redis::hDel()
+ */
+ public function hdel(string $key, string $member, string ...$other_members): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::hExists()
+ */
+ public function hexists(string $key, string $member): RedisCluster|bool;
+
+ /**
+ * @see \Redis::hGet()
+ */
+ public function hget(string $key, string $member): mixed;
+
+ /**
+ * @see \Redis::hGetAll()
+ */
+ public function hgetall(string $key): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hGetWithMeta()
+ */
+ public function hgetWithMeta(string $key, string $member): mixed;
+
+ /**
+ * @see \Redis::hIncrBy()
+ */
+ public function hincrby(string $key, string $member, int $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::hIncrByFloat()
+ */
+ public function hincrbyfloat(string $key, string $member, float $value): RedisCluster|float|false;
+
+ /**
+ * @see \Redis::hKeys()
+ */
+ public function hkeys(string $key): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hLen()
+ */
+ public function hlen(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::hMget()
+ */
+ public function hmget(string $key, array $keys): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hgetex()
+ */
+ public function hgetex(string $key, array $fields, string|array|null $expiry = null): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hsetex()
+ */
+ public function hsetex(string $key, array $fields, ?array $expiry = null): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::hgetdel()
+ */
+ public function hgetdel(string $key, array $fields): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hMset()
+ */
+ public function hmset(string $key, array $key_values): RedisCluster|bool;
+
+ /**
+ * @see \Redis::hscan()
+ */
+ public function hscan(string $key, null|int|string &$iterator, ?string $pattern = null, int $count = 0): array|bool;
+
+ /**
+ * @see \Redis::expiremember()
+ */
+ public function expiremember(string $key, string $field, int $ttl, ?string $unit = null): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::expirememberat()
+ */
+ public function expirememberat(string $key, string $field, int $timestamp): RedisCluster|int|false;
+
+ /**
+ * @see https://redis.io/commands/hrandfield
+ */
+ public function hrandfield(string $key, ?array $options = null): RedisCluster|string|array;
+
+ /**
+ * @see \Redis::hSet()
+ */
+ public function hset(string $key, string $member, mixed $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::hSetNx()
+ */
+ public function hsetnx(string $key, string $member, mixed $value): RedisCluster|bool;
+
+ /**
+ * @see \Redis::hStrLen()
+ */
+ public function hstrlen(string $key, string $field): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::hexpire()
+ */
+ public function hexpire(string $key, int $ttl, array $fields,
+ ?string $mode = NULL): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hpexpire()
+ */
+ public function hpexpire(string $key, int $ttl, array $fields,
+ ?string $mode = NULL): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hexpireat()
+ */
+ public function hexpireat(string $key, int $time, array $fields,
+ ?string $mode = NULL): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hpexpireat()
+ */
+ public function hpexpireat(string $key, int $mstime, array $fields,
+ ?string $mode = NULL): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::httl()
+ */
+ public function httl(string $key, array $fields): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hpttl()
+ */
+ public function hpttl(string $key, array $fields): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hexpiretime()
+ */
+ public function hexpiretime(string $key, array $fields): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hpexpiretime()
+ */
+ public function hpexpiretime(string $key, array $fields): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hpexpiretime()
+ */
+ public function hpersist(string $key, array $fields): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::hVals()
+ */
+ public function hvals(string $key): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::incr()
+ */
+ public function incr(string $key, int $by = 1): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::incrBy()
+ */
+ public function incrby(string $key, int $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::incrByFloat()
+ */
+ public function incrbyfloat(string $key, float $value): RedisCluster|float|false;
+
+ /**
+ * Retrieve information about the connected redis-server. If no arguments are passed to
+ * this function, redis will return every info field. Alternatively you may pass a specific
+ * section you want returned (e.g. 'server', or 'memory') to receive only information pertaining
+ * to that section.
+ *
+ * If connected to Redis server >= 7.0.0 you may pass multiple optional sections.
+ *
+ * @see https://redis.io/commands/info/
+ *
+ * @param string|array $key_or_address Either a key name or array with host and port indicating
+ * which cluster node we want to send the command to.
+ * @param string $sections Optional section(s) you wish Redis server to return.
+ *
+ * @return RedisCluster|array|false
+ */
+ public function info(string|array $key_or_address, string ...$sections): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::keys()
+ */
+ public function keys(string $pattern): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::lastSave()
+ */
+ public function lastsave(string|array $key_or_address): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::lget()
+ */
+ public function lget(string $key, int $index): RedisCluster|string|bool;
+
+ /**
+ * @see \Redis::lindex()
+ */
+ public function lindex(string $key, int $index): mixed;
+
+ /**
+ * @see \Redis::lInsert()
+ */
+ public function linsert(string $key, string $pos, mixed $pivot, mixed $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::lLen()
+ */
+ public function llen(string $key): RedisCluster|int|bool;
+
+ /**
+ * @see \Redis::lPop()
+ */
+ public function lpop(string $key, int $count = 0): RedisCluster|bool|string|array;
+
+ /**
+ * @see \Redis::lPos()
+ */
+ public function lpos(string $key, mixed $value, ?array $options = null): RedisCluster|null|bool|int|array;
+
+ /**
+ * @see \Redis::lPush()
+ */
+ public function lpush(string $key, mixed $value, mixed ...$other_values): RedisCluster|int|bool;
+
+ /**
+ * @see \Redis::lPushx()
+ */
+ public function lpushx(string $key, mixed $value): RedisCluster|int|bool;
+
+ /**
+ * @see \Redis::lrange()
+ */
+ public function lrange(string $key, int $start, int $end): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::lrem()
+ */
+ public function lrem(string $key, mixed $value, int $count = 0): RedisCluster|int|bool;
+
+ /**
+ * @see \Redis::lSet()
+ */
+ public function lset(string $key, int $index, mixed $value): RedisCluster|bool;
+
+ /**
+ * @see \Redis::ltrim()
+ */
+ public function ltrim(string $key, int $start, int $end): RedisCluster|bool;
+
+ /**
+ * @see \Redis::mget()
+ */
+ public function mget(array $keys): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::mset()
+ */
+ public function mset(array $key_values): RedisCluster|bool;
+
+ /**
+ * @see \Redis::msetnx()
+ */
+ public function msetnx(array $key_values): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::msetex()
+ */
+ public function msetex(array $key_vals, int|float|array|null $expiry = null): Redis|int|false;
+
+ /* We only support Redis::MULTI in RedisCluster but take the argument
+ so we can test MULTI..EXEC with RedisTest.php and in the event
+ we add pipeline support in the future. */
+ public function multi(int $value = Redis::MULTI): RedisCluster|bool;
+
+ /**
+ * @see \Redis::object()
+ */
+ public function object(string $subcommand, string $key): RedisCluster|int|string|false;
+
+ /**
+ * @see \Redis::persist()
+ */
+ public function persist(string $key): RedisCluster|bool;
+
+ /**
+ * @see \Redis::pexpire()
+ */
+ public function pexpire(string $key, int $timeout, ?string $mode = null): RedisCluster|bool;
+
+ /**
+ * @see \Redis::pexpireAt()
+ */
+ public function pexpireat(string $key, int $timestamp, ?string $mode = null): RedisCluster|bool;
+
+
+ /**
+ * @see \Redis::pfadd()
+ */
+ public function pfadd(string $key, array $elements): RedisCluster|bool;
+
+ /**
+ * @see \Redis::pfcount()
+ */
+ public function pfcount(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::pfmerge()
+ */
+ public function pfmerge(string $key, array $keys): RedisCluster|bool;
+
+ /**
+ * PING an instance in the redis cluster.
+ *
+ * @see \Redis::ping()
+ *
+ * @param string|array $key_or_address Either a key name or a two element array with host and
+ * address, informing RedisCluster which node to ping.
+ *
+ * @param string|null $message An optional message to send.
+ *
+ * @return mixed This method always returns `true` if no message was sent, and the message itself
+ * if one was.
+ */
+ public function ping(string|array $key_or_address, ?string $message = null): mixed;
+
+ /**
+ * @see \Redis::psetex()
+ */
+ public function psetex(string $key, int $timeout, string $value): RedisCluster|bool;
+
+ /**
+ * @see \Redis::psubscribe()
+ */
+ public function psubscribe(array $patterns, callable $callback): void;
+
+ /**
+ * @see \Redis::pttl()
+ */
+ public function pttl(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::publish()
+ */
+ public function publish(string $channel, string $message): RedisCluster|bool|int;
+
+ /**
+ * @see \Redis::pubsub()
+ */
+ public function pubsub(string|array $key_or_address, string ...$values): mixed;
+
+ /**
+ * @see \Redis::punsubscribe()
+ */
+ public function punsubscribe(string $pattern, string ...$other_patterns): bool|array;
+
+ /**
+ * @see \Redis::randomKey()
+ */
+ public function randomkey(string|array $key_or_address): RedisCluster|bool|string;
+
+ /**
+ * @see \Redis::rawcommand()
+ */
+ public function rawcommand(string|array $key_or_address, string $command, mixed ...$args): mixed;
+
+ /**
+ * @see \Redis::rename()
+ */
+ public function rename(string $key_src, string $key_dst): RedisCluster|bool;
+
+ /**
+ * @see \Redis::renameNx()
+ */
+ public function renamenx(string $key, string $newkey): RedisCluster|bool;
+
+ /**
+ * @see \Redis::restore()
+ */
+ public function restore(string $key, int $timeout, string $value, ?array $options = null): RedisCluster|bool;
+
+ /**
+ * @see \Redis::role()
+ */
+ public function role(string|array $key_or_address): mixed;
+
+ /**
+ * @see \Redis::rPop()
+ */
+ public function rpop(string $key, int $count = 0): RedisCluster|bool|string|array;
+
+ /**
+ * @see \Redis::rpoplpush()
+ */
+ public function rpoplpush(string $src, string $dst): RedisCluster|bool|string;
+
+ /**
+ * @see \Redis::rPush()
+ */
+ public function rpush(string $key, mixed ...$elements): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::rPushx()
+ */
+ public function rpushx(string $key, string $value): RedisCluster|bool|int;
+
+ /**
+ * @see \Redis::sAdd()
+ */
+ public function sadd(string $key, mixed $value, mixed ...$other_values): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::sAddArray()
+ */
+ public function saddarray(string $key, array $values): RedisCluster|bool|int;
+
+ /**
+ * @see \Redis::save()
+ */
+ public function save(string|array $key_or_address): RedisCluster|bool;
+
+ /**
+ * @see \Redis::scan()
+ */
+ public function scan(null|int|string &$iterator, string|array $key_or_address, ?string $pattern = null, int $count = 0): bool|array;
+
+ /**
+ * @see \Redis::scard()
+ */
+ public function scard(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::script()
+ */
+ public function script(string|array $key_or_address, mixed ...$args): mixed;
+
+ /**
+ * @see \Redis::sDiff()
+ */
+ public function sdiff(string $key, string ...$other_keys): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::sDiffStore()
+ */
+ public function sdiffstore(string $dst, string $key, string ...$other_keys): RedisCluster|int|false;
+
+ /**
+ * @see https://redis.io/commands/set
+ */
+ public function set(string $key, mixed $value, mixed $options = null): RedisCluster|string|bool;
+
+ /**
+ * @see \Redis::setBit()
+ */
+ public function setbit(string $key, int $offset, bool $onoff): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::setex()
+ */
+ public function setex(string $key, int $expire, mixed $value): RedisCluster|bool;
+
+ /**
+ * @see \Redis::setnx()
+ */
+ public function setnx(string $key, mixed $value): RedisCluster|bool;
+
+ /**
+ * @see \Redis::setOption()
+ */
+ public function setoption(int $option, mixed $value): bool;
+
+ /**
+ * @see \Redis::setRange()
+ */
+ public function setrange(string $key, int $offset, string $value): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::sInter()
+ */
+ public function sinter(array|string $key, string ...$other_keys): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::sintercard()
+ */
+ public function sintercard(array $keys, int $limit = -1): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::sInterStore()
+ */
+ public function sinterstore(array|string $key, string ...$other_keys): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::sismember()
+ */
+ public function sismember(string $key, mixed $value): RedisCluster|bool;
+
+ /**
+ * @see \Redis::sMisMember()
+ */
+ public function smismember(string $key, string $member, string ...$other_members): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::slowlog()
+ */
+ public function slowlog(string|array $key_or_address, mixed ...$args): mixed;
+
+ /**
+ * @see \Redis::sMembers()
+ */
+ public function smembers(string $key): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::sMove()
+ */
+ public function smove(string $src, string $dst, string $member): RedisCluster|bool;
+
+ /**
+ * @see \Redis::sort()
+ */
+ public function sort(string $key, ?array $options = null): RedisCluster|array|bool|int|string;
+
+ /**
+ * @see \Redis::sort_ro()
+ */
+ public function sort_ro(string $key, ?array $options = null): RedisCluster|array|bool|int|string;
+
+ /**
+ * @see \Redis::sPop()
+ */
+ public function spop(string $key, int $count = 0): RedisCluster|string|array|false;
+
+ /**
+ * @see \Redis::sRandMember()
+ */
+ public function srandmember(string $key, int $count = 0): RedisCluster|string|array|false;
+
+ /**
+ * @see \Redis::srem()
+ */
+ public function srem(string $key, mixed $value, mixed ...$other_values): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::sscan()
+ */
+ public function sscan(string $key, null|int|string &$iterator, ?string $pattern = null, int $count = 0): array|false;
+
+ /**
+ * @see \Redis::strlen()
+ */
+ public function strlen(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::subscribe()
+ */
+ public function subscribe(array $channels, callable $cb): void;
+
+ /**
+ * @see \Redis::sUnion()
+ */
+ public function sunion(string $key, string ...$other_keys): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::sUnionStore()
+ */
+ public function sunionstore(string $dst, string $key, string ...$other_keys): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::time()
+ */
+ public function time(string|array $key_or_address): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::ttl()
+ */
+ public function ttl(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::type()
+ */
+ public function type(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::unsubscribe()
+ */
+ public function unsubscribe(array $channels): bool|array;
+
+ /**
+ * @see \Redis::unlink()
+ */
+ public function unlink(array|string $key, string ...$other_keys): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::unwatch()
+ */
+ public function unwatch(): bool;
+
+ /**
+ * @see \Redis::watch()
+ */
+ public function watch(string $key, string ...$other_keys): RedisCluster|bool;
+
+ /**
+ * @see \Redis::vadd()
+ */
+ public function vadd(string $key, array $values, mixed $element, array|null $options = null): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::vsim()
+ */
+ public function vsim(string $key, mixed $member, array|null $options = null): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::vcard()
+ */
+ public function vcard(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::vdim()
+ */
+ public function vdim(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::vinfo()
+ */
+ public function vinfo(string $key): RedisCluster|array|false;
+
+ /**
+ * Check if an element is a member of a vectorset
+ *
+ * @param string $key The vector set to query.
+ * @param mixed $member The member to check for.
+ *
+ * @return RedisCluster|bool true if the member exists, false if it does not.
+ */
+ public function vismember(string $key, mixed $member): RedisCluster|bool;
+
+ /**
+ * @see \Redis::vemb()
+ */
+ public function vemb(string $key, mixed $member, bool $raw = false): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::vrandmember()
+ */
+ public function vrandmember(string $key, int $count = 0): RedisCluster|array|string|false;
+
+ /**
+ * Retreive a lexographical range of elements from a vector set
+ *
+ * @param string $key The vector set to query.
+ * @param string $min The minimum element to return.
+ * @param string $max The maximum element to return.
+ * @param int $count An optional maximum number of elements to return.
+ *
+ * @return RedisCluster|array|false An array of elements in the specified range.`
+ */
+ public function vrange(string $key, string $min, string $max, int $count = -1): RedisCluster|array|false;
+
+
+ /**
+ * @see \Redis::vrem()
+ */
+ public function vrem(string $key, mixed $member): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::vlinks()
+ */
+ public function vlinks(string $key, mixed $member, bool $withscores = false): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::vgetattr()
+ */
+ public function vgetattr(string $key, mixed $member, bool $decode = true): RedisCluster|array|string|false;
+
+ /**
+ * @see \Redis::vsetattr()
+ */
+ public function vsetattr(string $key, mixed $member, array|string $attributes): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::gcra()
+ */
+ public function gcra(string $key, int $maxBurst, int $requestsPerPeriod,
+ int $period, int $tokens = 0): RedisCluster|array|false;
+
+
+ /**
+ * @see \Redis::xack()
+ */
+ public function xack(string $key, string $group, array $ids): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::xadd()
+ */
+ public function xadd(string $key, string $id, array $values, int $maxlen = 0, bool $approx = false): RedisCluster|string|false;
+
+ /**
+ * @see \Redis::xclaim()
+ */
+ public function xclaim(string $key, string $group, string $consumer, int $min_iddle, array $ids, array $options): RedisCluster|string|array|false;
+
+ /**
+ * @see \Redis::xdel()
+ */
+ public function xdel(string $key, array $ids): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::xdelex()
+ */
+ public function xdelex(string $key, array $ids, ?string $mode = null): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::xgroup()
+ */
+ public function xgroup(string $operation, ?string $key = null, ?string $group = null, ?string $id_or_consumer = null,
+ bool $mkstream = false, int $entries_read = -2): mixed;
+
+ /**
+ * @see \Redis::xautoclaim()
+ */
+ public function xautoclaim(string $key, string $group, string $consumer, int $min_idle, string $start, int $count = -1, bool $justid = false): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::xinfo()
+ */
+ public function xinfo(string $operation, ?string $arg1 = null, ?string $arg2 = null, int $count = -1): mixed;
+
+ /**
+ * @see \Redis::xlen()
+ */
+ public function xlen(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::xpending()
+ */
+ public function xpending(string $key, string $group, ?string $start = null, ?string $end = null, int $count = -1, ?string $consumer = null): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::xrange()
+ */
+ public function xrange(string $key, string $start, string $end, int $count = -1): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::xread()
+ */
+ public function xread(array $streams, int $count = -1, int $block = -1): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::xreadgroup()
+ */
+ public function xreadgroup(string $group, string $consumer, array $streams, int $count = 1, int $block = 1): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::xrevrange()
+ */
+ public function xrevrange(string $key, string $start, string $end, int $count = -1): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::xtrim()
+ */
+ public function xtrim(string $key, int $maxlen, bool $approx = false, bool $minid = false, int $limit = -1): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zAdd()
+ */
+ public function zadd(string $key, array|float $score_or_options, mixed ...$more_scores_and_mems): RedisCluster|int|float|false;
+
+ /**
+ * @see \Redis::zCard()
+ */
+ public function zcard(string $key): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zCount()
+ */
+ public function zcount(string $key, string $start, string $end): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zIncrBy()
+ */
+ public function zincrby(string $key, float $value, string $member): RedisCluster|float|false;
+
+ /**
+ * @see \Redis::zinterstore()
+ */
+ public function zinterstore(string $dst, array $keys, ?array $weights = null, ?string $aggregate = null): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zintercard()
+ */
+ public function zintercard(array $keys, int $limit = -1): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zLexCount()
+ */
+ public function zlexcount(string $key, string $min, string $max): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zPopMax()
+ */
+ public function zpopmax(string $key, ?int $value = null): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::zPopMin()
+ */
+ public function zpopmin(string $key, ?int $value = null): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::zRange()
+ */
+ public function zrange(string $key, mixed $start, mixed $end, array|bool|null $options = null): RedisCluster|array|bool;
+
+ /**
+ * @see \Redis::zrangestore()
+ */
+ public function zrangestore(string $dstkey, string $srckey, int $start, int $end,
+ array|bool|null $options = null): RedisCluster|int|false;
+
+ /**
+ * @see https://redis.io/commands/zrandmember
+ */
+ public function zrandmember(string $key, ?array $options = null): RedisCluster|string|array;
+
+ /**
+ * @see \Redis::zRangeByLex()
+ */
+ public function zrangebylex(string $key, string $min, string $max, int $offset = -1, int $count = -1): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::zRangeByScore()
+ */
+ public function zrangebyscore(string $key, string $start, string $end, array $options = []): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::zRank()
+ */
+ public function zrank(string $key, mixed $member): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zRem()
+ */
+ public function zrem(string $key, string $value, string ...$other_values): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zRemRangeByLex()
+ */
+ public function zremrangebylex(string $key, string $min, string $max): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zRemRangeByRank()
+ */
+ public function zremrangebyrank(string $key, string $min, string $max): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zRemRangeByScore()
+ */
+ public function zremrangebyscore(string $key, string $min, string $max): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zRevRange()
+ */
+ public function zrevrange(string $key, string $min, string $max, ?array $options = null): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::zRevRangeByLex()
+ */
+ public function zrevrangebylex(string $key, string $min, string $max, ?array $options = null): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::zRevRangeByScore()
+ */
+ public function zrevrangebyscore(string $key, string $min, string $max, ?array $options = null): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::zRevRank()
+ */
+ public function zrevrank(string $key, mixed $member): RedisCluster|int|false;
+
+ /**
+ * @see \Redis::zscan()
+ */
+ public function zscan(string $key, null|int|string &$iterator, ?string $pattern = null, int $count = 0): RedisCluster|bool|array;
+
+ /**
+ * @see \Redis::zScore()
+ */
+ public function zscore(string $key, mixed $member): RedisCluster|float|false;
+
+ /**
+ * @see https://redis.io/commands/zmscore
+ */
+ public function zmscore(string $key, mixed $member, mixed ...$other_members): RedisCluster|array|false;
+
+ /**
+ * @see \Redis::zunionstore()
+ */
+ public function zunionstore(string $dst, array $keys, ?array $weights = null, ?string $aggregate = null): RedisCluster|int|false;
+
+ /**
+ * @see https://redis.io/commands/zinter
+ */
+ public function zinter(array $keys, ?array $weights = null, ?array $options = null): RedisCluster|array|false;
+
+ /**
+ * @see https://redis.io/commands/zdiffstore
+ */
+ public function zdiffstore(string $dst, array $keys): RedisCluster|int|false;
+
+ /**
+ * @see https://redis.io/commands/zunion
+ */
+ public function zunion(array $keys, ?array $weights = null, ?array $options = null): RedisCluster|array|false;
+
+ /**
+ * @see https://redis.io/commands/zdiff
+ */
+ public function zdiff(array $keys, ?array $options = null): RedisCluster|array|false;
+
+ /**
+ * @see https://redis.io/commands/digest
+ */
+ public function digest(string $key): RedisCluster|string|false;
+}
+
+class RedisClusterException extends RuntimeException {}
diff --git a/tests/Queue/E2E/Adapter/SwooleRedisStreamsTest.php b/tests/Queue/E2E/Adapter/SwooleRedisStreamsTest.php
new file mode 100644
index 0000000..a713880
--- /dev/null
+++ b/tests/Queue/E2E/Adapter/SwooleRedisStreamsTest.php
@@ -0,0 +1,303 @@
+getPublisher();
+
+ // Enqueue a delayed job
+ $result = $publisher->enqueueDelayed($this->getQueue(), [
+ 'type' => 'test_string',
+ 'value' => 'delayed job'
+ ], 1);
+
+ $this->assertTrue($result);
+
+ // Check delayed count
+ $delayedCount = $publisher->getDelayedCount($this->getQueue());
+ $this->assertGreaterThanOrEqual(1, $delayedCount);
+
+ // Wait for the job to be processed
+ // Worker's consume loop has a 2s block timeout + 1s delayed check interval
+ sleep(5);
+
+ // Delayed count should be back to 0
+ $delayedCount = $publisher->getDelayedCount($this->getQueue());
+ $this->assertEquals(0, $delayedCount);
+ }
+
+ /**
+ * Test scheduled job enqueueing.
+ */
+ public function testScheduledJobs(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue a job scheduled for 2 seconds from now
+ $result = $publisher->enqueueAt($this->getQueue(), [
+ 'type' => 'test_string',
+ 'value' => 'scheduled job'
+ ], time() + 2);
+
+ $this->assertTrue($result);
+
+ sleep(3);
+ }
+
+ /**
+ * Test stream observability.
+ */
+ public function testObservability(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue a job first
+ $publisher->enqueue($this->getQueue(), [
+ 'type' => 'test_string',
+ 'value' => 'observability test'
+ ]);
+
+ sleep(1);
+
+ // Test getStreamInfo
+ $info = $publisher->getStreamInfo($this->getQueue());
+ $this->assertIsArray($info);
+
+ // Test getGroupInfo
+ $groupInfo = $publisher->getGroupInfo($this->getQueue());
+ $this->assertIsArray($groupInfo);
+
+ // Test getConsumersInfo
+ $consumers = $publisher->getConsumersInfo($this->getQueue());
+ $this->assertIsArray($consumers);
+
+ // Test getQueueSize
+ $size = $publisher->getQueueSize($this->getQueue());
+ $this->assertIsInt($size);
+
+ // Test getLag
+ $lag = $publisher->getLag($this->getQueue());
+ $this->assertIsInt($lag);
+ }
+
+ /**
+ * Test message history/replay functionality.
+ */
+ public function testMessageHistory(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue several jobs
+ for ($i = 0; $i < 5; $i++) {
+ $publisher->enqueue($this->getQueue(), [
+ 'type' => 'test_number',
+ 'value' => $i
+ ]);
+ }
+
+ sleep(1);
+
+ // Get message history
+ $messages = $publisher->getMessages($this->getQueue(), '-', '+', 10);
+ $this->assertIsArray($messages);
+ }
+
+ /**
+ * Test schedule management.
+ */
+ public function testScheduleManagement(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $schedule = new \Utopia\Queue\Schedule(
+ id: 'e2e-test-schedule',
+ payload: ['type' => 'test_string', 'value' => 'scheduled'],
+ interval: 300
+ );
+
+ // Create schedule
+ $result = $publisher->schedule($this->getQueue(), $schedule);
+ $this->assertTrue($result);
+
+ // Retrieve schedule
+ $retrieved = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertNotNull($retrieved);
+ $this->assertEquals('e2e-test-schedule', $retrieved->id);
+
+ // Pause schedule
+ $result = $publisher->pauseSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($result);
+
+ $paused = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($paused->isPaused());
+
+ // Resume schedule
+ $result = $publisher->resumeSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($result);
+
+ $resumed = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertFalse($resumed->isPaused());
+
+ // List schedules
+ $schedules = $publisher->getSchedules($this->getQueue());
+ $this->assertArrayHasKey('e2e-test-schedule', $schedules);
+
+ // Remove schedule
+ $result = $publisher->unschedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($result);
+
+ $deleted = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertNull($deleted);
+ }
+
+ /**
+ * Test cron schedule.
+ */
+ public function testCronSchedule(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $schedule = new \Utopia\Queue\Schedule(
+ id: 'e2e-cron-schedule',
+ payload: ['type' => 'test_string', 'value' => 'cron job'],
+ cron: '*/5 * * * *'
+ );
+
+ $result = $publisher->schedule($this->getQueue(), $schedule);
+ $this->assertTrue($result);
+
+ $retrieved = $publisher->getSchedule($this->getQueue(), 'e2e-cron-schedule');
+ $this->assertEquals('*/5 * * * *', $retrieved->cron);
+
+ // Cleanup
+ $publisher->unschedule($this->getQueue(), 'e2e-cron-schedule');
+ }
+
+ /**
+ * Test stream trimming.
+ */
+ public function testStreamTrimming(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue many messages
+ for ($i = 0; $i < 20; $i++) {
+ $publisher->enqueue($this->getQueue(), [
+ 'type' => 'test_number',
+ 'value' => $i
+ ]);
+ }
+
+ sleep(1);
+
+ // Trim the stream
+ $trimmed = $publisher->trimStream($this->getQueue(), 5);
+ $this->assertGreaterThan(0, $trimmed);
+ }
+
+ /**
+ * Test pending count.
+ */
+ public function testPendingCount(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $pending = $publisher->getPendingCount($this->getQueue());
+ $this->assertIsInt($pending);
+ $this->assertGreaterThanOrEqual(0, $pending);
+ }
+
+ /**
+ * Test queue size with failed jobs.
+ */
+ public function testQueueSizeWithFailedJobs(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $failedSize = $publisher->getQueueSize($this->getQueue(), true);
+ $this->assertIsInt($failedSize);
+ $this->assertGreaterThanOrEqual(0, $failedSize);
+ }
+
+ /**
+ * Test consumer ID management.
+ */
+ public function testConsumerIdManagement(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Get default consumer ID
+ $defaultId = $publisher->getConsumerId();
+ $this->assertStringStartsWith('worker-', $defaultId);
+
+ // Set custom consumer ID
+ $publisher->setConsumerId('e2e-test-consumer');
+ $this->assertEquals('e2e-test-consumer', $publisher->getConsumerId());
+ }
+
+ /**
+ * Test various payload types are preserved.
+ */
+ public function testPayloadTypes(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Test complex nested payload
+ $complexPayload = [
+ 'type' => 'test_assoc',
+ 'value' => [
+ 'string' => 'test',
+ 'number' => 123,
+ 'float' => 1.23,
+ 'bool' => true,
+ 'null' => null,
+ 'array' => [1, 2, 3],
+ 'nested' => [
+ 'deep' => 'value'
+ ]
+ ]
+ ];
+
+ $result = $publisher->enqueue($this->getQueue(), $complexPayload);
+ $this->assertTrue($result);
+
+ sleep(1);
+
+ // Verify messages can be retrieved
+ $messages = $publisher->getMessages($this->getQueue(), '-', '+', 1);
+ $this->assertNotEmpty($messages);
+ }
+}
diff --git a/tests/Queue/Unit/RedisStreamConnectionTest.php b/tests/Queue/Unit/RedisStreamConnectionTest.php
new file mode 100644
index 0000000..a809b56
--- /dev/null
+++ b/tests/Queue/Unit/RedisStreamConnectionTest.php
@@ -0,0 +1,507 @@
+connection = new RedisStream('redis', 6379);
+ $this->testPrefix = 'test-' . uniqid() . '-';
+ }
+
+ protected function tearDown(): void
+ {
+ // Clean up test keys
+ $this->cleanupTestKeys();
+ $this->connection->close();
+ }
+
+ private function cleanupTestKeys(): void
+ {
+ $redis = new \Redis();
+ $redis->connect('redis', 6379);
+
+ $keys = $redis->keys($this->testPrefix . '*');
+ if (!empty($keys)) {
+ $redis->del($keys);
+ }
+
+ $redis->close();
+ }
+
+
+ public function testStreamAdd(): void
+ {
+ $stream = $this->testPrefix . 'stream';
+
+ $id = $this->connection->streamAdd($stream, ['field1' => 'value1', 'field2' => 'value2']);
+
+ $this->assertIsString($id);
+ $this->assertMatchesRegularExpression('/^\d+-\d+$/', $id);
+ }
+
+ public function testStreamAddWithMaxLen(): void
+ {
+ $stream = $this->testPrefix . 'stream-maxlen';
+
+ // Add 10 entries with maxlen of 5 (exact trimming by default)
+ for ($i = 0; $i < 10; $i++) {
+ $this->connection->streamAdd($stream, ['index' => (string)$i], '*', 5);
+ }
+
+ $len = $this->connection->streamLen($stream);
+ // With exact trimming (default), should have exactly 5
+ $this->assertEquals(5, $len);
+ }
+
+ public function testStreamCreateGroup(): void
+ {
+ $stream = $this->testPrefix . 'stream-group';
+ $group = 'test-group';
+
+ // Create group (also creates stream with MKSTREAM)
+ $result = $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->assertTrue($result);
+
+ // Creating same group again should return true (BUSYGROUP handled)
+ $result = $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->assertTrue($result);
+ }
+
+ public function testStreamDestroyGroup(): void
+ {
+ $stream = $this->testPrefix . 'stream-destroy';
+ $group = 'test-group';
+
+ // Create and then destroy
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $result = $this->connection->streamDestroyGroup($stream, $group);
+ $this->assertTrue($result);
+
+ // Destroying non-existent group should return true (NOGROUP handled)
+ $result = $this->connection->streamDestroyGroup($stream, 'non-existent');
+ $this->assertTrue($result);
+ }
+
+ public function testStreamReadGroupAndAck(): void
+ {
+ $stream = $this->testPrefix . 'stream-read';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ // Create group and add message
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $messageId = $this->connection->streamAdd($stream, ['data' => 'test-message']);
+
+ // Read message
+ $result = $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ $this->assertIsArray($result);
+ $this->assertArrayHasKey($stream, $result);
+ $this->assertNotEmpty($result[$stream]);
+
+ // Get the entry ID from result
+ $entryId = array_key_first($result[$stream]);
+
+ // Acknowledge
+ $ackCount = $this->connection->streamAck($stream, $group, $entryId);
+ $this->assertEquals(1, $ackCount);
+ }
+
+ public function testStreamPendingSummary(): void
+ {
+ $stream = $this->testPrefix . 'stream-pending';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'message1']);
+ $this->connection->streamAdd($stream, ['data' => 'message2']);
+
+ // Read without acknowledging
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 2, 100);
+
+ // Check pending
+ $pending = $this->connection->streamPendingSummary($stream, $group);
+
+ $this->assertIsArray($pending);
+ $this->assertEquals(2, $pending[0]); // 2 pending messages
+ }
+
+ public function testStreamPendingDetails(): void
+ {
+ $stream = $this->testPrefix . 'stream-pending-detail';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'message1']);
+
+ // Read without acknowledging
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ // Get pending details
+ $pending = $this->connection->streamPending($stream, $group, '-', '+', 10);
+
+ $this->assertIsArray($pending);
+ $this->assertCount(1, $pending);
+ $this->assertEquals($consumer, $pending[0][1]); // Consumer name
+ }
+
+ public function testStreamClaim(): void
+ {
+ $stream = $this->testPrefix . 'stream-claim';
+ $group = 'test-group';
+ $consumer1 = 'consumer-1';
+ $consumer2 = 'consumer-2';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $messageId = $this->connection->streamAdd($stream, ['data' => 'claim-test']);
+
+ // Consumer 1 reads
+ $this->connection->streamReadGroup($group, $consumer1, [$stream], 1, 100);
+
+ // Consumer 2 claims (with 0 idle time for testing)
+ $claimed = $this->connection->streamClaim($stream, $group, $consumer2, 0, [$messageId]);
+
+ $this->assertIsArray($claimed);
+ $this->assertNotEmpty($claimed);
+ }
+
+ public function testStreamAutoClaim(): void
+ {
+ $stream = $this->testPrefix . 'stream-autoclaim';
+ $group = 'test-group';
+ $consumer1 = 'consumer-1';
+ $consumer2 = 'consumer-2';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'autoclaim-test']);
+
+ // Consumer 1 reads
+ $this->connection->streamReadGroup($group, $consumer1, [$stream], 1, 100);
+
+ // Consumer 2 auto-claims (with 0 idle time for testing)
+ $result = $this->connection->streamAutoClaim($stream, $group, $consumer2, 0, '0-0', 10);
+
+ $this->assertIsArray($result);
+ $this->assertCount(3, $result); // [next_id, claimed_entries, deleted_ids]
+ }
+
+ public function testStreamDel(): void
+ {
+ $stream = $this->testPrefix . 'stream-del';
+
+ $id1 = $this->connection->streamAdd($stream, ['data' => 'message1']);
+ $id2 = $this->connection->streamAdd($stream, ['data' => 'message2']);
+
+ $this->assertEquals(2, $this->connection->streamLen($stream));
+
+ $deleted = $this->connection->streamDel($stream, [$id1]);
+ $this->assertEquals(1, $deleted);
+ $this->assertEquals(1, $this->connection->streamLen($stream));
+ }
+
+ public function testStreamLen(): void
+ {
+ $stream = $this->testPrefix . 'stream-len';
+
+ $this->assertEquals(0, $this->connection->streamLen($stream));
+
+ $this->connection->streamAdd($stream, ['data' => '1']);
+ $this->connection->streamAdd($stream, ['data' => '2']);
+ $this->connection->streamAdd($stream, ['data' => '3']);
+
+ $this->assertEquals(3, $this->connection->streamLen($stream));
+ }
+
+ public function testStreamTrim(): void
+ {
+ $stream = $this->testPrefix . 'stream-trim';
+
+ // Add 10 entries
+ for ($i = 0; $i < 10; $i++) {
+ $this->connection->streamAdd($stream, ['index' => (string)$i]);
+ }
+
+ $this->assertEquals(10, $this->connection->streamLen($stream));
+
+ // Trim to 5
+ $trimmed = $this->connection->streamTrim($stream, 5, false);
+ $this->assertEquals(5, $trimmed);
+ $this->assertEquals(5, $this->connection->streamLen($stream));
+ }
+
+ public function testStreamInfo(): void
+ {
+ $stream = $this->testPrefix . 'stream-info';
+
+ $this->connection->streamAdd($stream, ['data' => 'test']);
+
+ $info = $this->connection->streamInfo($stream);
+
+ $this->assertIsArray($info);
+ $this->assertArrayHasKey('length', $info);
+ $this->assertEquals(1, $info['length']);
+ }
+
+ public function testStreamGroupInfo(): void
+ {
+ $stream = $this->testPrefix . 'stream-group-info';
+ $group = 'test-group';
+
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+
+ $groups = $this->connection->streamGroupInfo($stream);
+
+ $this->assertIsArray($groups);
+ $this->assertCount(1, $groups);
+ $this->assertEquals($group, $groups[0]['name']);
+ }
+
+ public function testStreamConsumersInfo(): void
+ {
+ $stream = $this->testPrefix . 'stream-consumers-info';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'test']);
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ $consumers = $this->connection->streamConsumersInfo($stream, $group);
+
+ $this->assertIsArray($consumers);
+ $this->assertCount(1, $consumers);
+ $this->assertEquals($consumer, $consumers[0]['name']);
+ }
+
+ public function testStreamRange(): void
+ {
+ $stream = $this->testPrefix . 'stream-range';
+
+ $id1 = $this->connection->streamAdd($stream, ['index' => '1']);
+ $id2 = $this->connection->streamAdd($stream, ['index' => '2']);
+ $id3 = $this->connection->streamAdd($stream, ['index' => '3']);
+
+ // Get all
+ $entries = $this->connection->streamRange($stream, '-', '+');
+ $this->assertCount(3, $entries);
+
+ // Get with count
+ $entries = $this->connection->streamRange($stream, '-', '+', 2);
+ $this->assertCount(2, $entries);
+
+ // Get specific range
+ $entries = $this->connection->streamRange($stream, $id2, $id2);
+ $this->assertCount(1, $entries);
+ $this->assertEquals('2', $entries[$id2]['index']);
+ }
+
+ public function testStreamRevRange(): void
+ {
+ $stream = $this->testPrefix . 'stream-revrange';
+
+ $this->connection->streamAdd($stream, ['index' => '1']);
+ $this->connection->streamAdd($stream, ['index' => '2']);
+ $this->connection->streamAdd($stream, ['index' => '3']);
+
+ $entries = $this->connection->streamRevRange($stream, '+', '-', 2);
+
+ $this->assertCount(2, $entries);
+ // First entry should be the latest (index 3)
+ $firstEntry = reset($entries);
+ $this->assertEquals('3', $firstEntry['index']);
+ }
+
+ public function testStreamDeleteConsumer(): void
+ {
+ $stream = $this->testPrefix . 'stream-del-consumer';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'test']);
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ // Delete consumer
+ $pending = $this->connection->streamDeleteConsumer($stream, $group, $consumer);
+ $this->assertIsInt($pending);
+
+ // Consumer should be gone
+ $consumers = $this->connection->streamConsumersInfo($stream, $group);
+ $this->assertEmpty($consumers);
+ }
+
+
+ public function testSortedSetAdd(): void
+ {
+ $key = $this->testPrefix . 'zset-add';
+
+ $result = $this->connection->sortedSetAdd($key, 1.0, 'member1');
+ $this->assertEquals(1, $result);
+
+ // Adding same member updates score, returns 0
+ $result = $this->connection->sortedSetAdd($key, 2.0, 'member1');
+ $this->assertEquals(0, $result);
+ }
+
+ public function testSortedSetPopByScore(): void
+ {
+ $key = $this->testPrefix . 'zset-pop';
+
+ $this->connection->sortedSetAdd($key, 100, 'a');
+ $this->connection->sortedSetAdd($key, 200, 'b');
+ $this->connection->sortedSetAdd($key, 300, 'c');
+ $this->connection->sortedSetAdd($key, 400, 'd');
+
+ // Pop scores 0-250
+ $popped = $this->connection->sortedSetPopByScore($key, 0, 250, 10);
+
+ $this->assertCount(2, $popped);
+ $this->assertContains('a', $popped);
+ $this->assertContains('b', $popped);
+
+ // Verify they're removed
+ $this->assertEquals(2, $this->connection->sortedSetSize($key));
+ }
+
+ public function testSortedSetRangeByScore(): void
+ {
+ $key = $this->testPrefix . 'zset-range';
+
+ $this->connection->sortedSetAdd($key, 100, 'a');
+ $this->connection->sortedSetAdd($key, 200, 'b');
+ $this->connection->sortedSetAdd($key, 300, 'c');
+
+ $members = $this->connection->sortedSetRangeByScore($key, 150, 350);
+
+ $this->assertCount(2, $members);
+ $this->assertContains('b', $members);
+ $this->assertContains('c', $members);
+ }
+
+ public function testSortedSetSize(): void
+ {
+ $key = $this->testPrefix . 'zset-size';
+
+ $this->assertEquals(0, $this->connection->sortedSetSize($key));
+
+ $this->connection->sortedSetAdd($key, 1, 'a');
+ $this->connection->sortedSetAdd($key, 2, 'b');
+
+ $this->assertEquals(2, $this->connection->sortedSetSize($key));
+ }
+
+ public function testSortedSetRemove(): void
+ {
+ $key = $this->testPrefix . 'zset-remove';
+
+ $this->connection->sortedSetAdd($key, 1, 'member');
+
+ $result = $this->connection->sortedSetRemove($key, 'member');
+ $this->assertEquals(1, $result);
+
+ $result = $this->connection->sortedSetRemove($key, 'non-existent');
+ $this->assertEquals(0, $result);
+ }
+
+ public function testSortedSetScore(): void
+ {
+ $key = $this->testPrefix . 'zset-score';
+
+ $this->connection->sortedSetAdd($key, 123.456, 'member');
+
+ $score = $this->connection->sortedSetScore($key, 'member');
+ $this->assertEquals(123.456, $score);
+
+ $score = $this->connection->sortedSetScore($key, 'non-existent');
+ $this->assertFalse($score);
+ }
+
+
+ public function testHashSet(): void
+ {
+ $key = $this->testPrefix . 'hash-set';
+
+ $result = $this->connection->hashSet($key, 'field1', 'value1');
+ $this->assertTrue($result);
+ }
+
+ public function testHashGet(): void
+ {
+ $key = $this->testPrefix . 'hash-get';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+
+ $value = $this->connection->hashGet($key, 'field1');
+ $this->assertEquals('value1', $value);
+
+ $value = $this->connection->hashGet($key, 'non-existent');
+ $this->assertFalse($value);
+ }
+
+ public function testHashGetAll(): void
+ {
+ $key = $this->testPrefix . 'hash-getall';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+ $this->connection->hashSet($key, 'field2', 'value2');
+
+ $all = $this->connection->hashGetAll($key);
+
+ $this->assertEquals(['field1' => 'value1', 'field2' => 'value2'], $all);
+ }
+
+ public function testHashDel(): void
+ {
+ $key = $this->testPrefix . 'hash-del';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+
+ $result = $this->connection->hashDel($key, 'field1');
+ $this->assertEquals(1, $result);
+
+ $result = $this->connection->hashDel($key, 'non-existent');
+ $this->assertEquals(0, $result);
+ }
+
+ public function testHashExists(): void
+ {
+ $key = $this->testPrefix . 'hash-exists';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+
+ $this->assertTrue($this->connection->hashExists($key, 'field1'));
+ $this->assertFalse($this->connection->hashExists($key, 'non-existent'));
+ }
+
+ public function testHashLen(): void
+ {
+ $key = $this->testPrefix . 'hash-len';
+
+ $this->assertEquals(0, $this->connection->hashLen($key));
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+ $this->connection->hashSet($key, 'field2', 'value2');
+
+ $this->assertEquals(2, $this->connection->hashLen($key));
+ }
+}
diff --git a/tests/Queue/Unit/RedisStreamsBrokerTest.php b/tests/Queue/Unit/RedisStreamsBrokerTest.php
new file mode 100644
index 0000000..a16c107
--- /dev/null
+++ b/tests/Queue/Unit/RedisStreamsBrokerTest.php
@@ -0,0 +1,429 @@
+connection = new RedisStream('redis', 6379);
+ $this->broker = new RedisStreams($this->connection, 1000, 3, 1000);
+ $this->testNamespace = 'test-' . uniqid();
+ $this->queue = new Queue('test-queue', $this->testNamespace);
+ }
+
+ protected function tearDown(): void
+ {
+ // Clean up test keys
+ $this->cleanupTestKeys();
+ $this->connection->close();
+ }
+
+ private function cleanupTestKeys(): void
+ {
+ $redis = new \Redis();
+ $redis->connect('redis', 6379);
+
+ $keys = $redis->keys($this->testNamespace . '*');
+ if (!empty($keys)) {
+ $redis->del($keys);
+ }
+
+ $redis->close();
+ }
+
+
+ public function testEnqueue(): void
+ {
+ $result = $this->broker->enqueue($this->queue, ['task' => 'test', 'data' => 123]);
+
+ $this->assertTrue($result);
+ $this->assertGreaterThan(0, $this->broker->getQueueSize($this->queue));
+ }
+
+ public function testEnqueueMultiple(): void
+ {
+ for ($i = 0; $i < 5; $i++) {
+ $this->broker->enqueue($this->queue, ['index' => $i]);
+ }
+
+ $this->assertEquals(5, $this->broker->getQueueSize($this->queue));
+ }
+
+ public function testGetQueueSize(): void
+ {
+ $this->assertEquals(0, $this->broker->getQueueSize($this->queue));
+
+ $this->broker->enqueue($this->queue, ['test' => 1]);
+ $this->broker->enqueue($this->queue, ['test' => 2]);
+
+ $this->assertEquals(2, $this->broker->getQueueSize($this->queue));
+ }
+
+ public function testGetQueueSizeFailedJobs(): void
+ {
+ // Initially no failed jobs
+ $this->assertEquals(0, $this->broker->getQueueSize($this->queue, true));
+ }
+
+ public function testConsumerId(): void
+ {
+ // Default consumer ID
+ $defaultId = $this->broker->getConsumerId();
+ $this->assertStringStartsWith('worker-', $defaultId);
+
+ // Set custom consumer ID
+ $this->broker->setConsumerId('custom-worker-123');
+ $this->assertEquals('custom-worker-123', $this->broker->getConsumerId());
+ }
+
+
+ public function testEnqueueDelayed(): void
+ {
+ $result = $this->broker->enqueueDelayed($this->queue, ['task' => 'delayed'], 60);
+
+ $this->assertTrue($result);
+ $this->assertEquals(1, $this->broker->getDelayedCount($this->queue));
+ }
+
+ public function testEnqueueAt(): void
+ {
+ $futureTime = time() + 3600;
+ $result = $this->broker->enqueueAt($this->queue, ['task' => 'scheduled'], $futureTime);
+
+ $this->assertTrue($result);
+ $this->assertEquals(1, $this->broker->getDelayedCount($this->queue));
+ }
+
+ public function testGetDelayedCount(): void
+ {
+ $this->assertEquals(0, $this->broker->getDelayedCount($this->queue));
+
+ $this->broker->enqueueDelayed($this->queue, ['task' => 1], 60);
+ $this->broker->enqueueDelayed($this->queue, ['task' => 2], 120);
+
+ $this->assertEquals(2, $this->broker->getDelayedCount($this->queue));
+ }
+
+ public function testQueueSizeIncludesDelayed(): void
+ {
+ $this->broker->enqueue($this->queue, ['immediate' => true]);
+ $this->broker->enqueueDelayed($this->queue, ['delayed' => true], 60);
+
+ // Total size should include both immediate and delayed
+ $this->assertEquals(2, $this->broker->getQueueSize($this->queue));
+ }
+
+
+ public function testScheduleCron(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-cron-schedule',
+ payload: ['type' => 'cron-job'],
+ cron: '*/5 * * * *'
+ );
+
+ $result = $this->broker->schedule($this->queue, $schedule);
+ $this->assertTrue($result);
+
+ // Verify schedule was stored
+ $retrieved = $this->broker->getSchedule($this->queue, 'test-cron-schedule');
+ $this->assertNotNull($retrieved);
+ $this->assertEquals('test-cron-schedule', $retrieved->id);
+ $this->assertEquals('*/5 * * * *', $retrieved->cron);
+ }
+
+ public function testScheduleInterval(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-interval-schedule',
+ payload: ['type' => 'interval-job'],
+ interval: 300
+ );
+
+ $result = $this->broker->schedule($this->queue, $schedule);
+ $this->assertTrue($result);
+
+ $retrieved = $this->broker->getSchedule($this->queue, 'test-interval-schedule');
+ $this->assertNotNull($retrieved);
+ $this->assertEquals(300, $retrieved->interval);
+ }
+
+ public function testUnschedule(): void
+ {
+ $schedule = new Schedule(
+ id: 'to-remove',
+ payload: ['remove' => true],
+ interval: 60
+ );
+
+ $this->broker->schedule($this->queue, $schedule);
+ $this->assertNotNull($this->broker->getSchedule($this->queue, 'to-remove'));
+
+ $result = $this->broker->unschedule($this->queue, 'to-remove');
+ $this->assertTrue($result);
+
+ $this->assertNull($this->broker->getSchedule($this->queue, 'to-remove'));
+ }
+
+ public function testGetSchedules(): void
+ {
+ $this->broker->schedule($this->queue, new Schedule('sched-1', ['a' => 1], interval: 60));
+ $this->broker->schedule($this->queue, new Schedule('sched-2', ['b' => 2], interval: 120));
+ $this->broker->schedule($this->queue, new Schedule('sched-3', ['c' => 3], cron: '0 * * * *'));
+
+ $schedules = $this->broker->getSchedules($this->queue);
+
+ $this->assertCount(3, $schedules);
+ $this->assertArrayHasKey('sched-1', $schedules);
+ $this->assertArrayHasKey('sched-2', $schedules);
+ $this->assertArrayHasKey('sched-3', $schedules);
+ }
+
+ public function testPauseSchedule(): void
+ {
+ $schedule = new Schedule(
+ id: 'pausable',
+ payload: ['pause' => 'test'],
+ interval: 60
+ );
+
+ $this->broker->schedule($this->queue, $schedule);
+
+ $result = $this->broker->pauseSchedule($this->queue, 'pausable');
+ $this->assertTrue($result);
+
+ $retrieved = $this->broker->getSchedule($this->queue, 'pausable');
+ $this->assertTrue($retrieved->isPaused());
+ $this->assertFalse($retrieved->isActive());
+ }
+
+ public function testResumeSchedule(): void
+ {
+ $schedule = new Schedule(
+ id: 'resumable',
+ payload: ['resume' => 'test'],
+ interval: 60
+ );
+
+ $this->broker->schedule($this->queue, $schedule);
+ $this->broker->pauseSchedule($this->queue, 'resumable');
+
+ $result = $this->broker->resumeSchedule($this->queue, 'resumable');
+ $this->assertTrue($result);
+
+ $retrieved = $this->broker->getSchedule($this->queue, 'resumable');
+ $this->assertFalse($retrieved->isPaused());
+ $this->assertTrue($retrieved->isActive());
+ }
+
+ public function testPauseNonExistentSchedule(): void
+ {
+ $result = $this->broker->pauseSchedule($this->queue, 'non-existent');
+ $this->assertFalse($result);
+ }
+
+ public function testResumeNonExistentSchedule(): void
+ {
+ $result = $this->broker->resumeSchedule($this->queue, 'non-existent');
+ $this->assertFalse($result);
+ }
+
+
+ public function testGetStreamInfo(): void
+ {
+ // Add some messages first
+ $this->broker->enqueue($this->queue, ['test' => 1]);
+ $this->broker->enqueue($this->queue, ['test' => 2]);
+
+ $info = $this->broker->getStreamInfo($this->queue);
+
+ $this->assertIsArray($info);
+ $this->assertArrayHasKey('length', $info);
+ $this->assertEquals(2, $info['length']);
+ }
+
+ public function testGetStreamInfoEmpty(): void
+ {
+ // Empty stream (doesn't exist yet)
+ $info = $this->broker->getStreamInfo($this->queue);
+ $this->assertIsArray($info);
+ }
+
+ public function testGetGroupInfo(): void
+ {
+ // Need to trigger group creation by enqueueing
+ $this->broker->enqueue($this->queue, ['test' => true]);
+
+ $groupInfo = $this->broker->getGroupInfo($this->queue);
+
+ $this->assertIsArray($groupInfo);
+ if (!empty($groupInfo)) {
+ $this->assertArrayHasKey('name', $groupInfo);
+ }
+ }
+
+ public function testGetConsumersInfo(): void
+ {
+ $this->broker->enqueue($this->queue, ['test' => true]);
+
+ $consumers = $this->broker->getConsumersInfo($this->queue);
+ $this->assertIsArray($consumers);
+ }
+
+ public function testGetLag(): void
+ {
+ $lag = $this->broker->getLag($this->queue);
+ $this->assertIsInt($lag);
+ $this->assertGreaterThanOrEqual(0, $lag);
+ }
+
+ public function testGetPendingCount(): void
+ {
+ $pending = $this->broker->getPendingCount($this->queue);
+ $this->assertIsInt($pending);
+ $this->assertGreaterThanOrEqual(0, $pending);
+ }
+
+ public function testGetMessages(): void
+ {
+ $this->broker->enqueue($this->queue, ['msg' => 1]);
+ $this->broker->enqueue($this->queue, ['msg' => 2]);
+ $this->broker->enqueue($this->queue, ['msg' => 3]);
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 10);
+
+ $this->assertIsArray($messages);
+ $this->assertCount(3, $messages);
+
+ foreach ($messages as $message) {
+ $this->assertInstanceOf(Message::class, $message);
+ }
+ }
+
+ public function testGetMessagesWithLimit(): void
+ {
+ for ($i = 0; $i < 10; $i++) {
+ $this->broker->enqueue($this->queue, ['index' => $i]);
+ }
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 5);
+
+ $this->assertCount(5, $messages);
+ }
+
+ public function testGetMessage(): void
+ {
+ $this->broker->enqueue($this->queue, ['specific' => 'message']);
+
+ // Get all messages to find the ID
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 1);
+ $this->assertNotEmpty($messages);
+
+ $firstMessage = $messages[0];
+ $streamId = $firstMessage->getPayload()['streamId'] ?? null;
+
+ // Skip if streamId is not available in payload
+ if ($streamId) {
+ $retrieved = $this->broker->getMessage($this->queue, $streamId);
+ $this->assertNotNull($retrieved);
+ }
+ }
+
+ public function testGetMessageNonExistent(): void
+ {
+ $message = $this->broker->getMessage($this->queue, '0-0');
+ $this->assertNull($message);
+ }
+
+ public function testTrimStream(): void
+ {
+ // Add 20 messages
+ for ($i = 0; $i < 20; $i++) {
+ $this->broker->enqueue($this->queue, ['index' => $i]);
+ }
+
+ $initialSize = $this->broker->getQueueSize($this->queue) - $this->broker->getDelayedCount($this->queue);
+ $this->assertEquals(20, $initialSize);
+
+ // Trim to 10 (uses exact trimming, not approximate)
+ $trimmed = $this->broker->trimStream($this->queue, 10);
+
+ // Verify stream was trimmed
+ $finalSize = $this->broker->getQueueSize($this->queue) - $this->broker->getDelayedCount($this->queue);
+ $this->assertLessThanOrEqual(10, $finalSize);
+ $this->assertGreaterThanOrEqual(0, $trimmed);
+ }
+
+ public function testDeleteConsumer(): void
+ {
+ $this->broker->enqueue($this->queue, ['test' => true]);
+
+ // This will create the consumer group
+ $pending = $this->broker->deleteConsumer($this->queue, 'non-existent-consumer');
+ $this->assertIsInt($pending);
+ }
+
+
+ public function testMessageFormat(): void
+ {
+ $payload = ['key' => 'value', 'nested' => ['a' => 1, 'b' => 2]];
+ $this->broker->enqueue($this->queue, $payload);
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 1);
+ $this->assertCount(1, $messages);
+
+ $message = $messages[0];
+ $this->assertInstanceOf(Message::class, $message);
+
+ $messagePayload = $message->getPayload();
+ $this->assertEquals('value', $messagePayload['key']);
+ $this->assertEquals(['a' => 1, 'b' => 2], $messagePayload['nested']);
+ }
+
+ public function testMessageTimestamp(): void
+ {
+ $beforeEnqueue = time();
+ $this->broker->enqueue($this->queue, ['test' => true]);
+ $afterEnqueue = time();
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 1);
+ $message = $messages[0];
+
+ $timestamp = $message->getTimestamp();
+ $this->assertGreaterThanOrEqual($beforeEnqueue, $timestamp);
+ $this->assertLessThanOrEqual($afterEnqueue, $timestamp);
+ }
+
+
+ public function testRetryEmptyDLQ(): void
+ {
+ // Should not throw when DLQ is empty
+ $this->broker->retry($this->queue);
+ $this->assertTrue(true);
+ }
+
+ public function testRetryWithLimit(): void
+ {
+ // Should not throw
+ $this->broker->retry($this->queue, 5);
+ $this->assertTrue(true);
+ }
+}
diff --git a/tests/Queue/Unit/ScheduleTest.php b/tests/Queue/Unit/ScheduleTest.php
new file mode 100644
index 0000000..4bfc81b
--- /dev/null
+++ b/tests/Queue/Unit/ScheduleTest.php
@@ -0,0 +1,450 @@
+ 'cleanup'],
+ cron: '*/5 * * * *'
+ );
+
+ $this->assertEquals('test-cron', $schedule->id);
+ $this->assertEquals(['task' => 'cleanup'], $schedule->payload);
+ $this->assertEquals('*/5 * * * *', $schedule->cron);
+ $this->assertNull($schedule->interval);
+ $this->assertTrue($schedule->isActive());
+ }
+
+ public function testIntervalScheduleCreation(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-interval',
+ payload: ['task' => 'sync'],
+ interval: 300
+ );
+
+ $this->assertEquals('test-interval', $schedule->id);
+ $this->assertEquals(300, $schedule->interval);
+ $this->assertNull($schedule->cron);
+ $this->assertTrue($schedule->isActive());
+ }
+
+ public function testInvalidScheduleNoCronOrInterval(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Either cron or interval must be specified');
+
+ new Schedule(
+ id: 'invalid',
+ payload: []
+ );
+ }
+
+ public function testInvalidScheduleBothCronAndInterval(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Cannot specify both cron and interval');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ cron: '* * * * *',
+ interval: 60
+ );
+ }
+
+ public function testInvalidCronExpression(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Invalid cron expression');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ cron: 'not a valid cron'
+ );
+ }
+
+ public function testInvalidIntervalZero(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Interval must be greater than 0');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ interval: 0
+ );
+ }
+
+ public function testIntervalNextRunTime(): void
+ {
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60
+ );
+
+ $now = time();
+ $nextRun = $schedule->getNextRunTime();
+
+ // First run should be now or very close to now
+ $this->assertLessThanOrEqual($now + 1, $nextRun);
+
+ // Second run should be 60 seconds after first
+ $nextRunAfterFirst = $schedule->getNextRunTime($nextRun);
+ $this->assertEquals($nextRun + 60, $nextRunAfterFirst);
+ }
+
+ public function testCronNextRunTime(): void
+ {
+ // Every minute
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ cron: '* * * * *'
+ );
+
+ $now = time();
+ $nextRun = $schedule->getNextRunTime();
+
+ // Should be within the next minute
+ $this->assertGreaterThan($now, $nextRun);
+ $this->assertLessThanOrEqual($now + 60, $nextRun);
+ }
+
+ public function testStartAtConstraint(): void
+ {
+ $futureTime = time() + 3600; // 1 hour from now
+
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60,
+ startAt: $futureTime
+ );
+
+ $nextRun = $schedule->getNextRunTime();
+
+ // Should not run before startAt
+ $this->assertGreaterThanOrEqual($futureTime, $nextRun);
+ }
+
+ public function testEndAtConstraint(): void
+ {
+ $pastTime = time() - 3600; // 1 hour ago
+
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60,
+ endAt: $pastTime
+ );
+
+ // Schedule should not be active since endAt has passed
+ $this->assertFalse($schedule->isActive());
+ $this->assertNull($schedule->getNextRunTime());
+ }
+
+ public function testMaxRunsConstraint(): void
+ {
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60,
+ maxRuns: 3
+ );
+
+ $this->assertTrue($schedule->isActive());
+
+ // Simulate 3 runs
+ $schedule = $schedule->incrementRunCount();
+ $this->assertEquals(1, $schedule->getRunCount());
+
+ $schedule = $schedule->incrementRunCount();
+ $this->assertEquals(2, $schedule->getRunCount());
+
+ $schedule = $schedule->incrementRunCount();
+ $this->assertEquals(3, $schedule->getRunCount());
+
+ // Should no longer be active after max runs
+ $this->assertFalse($schedule->isActive());
+ $this->assertNull($schedule->getNextRunTime());
+ }
+
+ public function testPauseAndResume(): void
+ {
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60
+ );
+
+ $this->assertTrue($schedule->isActive());
+ $this->assertFalse($schedule->isPaused());
+
+ // Pause
+ $paused = $schedule->pause();
+ $this->assertTrue($paused->isPaused());
+ $this->assertFalse($paused->isActive());
+ $this->assertNull($paused->getNextRunTime());
+
+ // Resume
+ $resumed = $paused->resume();
+ $this->assertFalse($resumed->isPaused());
+ $this->assertTrue($resumed->isActive());
+ $this->assertNotNull($resumed->getNextRunTime());
+ }
+
+ public function testToArrayAndFromArray(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-serialization',
+ payload: ['key' => 'value', 'nested' => ['a' => 1]],
+ cron: '0 9 * * *',
+ startAt: 1704067200,
+ endAt: 1735689600,
+ maxRuns: 100
+ );
+
+ $array = $schedule->toArray();
+
+ $this->assertEquals('test-serialization', $array['id']);
+ $this->assertEquals(['key' => 'value', 'nested' => ['a' => 1]], $array['payload']);
+ $this->assertEquals('0 9 * * *', $array['cron']);
+ $this->assertEquals(1704067200, $array['startAt']);
+ $this->assertEquals(1735689600, $array['endAt']);
+ $this->assertEquals(100, $array['maxRuns']);
+
+ // Reconstruct from array
+ $reconstructed = Schedule::fromArray($array);
+
+ $this->assertEquals($schedule->id, $reconstructed->id);
+ $this->assertEquals($schedule->payload, $reconstructed->payload);
+ $this->assertEquals($schedule->cron, $reconstructed->cron);
+ $this->assertEquals($schedule->startAt, $reconstructed->startAt);
+ $this->assertEquals($schedule->endAt, $reconstructed->endAt);
+ $this->assertEquals($schedule->maxRuns, $reconstructed->maxRuns);
+ }
+
+ public function testGetDescription(): void
+ {
+ $cronSchedule = new Schedule(
+ id: 'cron',
+ payload: [],
+ cron: '0 9 * * *'
+ );
+ $this->assertStringContainsString('Cron:', $cronSchedule->getDescription());
+
+ $secondsSchedule = new Schedule(
+ id: 'seconds',
+ payload: [],
+ interval: 30
+ );
+ $this->assertStringContainsString('second', $secondsSchedule->getDescription());
+
+ $minutesSchedule = new Schedule(
+ id: 'minutes',
+ payload: [],
+ interval: 300
+ );
+ $this->assertStringContainsString('minute', $minutesSchedule->getDescription());
+
+ $hoursSchedule = new Schedule(
+ id: 'hours',
+ payload: [],
+ interval: 7200
+ );
+ $this->assertStringContainsString('hour', $hoursSchedule->getDescription());
+
+ $daysSchedule = new Schedule(
+ id: 'days',
+ payload: [],
+ interval: 172800
+ );
+ $this->assertStringContainsString('day', $daysSchedule->getDescription());
+ }
+
+ public function testInvalidIntervalNegative(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Interval must be greater than 0');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ interval: -10
+ );
+ }
+
+ public function testCronVariousExpressions(): void
+ {
+ // Daily at midnight
+ $daily = new Schedule('daily', [], cron: '0 0 * * *');
+ $this->assertEquals('0 0 * * *', $daily->cron);
+
+ // Every hour
+ $hourly = new Schedule('hourly', [], cron: '0 * * * *');
+ $this->assertEquals('0 * * * *', $hourly->cron);
+
+ // Weekdays at 9am
+ $weekdays = new Schedule('weekdays', [], cron: '0 9 * * 1-5');
+ $this->assertEquals('0 9 * * 1-5', $weekdays->cron);
+
+ // Every 15 minutes
+ $quarter = new Schedule('quarter', [], cron: '*/15 * * * *');
+ $this->assertEquals('*/15 * * * *', $quarter->cron);
+ }
+
+ public function testIntervalCatchUp(): void
+ {
+ $schedule = new Schedule(
+ id: 'catch-up',
+ payload: [],
+ interval: 60
+ );
+
+ // Simulate a last run 5 minutes ago
+ $lastRun = time() - 300;
+ $nextRun = $schedule->getNextRunTime($lastRun);
+
+ // Should catch up - next run should be in the future or very close
+ $this->assertGreaterThanOrEqual(time() - 1, $nextRun);
+ }
+
+ public function testImmutableOperations(): void
+ {
+ $original = new Schedule(
+ id: 'immutable-test',
+ payload: ['data' => 'value'],
+ interval: 60
+ );
+
+ // Increment should return new instance
+ $incremented = $original->incrementRunCount();
+ $this->assertEquals(0, $original->getRunCount());
+ $this->assertEquals(1, $incremented->getRunCount());
+
+ // Pause should return new instance
+ $paused = $original->pause();
+ $this->assertFalse($original->isPaused());
+ $this->assertTrue($paused->isPaused());
+
+ // Resume should return new instance
+ $resumed = $paused->resume();
+ $this->assertTrue($paused->isPaused());
+ $this->assertFalse($resumed->isPaused());
+ }
+
+ public function testFromArrayWithDefaults(): void
+ {
+ $minimal = [
+ 'id' => 'minimal',
+ 'payload' => ['test' => true],
+ 'interval' => 60,
+ ];
+
+ $schedule = Schedule::fromArray($minimal);
+
+ $this->assertEquals('minimal', $schedule->id);
+ $this->assertNull($schedule->cron);
+ $this->assertEquals(60, $schedule->interval);
+ $this->assertNull($schedule->startAt);
+ $this->assertNull($schedule->endAt);
+ $this->assertNull($schedule->maxRuns);
+ $this->assertEquals(0, $schedule->getRunCount());
+ $this->assertFalse($schedule->isPaused());
+ }
+
+ public function testFromArrayPreservesState(): void
+ {
+ $data = [
+ 'id' => 'stateful',
+ 'payload' => [],
+ 'interval' => 60,
+ 'runCount' => 5,
+ 'paused' => true,
+ ];
+
+ $schedule = Schedule::fromArray($data);
+
+ $this->assertEquals(5, $schedule->getRunCount());
+ $this->assertTrue($schedule->isPaused());
+ }
+
+ public function testEndAtInFuture(): void
+ {
+ $futureEndAt = time() + 3600; // 1 hour from now
+
+ $schedule = new Schedule(
+ id: 'future-end',
+ payload: [],
+ interval: 60,
+ endAt: $futureEndAt
+ );
+
+ $this->assertTrue($schedule->isActive());
+ $nextRun = $schedule->getNextRunTime();
+ $this->assertNotNull($nextRun);
+ $this->assertLessThan($futureEndAt, $nextRun);
+ }
+
+ public function testNextRunRespectsFutureEndAt(): void
+ {
+ // endAt is 30 seconds from now
+ $endAt = time() + 30;
+
+ $schedule = new Schedule(
+ id: 'end-soon',
+ payload: [],
+ interval: 60, // 60 second interval
+ endAt: $endAt
+ );
+
+ // First run is now
+ $firstRun = $schedule->getNextRunTime();
+ $this->assertNotNull($firstRun);
+
+ // Next run would be 60s after first run, which is after endAt
+ $secondRun = $schedule->getNextRunTime($firstRun);
+ $this->assertNull($secondRun); // Should be null because it would exceed endAt
+ }
+
+ public function testComplexPayload(): void
+ {
+ $complexPayload = [
+ 'string' => 'value',
+ 'number' => 42,
+ 'float' => 3.14,
+ 'bool' => true,
+ 'null' => null,
+ 'array' => [1, 2, 3],
+ 'nested' => [
+ 'deep' => [
+ 'value' => 'found'
+ ]
+ ]
+ ];
+
+ $schedule = new Schedule(
+ id: 'complex',
+ payload: $complexPayload,
+ interval: 60
+ );
+
+ $this->assertEquals($complexPayload, $schedule->payload);
+
+ // Verify serialization preserves structure
+ $array = $schedule->toArray();
+ $reconstructed = Schedule::fromArray($array);
+
+ $this->assertEquals($complexPayload, $reconstructed->payload);
+ }
+}
diff --git a/tests/Queue/servers/SwooleRedisStreams/Dockerfile b/tests/Queue/servers/SwooleRedisStreams/Dockerfile
new file mode 100644
index 0000000..eb30cec
--- /dev/null
+++ b/tests/Queue/servers/SwooleRedisStreams/Dockerfile
@@ -0,0 +1,5 @@
+FROM phpswoole/swoole:php8.3-alpine
+
+RUN apk add autoconf build-base
+
+RUN docker-php-ext-enable redis
diff --git a/tests/Queue/servers/SwooleRedisStreams/worker.php b/tests/Queue/servers/SwooleRedisStreams/worker.php
new file mode 100644
index 0000000..499ed2e
--- /dev/null
+++ b/tests/Queue/servers/SwooleRedisStreams/worker.php
@@ -0,0 +1,33 @@
+job()->inject('message')->action(handleRequest(...));
+
+$server
+ ->error()
+ ->inject('error')
+ ->action(function ($th) {
+ echo $th->getMessage() . PHP_EOL;
+ });
+
+$server->workerStart()->action(function () {
+ echo 'Worker Started (Redis Streams)' . PHP_EOL;
+});
+
+$server->workerStop()->action(function () {
+ echo 'Worker Stopped (Redis Streams)' . PHP_EOL;
+});
+
+$server->start();