Skip to content

feat(node-message-broker): inbound delivery loop (wakeup → pull → decrypt → deliver)#16

Merged
tada5hi merged 2 commits into
masterfrom
feat/inbound-delivery
Jun 26, 2026
Merged

feat(node-message-broker): inbound delivery loop (wakeup → pull → decrypt → deliver)#16
tada5hi merged 2 commits into
masterfrom
feat/inbound-delivery

Conversation

@tada5hi

@tada5hi tada5hi commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Summary

Wires the inbound delivery loop for the node message-broker (Plan 013 Track B, Phase 4 — issue #9): the node now pulls its pending ciphertext from the Hub, decrypts it node-to-node, and fans the plaintext out to the analysis's local webhooks.

Flow

InboundDeliveryProcessor (core, port-only) runs:

wakeup / long-poll ──▶ pull ──▶ resolve sender node key ──▶ crypto.open ──▶ JSON.parse ──▶ delivery.deliver(analysisId, message) ──▶ ack
  • Sender key: each message's sender_id (the sending node's client id) is mapped to its node public key via the participant resolver for the message's metadata.analysisId.
  • Decrypt + deliver: the decrypted bytes are parsed and delivered as the bare message object (the SDK's own meta envelope rides inside it untouched) — matching the contract verified for feat(node-message-broker): container-facing message + participant endpoints #15.
  • Demux: delivery is keyed by metadata.analysisId; the Hub stays analysis-agnostic.

Triggers

  • Wakeup (primary): a payload-free messagePending SSE signal triggers an immediate backlog drain, coalesced single-flight so a burst of signals collapses into one drain.
  • Long-poll (fallback): a background loop parks on pull({ wait }) to catch anything a missed wakeup would leave pending. Its in-flight pull is released promptly on stop via a stop barrier (teardown never blocks on the long-poll budget).

Concurrent triggers can occasionally double-deliver; that's safe under the at-least-once contract — the SDK dedupes by its own meta.id.

Reliability / poison handling

Delete-on-ack, at-least-once: only successfully delivered messages are acked, so a transient failure is retried on the next pull. A per-message failure — no analysisId, unknown sender, decrypt/parse error, or webhook delivery error — is isolated: logged, skipped, left unacked, never fatal to the batch.

Decision for this slice: a permanently undecryptable ("poison") message is redelivered on each cycle rather than dead-lettered. It doesn't block newer messages (each is processed independently), but it does persist. Dead-lettering / max-attempts is a reasonable follow-up if we want bounded retries — happy to file an issue.

Wiring

InboundModule (deps: components + coreClient) constructs the processor from the Hub link, crypto, delivery, and participant-resolver ports and starts it — skipped under the test environment, mirroring how the Hub stream is left unopened there.

Tests

83 unit tests pass (+6 inbound) against in-memory fakes of the Hub, crypto, resolver, and delivery ports: sender-key resolution + decrypt + deliver + ack; decrypt-failure isolation; no-ack-on-delivery-failure; skipping messages without analysisId/ciphertext; unknown-sender skip; and the wakeup → drain → ack path with clean unsubscribe on stop. Lint, typecheck, and the tsdown build are clean.

Closes #9.

Summary by CodeRabbit

  • New Features

    • Added inbound message processing for the broker, including wakeup handling, backlog draining, decryption, delivery, and acknowledgement of successfully processed messages.
    • Enabled inbound module configuration during application setup.
  • Bug Fixes

    • Messages that can’t be decrypted, lack required metadata, come from unknown senders, or fail delivery are now skipped without being acknowledged, allowing safe redelivery.
  • Tests

    • Added unit coverage for inbound processing behavior and lifecycle handling, including wakeups, batching, and error cases.

…rypt → deliver)

Wire the inbound side of the broker. `InboundDeliveryProcessor` (core) pulls this
node's pending ciphertext from the Hub, resolves each message's sender node key via
the participant resolver, decrypts it (crypto.open), and fans the decrypted JSON out
to the analysis's local webhooks — demuxed by `metadata.analysisId`.

Two triggers feed one pipeline: a payload-free `messagePending` wakeup drives an
immediate, coalesced backlog drain; a long-poll fallback loop (`pull({ wait })`)
catches anything a missed wakeup would leave pending. Delivery is delete-on-ack
at-least-once — only successfully delivered messages are acked, so transient failures
retry on the next pull. A per-message failure (no analysisId, unknown sender,
decrypt/parse error, webhook error) is isolated: logged, skipped, left unacked for
redelivery, never fatal to the batch.

`InboundModule` (deps: components + coreClient) constructs and starts the processor,
skipping the live loop under the test environment. Adds core tests against in-memory
fakes of the Hub, crypto, resolver, and delivery ports.
@coderabbitai

coderabbitai Bot commented Jun 26, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Review limit reached

@tada5hi, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 40 minutes and 19 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits.

🚦 How do rate limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 044613c4-cc5d-4ddc-a505-d7d425e39fb6

📥 Commits

Reviewing files that changed from the base of the PR and between 50ef6b1 and 35910b5.

📒 Files selected for processing (7)
  • apps/node-message-broker/src/core/inbound/processor.ts
  • apps/node-message-broker/src/core/messaging/dispatch.ts
  • apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts
  • apps/node-message-broker/test/unit/core/inbound/fake-hub.ts
  • apps/node-message-broker/test/unit/core/inbound/processor.spec.ts
  • apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts
  • apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts
📝 Walkthrough

Walkthrough

Adds an inbound message broker module, wires it into application creation, and implements a processor that pulls queued messages, resolves sender keys, decrypts payloads, delivers webhook bodies, and acknowledges processed ids, with unit tests and fakes covering the flow.

Changes

Inbound delivery loop

Layer / File(s) Summary
Inbound surface and barrels
apps/node-message-broker/src/core/inbound/types.ts, apps/node-message-broker/src/core/inbound/index.ts, apps/node-message-broker/src/app/modules/inbound/index.ts
Adds inbound input and option types plus barrel re-exports for the core and app inbound entrypoints.
Processor lifecycle and polling
apps/node-message-broker/src/core/inbound/processor.ts
Implements processor start/stop state, backlog draining, wakeup coalescing, and long-poll fallback handling.
Per-message delivery path
apps/node-message-broker/src/core/inbound/processor.ts
Validates incoming messages, resolves sender public keys, decrypts payloads, delivers webhook bodies, and records successful acknowledgements.
Module registration
apps/node-message-broker/src/app/builder.ts, apps/node-message-broker/src/app/factory.ts, apps/node-message-broker/src/app/modules/inbound/module.ts
Adds the inbound module class, registers it on the application builder, and enables it during application creation.
Unit fakes and coverage
apps/node-message-broker/test/unit/core/inbound/*
Adds in-memory fakes and processor unit tests for successful delivery, decrypt failures, delivery failures, missing metadata, unknown senders, and wakeup lifecycle behavior.

Sequence Diagram(s)

sequenceDiagram
  participant IHubClient
  participant InboundDeliveryProcessor
  participant resolver
  participant ICryptoService
  participant IDeliveryService

  IHubClient->>InboundDeliveryProcessor: onWakeup()
  InboundDeliveryProcessor->>IHubClient: pull({ limit })
  IHubClient-->>InboundDeliveryProcessor: messages
  loop each message
    InboundDeliveryProcessor->>resolver: resolve(analysisId)
    resolver-->>InboundDeliveryProcessor: sender public key
    InboundDeliveryProcessor->>ICryptoService: open(ciphertext, senderPublicKey)
    ICryptoService-->>InboundDeliveryProcessor: plaintext bytes
    InboundDeliveryProcessor->>IDeliveryService: deliver(analysisId, decrypted payload)
  end
  InboundDeliveryProcessor->>IHubClient: ack(processed ids)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

  • PrivateAIM/node#12: Adds the participant-resolver plumbing used by the inbound sender-key resolution path.

Poem

I twitched my nose and hopped in line,
sealed little messages now shine.
Decrypt, deliver, ack with glee —
the hub hums softly back to me.
🐰🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title is concise and accurately describes the main inbound delivery loop change.
Linked Issues check ✅ Passed The PR implements the wakeup/long-poll inbound flow, decrypt/deliver path, ack behavior, and isolation tests required by #9.
Out of Scope Changes check ✅ Passed The added module wiring, processor, types, fakes, and tests all support the inbound loop scope and no unrelated changes stand out.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/inbound-delivery

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@tada5hi

tada5hi commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jun 26, 2026

Copy link
Copy Markdown
✅ Action performed

Review finished.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@apps/node-message-broker/src/core/inbound/processor.ts`:
- Around line 197-209: The inbound processor currently trusts
message.metadata.analysisId for sender lookup and delivery routing, which allows
relabeling before decrypt/authentication. Update the Processor flow so the
authenticated context includes the analysisId used at seal time: either pass the
same analysisId through the seal/open path in the matching send code, or verify
an analysisId carried inside the decrypted payload before calling
resolveSenderPublicKey and delivery.deliver. Use the Processor,
resolveSenderPublicKey, crypto.open, and delivery.deliver flow to keep the
authenticated analysis binding intact.

In `@apps/node-message-broker/test/unit/core/inbound/fake-hub.ts`:
- Around line 36-43: The fake hub pull behavior in fake-hub’s pull method should
return any queued messages from pullBatches before honoring a wait-based
long-poll. Update the pull(query) logic so it first drains pullBatches via the
existing response shape, and only parks the resolver in parkedResolvers when
query.wait is set and there are no pending batches. Keep the existing pull and
parkedResolvers symbols so the fake matches the Hub contract and fallback-loop
tests can exercise the queued-message path.

In `@apps/node-message-broker/test/unit/core/inbound/processor.spec.ts`:
- Around line 161-181: The current spec only covers wakeup-driven draining, so
add a test in processor.spec around processor.start() and whenIdle() that seeds
hub.pullBatches before start, does not call hub.emitWakeup, and verifies the
backlog is still delivered and acked via the long-poll fallback in the processor
flow. Reuse the existing setup(), hub, delivery, and processor symbols so the
new case clearly exercises the no-wakeup path and the pull({ wait }) behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dbb15418-6df0-4a8a-92ba-64a630642421

📥 Commits

Reviewing files that changed from the base of the PR and between 27fbd3f and 50ef6b1.

📒 Files selected for processing (11)
  • apps/node-message-broker/src/app/builder.ts
  • apps/node-message-broker/src/app/factory.ts
  • apps/node-message-broker/src/app/modules/inbound/index.ts
  • apps/node-message-broker/src/app/modules/inbound/module.ts
  • apps/node-message-broker/src/core/inbound/index.ts
  • apps/node-message-broker/src/core/inbound/processor.ts
  • apps/node-message-broker/src/core/inbound/types.ts
  • apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts
  • apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts
  • apps/node-message-broker/test/unit/core/inbound/fake-hub.ts
  • apps/node-message-broker/test/unit/core/inbound/processor.spec.ts

Comment thread apps/node-message-broker/src/core/inbound/processor.ts
Comment thread apps/node-message-broker/test/unit/core/inbound/fake-hub.ts Outdated
Comment thread apps/node-message-broker/test/unit/core/inbound/processor.spec.ts
Pass `analysisId` as the HKDF `info` to both `crypto.seal` (send) and `crypto.open`
(receive). The analysis is now authenticated by the key derivation, so a
`metadata.analysisId` relabelled in transit by the untrusted Hub (or a replay) fails
to decrypt instead of being delivered to the wrong analysis's webhooks.

Also harden the inbound tests per review: the fake Hub now returns queued messages
before parking a long-poll (matching the Hub contract), and the suite gains a
no-wakeup case proving the `pull({ wait })` fallback delivers + acks a backlog. Seal
and open assertions cover the new HKDF binding.
@tada5hi tada5hi merged commit 90f712a into master Jun 26, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

S6 — Inbound delivery loop (onWakeup → pull → decrypt → deliver)

1 participant