[feat] Add Composio trigger ingress, async dispatch, and worker#4742
Draft
jp-agenta wants to merge 1 commit into
Draft
[feat] Add Composio trigger ingress, async dispatch, and worker#4742jp-agenta wants to merge 1 commit into
jp-agenta wants to merge 1 commit into
Conversation
Inbound dual of webhooks: a global ingress endpoint POST /triggers/composio/events fast-ACKs and enqueues onto the queues:triggers Redis Stream; a dedicated worker_triggers process consumes it and the TriggersDispatcher invokes the bound workflow. - Ingress endpoint with HMAC-SHA256 signature verification against COMPOSIO_WEBHOOK_SECRET; whitelisted in auth middleware as public. - Async pipeline mirroring webhooks: Redis Streams broker + taskiq worker with retry_on_error and TRIGGER_MAX_RETRIES=5; dedup by event_id for idempotency. - Dispatcher attributes the run to the subscription creator (created_by_id) or null; binds the workflow key-agnostically from the references dict via the /retrieve selector shape. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Context
Trigger subscriptions can be created and stored (the lane below this one), but nothing yet turns an inbound provider event into a workflow run. This is the inbound dual of webhooks: when Composio reports that a watched event fired, we need to receive it, decide which subscription it belongs to, and invoke the bound workflow.
What this adds
A single global ingress endpoint and an async dispatch pipeline that mirrors how outbound webhooks already work.
POST /triggers/composio/eventsreceives every Composio event. It verifies the signature, demuxes by the subscription id carried in the payload, fast-ACKs with202 Accepted, and enqueues onto a newqueues:triggersRedis Stream. It does not invoke the workflow inline. A dedicatedworker_triggersprocess consumes the stream and theTriggersDispatcherdoes the actual invocation, so a slow or failing workflow never blocks Composio's delivery.The async shape matches webhooks exactly:
The taskiq task retries with
TRIGGER_MAX_RETRIES=5(matchingWEBHOOK_MAX_RETRIES), anddedup_seen(event_id)makes redelivery idempotent.Signature verification is HMAC-SHA256 over
{id}.{ts}.{body}againstCOMPOSIO_WEBHOOK_SECRET, compared withhmac.compare_digest. If the secret is unset it is a no-op (local dev), and a bad signature is rejected before any processing. The endpoint is whitelisted as public in the auth middleware, mirroring the existingtools/connections/callbackOAuth route.The dispatcher attributes the triggered run to
subscription.created_by_id(the person who set up the subscription) or null if absent, and binds the workflow key-agnostically by iterating the subscription'sreferencesdict in the/retrieveselector shape.Tests / notes
test_triggers_ingress.pycovers the ingress endpoint.ruff formatandruff checkpass clean onapi/.queues:triggers.