From 50ef6b1b826802ffea318e0c85802afe3ae2c671 Mon Sep 17 00:00:00 2001 From: tada5hi Date: Fri, 26 Jun 2026 15:39:48 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(node-message-broker):=20inbound=20deli?= =?UTF-8?q?very=20loop=20(wakeup=20=E2=86=92=20pull=20=E2=86=92=20decrypt?= =?UTF-8?q?=20=E2=86=92=20deliver)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the inbound side of the broker. `InboundDeliveryProcessor` (core) pulls this node's pending ciphertext from the Hub, resolves each message's sender node key via the participant resolver, decrypts it (crypto.open), and fans the decrypted JSON out to the analysis's local webhooks — demuxed by `metadata.analysisId`. Two triggers feed one pipeline: a payload-free `messagePending` wakeup drives an immediate, coalesced backlog drain; a long-poll fallback loop (`pull({ wait })`) catches anything a missed wakeup would leave pending. Delivery is delete-on-ack at-least-once — only successfully delivered messages are acked, so transient failures retry on the next pull. A per-message failure (no analysisId, unknown sender, decrypt/parse error, webhook error) is isolated: logged, skipped, left unacked for redelivery, never fatal to the batch. `InboundModule` (deps: components + coreClient) constructs and starts the processor, skipping the live loop under the test environment. Adds core tests against in-memory fakes of the Hub, crypto, resolver, and delivery ports. --- apps/node-message-broker/src/app/builder.ts | 5 + apps/node-message-broker/src/app/factory.ts | 1 + .../src/app/modules/inbound/index.ts | 8 + .../src/app/modules/inbound/module.ts | 57 +++++ .../src/core/inbound/index.ts | 9 + .../src/core/inbound/processor.ts | 228 ++++++++++++++++++ .../src/core/inbound/types.ts | 31 +++ .../unit/core/inbound/fake-crypto-service.ts | 35 +++ .../core/inbound/fake-delivery-service.ts | 32 +++ .../test/unit/core/inbound/fake-hub.ts | 80 ++++++ .../test/unit/core/inbound/processor.spec.ts | 182 ++++++++++++++ 11 files changed, 668 insertions(+) create mode 100644 apps/node-message-broker/src/app/modules/inbound/index.ts create mode 100644 apps/node-message-broker/src/app/modules/inbound/module.ts create mode 100644 apps/node-message-broker/src/core/inbound/index.ts create mode 100644 apps/node-message-broker/src/core/inbound/processor.ts create mode 100644 apps/node-message-broker/src/core/inbound/types.ts create mode 100644 apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts create mode 100644 apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts create mode 100644 apps/node-message-broker/test/unit/core/inbound/fake-hub.ts create mode 100644 apps/node-message-broker/test/unit/core/inbound/processor.spec.ts diff --git a/apps/node-message-broker/src/app/builder.ts b/apps/node-message-broker/src/app/builder.ts index a362937..61033c5 100644 --- a/apps/node-message-broker/src/app/builder.ts +++ b/apps/node-message-broker/src/app/builder.ts @@ -11,6 +11,7 @@ import { ConfigModule } from './modules/config/index.ts'; import { ComponentsModule } from './modules/components/index.ts'; import { CoreClientModule } from './modules/core-client/index.ts'; import { HTTPModule } from './modules/http/index.ts'; +import { InboundModule } from './modules/inbound/index.ts'; export class ServerMessageBrokerApplicationBuilder extends BaseApplicationBuilder { withConfig(instance?: ConfigModule | false): this { @@ -25,6 +26,10 @@ export class ServerMessageBrokerApplicationBuilder extends BaseApplicationBuilde return this.addModuleSlot('coreClient', instance, () => new CoreClientModule()); } + withInbound(instance?: InboundModule | false): this { + return this.addModuleSlot('inbound', instance, () => new InboundModule()); + } + withHTTP(instance?: HTTPModule | false): this { return this.addModuleSlot('http', instance, () => new HTTPModule()); } diff --git a/apps/node-message-broker/src/app/factory.ts b/apps/node-message-broker/src/app/factory.ts index bdf22ca..2ef48af 100644 --- a/apps/node-message-broker/src/app/factory.ts +++ b/apps/node-message-broker/src/app/factory.ts @@ -19,6 +19,7 @@ export function createApplication(): Application { }) .withComponents() .withCoreClient() + .withInbound() .withAuthupHook() .withAuthupClient() .withHTTP(); diff --git a/apps/node-message-broker/src/app/modules/inbound/index.ts b/apps/node-message-broker/src/app/modules/inbound/index.ts new file mode 100644 index 0000000..954fdb1 --- /dev/null +++ b/apps/node-message-broker/src/app/modules/inbound/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './module.ts'; diff --git a/apps/node-message-broker/src/app/modules/inbound/module.ts b/apps/node-message-broker/src/app/modules/inbound/module.ts new file mode 100644 index 0000000..a9a2b16 --- /dev/null +++ b/apps/node-message-broker/src/app/modules/inbound/module.ts @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { IContainer } from 'eldin'; +import type { IModule } from 'orkos'; +import { EnvironmentName, LoggerInjectionKey } from '@privateaim/server-kit'; +import { InboundDeliveryProcessor } from '../../../core/inbound/index.ts'; +import { ComponentsInjectionKey } from '../components/constants.ts'; +import { ConfigInjectionKey } from '../config/constants.ts'; +import { CoreClientInjectionKey } from '../core-client/constants.ts'; + +/** + * Starts the inbound delivery loop (Plan 013 Track B, Phase 4): wakeup/long-poll → pull → + * decrypt → fan out to the analysis webhooks. Depends on `components` (Hub link, crypto, + * delivery) and `coreClient` (participant resolver, for the sender's node key). The loop is + * skipped under the test environment, mirroring how the Hub stream is left unopened there. + */ +export class InboundModule implements IModule { + readonly name = 'inbound'; + + readonly dependencies: string[] = ['config', 'components', 'coreClient']; + + private processor: InboundDeliveryProcessor | undefined; + + async setup(container: IContainer): Promise { + const config = container.resolve(ConfigInjectionKey); + + const loggerResult = container.tryResolve(LoggerInjectionKey); + const logger = loggerResult.success ? loggerResult.data : undefined; + + const processor = new InboundDeliveryProcessor({ + hub: container.resolve(ComponentsInjectionKey.HubClient), + crypto: container.resolve(ComponentsInjectionKey.Crypto), + delivery: container.resolve(ComponentsInjectionKey.Delivery), + resolver: container.resolve(CoreClientInjectionKey.ParticipantResolver), + logger, + }); + + // tests don't reach a live Hub; skip pulling and the reconnecting wakeup stream. + if (config.env !== EnvironmentName.TEST) { + processor.start(); + } + + this.processor = processor; + } + + async teardown(): Promise { + if (this.processor) { + await this.processor.stop(); + this.processor = undefined; + } + } +} diff --git a/apps/node-message-broker/src/core/inbound/index.ts b/apps/node-message-broker/src/core/inbound/index.ts new file mode 100644 index 0000000..fe48961 --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './types.ts'; +export * from './processor.ts'; diff --git a/apps/node-message-broker/src/core/inbound/processor.ts b/apps/node-message-broker/src/core/inbound/processor.ts new file mode 100644 index 0000000..cedec9f --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/processor.ts @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Message, MessagePullResponse } from '@privateaim/messenger-kit'; +import type { InboundDeliveryDeps, InboundProcessorOptions } from './types.ts'; + +const DEFAULT_PULL_LIMIT = 50; +const DEFAULT_WAIT_MS = 20_000; +const DEFAULT_ERROR_BACKOFF_MS = 1_000; + +/** + * Drives the inbound side of the broker: pull this node's pending ciphertext from the Hub, + * resolve each message's **sender** node key, decrypt it (S1), and fan it out to the + * analysis's local webhooks (S6). Two pull triggers feed the same pipeline: + * + * - **wakeup** — a payload-free `messagePending` SSE signal triggers an immediate backlog + * drain (coalesced single-flight, so a burst of signals collapses into one drain); + * - **long-poll fallback** — a background loop parks on `pull({ wait })` to catch anything a + * missed wakeup would otherwise leave sitting until the next signal. + * + * Delivery is delete-on-ack at-least-once: only successfully delivered messages are acked, + * so a transient failure is retried on the next pull. A per-message failure (no + * `analysisId`, unknown sender, decrypt/parse error) is isolated — logged and skipped, never + * fatal to the batch — and left unacked for redelivery. The receiving SDK dedupes by its own + * `meta.id`, so an occasional duplicate from concurrent triggers is harmless. + */ +export class InboundDeliveryProcessor { + protected deps: InboundDeliveryDeps; + + protected pullLimit: number; + + protected waitMs: number; + + protected errorBackoffMs: number; + + protected running = false; + + protected unsubscribe: (() => void) | undefined; + + protected fallbackLoop: Promise | undefined; + + protected wakeupInFlight: Promise | undefined; + + protected wakeupPending = false; + + protected resolveStop: (() => void) | undefined; + + private readonly decoder = new TextDecoder(); + + constructor(deps: InboundDeliveryDeps, options: InboundProcessorOptions = {}) { + this.deps = deps; + this.pullLimit = options.pullLimit ?? DEFAULT_PULL_LIMIT; + this.waitMs = options.waitMs ?? DEFAULT_WAIT_MS; + this.errorBackoffMs = options.errorBackoffMs ?? DEFAULT_ERROR_BACKOFF_MS; + } + + /** Subscribe to Hub wakeups and start the long-poll fallback loop. Idempotent. */ + start(): void { + if (this.running) { + return; + } + + this.running = true; + this.unsubscribe = this.deps.hub.onWakeup(() => this.onWakeupSignal()); + this.fallbackLoop = this.runFallbackLoop(); + } + + /** Unsubscribe, unblock the long-poll, and await both triggers settling. Idempotent. */ + async stop(): Promise { + if (!this.running) { + return; + } + + this.running = false; + + this.unsubscribe?.(); + this.unsubscribe = undefined; + + // release the fallback loop if it is parked on a long-poll pull + this.resolveStop?.(); + + const inFlight = this.wakeupInFlight; + const loop = this.fallbackLoop; + this.fallbackLoop = undefined; + + await inFlight?.catch(() => undefined); + await loop?.catch(() => undefined); + } + + /** Resolve once the current wakeup-triggered drain has settled (diagnostic / test aid). */ + async whenIdle(): Promise { + await this.wakeupInFlight?.catch(() => undefined); + } + + /** + * Decrypt and locally deliver a batch, returning the ids of the messages that were + * delivered (and acked). Per-message failures are isolated; the ack covers only the + * delivered subset so the rest are redelivered on a later pull. + */ + async processBatch(messages: Message[]): Promise { + const ackIds: string[] = []; + + for (const message of messages) { + try { + await this.deliverMessage(message); + ackIds.push(message.id); + } catch (error) { + this.deps.logger?.warn(`Inbound message ${message.id} skipped: ${(error as Error).message}`); + } + } + + if (ackIds.length > 0) { + try { + await this.deps.hub.ack({ ids: ackIds }); + } catch (error) { + this.deps.logger?.error(`Failed to ack inbound messages ${ackIds.join(', ')}: ${(error as Error).message}`); + } + } + + return ackIds; + } + + /** Drain the backlog: pull (no wait) and process repeatedly until the mailbox is empty. */ + protected async drainBacklog(): Promise { + while (this.running) { + const { messages } = await this.deps.hub.pull({ limit: this.pullLimit }); + if (messages.length === 0) { + return; + } + + await this.processBatch(messages); + + if (messages.length < this.pullLimit) { + return; + } + } + } + + /** Coalesce wakeup signals into a single in-flight backlog drain. */ + protected onWakeupSignal(): void { + if (this.wakeupInFlight) { + this.wakeupPending = true; + return; + } + + this.wakeupInFlight = this.drainBacklog() + .catch((error) => { + this.deps.logger?.warn(`Inbound wakeup drain failed: ${(error as Error).message}`); + }) + .finally(() => { + this.wakeupInFlight = undefined; + if (this.wakeupPending && this.running) { + this.wakeupPending = false; + this.onWakeupSignal(); + } + }); + } + + /** Long-poll the Hub as a fallback for missed wakeups until {@link stop}. */ + protected async runFallbackLoop(): Promise { + const stopped = new Promise((resolve) => { + this.resolveStop = resolve; + }); + + while (this.running) { + let result: MessagePullResponse | undefined; + try { + result = await Promise.race([ + this.deps.hub.pull({ limit: this.pullLimit, wait: this.waitMs }), + stopped.then(() => undefined), + ]); + } catch (error) { + if (!this.running) { + return; + } + this.deps.logger?.warn(`Inbound long-poll failed: ${(error as Error).message}`); + await Promise.race([this.sleep(this.errorBackoffMs), stopped]); + continue; + } + + if (!this.running || !result) { + return; + } + + if (result.messages.length > 0) { + await this.processBatch(result.messages); + } + } + } + + /** Decrypt one inbound message and deliver it to the analysis's webhooks. */ + protected async deliverMessage(message: Message): Promise { + const analysisId = message.metadata?.analysisId; + if (!analysisId) { + throw new Error('message carries no analysisId metadata'); + } + if (typeof message.data !== 'string') { + throw new Error('message carries no ciphertext payload'); + } + + const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id); + const plaintext = await this.deps.crypto.open(message.data, senderPublicKey); + const payload = JSON.parse(this.decoder.decode(plaintext)); + + await this.deps.delivery.deliver(analysisId, payload); + } + + /** Map a sender client id to its node public key via the analysis participant set. */ + protected async resolveSenderPublicKey(analysisId: string, senderId: string): Promise { + const participants = await this.deps.resolver.resolve(analysisId); + const sender = participants.find((participant) => participant.clientId === senderId); + if (!sender) { + throw new Error(`sender '${senderId}' is not a participant of analysis '${analysisId}'`); + } + + return sender.publicKey; + } + + protected sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } +} diff --git a/apps/node-message-broker/src/core/inbound/types.ts b/apps/node-message-broker/src/core/inbound/types.ts new file mode 100644 index 0000000..1baccaf --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/types.ts @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Logger } from '@privateaim/server-kit'; +import type { IParticipantResolver } from '../analysis/index.ts'; +import type { ICryptoService } from '../crypto/index.ts'; +import type { IDeliveryService } from '../delivery/index.ts'; +import type { IHubClient } from '../hub/index.ts'; + +/** The ports the inbound delivery loop fans in from. */ +export type InboundDeliveryDeps = { + hub: IHubClient, + crypto: ICryptoService, + resolver: IParticipantResolver, + delivery: IDeliveryService, + logger?: Logger +}; + +/** Tunables for the inbound delivery loop. */ +export type InboundProcessorOptions = { + /** maximum messages requested per pull (default 50) */ + pullLimit?: number, + /** long-poll budget in ms for the fallback loop's pull (default 20000) */ + waitMs?: number, + /** backoff in ms after a failed pull before the fallback loop retries (default 1000) */ + errorBackoffMs?: number +}; diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts b/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts new file mode 100644 index 0000000..ddf5dfd --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { ICryptoService } from '../../../../src/core/crypto/index.ts'; + +/** + * In-memory `ICryptoService` for inbound tests. `open` records each call and returns the + * plaintext bytes mapped from the ciphertext payload (defaulting to echoing the payload), so + * a test seals nothing real yet controls exactly what each frame decrypts to. Payloads in + * {@link undecryptable} reject, modelling a poison frame. + */ +export class FakeInboundCryptoService implements ICryptoService { + openCalls: { payload: string, senderPublicKey: string }[] = []; + + plaintextByPayload = new Map(); + + undecryptable = new Set(); + + seal = async (): Promise => ''; + + open = async (payload: string, senderPublicKey: string): Promise => { + this.openCalls.push({ payload, senderPublicKey }); + + if (this.undecryptable.has(payload)) { + throw new Error('decryption failed'); + } + + const plaintext = this.plaintextByPayload.get(payload) ?? payload; + return new TextEncoder().encode(plaintext); + }; +} diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts b/apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts new file mode 100644 index 0000000..29dda51 --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { IDeliveryService, WebhookSubscription } from '../../../../src/core/delivery/index.ts'; + +/** + * In-memory `IDeliveryService` recording every decrypted delivery. Analyses in + * {@link failAnalyses} make `deliver` reject, modelling a downstream webhook failure. + */ +export class FakeDeliveryService implements IDeliveryService { + delivered: { analysisId: string, message: unknown }[] = []; + + failAnalyses = new Set(); + + register = async (): Promise => {}; + + unregister = async (): Promise => {}; + + list = async (): Promise => []; + + deliver = async (analysisId: string, message: unknown): Promise => { + if (this.failAnalyses.has(analysisId)) { + throw new Error('delivery failed'); + } + + this.delivered.push({ analysisId, message }); + }; +} diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts b/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts new file mode 100644 index 0000000..2180399 --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { + Message, + MessageAckRequest, + MessageParty, + MessagePullQuery, + MessagePullResponse, +} from '@privateaim/messenger-kit'; +import type { IHubClient } from '../../../../src/core/hub/index.ts'; + +/** + * In-memory `IHubClient` for the inbound loop. A no-wait `pull` (the backlog drain) returns + * the next seeded batch; a long-poll `pull({ wait })` (the fallback loop) parks until + * {@link releaseParked} or the processor's stop barrier unblocks it. Records acks and exposes + * wakeup emission so tests can drive the wakeup → drain path. + */ +export class FakeInboundHubClient implements IHubClient { + /** batches handed out by successive no-wait pulls; empty once exhausted */ + pullBatches: Message[][] = []; + + /** ids passed to every `ack` call */ + acked: string[][] = []; + + private wakeupListeners: ((recipient: MessageParty) => void)[] = []; + + private parkedResolvers: ((value: MessagePullResponse) => void)[] = []; + + send = async (): Promise => []; + + pull = async (query?: MessagePullQuery): Promise => { + if (query?.wait != null) { + return new Promise((resolve) => { + this.parkedResolvers.push(resolve); + }); + } + + return { messages: this.pullBatches.shift() ?? [] }; + }; + + ack = async (input: MessageAckRequest): Promise => { + this.acked.push(input.ids); + }; + + onWakeup = (listener: (recipient: MessageParty) => void): (() => void) => { + this.wakeupListeners.push(listener); + return () => { + this.wakeupListeners = this.wakeupListeners.filter((entry) => entry !== listener); + }; + }; + + start = async (): Promise => {}; + + stop = async (): Promise => {}; + + /** test helper: deliver a wakeup to every current listener */ + emitWakeup(recipient: MessageParty): void { + for (const listener of [...this.wakeupListeners]) { + listener(recipient); + } + } + + get wakeupListenerCount(): number { + return this.wakeupListeners.length; + } + + /** test helper: resolve any parked long-poll pulls with an empty batch */ + releaseParked(): void { + const resolvers = this.parkedResolvers; + this.parkedResolvers = []; + for (const resolve of resolvers) { + resolve({ messages: [] }); + } + } +} diff --git a/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts new file mode 100644 index 0000000..9a2280b --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Message } from '@privateaim/messenger-kit'; +import { describe, expect, it } from 'vitest'; +import type { AnalysisParticipant } from '../../../../src/core/analysis/index.ts'; +import { InboundDeliveryProcessor } from '../../../../src/core/inbound/index.ts'; +import { FakeParticipantResolver } from '../messaging/fake-participant-resolver.ts'; +import { FakeInboundCryptoService } from './fake-crypto-service.ts'; +import { FakeDeliveryService } from './fake-delivery-service.ts'; +import { FakeInboundHubClient } from './fake-hub.ts'; + +const SELF: AnalysisParticipant = { + nodeId: 'node-self', + nodeType: 'default', + clientId: 'client-self', + publicKey: 'pk-self', +}; +const SENDER: AnalysisParticipant = { + nodeId: 'node-b', + nodeType: 'default', + clientId: 'client-b', + publicKey: 'pk-b', +}; + +/** The decrypted message object delivered to the webhook — the SDK's own envelope rides in `meta`. */ +const MESSAGE_BODY = { + greeting: 'hi', + meta: { + id: 'm-1', + sender: 'node-b', + }, +}; + +function inboundMessage(overrides: Partial = {}): Message { + return { + id: 'msg-1', + sender_type: 'client', + sender_id: 'client-b', + recipient_type: 'client', + recipient_id: 'client-self', + data: 'cipher-1', + metadata: { analysisId: 'a1' }, + created_at: '2026-01-01T00:00:00.000Z', + ...overrides, + }; +} + +function setup() { + const hub = new FakeInboundHubClient(); + const crypto = new FakeInboundCryptoService(); + const resolver = new FakeParticipantResolver(); + const delivery = new FakeDeliveryService(); + + resolver.participantsByAnalysis.set('a1', [SELF, SENDER]); + crypto.plaintextByPayload.set('cipher-1', JSON.stringify(MESSAGE_BODY)); + + const processor = new InboundDeliveryProcessor( + { + hub, + crypto, + resolver, + delivery, + }, + { + waitMs: 50, + errorBackoffMs: 5, + }, + ); + + return { + hub, + crypto, + resolver, + delivery, + processor, + }; +} + +describe('core/inbound/processor', () => { + it('resolves the sender key, decrypts, delivers, and acks a message', async () => { + const { + crypto, + delivery, + hub, + processor, + } = setup(); + + const acked = await processor.processBatch([inboundMessage()]); + + expect(acked).toEqual(['msg-1']); + expect(crypto.openCalls).toEqual([{ payload: 'cipher-1', senderPublicKey: 'pk-b' }]); + expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); + expect(hub.acked).toEqual([['msg-1']]); + }); + + it('isolates a decrypt failure and still delivers and acks the rest of the batch', async () => { + const { + crypto, + delivery, + hub, + processor, + } = setup(); + crypto.undecryptable.add('cipher-bad'); + crypto.plaintextByPayload.set('cipher-2', JSON.stringify({ ok: true })); + + const acked = await processor.processBatch([ + inboundMessage({ id: 'bad', data: 'cipher-bad' }), + inboundMessage({ id: 'good', data: 'cipher-2' }), + ]); + + expect(acked).toEqual(['good']); + expect(delivery.delivered.map((entry) => entry.message)).toEqual([{ ok: true }]); + expect(hub.acked).toEqual([['good']]); + }); + + it('does not ack a message whose local delivery fails (left for redelivery)', async () => { + const { + delivery, + hub, + processor, + } = setup(); + delivery.failAnalyses.add('a1'); + + const acked = await processor.processBatch([inboundMessage()]); + + expect(acked).toEqual([]); + expect(hub.acked).toEqual([]); + }); + + it('skips messages without an analysisId or ciphertext payload', async () => { + const { + delivery, + hub, + processor, + } = setup(); + + const acked = await processor.processBatch([ + inboundMessage({ id: 'no-analysis', metadata: null }), + inboundMessage({ id: 'no-cipher', data: null }), + ]); + + expect(acked).toEqual([]); + expect(delivery.delivered).toEqual([]); + expect(hub.acked).toEqual([]); + }); + + it('skips a message from an unknown sender', async () => { + const { delivery, processor } = setup(); + + const acked = await processor.processBatch([inboundMessage({ sender_id: 'client-stranger' })]); + + expect(acked).toEqual([]); + expect(delivery.delivered).toEqual([]); + }); + + it('drains the backlog on a wakeup, then unsubscribes on stop', async () => { + const { + hub, + delivery, + processor, + } = setup(); + hub.pullBatches.push([inboundMessage()]); + + processor.start(); + expect(hub.wakeupListenerCount).toBe(1); + + hub.emitWakeup({ type: 'client', id: 'client-self' }); + await processor.whenIdle(); + + expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); + expect(hub.acked).toEqual([['msg-1']]); + + await processor.stop(); + expect(hub.wakeupListenerCount).toBe(0); + hub.releaseParked(); + }); +}); From 35910b5174fd03ef3ce6e599a8b2ca60b56a8231 Mon Sep 17 00:00:00 2001 From: tada5hi Date: Fri, 26 Jun 2026 16:59:22 +0200 Subject: [PATCH 2/2] fix(node-message-broker): bind analysisId into the message AEAD Pass `analysisId` as the HKDF `info` to both `crypto.seal` (send) and `crypto.open` (receive). The analysis is now authenticated by the key derivation, so a `metadata.analysisId` relabelled in transit by the untrusted Hub (or a replay) fails to decrypt instead of being delivered to the wrong analysis's webhooks. Also harden the inbound tests per review: the fake Hub now returns queued messages before parking a long-poll (matching the Hub contract), and the suite gains a no-wakeup case proving the `pull({ wait })` fallback delivers + acks a backlog. Seal and open assertions cover the new HKDF binding. --- .../src/core/inbound/processor.ts | 5 ++- .../src/core/messaging/dispatch.ts | 6 ++- .../unit/core/inbound/fake-crypto-service.ts | 24 ++++++++---- .../test/unit/core/inbound/fake-hub.ts | 9 ++++- .../test/unit/core/inbound/processor.spec.ts | 38 +++++++++++++++++-- .../test/unit/core/messaging/dispatch.spec.ts | 2 + .../core/messaging/fake-crypto-service.ts | 14 +++++-- 7 files changed, 82 insertions(+), 16 deletions(-) diff --git a/apps/node-message-broker/src/core/inbound/processor.ts b/apps/node-message-broker/src/core/inbound/processor.ts index cedec9f..2ad36e6 100644 --- a/apps/node-message-broker/src/core/inbound/processor.ts +++ b/apps/node-message-broker/src/core/inbound/processor.ts @@ -202,8 +202,11 @@ export class InboundDeliveryProcessor { throw new Error('message carries no ciphertext payload'); } + // `analysisId` is bound into the key derivation (HKDF info, matching the seal path), + // so a `metadata.analysisId` relabelled in transit by the untrusted Hub fails to + // decrypt here rather than being mis-routed to another analysis's webhooks. const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id); - const plaintext = await this.deps.crypto.open(message.data, senderPublicKey); + const plaintext = await this.deps.crypto.open(message.data, senderPublicKey, analysisId); const payload = JSON.parse(this.decoder.decode(plaintext)); await this.deps.delivery.deliver(analysisId, payload); diff --git a/apps/node-message-broker/src/core/messaging/dispatch.ts b/apps/node-message-broker/src/core/messaging/dispatch.ts index e8c4bff..b08e65e 100644 --- a/apps/node-message-broker/src/core/messaging/dispatch.ts +++ b/apps/node-message-broker/src/core/messaging/dispatch.ts @@ -56,7 +56,11 @@ async function sendSealed( data: MessageSealInput, ): Promise { const sent = await Promise.all(recipients.map(async (recipient) => { - const sealed = await deps.crypto.seal(data, recipient.publicKey); + // Bind the analysis into the key derivation (HKDF info) so the recipient's + // `open` only succeeds when the relayed `metadata.analysisId` is unchanged — a + // relabel by the untrusted Hub (or a replay) fails to decrypt instead of being + // delivered to the wrong analysis's webhooks. + const sealed = await deps.crypto.seal(data, recipient.publicKey, analysisId); return deps.hub.send({ recipients: [{ type: MessagePartyKind.CLIENT, id: recipient.clientId }], data: sealed, diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts b/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts index ddf5dfd..4bbd33d 100644 --- a/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts +++ b/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts @@ -5,16 +5,22 @@ * view the LICENSE file that was distributed with this source code. */ +import type { MessageSealInput } from '@privateaim/kit'; import type { ICryptoService } from '../../../../src/core/crypto/index.ts'; /** - * In-memory `ICryptoService` for inbound tests. `open` records each call and returns the - * plaintext bytes mapped from the ciphertext payload (defaulting to echoing the payload), so - * a test seals nothing real yet controls exactly what each frame decrypts to. Payloads in - * {@link undecryptable} reject, modelling a poison frame. + * In-memory `ICryptoService` for inbound tests. `open` records each call (incl. the HKDF + * `info` binding) and returns the plaintext bytes mapped from the ciphertext payload + * (defaulting to echoing the payload), so a test seals nothing real yet controls exactly + * what each frame decrypts to. Payloads in {@link undecryptable} reject, modelling a poison + * frame. */ export class FakeInboundCryptoService implements ICryptoService { - openCalls: { payload: string, senderPublicKey: string }[] = []; + openCalls: { + payload: string, + senderPublicKey: string, + info?: MessageSealInput + }[] = []; plaintextByPayload = new Map(); @@ -22,8 +28,12 @@ export class FakeInboundCryptoService implements ICryptoService { seal = async (): Promise => ''; - open = async (payload: string, senderPublicKey: string): Promise => { - this.openCalls.push({ payload, senderPublicKey }); + open = async (payload: string, senderPublicKey: string, info?: MessageSealInput): Promise => { + this.openCalls.push({ + payload, + senderPublicKey, + info, + }); if (this.undecryptable.has(payload)) { throw new Error('decryption failed'); diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts b/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts index 2180399..4471c0a 100644 --- a/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts +++ b/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts @@ -34,13 +34,20 @@ export class FakeInboundHubClient implements IHubClient { send = async (): Promise => []; pull = async (query?: MessagePullQuery): Promise => { + // Hub contract: return pending messages immediately; only a long-poll with an empty + // mailbox parks until a message arrives (here: until released / stopped). + const nextBatch = this.pullBatches.shift(); + if (nextBatch) { + return { messages: nextBatch }; + } + if (query?.wait != null) { return new Promise((resolve) => { this.parkedResolvers.push(resolve); }); } - return { messages: this.pullBatches.shift() ?? [] }; + return { messages: [] }; }; ack = async (input: MessageAckRequest): Promise => { diff --git a/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts index 9a2280b..3f509a1 100644 --- a/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts +++ b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts @@ -6,7 +6,12 @@ */ import type { Message } from '@privateaim/messenger-kit'; -import { describe, expect, it } from 'vitest'; +import { + describe, + expect, + it, + vi, +} from 'vitest'; import type { AnalysisParticipant } from '../../../../src/core/analysis/index.ts'; import { InboundDeliveryProcessor } from '../../../../src/core/inbound/index.ts'; import { FakeParticipantResolver } from '../messaging/fake-participant-resolver.ts'; @@ -93,7 +98,12 @@ describe('core/inbound/processor', () => { const acked = await processor.processBatch([inboundMessage()]); expect(acked).toEqual(['msg-1']); - expect(crypto.openCalls).toEqual([{ payload: 'cipher-1', senderPublicKey: 'pk-b' }]); + // the analysis is bound into the open's key derivation (HKDF info) + expect(crypto.openCalls).toEqual([{ + payload: 'cipher-1', + senderPublicKey: 'pk-b', + info: 'a1', + }]); expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); expect(hub.acked).toEqual([['msg-1']]); }); @@ -164,11 +174,13 @@ describe('core/inbound/processor', () => { delivery, processor, } = setup(); - hub.pullBatches.push([inboundMessage()]); + // start with an empty mailbox so the fallback loop parks; the wakeup-triggered drain + // is what picks up the message seeded afterwards. processor.start(); expect(hub.wakeupListenerCount).toBe(1); + hub.pullBatches.push([inboundMessage()]); hub.emitWakeup({ type: 'client', id: 'client-self' }); await processor.whenIdle(); @@ -179,4 +191,24 @@ describe('core/inbound/processor', () => { expect(hub.wakeupListenerCount).toBe(0); hub.releaseParked(); }); + + it('delivers and acks a backlog via the long-poll fallback, without any wakeup', async () => { + const { + hub, + delivery, + processor, + } = setup(); + hub.pullBatches.push([inboundMessage()]); + + // no emitWakeup — the fallback loop's pull({ wait }) must catch the pending message + processor.start(); + + await vi.waitFor(() => { + expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); + }); + expect(hub.acked).toEqual([['msg-1']]); + + await processor.stop(); + hub.releaseParked(); + }); }); diff --git a/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts b/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts index c8bf401..cf954ca 100644 --- a/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts +++ b/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts @@ -74,6 +74,8 @@ describe('core/messaging/dispatch', () => { expect(ids).toHaveLength(2); expect(hub.sends).toHaveLength(2); expect(crypto.sealCalls.map((call) => call.recipientPublicKey).sort()).toEqual(['pk-b', 'pk-c']); + // the analysis is bound into each seal's key derivation (HKDF info) + expect(crypto.sealCalls.every((call) => call.info === 'a1')).toBe(true); const sends = byRecipientClient(hub.sends); expect(sends.get('client-b')).toMatchObject({ diff --git a/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts b/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts index 5262eef..c8c31a2 100644 --- a/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts +++ b/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts @@ -14,10 +14,18 @@ import type { ICryptoService } from '../../../../src/core/crypto/index.ts'; * under the correct recipient key without real crypto. */ export class FakeCryptoService implements ICryptoService { - sealCalls: { data: MessageSealInput, recipientPublicKey: string }[] = []; + sealCalls: { + data: MessageSealInput, + recipientPublicKey: string, + info?: MessageSealInput + }[] = []; - seal = async (data: MessageSealInput, recipientPublicKey: string): Promise => { - this.sealCalls.push({ data, recipientPublicKey }); + seal = async (data: MessageSealInput, recipientPublicKey: string, info?: MessageSealInput): Promise => { + this.sealCalls.push({ + data, + recipientPublicKey, + info, + }); return `sealed:${recipientPublicKey}`; };