diff --git a/apps/node-message-broker/src/adapters/http/controllers/messages/module.ts b/apps/node-message-broker/src/adapters/http/controllers/messages/module.ts index 7e02284..3672014 100644 --- a/apps/node-message-broker/src/adapters/http/controllers/messages/module.ts +++ b/apps/node-message-broker/src/adapters/http/controllers/messages/module.ts @@ -5,10 +5,16 @@ * view the LICENSE file that was distributed with this source code. */ -import { BadRequestError } from '@privateaim/errors'; -import { ForceLoggedInMiddleware } from '@privateaim/server-http-kit'; +import { EntityNotFoundError } from '@privateaim/errors'; +import { PermissionName } from '@privateaim/kit'; +import { + ForceLoggedInMiddleware, + useRequestIdentity, + useRequestPermissionChecker, +} from '@privateaim/server-http-kit'; import { DBody, + DContext, DController, DDelete, DGet, @@ -16,35 +22,141 @@ import { DPost, DTags, } from '@routup/decorators'; +import type { IAppEvent } from 'routup'; +import { assertClientOwnsAnalysis } from '../../../../core/analysis/index.ts'; +import type { IAnalysisClientLookup, IParticipantResolver } from '../../../../core/analysis/index.ts'; import type { IDeliveryService } from '../../../../core/delivery/index.ts'; - -type AnalysisMessageControllerContext = { - delivery: IDeliveryService -}; +import { broadcastAnalysisMessage, dispatchAnalysisMessage } from '../../../../core/messaging/index.ts'; +import type { MessageDispatchDeps } from '../../../../core/messaging/index.ts'; +import type { + AnalysisMessageControllerContext, + AnalysisMessagePayload, + WebhookSubscriptionPayload, +} from './types.ts'; +import { + AnalysisMessageValidator, + MESSAGE_SEND_GROUP, + WebhookSubscriptionValidator, +} from './validators/index.ts'; /** * Container-facing API (auth: node-local Authup JWT — the analysis presents its - * `KEYCLOAK_TOKEN`). The SDK-compatible surface is intentionally preserved. + * `KEYCLOAK_TOKEN`). The SDK-compatible surface is intentionally preserved: `recipients` + * carry **node ids**, `message` is an opaque JSON payload relayed verbatim, sends answer + * `202` with an empty body, and participant lists are bare arrays of `{ nodeId, nodeType }`. * - * Implemented: webhook-subscription CRUD (the default delivery transport). - * TODO (Plan 013 Track B, Phase 4): `POST /:id/messages`, `…/broadcast`, - * `GET /:id/participants`, `…/participants/self`, and the additive pull endpoint. + * Request bodies are validated with validup + zod (see `./validators`); a validation + * failure throws a `ValidupError` that the error middleware renders as a `400`. Every + * analysis-scoped route is additionally gated by {@link authorize}: the caller must hold the + * `ANALYSIS_SELF_MESSAGE_BROKER_USE` capability and own the analysis (the Hub stays + * analysis-agnostic). Inbound delivery is webhook-push only (no pull endpoint), managed via + * the subscription CRUD below. */ @DTags('messages') @DController('/analyses') export class AnalysisMessageController { protected delivery: IDeliveryService; + protected resolver: IParticipantResolver; + + protected analyses: IAnalysisClientLookup; + + protected dispatch: MessageDispatchDeps; + + protected messageValidator: AnalysisMessageValidator; + + protected subscriptionValidator: WebhookSubscriptionValidator; + constructor(ctx: AnalysisMessageControllerContext) { this.delivery = ctx.delivery; + this.resolver = ctx.resolver; + this.analyses = ctx.analyses; + this.dispatch = { + resolver: ctx.resolver, + crypto: ctx.crypto, + hub: ctx.hub, + }; + this.messageValidator = new AnalysisMessageValidator(); + this.subscriptionValidator = new WebhookSubscriptionValidator(); + } + + @DPost('/:id/messages', [ForceLoggedInMiddleware]) + async send( + @DPath('id') analysisId: string, + @DBody() body: Partial, + @DContext() event: IAppEvent, + ) { + await this.authorize(event, analysisId); + + const { recipients, message } = await this.messageValidator.run(body, { group: MESSAGE_SEND_GROUP }); + + await dispatchAnalysisMessage(this.dispatch, { + analysisId, + recipientNodeIds: recipients, + data: JSON.stringify(message), + }); + + event.response.status = 202; + return null; + } + + @DPost('/:id/messages/broadcast', [ForceLoggedInMiddleware]) + async broadcast( + @DPath('id') analysisId: string, + @DBody() body: Partial, + @DContext() event: IAppEvent, + ) { + await this.authorize(event, analysisId); + + const { message } = await this.messageValidator.run(body); + + await broadcastAnalysisMessage(this.dispatch, { + analysisId, + data: JSON.stringify(message), + }); + + event.response.status = 202; + return null; + } + + @DGet('/:id/participants', [ForceLoggedInMiddleware]) + async listParticipants( + @DPath('id') analysisId: string, + @DContext() event: IAppEvent, + ) { + await this.authorize(event, analysisId); + + const participants = await this.resolver.resolve(analysisId); + return participants.map((participant) => ({ + nodeId: participant.nodeId, + nodeType: participant.nodeType, + })); + } + + @DGet('/:id/participants/self', [ForceLoggedInMiddleware]) + async getSelfParticipant( + @DPath('id') analysisId: string, + @DContext() event: IAppEvent, + ) { + await this.authorize(event, analysisId); + + const self = await this.resolver.resolveSelf(analysisId); + if (!self) { + throw new EntityNotFoundError('No self participant exists for this analysis.'); + } + + return { + nodeId: self.nodeId, + nodeType: self.nodeType, + }; } @DPost('/:id/messages/subscriptions', [ForceLoggedInMiddleware]) async subscribe( @DPath('id') analysisId: string, - @DBody() data: { webhookUrl?: string }, + @DBody() body: Partial, ) { - const webhookUrl = this.requireWebhookUrl(data); + const { webhookUrl } = await this.subscriptionValidator.run(body); await this.delivery.register({ analysisId, webhookUrl }); return { analysisId, webhookUrl }; } @@ -58,17 +170,25 @@ export class AnalysisMessageController { @DDelete('/:id/messages/subscriptions', [ForceLoggedInMiddleware]) async unsubscribe( @DPath('id') analysisId: string, - @DBody() data: { webhookUrl?: string }, + @DBody() body: Partial, ) { - const webhookUrl = this.requireWebhookUrl(data); + const { webhookUrl } = await this.subscriptionValidator.run(body); await this.delivery.unregister(analysisId, webhookUrl); return { analysisId, webhookUrl }; } - protected requireWebhookUrl(data: { webhookUrl?: string }): string { - if (!data || typeof data.webhookUrl !== 'string' || data.webhookUrl.length === 0) { - throw new BadRequestError('A webhookUrl is required.'); - } - return data.webhookUrl; + /** + * Authorize an analysis-scoped request: assert the caller holds + * `ANALYSIS_SELF_MESSAGE_BROKER_USE` (capability, via the request permission checker) + * and that the caller's client owns `analysisId` (node-side analysis scope). Either + * check failing throws and aborts the request. + */ + protected async authorize(event: IAppEvent, analysisId: string): Promise { + await useRequestPermissionChecker(event) + .check({ name: PermissionName.ANALYSIS_SELF_MESSAGE_BROKER_USE }); + + const identity = useRequestIdentity(event); + const clientId = identity?.type === 'client' ? identity.id : undefined; + await assertClientOwnsAnalysis(this.analyses, analysisId, clientId); } } diff --git a/apps/node-message-broker/src/adapters/http/controllers/messages/types.ts b/apps/node-message-broker/src/adapters/http/controllers/messages/types.ts new file mode 100644 index 0000000..2cf6c21 --- /dev/null +++ b/apps/node-message-broker/src/adapters/http/controllers/messages/types.ts @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { IAnalysisClientLookup, IParticipantResolver } from '../../../../core/analysis/index.ts'; +import type { ICryptoService } from '../../../../core/crypto/index.ts'; +import type { IDeliveryService } from '../../../../core/delivery/index.ts'; +import type { IHubClient } from '../../../../core/hub/index.ts'; + +/** Dependencies injected into {@link AnalysisMessageController}. */ +export type AnalysisMessageControllerContext = { + delivery: IDeliveryService, + resolver: IParticipantResolver, + analyses: IAnalysisClientLookup, + crypto: ICryptoService, + hub: IHubClient +}; + +/** + * Validated body of `POST /analyses/:id/messages` (and, sans `recipients`, of + * `…/broadcast`). `recipients` are participant **node ids**; `message` is an opaque JSON + * payload the broker seals and relays verbatim — the SDK round-trips its own `meta` + * envelope inside it. + */ +export type AnalysisMessagePayload = { + recipients: string[], + message: unknown +}; + +/** Validated body of the webhook-subscription endpoints. */ +export type WebhookSubscriptionPayload = { + webhookUrl: string +}; diff --git a/apps/node-message-broker/src/adapters/http/controllers/messages/validators/index.ts b/apps/node-message-broker/src/adapters/http/controllers/messages/validators/index.ts new file mode 100644 index 0000000..384612a --- /dev/null +++ b/apps/node-message-broker/src/adapters/http/controllers/messages/validators/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './message.ts'; +export * from './webhook-subscription.ts'; diff --git a/apps/node-message-broker/src/adapters/http/controllers/messages/validators/message.ts b/apps/node-message-broker/src/adapters/http/controllers/messages/validators/message.ts new file mode 100644 index 0000000..8fd96d6 --- /dev/null +++ b/apps/node-message-broker/src/adapters/http/controllers/messages/validators/message.ts @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { createValidator } from '@validup/zod'; +import { Container } from 'validup'; +import zod from 'zod'; +import type { AnalysisMessagePayload } from '../types.ts'; + +/** + * Validator group that gates the `recipients` attribute to the (non-broadcast) send path. + * Run the validator with this group for a direct send; run it without a group for a + * broadcast, which validates `message` alone. + */ +export const MESSAGE_SEND_GROUP = 'send'; + +/** + * Validates an outbound analysis message body with validup + zod. `recipients` is a + * non-empty array of participant node ids, required only under {@link MESSAGE_SEND_GROUP}. + * `message` is an opaque, non-null JSON payload (object, array, or scalar) relayed verbatim; + * unknown top-level keys are stripped from the validated result. + */ +export class AnalysisMessageValidator extends Container { + protected initialize() { + super.initialize(); + + this.mount( + 'recipients', + { group: MESSAGE_SEND_GROUP }, + createValidator(zod.array(zod.string().min(1)).min(1)), + ); + + this.mount('message', createValidator(zod.union([ + zod.looseObject({}), + zod.array(zod.any()), + zod.string(), + zod.number(), + zod.boolean(), + ]))); + } +} diff --git a/apps/node-message-broker/src/adapters/http/controllers/messages/validators/webhook-subscription.ts b/apps/node-message-broker/src/adapters/http/controllers/messages/validators/webhook-subscription.ts new file mode 100644 index 0000000..58dea1b --- /dev/null +++ b/apps/node-message-broker/src/adapters/http/controllers/messages/validators/webhook-subscription.ts @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { createValidator } from '@validup/zod'; +import { Container } from 'validup'; +import zod from 'zod'; +import type { WebhookSubscriptionPayload } from '../types.ts'; + +/** Validates a webhook-subscription body — a single required, absolute `webhookUrl`. */ +export class WebhookSubscriptionValidator extends Container { + protected initialize() { + super.initialize(); + + this.mount('webhookUrl', createValidator(zod.url())); + } +} diff --git a/apps/node-message-broker/src/app/modules/components/constants.ts b/apps/node-message-broker/src/app/modules/components/constants.ts index cd6c700..2b76856 100644 --- a/apps/node-message-broker/src/app/modules/components/constants.ts +++ b/apps/node-message-broker/src/app/modules/components/constants.ts @@ -6,10 +6,12 @@ */ import { TypedToken } from 'eldin'; +import type { ICryptoService } from '../../../core/crypto/index.ts'; import type { IDeliveryService } from '../../../core/delivery/index.ts'; import type { IHubClient } from '../../../core/hub/index.ts'; export const ComponentsInjectionKey = { Delivery: new TypedToken('DeliveryService'), HubClient: new TypedToken('HubClient'), + Crypto: new TypedToken('CryptoService'), }; diff --git a/apps/node-message-broker/src/app/modules/components/module.ts b/apps/node-message-broker/src/app/modules/components/module.ts index 88c0d82..d9db3cc 100644 --- a/apps/node-message-broker/src/app/modules/components/module.ts +++ b/apps/node-message-broker/src/app/modules/components/module.ts @@ -14,6 +14,7 @@ import { createAuthupClientAuthenticationHook, createAuthupClientTokenCreator, } from '@privateaim/server-kit'; +import { CryptoService } from '../../../adapters/crypto/index.ts'; import { MemoryDeliveryService } from '../../../adapters/delivery/index.ts'; import { HubClient, SseWakeupSource } from '../../../adapters/hub/index.ts'; import { ConfigInjectionKey } from '../config/constants.ts'; @@ -46,6 +47,10 @@ export class ComponentsModule implements IModule { const delivery = new MemoryDeliveryService(); container.register(ComponentsInjectionKey.Delivery, { useValue: delivery }); + // node-to-node E2E crypto; the operator supplies the single ECDH private key. + const crypto = new CryptoService({ privateKey: config.nodePrivateKey }); + container.register(ComponentsInjectionKey.Crypto, { useValue: crypto }); + // node-client credentials authenticate every Hub interaction; this creator // backs the REST auth hook directly and the SSE Authorization header via a // caching wrapper (below). diff --git a/apps/node-message-broker/src/app/modules/http/controller.ts b/apps/node-message-broker/src/app/modules/http/controller.ts index fa82a5f..35d7b31 100644 --- a/apps/node-message-broker/src/app/modules/http/controller.ts +++ b/apps/node-message-broker/src/app/modules/http/controller.ts @@ -8,11 +8,22 @@ import type { IContainer } from 'eldin'; import { AnalysisMessageController } from '../../../adapters/http/controllers/messages/index.ts'; import { ComponentsInjectionKey } from '../components/constants.ts'; +import { CoreClientInjectionKey } from '../core-client/constants.ts'; export function createControllers(container: IContainer): Record[] { const delivery = container.resolve(ComponentsInjectionKey.Delivery); + const crypto = container.resolve(ComponentsInjectionKey.Crypto); + const hub = container.resolve(ComponentsInjectionKey.HubClient); + const resolver = container.resolve(CoreClientInjectionKey.ParticipantResolver); + const analyses = container.resolve(CoreClientInjectionKey.AnalysisClientLookup); return [ - new AnalysisMessageController({ delivery }), + new AnalysisMessageController({ + delivery, + resolver, + analyses, + crypto, + hub, + }), ]; } diff --git a/apps/node-message-broker/src/app/modules/http/module.ts b/apps/node-message-broker/src/app/modules/http/module.ts index ff852bb..22ece40 100644 --- a/apps/node-message-broker/src/app/modules/http/module.ts +++ b/apps/node-message-broker/src/app/modules/http/module.ts @@ -31,9 +31,11 @@ import { HTTPInjectionKey } from './constants.ts'; export class HTTPModule implements IModule { readonly name = 'http'; - // `authupClient` is optional so partial builds (e.g. tests) still work; when present - // it must set up before HTTP so the authorization middleware + permission checker run. - readonly dependencies: (string | ModuleDependency)[] = ['config', 'components', { name: 'authupClient', optional: true }]; + // `components` + `coreClient` register the ports the controllers resolve (delivery, + // crypto, hub, participant resolver, analysis lookup). `authupClient` is optional so + // partial builds (e.g. tests) still work; when present it must set up before HTTP so + // the authorization middleware + permission checker run. + readonly dependencies: (string | ModuleDependency)[] = ['config', 'components', 'coreClient', { name: 'authupClient', optional: true }]; private instance: HTTPServer | undefined; diff --git a/apps/node-message-broker/src/core/messaging/dispatch.ts b/apps/node-message-broker/src/core/messaging/dispatch.ts new file mode 100644 index 0000000..e8c4bff --- /dev/null +++ b/apps/node-message-broker/src/core/messaging/dispatch.ts @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { BadRequestError } from '@ebec/http'; +import type { MessageSealInput } from '@privateaim/kit'; +import { MessagePartyKind } from '@privateaim/messenger-kit'; +import type { AnalysisParticipant } from '../analysis/index.ts'; +import type { MessageDispatchDeps, OutboundAnalysisMessage } from './types.ts'; + +/** + * Fan out an analysis message to the named participant nodes: seal it under each + * recipient node's public key (distinct ciphertext per recipient) and relay one Hub + * `send` per recipient, tagging `metadata.analysisId`. Resolves with the Hub message + * ids. An unknown recipient node id is rejected with {@link BadRequestError}. + */ +export async function dispatchAnalysisMessage( + deps: MessageDispatchDeps, + input: OutboundAnalysisMessage, +): Promise { + const participants = await deps.resolver.resolve(input.analysisId); + const byNodeId = new Map(participants.map((participant) => [participant.nodeId, participant])); + + const recipients = input.recipientNodeIds.map((nodeId) => { + const participant = byNodeId.get(nodeId); + if (!participant) { + throw new BadRequestError(`'${nodeId}' is not a participant of this analysis.`); + } + return participant; + }); + + return sendSealed(deps, input.analysisId, recipients, input.data); +} + +/** Broadcast an analysis message to every participant node except this one. */ +export async function broadcastAnalysisMessage( + deps: MessageDispatchDeps, + input: { analysisId: string, data: MessageSealInput }, +): Promise { + const [participants, self] = await Promise.all([ + deps.resolver.resolve(input.analysisId), + deps.resolver.resolveSelf(input.analysisId), + ]); + + const recipients = participants.filter((participant) => !self || participant.nodeId !== self.nodeId); + return sendSealed(deps, input.analysisId, recipients, input.data); +} + +async function sendSealed( + deps: MessageDispatchDeps, + analysisId: string, + recipients: AnalysisParticipant[], + data: MessageSealInput, +): Promise { + const sent = await Promise.all(recipients.map(async (recipient) => { + const sealed = await deps.crypto.seal(data, recipient.publicKey); + return deps.hub.send({ + recipients: [{ type: MessagePartyKind.CLIENT, id: recipient.clientId }], + data: sealed, + metadata: { analysisId }, + }); + })); + + return sent.flat(); +} diff --git a/apps/node-message-broker/src/core/messaging/index.ts b/apps/node-message-broker/src/core/messaging/index.ts new file mode 100644 index 0000000..b4e9c3d --- /dev/null +++ b/apps/node-message-broker/src/core/messaging/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './types.ts'; +export * from './dispatch.ts'; diff --git a/apps/node-message-broker/src/core/messaging/types.ts b/apps/node-message-broker/src/core/messaging/types.ts new file mode 100644 index 0000000..cf4917d --- /dev/null +++ b/apps/node-message-broker/src/core/messaging/types.ts @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { MessageSealInput } from '@privateaim/kit'; +import type { IParticipantResolver } from '../analysis/index.ts'; +import type { ICryptoService } from '../crypto/index.ts'; +import type { IHubClient } from '../hub/index.ts'; + +/** The ports the analysis-message dispatch fans out across. */ +export type MessageDispatchDeps = { + resolver: IParticipantResolver, + crypto: ICryptoService, + hub: IHubClient +}; + +/** An analysis message addressed to specific participant nodes (the SDK addresses by node id). */ +export type OutboundAnalysisMessage = { + analysisId: string, + recipientNodeIds: string[], + /** plaintext payload (bytes or string); sealed per recipient before relay */ + data: MessageSealInput +}; diff --git a/apps/node-message-broker/test/unit/adapters/http/messages-controller.spec.ts b/apps/node-message-broker/test/unit/adapters/http/messages-controller.spec.ts new file mode 100644 index 0000000..d1a622b --- /dev/null +++ b/apps/node-message-broker/test/unit/adapters/http/messages-controller.spec.ts @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { ForbiddenError } from '@ebec/http'; +import { PermissionName } from '@privateaim/kit'; +import type { RequestIdentity } from '@privateaim/server-http-kit'; +import type { IAppEvent } from 'routup'; +import { describe, expect, it } from 'vitest'; +import { AnalysisMessageController } from '../../../../src/adapters/http/controllers/messages/index.ts'; +import type { AnalysisParticipant } from '../../../../src/core/analysis/index.ts'; +import type { IDeliveryService } from '../../../../src/core/delivery/index.ts'; +import { FakeAnalysisClientLookup } from '../../core/analysis/fake-analysis-client-lookup.ts'; +import { FakeCryptoService } from '../../core/messaging/fake-crypto-service.ts'; +import { FakeHubClient } from '../../core/messaging/fake-hub-client.ts'; +import { FakeParticipantResolver } from '../../core/messaging/fake-participant-resolver.ts'; + +const SELF: AnalysisParticipant = { + nodeId: 'node-self', + nodeType: 'aggregator', + clientId: 'client-self', + publicKey: 'pk-self', +}; +const NODE_B: AnalysisParticipant = { + nodeId: 'node-b', + nodeType: 'default', + clientId: 'client-b', + publicKey: 'pk-b', +}; +const NODE_C: AnalysisParticipant = { + nodeId: 'node-c', + nodeType: 'default', + clientId: 'client-c', + publicKey: 'pk-c', +}; + +/** The analysis owner — `FakeAnalysisClientLookup` maps analysis `a1` to `client-analysis`. */ +const OWNER: RequestIdentity = { + id: 'client-analysis', + type: 'client', + realmId: 'realm-1', + realmName: 'master', +}; + +const noopDelivery: IDeliveryService = { + register: async () => {}, + unregister: async () => {}, + list: async () => [], + deliver: async () => {}, +}; + +function setup() { + const resolver = new FakeParticipantResolver(); + const crypto = new FakeCryptoService(); + const hub = new FakeHubClient(); + const analyses = new FakeAnalysisClientLookup(); + + resolver.participantsByAnalysis.set('a1', [SELF, NODE_B, NODE_C]); + resolver.selfByAnalysis.set('a1', SELF); + + const controller = new AnalysisMessageController({ + delivery: noopDelivery, + resolver, + analyses, + crypto, + hub, + }); + + return { + controller, + resolver, + crypto, + hub, + analyses, + }; +} + +/** + * Build a minimal request event — the permission checker and identity live on `store` + * (where the `useRequest*` helpers read them), and `response.status` is what the + * controller mutates. `check` defaults to an allow; pass a throwing one to simulate a + * denied capability. + */ +function createEvent(options: { + identity?: RequestIdentity, + check?: (ctx: { name: string | string[] }) => Promise, +} = {}): { event: IAppEvent, checks: { name: string | string[] }[] } { + const checks: { name: string | string[] }[] = []; + const check = options.check ?? (async () => {}); + + const event = { + store: { + identity: options.identity ?? OWNER, + permissionChecker: { + check: async (ctx: { name: string | string[] }) => { + checks.push(ctx); + await check(ctx); + }, + }, + }, + response: { + status: 200, + headers: new Headers(), + }, + } as unknown as IAppEvent; + + return { event, checks }; +} + +describe('adapters/http/controllers/messages', () => { + it('seals per recipient, relays one tagged Hub message each, and answers 202', async () => { + const { + controller, + crypto, + hub, + } = setup(); + const { event, checks } = createEvent(); + const message = { hello: 'world', meta: { id: 'm-1' } }; + + const result = await controller.send('a1', { recipients: ['node-b', 'node-c'], message }, event); + + expect(result).toBeNull(); + expect(event.response.status).toBe(202); + expect(checks).toEqual([{ name: PermissionName.ANALYSIS_SELF_MESSAGE_BROKER_USE }]); + + expect(hub.sends).toHaveLength(2); + expect(crypto.sealCalls.map((call) => call.recipientPublicKey).sort()).toEqual(['pk-b', 'pk-c']); + // the opaque JSON payload is serialized and sealed verbatim for every recipient + for (const call of crypto.sealCalls) { + expect(JSON.parse(call.data as string)).toEqual(message); + } + + const recipientClientIds = hub.sends.flatMap((send) => send.recipients.map((recipient) => recipient.id)); + expect(recipientClientIds.sort()).toEqual(['client-b', 'client-c']); + }); + + it('rejects a send when the capability is denied (and sends nothing)', async () => { + const { controller, hub } = setup(); + const { event } = createEvent({ + check: async () => { + throw new ForbiddenError('denied'); + }, + }); + + await expect(controller.send('a1', { recipients: ['node-b'], message: { a: 1 } }, event)) + .rejects.toBeInstanceOf(ForbiddenError); + expect(hub.sends).toEqual([]); + }); + + it('rejects a send when the caller does not own the analysis', async () => { + const { controller, hub } = setup(); + const { event } = createEvent({ + identity: { + id: 'client-other', + type: 'client', + realmId: 'realm-1', + realmName: 'master', + }, + }); + + await expect(controller.send('a1', { recipients: ['node-b'], message: { a: 1 } }, event)) + .rejects.toBeInstanceOf(ForbiddenError); + expect(hub.sends).toEqual([]); + }); + + it('rejects a send with empty or missing recipients', async () => { + const { controller } = setup(); + + await expect(controller.send('a1', { recipients: [], message: { a: 1 } }, createEvent().event)) + .rejects.toThrow(/recipients/i); + await expect(controller.send('a1', { message: { a: 1 } }, createEvent().event)) + .rejects.toThrow(/recipients/i); + }); + + it('rejects a send without a message payload', async () => { + const { controller } = setup(); + + await expect(controller.send('a1', { recipients: ['node-b'] }, createEvent().event)) + .rejects.toThrow(/message/i); + }); + + it('broadcasts to every participant except self and answers 202', async () => { + const { controller, hub } = setup(); + const { event } = createEvent(); + + const result = await controller.broadcast('a1', { message: { ping: true } }, event); + + expect(result).toBeNull(); + expect(event.response.status).toBe(202); + + const recipientClientIds = hub.sends.flatMap((send) => send.recipients.map((recipient) => recipient.id)); + expect(recipientClientIds.sort()).toEqual(['client-b', 'client-c']); + }); + + it('lists participants as a bare array exposing only nodeId and nodeType', async () => { + const { controller } = setup(); + + const participants = await controller.listParticipants('a1', createEvent().event); + + expect(participants).toEqual([ + { nodeId: 'node-self', nodeType: 'aggregator' }, + { nodeId: 'node-b', nodeType: 'default' }, + { nodeId: 'node-c', nodeType: 'default' }, + ]); + // the internal clientId / publicKey must never leak to containers + expect(participants.every((participant) => !('clientId' in participant) && !('publicKey' in participant))).toBe(true); + }); + + it('returns the self participant', async () => { + const { controller } = setup(); + + const self = await controller.getSelfParticipant('a1', createEvent().event); + + expect(self).toEqual({ nodeId: 'node-self', nodeType: 'aggregator' }); + }); + + it('404s when no self participant exists for the analysis', async () => { + const { controller, resolver } = setup(); + resolver.selfByAnalysis.delete('a1'); + + await expect(controller.getSelfParticipant('a1', createEvent().event)) + .rejects.toThrow(/self participant/i); + }); +}); diff --git a/apps/node-message-broker/test/unit/adapters/http/messages-validators.spec.ts b/apps/node-message-broker/test/unit/adapters/http/messages-validators.spec.ts new file mode 100644 index 0000000..214df8b --- /dev/null +++ b/apps/node-message-broker/test/unit/adapters/http/messages-validators.spec.ts @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { isValidupError } from 'validup'; +import { describe, expect, it } from 'vitest'; +import type { AnalysisMessagePayload } from '../../../../src/adapters/http/controllers/messages/types.ts'; +import { + AnalysisMessageValidator, + MESSAGE_SEND_GROUP, + WebhookSubscriptionValidator, +} from '../../../../src/adapters/http/controllers/messages/validators/index.ts'; + +describe('adapters/http/controllers/messages/validators', () => { + describe('AnalysisMessageValidator (send group)', () => { + const validator = new AnalysisMessageValidator(); + const run = (body: Partial) => validator.run(body, { group: MESSAGE_SEND_GROUP }); + + it('accepts node-id recipients and an opaque JSON object message', async () => { + const message = { hello: 'world', meta: { id: 'm-1' } }; + const result = await run({ recipients: ['node-a', 'node-b'], message }); + expect(result).toEqual({ recipients: ['node-a', 'node-b'], message }); + }); + + it('accepts a string or array message (arbitrary JSON)', async () => { + await expect(run({ recipients: ['node-a'], message: 'plain' })).resolves.toMatchObject({ message: 'plain' }); + await expect(run({ recipients: ['node-a'], message: [1, 2, 3] })).resolves.toMatchObject({ message: [1, 2, 3] }); + }); + + it('strips unknown top-level keys from the result', async () => { + const body = { + recipients: ['node-a'], + message: { a: 1 }, + junk: 'x', + }; + const result = await run(body); + expect(result).not.toHaveProperty('junk'); + }); + + it('rejects empty, missing, or non-string recipients', async () => { + await expect(run({ recipients: [], message: { a: 1 } })).rejects.toThrow(/recipients/i); + await expect(run({ message: { a: 1 } })).rejects.toThrow(/recipients/i); + await expect(run({ recipients: [''], message: { a: 1 } })).rejects.toThrow(/recipients/i); + }); + + it('rejects a missing or null message', async () => { + await expect(run({ recipients: ['node-a'] })).rejects.toThrow(/message/i); + await expect(run({ recipients: ['node-a'], message: null })).rejects.toThrow(/message/i); + }); + + it('throws a ValidupError (so the error middleware renders a 400)', async () => { + const error = await run({}).catch((err) => err); + expect(isValidupError(error)).toBe(true); + }); + }); + + describe('AnalysisMessageValidator (broadcast, no group)', () => { + const validator = new AnalysisMessageValidator(); + + it('validates message alone and ignores recipients', async () => { + const result = await validator.run({ message: { ping: true } }); + expect(result).toEqual({ message: { ping: true } }); + }); + + it('still requires a message', async () => { + await expect(validator.run({})).rejects.toThrow(/message/i); + }); + }); + + describe('WebhookSubscriptionValidator', () => { + const validator = new WebhookSubscriptionValidator(); + + it('accepts an absolute URL', async () => { + await expect(validator.run({ webhookUrl: 'http://nginx/analysis/webhook' })) + .resolves.toEqual({ webhookUrl: 'http://nginx/analysis/webhook' }); + }); + + it('rejects a missing or malformed webhookUrl', async () => { + await expect(validator.run({})).rejects.toThrow(/webhookUrl/i); + await expect(validator.run({ webhookUrl: 'not-a-url' })).rejects.toThrow(/webhookUrl/i); + }); + }); +}); diff --git a/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts b/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts new file mode 100644 index 0000000..c8bf401 --- /dev/null +++ b/apps/node-message-broker/test/unit/core/messaging/dispatch.spec.ts @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { SendMessageRequest } from '@privateaim/messenger-kit'; +import { describe, expect, it } from 'vitest'; +import type { AnalysisParticipant } from '../../../../src/core/analysis/index.ts'; +import { broadcastAnalysisMessage, dispatchAnalysisMessage } from '../../../../src/core/messaging/index.ts'; +import { FakeCryptoService } from './fake-crypto-service.ts'; +import { FakeHubClient } from './fake-hub-client.ts'; +import { FakeParticipantResolver } from './fake-participant-resolver.ts'; + +const SELF: AnalysisParticipant = { + nodeId: 'node-self', + nodeType: 'aggregator', + clientId: 'client-self', + publicKey: 'pk-self', +}; +const NODE_B: AnalysisParticipant = { + nodeId: 'node-b', + nodeType: 'default', + clientId: 'client-b', + publicKey: 'pk-b', +}; +const NODE_C: AnalysisParticipant = { + nodeId: 'node-c', + nodeType: 'default', + clientId: 'client-c', + publicKey: 'pk-c', +}; + +function setup() { + const resolver = new FakeParticipantResolver(); + const crypto = new FakeCryptoService(); + const hub = new FakeHubClient(); + + resolver.participantsByAnalysis.set('a1', [SELF, NODE_B, NODE_C]); + resolver.selfByAnalysis.set('a1', SELF); + + return { + resolver, + crypto, + hub, + deps: { + resolver, + crypto, + hub, + }, + }; +} + +/** Index sends by their single recipient's client id for order-independent assertions. */ +function byRecipientClient(sends: SendMessageRequest[]): Map { + return new Map(sends.map((send) => [send.recipients[0].id, send])); +} + +describe('core/messaging/dispatch', () => { + it('seals per recipient and relays one Hub message each, tagged with the analysis', async () => { + const { + crypto, + hub, + deps, + } = setup(); + + const ids = await dispatchAnalysisMessage(deps, { + analysisId: 'a1', + recipientNodeIds: ['node-b', 'node-c'], + data: 'hello', + }); + + expect(ids).toHaveLength(2); + expect(hub.sends).toHaveLength(2); + expect(crypto.sealCalls.map((call) => call.recipientPublicKey).sort()).toEqual(['pk-b', 'pk-c']); + + const sends = byRecipientClient(hub.sends); + expect(sends.get('client-b')).toMatchObject({ + recipients: [{ type: 'client', id: 'client-b' }], + data: 'sealed:pk-b', + metadata: { analysisId: 'a1' }, + }); + expect(sends.get('client-c')?.data).toBe('sealed:pk-c'); + }); + + it('rejects an unknown recipient node', async () => { + const { hub, deps } = setup(); + + await expect(dispatchAnalysisMessage(deps, { + analysisId: 'a1', + recipientNodeIds: ['node-x'], + data: 'x', + })) + .rejects.toThrow(/not a participant/i); + expect(hub.sends).toEqual([]); + }); + + it('broadcasts to every participant except self', async () => { + const { hub, deps } = setup(); + + const ids = await broadcastAnalysisMessage(deps, { analysisId: 'a1', data: 'hi' }); + + expect(ids).toHaveLength(2); + const recipientClientIds = hub.sends.flatMap((send) => send.recipients.map((recipient) => recipient.id)); + expect(recipientClientIds.sort()).toEqual(['client-b', 'client-c']); + }); + + it('sends nothing when there are no recipients', async () => { + const { hub, deps } = setup(); + + const ids = await dispatchAnalysisMessage(deps, { + analysisId: 'a1', + recipientNodeIds: [], + data: 'x', + }); + + expect(ids).toEqual([]); + expect(hub.sends).toEqual([]); + }); +}); diff --git a/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts b/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts new file mode 100644 index 0000000..5262eef --- /dev/null +++ b/apps/node-message-broker/test/unit/core/messaging/fake-crypto-service.ts @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { MessageSealInput } from '@privateaim/kit'; +import type { ICryptoService } from '../../../../src/core/crypto/index.ts'; + +/** + * In-memory `ICryptoService` that records seal calls and returns a deterministic + * `sealed:` marker, so tests can assert each message was sealed + * under the correct recipient key without real crypto. + */ +export class FakeCryptoService implements ICryptoService { + sealCalls: { data: MessageSealInput, recipientPublicKey: string }[] = []; + + seal = async (data: MessageSealInput, recipientPublicKey: string): Promise => { + this.sealCalls.push({ data, recipientPublicKey }); + return `sealed:${recipientPublicKey}`; + }; + + open = async (): Promise => new Uint8Array(); +} diff --git a/apps/node-message-broker/test/unit/core/messaging/fake-hub-client.ts b/apps/node-message-broker/test/unit/core/messaging/fake-hub-client.ts new file mode 100644 index 0000000..31032bd --- /dev/null +++ b/apps/node-message-broker/test/unit/core/messaging/fake-hub-client.ts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { MessagePullResponse, SendMessageRequest } from '@privateaim/messenger-kit'; +import type { IHubClient } from '../../../../src/core/hub/index.ts'; + +/** + * In-memory `IHubClient` recording every `send` and minting one id per recipient. + * The pull/ack/wakeup/lifecycle members are inert — the dispatch path only sends. + */ +export class FakeHubClient implements IHubClient { + sends: SendMessageRequest[] = []; + + private counter = 0; + + send = async (input: SendMessageRequest): Promise => { + this.sends.push(input); + return input.recipients.map(() => { + this.counter += 1; + return `msg-${this.counter}`; + }); + }; + + pull = async (): Promise => ({ messages: [] }); + + ack = async (): Promise => {}; + + onWakeup = (): (() => void) => () => {}; + + start = async (): Promise => {}; + + stop = async (): Promise => {}; +} diff --git a/apps/node-message-broker/test/unit/core/messaging/fake-participant-resolver.ts b/apps/node-message-broker/test/unit/core/messaging/fake-participant-resolver.ts new file mode 100644 index 0000000..e5dfc52 --- /dev/null +++ b/apps/node-message-broker/test/unit/core/messaging/fake-participant-resolver.ts @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { AnalysisParticipant, IParticipantResolver } from '../../../../src/core/analysis/index.ts'; + +/** In-memory `IParticipantResolver` returning configurable participants per analysis. */ +export class FakeParticipantResolver implements IParticipantResolver { + participantsByAnalysis = new Map(); + + selfByAnalysis = new Map(); + + resolve = async (analysisId: string): Promise => this.participantsByAnalysis.get(analysisId) ?? []; + + resolveSelf = async (analysisId: string): Promise => this.selfByAnalysis.get(analysisId); +}