feat(node-message-broker): inbound delivery loop (wakeup → pull → decrypt → deliver)#16
Conversation
…rypt → deliver)
Wire the inbound side of the broker. `InboundDeliveryProcessor` (core) pulls this
node's pending ciphertext from the Hub, resolves each message's sender node key via
the participant resolver, decrypts it (crypto.open), and fans the decrypted JSON out
to the analysis's local webhooks — demuxed by `metadata.analysisId`.
Two triggers feed one pipeline: a payload-free `messagePending` wakeup drives an
immediate, coalesced backlog drain; a long-poll fallback loop (`pull({ wait })`)
catches anything a missed wakeup would leave pending. Delivery is delete-on-ack
at-least-once — only successfully delivered messages are acked, so transient failures
retry on the next pull. A per-message failure (no analysisId, unknown sender,
decrypt/parse error, webhook error) is isolated: logged, skipped, left unacked for
redelivery, never fatal to the batch.
`InboundModule` (deps: components + coreClient) constructs and starts the processor,
skipping the live loop under the test environment. Adds core tests against in-memory
fakes of the Hub, crypto, resolver, and delivery ports.
|
Warning Review limit reached
More reviews will be available in 40 minutes and 19 seconds. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
📝 WalkthroughWalkthroughAdds an inbound message broker module, wires it into application creation, and implements a processor that pulls queued messages, resolves sender keys, decrypts payloads, delivers webhook bodies, and acknowledges processed ids, with unit tests and fakes covering the flow. ChangesInbound delivery loop
Sequence Diagram(s)sequenceDiagram
participant IHubClient
participant InboundDeliveryProcessor
participant resolver
participant ICryptoService
participant IDeliveryService
IHubClient->>InboundDeliveryProcessor: onWakeup()
InboundDeliveryProcessor->>IHubClient: pull({ limit })
IHubClient-->>InboundDeliveryProcessor: messages
loop each message
InboundDeliveryProcessor->>resolver: resolve(analysisId)
resolver-->>InboundDeliveryProcessor: sender public key
InboundDeliveryProcessor->>ICryptoService: open(ciphertext, senderPublicKey)
ICryptoService-->>InboundDeliveryProcessor: plaintext bytes
InboundDeliveryProcessor->>IDeliveryService: deliver(analysisId, decrypted payload)
end
InboundDeliveryProcessor->>IHubClient: ack(processed ids)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/node-message-broker/src/core/inbound/processor.ts`:
- Around line 197-209: The inbound processor currently trusts
message.metadata.analysisId for sender lookup and delivery routing, which allows
relabeling before decrypt/authentication. Update the Processor flow so the
authenticated context includes the analysisId used at seal time: either pass the
same analysisId through the seal/open path in the matching send code, or verify
an analysisId carried inside the decrypted payload before calling
resolveSenderPublicKey and delivery.deliver. Use the Processor,
resolveSenderPublicKey, crypto.open, and delivery.deliver flow to keep the
authenticated analysis binding intact.
In `@apps/node-message-broker/test/unit/core/inbound/fake-hub.ts`:
- Around line 36-43: The fake hub pull behavior in fake-hub’s pull method should
return any queued messages from pullBatches before honoring a wait-based
long-poll. Update the pull(query) logic so it first drains pullBatches via the
existing response shape, and only parks the resolver in parkedResolvers when
query.wait is set and there are no pending batches. Keep the existing pull and
parkedResolvers symbols so the fake matches the Hub contract and fallback-loop
tests can exercise the queued-message path.
In `@apps/node-message-broker/test/unit/core/inbound/processor.spec.ts`:
- Around line 161-181: The current spec only covers wakeup-driven draining, so
add a test in processor.spec around processor.start() and whenIdle() that seeds
hub.pullBatches before start, does not call hub.emitWakeup, and verifies the
backlog is still delivered and acked via the long-poll fallback in the processor
flow. Reuse the existing setup(), hub, delivery, and processor symbols so the
new case clearly exercises the no-wakeup path and the pull({ wait }) behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dbb15418-6df0-4a8a-92ba-64a630642421
📒 Files selected for processing (11)
apps/node-message-broker/src/app/builder.tsapps/node-message-broker/src/app/factory.tsapps/node-message-broker/src/app/modules/inbound/index.tsapps/node-message-broker/src/app/modules/inbound/module.tsapps/node-message-broker/src/core/inbound/index.tsapps/node-message-broker/src/core/inbound/processor.tsapps/node-message-broker/src/core/inbound/types.tsapps/node-message-broker/test/unit/core/inbound/fake-crypto-service.tsapps/node-message-broker/test/unit/core/inbound/fake-delivery-service.tsapps/node-message-broker/test/unit/core/inbound/fake-hub.tsapps/node-message-broker/test/unit/core/inbound/processor.spec.ts
Pass `analysisId` as the HKDF `info` to both `crypto.seal` (send) and `crypto.open`
(receive). The analysis is now authenticated by the key derivation, so a
`metadata.analysisId` relabelled in transit by the untrusted Hub (or a replay) fails
to decrypt instead of being delivered to the wrong analysis's webhooks.
Also harden the inbound tests per review: the fake Hub now returns queued messages
before parking a long-poll (matching the Hub contract), and the suite gains a
no-wakeup case proving the `pull({ wait })` fallback delivers + acks a backlog. Seal
and open assertions cover the new HKDF binding.
Summary
Wires the inbound delivery loop for the node message-broker (Plan 013 Track B, Phase 4 — issue #9): the node now pulls its pending ciphertext from the Hub, decrypts it node-to-node, and fans the plaintext out to the analysis's local webhooks.
Flow
InboundDeliveryProcessor(core, port-only) runs:sender_id(the sending node's client id) is mapped to its node public key via the participant resolver for the message'smetadata.analysisId.metaenvelope rides inside it untouched) — matching the contract verified for feat(node-message-broker): container-facing message + participant endpoints #15.metadata.analysisId; the Hub stays analysis-agnostic.Triggers
messagePendingSSE signal triggers an immediate backlog drain, coalesced single-flight so a burst of signals collapses into one drain.pull({ wait })to catch anything a missed wakeup would leave pending. Its in-flight pull is released promptly on stop via a stop barrier (teardown never blocks on the long-poll budget).Concurrent triggers can occasionally double-deliver; that's safe under the at-least-once contract — the SDK dedupes by its own
meta.id.Reliability / poison handling
Delete-on-ack, at-least-once: only successfully delivered messages are acked, so a transient failure is retried on the next pull. A per-message failure — no
analysisId, unknown sender, decrypt/parse error, or webhook delivery error — is isolated: logged, skipped, left unacked, never fatal to the batch.Decision for this slice: a permanently undecryptable ("poison") message is redelivered on each cycle rather than dead-lettered. It doesn't block newer messages (each is processed independently), but it does persist. Dead-lettering / max-attempts is a reasonable follow-up if we want bounded retries — happy to file an issue.
Wiring
InboundModule(deps:components+coreClient) constructs the processor from the Hub link, crypto, delivery, and participant-resolver ports and starts it — skipped under the test environment, mirroring how the Hub stream is left unopened there.Tests
83 unit tests pass (+6 inbound) against in-memory fakes of the Hub, crypto, resolver, and delivery ports: sender-key resolution + decrypt + deliver + ack; decrypt-failure isolation; no-ack-on-delivery-failure; skipping messages without analysisId/ciphertext; unknown-sender skip; and the wakeup → drain → ack path with clean unsubscribe on stop. Lint, typecheck, and the tsdown build are clean.
Closes #9.
Summary by CodeRabbit
New Features
Bug Fixes
Tests