Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,158 @@
* view the LICENSE file that was distributed with this source code.
*/

import { BadRequestError } from '@privateaim/errors';
import { ForceLoggedInMiddleware } from '@privateaim/server-http-kit';
import { EntityNotFoundError } from '@privateaim/errors';
import { PermissionName } from '@privateaim/kit';
import {
ForceLoggedInMiddleware,
useRequestIdentity,
useRequestPermissionChecker,
} from '@privateaim/server-http-kit';
import {
DBody,
DContext,
DController,
DDelete,
DGet,
DPath,
DPost,
DTags,
} from '@routup/decorators';
import type { IAppEvent } from 'routup';
import { assertClientOwnsAnalysis } from '../../../../core/analysis/index.ts';
import type { IAnalysisClientLookup, IParticipantResolver } from '../../../../core/analysis/index.ts';
import type { IDeliveryService } from '../../../../core/delivery/index.ts';

type AnalysisMessageControllerContext = {
delivery: IDeliveryService
};
import { broadcastAnalysisMessage, dispatchAnalysisMessage } from '../../../../core/messaging/index.ts';
import type { MessageDispatchDeps } from '../../../../core/messaging/index.ts';
import type {
AnalysisMessageControllerContext,
AnalysisMessagePayload,
WebhookSubscriptionPayload,
} from './types.ts';
import {
AnalysisMessageValidator,
MESSAGE_SEND_GROUP,
WebhookSubscriptionValidator,
} from './validators/index.ts';

/**
* Container-facing API (auth: node-local Authup JWT — the analysis presents its
* `KEYCLOAK_TOKEN`). The SDK-compatible surface is intentionally preserved.
* `KEYCLOAK_TOKEN`). The SDK-compatible surface is intentionally preserved: `recipients`
* carry **node ids**, `message` is an opaque JSON payload relayed verbatim, sends answer
* `202` with an empty body, and participant lists are bare arrays of `{ nodeId, nodeType }`.
*
* Implemented: webhook-subscription CRUD (the default delivery transport).
* TODO (Plan 013 Track B, Phase 4): `POST /:id/messages`, `…/broadcast`,
* `GET /:id/participants`, `…/participants/self`, and the additive pull endpoint.
* Request bodies are validated with validup + zod (see `./validators`); a validation
* failure throws a `ValidupError` that the error middleware renders as a `400`. Every
* analysis-scoped route is additionally gated by {@link authorize}: the caller must hold the
* `ANALYSIS_SELF_MESSAGE_BROKER_USE` capability and own the analysis (the Hub stays
* analysis-agnostic). Inbound delivery is webhook-push only (no pull endpoint), managed via
* the subscription CRUD below.
*/
@DTags('messages')
@DController('/analyses')
export class AnalysisMessageController {
protected delivery: IDeliveryService;

protected resolver: IParticipantResolver;

protected analyses: IAnalysisClientLookup;

protected dispatch: MessageDispatchDeps;

protected messageValidator: AnalysisMessageValidator;

protected subscriptionValidator: WebhookSubscriptionValidator;

constructor(ctx: AnalysisMessageControllerContext) {
this.delivery = ctx.delivery;
this.resolver = ctx.resolver;
this.analyses = ctx.analyses;
this.dispatch = {
resolver: ctx.resolver,
crypto: ctx.crypto,
hub: ctx.hub,
};
this.messageValidator = new AnalysisMessageValidator();
this.subscriptionValidator = new WebhookSubscriptionValidator();
}

@DPost('/:id/messages', [ForceLoggedInMiddleware])
async send(
@DPath('id') analysisId: string,
@DBody() body: Partial<AnalysisMessagePayload>,
@DContext() event: IAppEvent,
) {
await this.authorize(event, analysisId);

const { recipients, message } = await this.messageValidator.run(body, { group: MESSAGE_SEND_GROUP });

await dispatchAnalysisMessage(this.dispatch, {
analysisId,
recipientNodeIds: recipients,
data: JSON.stringify(message),
});

event.response.status = 202;
return null;
}

@DPost('/:id/messages/broadcast', [ForceLoggedInMiddleware])
async broadcast(
@DPath('id') analysisId: string,
@DBody() body: Partial<AnalysisMessagePayload>,
@DContext() event: IAppEvent,
) {
await this.authorize(event, analysisId);

const { message } = await this.messageValidator.run(body);

await broadcastAnalysisMessage(this.dispatch, {
analysisId,
data: JSON.stringify(message),
});

event.response.status = 202;
return null;
}

@DGet('/:id/participants', [ForceLoggedInMiddleware])
async listParticipants(
@DPath('id') analysisId: string,
@DContext() event: IAppEvent,
) {
await this.authorize(event, analysisId);

const participants = await this.resolver.resolve(analysisId);
return participants.map((participant) => ({
nodeId: participant.nodeId,
nodeType: participant.nodeType,
}));
}

@DGet('/:id/participants/self', [ForceLoggedInMiddleware])
async getSelfParticipant(
@DPath('id') analysisId: string,
@DContext() event: IAppEvent,
) {
await this.authorize(event, analysisId);

const self = await this.resolver.resolveSelf(analysisId);
if (!self) {
throw new EntityNotFoundError('No self participant exists for this analysis.');
}

return {
nodeId: self.nodeId,
nodeType: self.nodeType,
};
}

@DPost('/:id/messages/subscriptions', [ForceLoggedInMiddleware])
async subscribe(
@DPath('id') analysisId: string,
@DBody() data: { webhookUrl?: string },
@DBody() body: Partial<WebhookSubscriptionPayload>,
) {
const webhookUrl = this.requireWebhookUrl(data);
const { webhookUrl } = await this.subscriptionValidator.run(body);
await this.delivery.register({ analysisId, webhookUrl });
return { analysisId, webhookUrl };
}
Expand All @@ -58,17 +170,25 @@ export class AnalysisMessageController {
@DDelete('/:id/messages/subscriptions', [ForceLoggedInMiddleware])
async unsubscribe(
@DPath('id') analysisId: string,
@DBody() data: { webhookUrl?: string },
@DBody() body: Partial<WebhookSubscriptionPayload>,
) {
const webhookUrl = this.requireWebhookUrl(data);
const { webhookUrl } = await this.subscriptionValidator.run(body);
await this.delivery.unregister(analysisId, webhookUrl);
return { analysisId, webhookUrl };
}

protected requireWebhookUrl(data: { webhookUrl?: string }): string {
if (!data || typeof data.webhookUrl !== 'string' || data.webhookUrl.length === 0) {
throw new BadRequestError('A webhookUrl is required.');
}
return data.webhookUrl;
/**
* Authorize an analysis-scoped request: assert the caller holds
* `ANALYSIS_SELF_MESSAGE_BROKER_USE` (capability, via the request permission checker)
* and that the caller's client owns `analysisId` (node-side analysis scope). Either
* check failing throws and aborts the request.
*/
protected async authorize(event: IAppEvent, analysisId: string): Promise<void> {
await useRequestPermissionChecker(event)
.check({ name: PermissionName.ANALYSIS_SELF_MESSAGE_BROKER_USE });

const identity = useRequestIdentity(event);
const clientId = identity?.type === 'client' ? identity.id : undefined;
await assertClientOwnsAnalysis(this.analyses, analysisId, clientId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import type { IAnalysisClientLookup, IParticipantResolver } from '../../../../core/analysis/index.ts';
import type { ICryptoService } from '../../../../core/crypto/index.ts';
import type { IDeliveryService } from '../../../../core/delivery/index.ts';
import type { IHubClient } from '../../../../core/hub/index.ts';

/** Dependencies injected into {@link AnalysisMessageController}. */
export type AnalysisMessageControllerContext = {
delivery: IDeliveryService,
resolver: IParticipantResolver,
analyses: IAnalysisClientLookup,
crypto: ICryptoService,
hub: IHubClient
};

/**
* Validated body of `POST /analyses/:id/messages` (and, sans `recipients`, of
* `…/broadcast`). `recipients` are participant **node ids**; `message` is an opaque JSON
* payload the broker seals and relays verbatim — the SDK round-trips its own `meta`
* envelope inside it.
*/
export type AnalysisMessagePayload = {
recipients: string[],
message: unknown
};

/** Validated body of the webhook-subscription endpoints. */
export type WebhookSubscriptionPayload = {
webhookUrl: string
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

export * from './message.ts';
export * from './webhook-subscription.ts';
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import { createValidator } from '@validup/zod';
import { Container } from 'validup';
import zod from 'zod';
import type { AnalysisMessagePayload } from '../types.ts';

/**
* Validator group that gates the `recipients` attribute to the (non-broadcast) send path.
* Run the validator with this group for a direct send; run it without a group for a
* broadcast, which validates `message` alone.
*/
export const MESSAGE_SEND_GROUP = 'send';

/**
* Validates an outbound analysis message body with validup + zod. `recipients` is a
* non-empty array of participant node ids, required only under {@link MESSAGE_SEND_GROUP}.
* `message` is an opaque, non-null JSON payload (object, array, or scalar) relayed verbatim;
* unknown top-level keys are stripped from the validated result.
*/
export class AnalysisMessageValidator extends Container<AnalysisMessagePayload> {
protected initialize() {
super.initialize();

this.mount(
'recipients',
{ group: MESSAGE_SEND_GROUP },
createValidator(zod.array(zod.string().min(1)).min(1)),
);

this.mount('message', createValidator(zod.union([
zod.looseObject({}),
zod.array(zod.any()),
zod.string(),
zod.number(),
zod.boolean(),
])));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import { createValidator } from '@validup/zod';
import { Container } from 'validup';
import zod from 'zod';
import type { WebhookSubscriptionPayload } from '../types.ts';

/** Validates a webhook-subscription body — a single required, absolute `webhookUrl`. */
export class WebhookSubscriptionValidator extends Container<WebhookSubscriptionPayload> {
protected initialize() {
super.initialize();

this.mount('webhookUrl', createValidator(zod.url()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
*/

import { TypedToken } from 'eldin';
import type { ICryptoService } from '../../../core/crypto/index.ts';
import type { IDeliveryService } from '../../../core/delivery/index.ts';
import type { IHubClient } from '../../../core/hub/index.ts';

export const ComponentsInjectionKey = {
Delivery: new TypedToken<IDeliveryService>('DeliveryService'),
HubClient: new TypedToken<IHubClient>('HubClient'),
Crypto: new TypedToken<ICryptoService>('CryptoService'),
};
5 changes: 5 additions & 0 deletions apps/node-message-broker/src/app/modules/components/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
createAuthupClientAuthenticationHook,
createAuthupClientTokenCreator,
} from '@privateaim/server-kit';
import { CryptoService } from '../../../adapters/crypto/index.ts';
import { MemoryDeliveryService } from '../../../adapters/delivery/index.ts';
import { HubClient, SseWakeupSource } from '../../../adapters/hub/index.ts';
import { ConfigInjectionKey } from '../config/constants.ts';
Expand Down Expand Up @@ -46,6 +47,10 @@ export class ComponentsModule implements IModule {
const delivery = new MemoryDeliveryService();
container.register(ComponentsInjectionKey.Delivery, { useValue: delivery });

// node-to-node E2E crypto; the operator supplies the single ECDH private key.
const crypto = new CryptoService({ privateKey: config.nodePrivateKey });
container.register(ComponentsInjectionKey.Crypto, { useValue: crypto });

// node-client credentials authenticate every Hub interaction; this creator
// backs the REST auth hook directly and the SSE Authorization header via a
// caching wrapper (below).
Expand Down
13 changes: 12 additions & 1 deletion apps/node-message-broker/src/app/modules/http/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@
import type { IContainer } from 'eldin';
import { AnalysisMessageController } from '../../../adapters/http/controllers/messages/index.ts';
import { ComponentsInjectionKey } from '../components/constants.ts';
import { CoreClientInjectionKey } from '../core-client/constants.ts';

export function createControllers(container: IContainer): Record<string, any>[] {
const delivery = container.resolve(ComponentsInjectionKey.Delivery);
const crypto = container.resolve(ComponentsInjectionKey.Crypto);
const hub = container.resolve(ComponentsInjectionKey.HubClient);
const resolver = container.resolve(CoreClientInjectionKey.ParticipantResolver);
const analyses = container.resolve(CoreClientInjectionKey.AnalysisClientLookup);

return [
new AnalysisMessageController({ delivery }),
new AnalysisMessageController({
delivery,
resolver,
analyses,
crypto,
hub,
}),
];
}
8 changes: 5 additions & 3 deletions apps/node-message-broker/src/app/modules/http/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import { HTTPInjectionKey } from './constants.ts';
export class HTTPModule implements IModule {
readonly name = 'http';

// `authupClient` is optional so partial builds (e.g. tests) still work; when present
// it must set up before HTTP so the authorization middleware + permission checker run.
readonly dependencies: (string | ModuleDependency)[] = ['config', 'components', { name: 'authupClient', optional: true }];
// `components` + `coreClient` register the ports the controllers resolve (delivery,
// crypto, hub, participant resolver, analysis lookup). `authupClient` is optional so
// partial builds (e.g. tests) still work; when present it must set up before HTTP so
// the authorization middleware + permission checker run.
readonly dependencies: (string | ModuleDependency)[] = ['config', 'components', 'coreClient', { name: 'authupClient', optional: true }];

private instance: HTTPServer | undefined;

Expand Down
Loading
Loading