diff --git a/apps/node-message-broker/src/app/builder.ts b/apps/node-message-broker/src/app/builder.ts index a362937..61033c5 100644 --- a/apps/node-message-broker/src/app/builder.ts +++ b/apps/node-message-broker/src/app/builder.ts @@ -11,6 +11,7 @@ import { ConfigModule } from './modules/config/index.ts'; import { ComponentsModule } from './modules/components/index.ts'; import { CoreClientModule } from './modules/core-client/index.ts'; import { HTTPModule } from './modules/http/index.ts'; +import { InboundModule } from './modules/inbound/index.ts'; export class ServerMessageBrokerApplicationBuilder extends BaseApplicationBuilder { withConfig(instance?: ConfigModule | false): this { @@ -25,6 +26,10 @@ export class ServerMessageBrokerApplicationBuilder extends BaseApplicationBuilde return this.addModuleSlot('coreClient', instance, () => new CoreClientModule()); } + withInbound(instance?: InboundModule | false): this { + return this.addModuleSlot('inbound', instance, () => new InboundModule()); + } + withHTTP(instance?: HTTPModule | false): this { return this.addModuleSlot('http', instance, () => new HTTPModule()); } diff --git a/apps/node-message-broker/src/app/factory.ts b/apps/node-message-broker/src/app/factory.ts index bdf22ca..2ef48af 100644 --- a/apps/node-message-broker/src/app/factory.ts +++ b/apps/node-message-broker/src/app/factory.ts @@ -19,6 +19,7 @@ export function createApplication(): Application { }) .withComponents() .withCoreClient() + .withInbound() .withAuthupHook() .withAuthupClient() .withHTTP(); diff --git a/apps/node-message-broker/src/app/modules/inbound/index.ts b/apps/node-message-broker/src/app/modules/inbound/index.ts new file mode 100644 index 0000000..954fdb1 --- /dev/null +++ b/apps/node-message-broker/src/app/modules/inbound/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './module.ts'; diff --git a/apps/node-message-broker/src/app/modules/inbound/module.ts b/apps/node-message-broker/src/app/modules/inbound/module.ts new file mode 100644 index 0000000..a9a2b16 --- /dev/null +++ b/apps/node-message-broker/src/app/modules/inbound/module.ts @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { IContainer } from 'eldin'; +import type { IModule } from 'orkos'; +import { EnvironmentName, LoggerInjectionKey } from '@privateaim/server-kit'; +import { InboundDeliveryProcessor } from '../../../core/inbound/index.ts'; +import { ComponentsInjectionKey } from '../components/constants.ts'; +import { ConfigInjectionKey } from '../config/constants.ts'; +import { CoreClientInjectionKey } from '../core-client/constants.ts'; + +/** + * Starts the inbound delivery loop (Plan 013 Track B, Phase 4): wakeup/long-poll → pull → + * decrypt → fan out to the analysis webhooks. Depends on `components` (Hub link, crypto, + * delivery) and `coreClient` (participant resolver, for the sender's node key). The loop is + * skipped under the test environment, mirroring how the Hub stream is left unopened there. + */ +export class InboundModule implements IModule { + readonly name = 'inbound'; + + readonly dependencies: string[] = ['config', 'components', 'coreClient']; + + private processor: InboundDeliveryProcessor | undefined; + + async setup(container: IContainer): Promise { + const config = container.resolve(ConfigInjectionKey); + + const loggerResult = container.tryResolve(LoggerInjectionKey); + const logger = loggerResult.success ? loggerResult.data : undefined; + + const processor = new InboundDeliveryProcessor({ + hub: container.resolve(ComponentsInjectionKey.HubClient), + crypto: container.resolve(ComponentsInjectionKey.Crypto), + delivery: container.resolve(ComponentsInjectionKey.Delivery), + resolver: container.resolve(CoreClientInjectionKey.ParticipantResolver), + logger, + }); + + // tests don't reach a live Hub; skip pulling and the reconnecting wakeup stream. + if (config.env !== EnvironmentName.TEST) { + processor.start(); + } + + this.processor = processor; + } + + async teardown(): Promise { + if (this.processor) { + await this.processor.stop(); + this.processor = undefined; + } + } +} diff --git a/apps/node-message-broker/src/core/inbound/index.ts b/apps/node-message-broker/src/core/inbound/index.ts new file mode 100644 index 0000000..fe48961 --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './types.ts'; +export * from './processor.ts'; diff --git a/apps/node-message-broker/src/core/inbound/processor.ts b/apps/node-message-broker/src/core/inbound/processor.ts new file mode 100644 index 0000000..2ad36e6 --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/processor.ts @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Message, MessagePullResponse } from '@privateaim/messenger-kit'; +import type { InboundDeliveryDeps, InboundProcessorOptions } from './types.ts'; + +const DEFAULT_PULL_LIMIT = 50; +const DEFAULT_WAIT_MS = 20_000; +const DEFAULT_ERROR_BACKOFF_MS = 1_000; + +/** + * Drives the inbound side of the broker: pull this node's pending ciphertext from the Hub, + * resolve each message's **sender** node key, decrypt it (S1), and fan it out to the + * analysis's local webhooks (S6). Two pull triggers feed the same pipeline: + * + * - **wakeup** — a payload-free `messagePending` SSE signal triggers an immediate backlog + * drain (coalesced single-flight, so a burst of signals collapses into one drain); + * - **long-poll fallback** — a background loop parks on `pull({ wait })` to catch anything a + * missed wakeup would otherwise leave sitting until the next signal. + * + * Delivery is delete-on-ack at-least-once: only successfully delivered messages are acked, + * so a transient failure is retried on the next pull. A per-message failure (no + * `analysisId`, unknown sender, decrypt/parse error) is isolated — logged and skipped, never + * fatal to the batch — and left unacked for redelivery. The receiving SDK dedupes by its own + * `meta.id`, so an occasional duplicate from concurrent triggers is harmless. + */ +export class InboundDeliveryProcessor { + protected deps: InboundDeliveryDeps; + + protected pullLimit: number; + + protected waitMs: number; + + protected errorBackoffMs: number; + + protected running = false; + + protected unsubscribe: (() => void) | undefined; + + protected fallbackLoop: Promise | undefined; + + protected wakeupInFlight: Promise | undefined; + + protected wakeupPending = false; + + protected resolveStop: (() => void) | undefined; + + private readonly decoder = new TextDecoder(); + + constructor(deps: InboundDeliveryDeps, options: InboundProcessorOptions = {}) { + this.deps = deps; + this.pullLimit = options.pullLimit ?? DEFAULT_PULL_LIMIT; + this.waitMs = options.waitMs ?? DEFAULT_WAIT_MS; + this.errorBackoffMs = options.errorBackoffMs ?? DEFAULT_ERROR_BACKOFF_MS; + } + + /** Subscribe to Hub wakeups and start the long-poll fallback loop. Idempotent. */ + start(): void { + if (this.running) { + return; + } + + this.running = true; + this.unsubscribe = this.deps.hub.onWakeup(() => this.onWakeupSignal()); + this.fallbackLoop = this.runFallbackLoop(); + } + + /** Unsubscribe, unblock the long-poll, and await both triggers settling. Idempotent. */ + async stop(): Promise { + if (!this.running) { + return; + } + + this.running = false; + + this.unsubscribe?.(); + this.unsubscribe = undefined; + + // release the fallback loop if it is parked on a long-poll pull + this.resolveStop?.(); + + const inFlight = this.wakeupInFlight; + const loop = this.fallbackLoop; + this.fallbackLoop = undefined; + + await inFlight?.catch(() => undefined); + await loop?.catch(() => undefined); + } + + /** Resolve once the current wakeup-triggered drain has settled (diagnostic / test aid). */ + async whenIdle(): Promise { + await this.wakeupInFlight?.catch(() => undefined); + } + + /** + * Decrypt and locally deliver a batch, returning the ids of the messages that were + * delivered (and acked). Per-message failures are isolated; the ack covers only the + * delivered subset so the rest are redelivered on a later pull. + */ + async processBatch(messages: Message[]): Promise { + const ackIds: string[] = []; + + for (const message of messages) { + try { + await this.deliverMessage(message); + ackIds.push(message.id); + } catch (error) { + this.deps.logger?.warn(`Inbound message ${message.id} skipped: ${(error as Error).message}`); + } + } + + if (ackIds.length > 0) { + try { + await this.deps.hub.ack({ ids: ackIds }); + } catch (error) { + this.deps.logger?.error(`Failed to ack inbound messages ${ackIds.join(', ')}: ${(error as Error).message}`); + } + } + + return ackIds; + } + + /** Drain the backlog: pull (no wait) and process repeatedly until the mailbox is empty. */ + protected async drainBacklog(): Promise { + while (this.running) { + const { messages } = await this.deps.hub.pull({ limit: this.pullLimit }); + if (messages.length === 0) { + return; + } + + await this.processBatch(messages); + + if (messages.length < this.pullLimit) { + return; + } + } + } + + /** Coalesce wakeup signals into a single in-flight backlog drain. */ + protected onWakeupSignal(): void { + if (this.wakeupInFlight) { + this.wakeupPending = true; + return; + } + + this.wakeupInFlight = this.drainBacklog() + .catch((error) => { + this.deps.logger?.warn(`Inbound wakeup drain failed: ${(error as Error).message}`); + }) + .finally(() => { + this.wakeupInFlight = undefined; + if (this.wakeupPending && this.running) { + this.wakeupPending = false; + this.onWakeupSignal(); + } + }); + } + + /** Long-poll the Hub as a fallback for missed wakeups until {@link stop}. */ + protected async runFallbackLoop(): Promise { + const stopped = new Promise((resolve) => { + this.resolveStop = resolve; + }); + + while (this.running) { + let result: MessagePullResponse | undefined; + try { + result = await Promise.race([ + this.deps.hub.pull({ limit: this.pullLimit, wait: this.waitMs }), + stopped.then(() => undefined), + ]); + } catch (error) { + if (!this.running) { + return; + } + this.deps.logger?.warn(`Inbound long-poll failed: ${(error as Error).message}`); + await Promise.race([this.sleep(this.errorBackoffMs), stopped]); + continue; + } + + if (!this.running || !result) { + return; + } + + if (result.messages.length > 0) { + await this.processBatch(result.messages); + } + } + } + + /** Decrypt one inbound message and deliver it to the analysis's webhooks. */ + protected async deliverMessage(message: Message): Promise { + const analysisId = message.metadata?.analysisId; + if (!analysisId) { + throw new Error('message carries no analysisId metadata'); + } + if (typeof message.data !== 'string') { + throw new Error('message carries no ciphertext payload'); + } + + // `analysisId` is bound into the key derivation (HKDF info, matching the seal path), + // so a `metadata.analysisId` relabelled in transit by the untrusted Hub fails to + // decrypt here rather than being mis-routed to another analysis's webhooks. + const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id); + const plaintext = await this.deps.crypto.open(message.data, senderPublicKey, analysisId); + const payload = JSON.parse(this.decoder.decode(plaintext)); + + await this.deps.delivery.deliver(analysisId, payload); + } + + /** Map a sender client id to its node public key via the analysis participant set. */ + protected async resolveSenderPublicKey(analysisId: string, senderId: string): Promise { + const participants = await this.deps.resolver.resolve(analysisId); + const sender = participants.find((participant) => participant.clientId === senderId); + if (!sender) { + throw new Error(`sender '${senderId}' is not a participant of analysis '${analysisId}'`); + } + + return sender.publicKey; + } + + protected sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } +} diff --git a/apps/node-message-broker/src/core/inbound/types.ts b/apps/node-message-broker/src/core/inbound/types.ts new file mode 100644 index 0000000..1baccaf --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/types.ts @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Logger } from '@privateaim/server-kit'; +import type { IParticipantResolver } from '../analysis/index.ts'; +import type { ICryptoService } from '../crypto/index.ts'; +import type { IDeliveryService } from '../delivery/index.ts'; +import type { IHubClient } from '../hub/index.ts'; + +/** The ports the inbound delivery loop fans in from. */ +export type InboundDeliveryDeps = { + hub: IHubClient, + crypto: ICryptoService, + resolver: IParticipantResolver, + delivery: IDeliveryService, + logger?: Logger +}; + +/** Tunables for the inbound delivery loop. */ +export type InboundProcessorOptions = { + /** maximum messages requested per pull (default 50) */ + pullLimit?: number, + /** long-poll budget in ms for the fallback loop's pull (default 20000) */ + waitMs?: number, + /** backoff in ms after a failed pull before the fallback loop retries (default 1000) */ + errorBackoffMs?: number +}; diff --git a/apps/node-message-broker/src/core/messaging/dispatch.ts b/apps/node-message-broker/src/core/messaging/dispatch.ts index e8c4bff..b08e65e 100644 --- a/apps/node-message-broker/src/core/messaging/dispatch.ts +++ b/apps/node-message-broker/src/core/messaging/dispatch.ts @@ -56,7 +56,11 @@ async function sendSealed( data: MessageSealInput, ): Promise { const sent = await Promise.all(recipients.map(async (recipient) => { - const sealed = await deps.crypto.seal(data, recipient.publicKey); + // Bind the analysis into the key derivation (HKDF info) so the recipient's + // `open` only succeeds when the relayed `metadata.analysisId` is unchanged — a + // relabel by the untrusted Hub (or a replay) fails to decrypt instead of being + // delivered to the wrong analysis's webhooks. + const sealed = await deps.crypto.seal(data, recipient.publicKey, analysisId); return deps.hub.send({ recipients: [{ type: MessagePartyKind.CLIENT, id: recipient.clientId }], data: sealed, diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts b/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts new file mode 100644 index 0000000..4bbd33d --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/fake-crypto-service.ts @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { MessageSealInput } from '@privateaim/kit'; +import type { ICryptoService } from '../../../../src/core/crypto/index.ts'; + +/** + * In-memory `ICryptoService` for inbound tests. `open` records each call (incl. the HKDF + * `info` binding) and returns the plaintext bytes mapped from the ciphertext payload + * (defaulting to echoing the payload), so a test seals nothing real yet controls exactly + * what each frame decrypts to. Payloads in {@link undecryptable} reject, modelling a poison + * frame. + */ +export class FakeInboundCryptoService implements ICryptoService { + openCalls: { + payload: string, + senderPublicKey: string, + info?: MessageSealInput + }[] = []; + + plaintextByPayload = new Map(); + + undecryptable = new Set(); + + seal = async (): Promise => ''; + + open = async (payload: string, senderPublicKey: string, info?: MessageSealInput): Promise => { + this.openCalls.push({ + payload, + senderPublicKey, + info, + }); + + if (this.undecryptable.has(payload)) { + throw new Error('decryption failed'); + } + + const plaintext = this.plaintextByPayload.get(payload) ?? payload; + return new TextEncoder().encode(plaintext); + }; +} diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts b/apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts new file mode 100644 index 0000000..29dda51 --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/fake-delivery-service.ts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { IDeliveryService, WebhookSubscription } from '../../../../src/core/delivery/index.ts'; + +/** + * In-memory `IDeliveryService` recording every decrypted delivery. Analyses in + * {@link failAnalyses} make `deliver` reject, modelling a downstream webhook failure. + */ +export class FakeDeliveryService implements IDeliveryService { + delivered: { analysisId: string, message: unknown }[] = []; + + failAnalyses = new Set(); + + register = async (): Promise => {}; + + unregister = async (): Promise => {}; + + list = async (): Promise => []; + + deliver = async (analysisId: string, message: unknown): Promise => { + if (this.failAnalyses.has(analysisId)) { + throw new Error('delivery failed'); + } + + this.delivered.push({ analysisId, message }); + }; +} diff --git a/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts b/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts new file mode 100644 index 0000000..4471c0a --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/fake-hub.ts @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { + Message, + MessageAckRequest, + MessageParty, + MessagePullQuery, + MessagePullResponse, +} from '@privateaim/messenger-kit'; +import type { IHubClient } from '../../../../src/core/hub/index.ts'; + +/** + * In-memory `IHubClient` for the inbound loop. A no-wait `pull` (the backlog drain) returns + * the next seeded batch; a long-poll `pull({ wait })` (the fallback loop) parks until + * {@link releaseParked} or the processor's stop barrier unblocks it. Records acks and exposes + * wakeup emission so tests can drive the wakeup → drain path. + */ +export class FakeInboundHubClient implements IHubClient { + /** batches handed out by successive no-wait pulls; empty once exhausted */ + pullBatches: Message[][] = []; + + /** ids passed to every `ack` call */ + acked: string[][] = []; + + private wakeupListeners: ((recipient: MessageParty) => void)[] = []; + + private parkedResolvers: ((value: MessagePullResponse) => void)[] = []; + + send = async (): Promise => []; + + pull = async (query?: MessagePullQuery): Promise => { + // Hub contract: return pending messages immediately; only a long-poll with an empty + // mailbox parks until a message arrives (here: until released / stopped). + const nextBatch = this.pullBatches.shift(); + if (nextBatch) { + return { messages: nextBatch }; + } + + if (query?.wait != null) { + return new Promise((resolve) => { + this.parkedResolvers.push(resolve); + }); + } + + return { messages: [] }; + }; + + ack = async (input: MessageAckRequest): Promise => { + this.acked.push(input.ids); + }; + + onWakeup = (listener: (recipient: MessageParty) => void): (() => void) => { + this.wakeupListeners.push(listener); + return () => { + this.wakeupListeners = this.wakeupListeners.filter((entry) => entry !== listener); + }; + }; + + start = async (): Promise => {}; + + stop = async (): Promise => {}; + + /** test helper: deliver a wakeup to every current listener */ + emitWakeup(recipient: MessageParty): void { + for (const listener of [...this.wakeupListeners]) { + listener(recipient); + } + } + + get wakeupListenerCount(): number { + return this.wakeupListeners.length; + } + + /** test helper: resolve any parked long-poll pulls with an empty batch */ + releaseParked(): void { + const resolvers = this.parkedResolvers; + this.parkedResolvers = []; + for (const resolve of resolvers) { + resolve({ messages: [] }); + } + } +} diff --git a/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts new file mode 100644 index 0000000..3f509a1 --- /dev/null +++ b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Message } from '@privateaim/messenger-kit'; +import { + describe, + expect, + it, + vi, +} from 'vitest'; +import type { AnalysisParticipant } from '../../../../src/core/analysis/index.ts'; +import { InboundDeliveryProcessor } from '../../../../src/core/inbound/index.ts'; +import { FakeParticipantResolver } from '../messaging/fake-participant-resolver.ts'; +import { FakeInboundCryptoService } from './fake-crypto-service.ts'; +import { FakeDeliveryService } from './fake-delivery-service.ts'; +import { FakeInboundHubClient } from './fake-hub.ts'; + +const SELF: AnalysisParticipant = { + nodeId: 'node-self', + nodeType: 'default', + clientId: 'client-self', + publicKey: 'pk-self', +}; +const SENDER: AnalysisParticipant = { + nodeId: 'node-b', + nodeType: 'default', + clientId: 'client-b', + publicKey: 'pk-b', +}; + +/** The decrypted message object delivered to the webhook — the SDK's own envelope rides in `meta`. */ +const MESSAGE_BODY = { + greeting: 'hi', + meta: { + id: 'm-1', + sender: 'node-b', + }, +}; + +function inboundMessage(overrides: Partial = {}): Message { + return { + id: 'msg-1', + sender_type: 'client', + sender_id: 'client-b', + recipient_type: 'client', + recipient_id: 'client-self', + data: 'cipher-1', + metadata: { analysisId: 'a1' }, + created_at: '2026-01-01T00:00:00.000Z', + ...overrides, + }; +} + +function setup() { + const hub = new FakeInboundHubClient(); + const crypto = new FakeInboundCryptoService(); + const resolver = new FakeParticipantResolver(); + const delivery = new FakeDeliveryService(); + + resolver.participantsByAnalysis.set('a1', [SELF, SENDER]); + crypto.plaintextByPayload.set('cipher-1', JSON.stringify(MESSAGE_BODY)); + + const processor = new InboundDeliveryProcessor( + { + hub, + crypto, + resolver, + delivery, + }, + { + waitMs: 50, + errorBackoffMs: 5, + }, + ); + + return { + hub, + crypto, + resolver, + delivery, + processor, + }; +} + +describe('core/inbound/processor', () => { + it('resolves the sender key, decrypts, delivers, and acks a message', async () => { + const { + crypto, + delivery, + hub, + processor, + } = setup(); + + const acked = await processor.processBatch([inboundMessage()]); + + expect(acked).toEqual(['msg-1']); + // the analysis is bound into the open's key derivation (HKDF info) + expect(crypto.openCalls).toEqual([{ + payload: 'cipher-1', + senderPublicKey: 'pk-b', + info: 'a1', + }]); + expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); + expect(hub.acked).toEqual([['msg-1']]); + }); + + it('isolates a decrypt failure and still delivers and acks the rest of the batch', async () => { + const { + crypto, + delivery, + hub, + processor, + } = setup(); + crypto.undecryptable.add('cipher-bad'); + crypto.plaintextByPayload.set('cipher-2', JSON.stringify({ ok: true })); + + const acked = await processor.processBatch([ + inboundMessage({ id: 'bad', data: 'cipher-bad' }), + inboundMessage({ id: 'good', data: 'cipher-2' }), + ]); + + expect(acked).toEqual(['good']); + expect(delivery.delivered.map((entry) => entry.message)).toEqual([{ ok: true }]); + expect(hub.acked).toEqual([['good']]); + }); + + it('does not ack a message whose local delivery fails (left for redelivery)', async () => { + const { + delivery, + hub, + processor, + } = setup(); + delivery.failAnalyses.add('a1'); + + const acked = await processor.processBatch([inboundMessage()]); + + expect(acked).toEqual([]); + expect(hub.acked).toEqual([]); + }); + + it('skips messages without an analysisId or ciphertext payload', async () => { + const { + delivery, + hub, + processor, + } = setup(); + + const acked = await processor.processBatch([ + inboundMessage({ id: 'no-analysis', metadata: null }), + inboundMessage({ id: 'no-cipher', data: null }), + ]); + + expect(acked).toEqual([]); + expect(delivery.delivered).toEqual([]); + expect(hub.acked).toEqual([]); + }); + + it('skips a message from an unknown sender', async () => { + const { delivery, processor } = setup(); + + const acked = await processor.processBatch([inboundMessage({ sender_id: 'client-stranger' })]); + + expect(acked).toEqual([]); + expect(delivery.delivered).toEqual([]); + }); + + it('drains the backlog on a wakeup, then unsubscribes on stop', async () => { + const { + hub, + delivery, + processor, + } = setup(); + + // start with an empty mailbox so the fallback loop parks; the wakeup-triggered drain + // is what picks up the message seeded afterwards. + processor.start(); + expect(hub.wakeupListenerCount).toBe(1); + + hub.pullBatches.push([inboundMessage()]); + hub.emitWakeup({ type: 'client', id: 'client-self' }); + await processor.whenIdle(); + + expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); + expect(hub.acked).toEqual([['msg-1']]); + + await processor.stop(); + expect(hub.wakeupListenerCount).toBe(0); + hub.releaseParked(); + }); + + it('delivers and acks a backlog via the long-poll fallback, without any wakeup', async () => { + const { + hub, + delivery, + processor, + } = setup(); + hub.pullBatches.push([inboundMessage()]); + + // no emitWakeup — the fallback loop's pull({ wait }) must catch the pending message + processor.start(); + + await vi.waitFor(() => { + expect(delivery.delivered).toEqual([{ analysisId: 'a1', message: MESSAGE_BODY }]); + }); + expect(hub.acked).toEqual([['msg-1']]); + + await processor.stop(); + hub.releaseParked(); + }); +}); diff --git a/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts b/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts index c8bf401..cf954ca 100644 --- a/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts +++ b/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts @@ -74,6 +74,8 @@ describe('core/messaging/dispatch', () => { expect(ids).toHaveLength(2); expect(hub.sends).toHaveLength(2); expect(crypto.sealCalls.map((call) => call.recipientPublicKey).sort()).toEqual(['pk-b', 'pk-c']); + // the analysis is bound into each seal's key derivation (HKDF info) + expect(crypto.sealCalls.every((call) => call.info === 'a1')).toBe(true); const sends = byRecipientClient(hub.sends); expect(sends.get('client-b')).toMatchObject({ diff --git a/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts b/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts index 5262eef..c8c31a2 100644 --- a/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts +++ b/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts @@ -14,10 +14,18 @@ import type { ICryptoService } from '../../../../src/core/crypto/index.ts'; * under the correct recipient key without real crypto. */ export class FakeCryptoService implements ICryptoService { - sealCalls: { data: MessageSealInput, recipientPublicKey: string }[] = []; + sealCalls: { + data: MessageSealInput, + recipientPublicKey: string, + info?: MessageSealInput + }[] = []; - seal = async (data: MessageSealInput, recipientPublicKey: string): Promise => { - this.sealCalls.push({ data, recipientPublicKey }); + seal = async (data: MessageSealInput, recipientPublicKey: string, info?: MessageSealInput): Promise => { + this.sealCalls.push({ + data, + recipientPublicKey, + info, + }); return `sealed:${recipientPublicKey}`; };