Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions .agents/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,39 @@ The node broker is a thin encrypt/decrypt + local-delivery adapter in front of t
durable mailbox. It is **analysis-aware** (unlike the Hub).

```
send: Container ──REST──▶ Node Broker ──[encrypt, per-message HKDF]──▶ Hub /messages (durable)
notify: Hub ──SSE "messagePending" (payload-free)──▶ Node Broker
send: Container ──REST──▶ Node Broker ──[encrypt per recipient, analysisId-bound HKDF]──▶ Hub /messages (durable)
notify: Hub ──SSE "messagePending" (payload-free)──▶ Node Broker (long-poll pull is the fallback)
pull: Node Broker ──GET /messages (cursor, long-poll)──▶ Hub
deliver: Node Broker ──[decrypt]──▶ webhook POST to the analysis container (or container pull)
deliver: Node Broker ──[decrypt]──▶ webhook POST to the analysis container
```

### Ports (core/)

| Port | Responsibility |
|-----------------------|--------------------------------------------------------------------------|
| `IHubClient` | REST `send`/`pull`/`ack` against the Hub mailbox + SSE wakeup (`onWakeup`)|
| `IDeliveryService` | Webhook-subscription registry + fan-out of decrypted messages |
| `IAnalysisPolicy` | Assert the caller holds `ANALYSIS_SELF_MESSAGE_BROKER_USE` |
| `IParticipantResolver`| Resolve analysis node-client participants via server-core |
| Port | Responsibility |
|--------------------------|--------------------------------------------------------------------------|
| `IHubClient` | REST `send`/`pull`/`ack` against the Hub mailbox + SSE wakeup (`onWakeup`)|
| `ICryptoService` | `seal`/`open` — node-to-node E2E (ECDH + per-message HKDF + AES-256-GCM) |
| `IDeliveryService` | Webhook-subscription registry + fan-out of decrypted messages |
| `IParticipantResolver` | Resolve analysis node-client participants (+ their public keys) via server-core |
| `IAnalysisClientLookup` | Resolve the Authup client that owns an analysis (server-core) |
| `IPermissionCheckGateway`| Check the `ANALYSIS_SELF_MESSAGE_BROKER_USE` capability against Authup over HTTP |

Outbound send/broadcast orchestration lives in `core/messaging`; the inbound pull → decrypt →
deliver → ack loop in `core/inbound`. The analysis-scope rule is the pure function
`assertClientOwnsAnalysis` (`core/analysis`), not a port.

### Authorization

- **Inbound (container → node):** node-local Authup JWT (`KEYCLOAK_TOKEN`), verified by
the standard `@privateaim/server-http-kit` authorization middleware.
- **Analysis policy:** enforced node-side from the analysis client's token claims (or
server-core introspection). The Hub does not know about analyses.
- **Capability:** every analysis-scoped route asserts `ANALYSIS_SELF_MESSAGE_BROKER_USE`
via the request permission checker, evaluated against Authup over HTTP
(`IPermissionCheckGateway` → Authup permission-check endpoint, short-TTL cached) — **not**
from token introspection permissions.
- **Analysis scope:** `assertClientOwnsAnalysis` requires the caller's client to own the
analysis (server-core `analysis → client` lookup). One dedicated Authup client per
analysis, so a client match is exact analysis-level isolation. The Hub knows nothing of
analyses.
- **Outbound (node → Hub):** the node authenticates as its **node client**
(`client_credentials`); the Hub authorizes only "authenticated identity may send/pull".

Expand All @@ -52,6 +64,11 @@ Node-to-node ECDH (P-256) + AES-256-GCM via `@privateaim/kit`'s `crypto/message`
nonce reuse. Each node holds **one** ECDH keypair; the operator keeps the private key
(`NODE_PRIVATE_KEY`), the Hub never sees it. The Hub stores ciphertext only.

`analysisId` is bound into the HKDF `info` on **both** `seal` and `open`, so a
`metadata.analysisId` relabelled in transit by the untrusted Hub fails to decrypt instead of
being mis-routed to another analysis's webhooks. The two call sites are coupled — keep them
in sync.

## Configuration

Environment-based via `envix`, validated with `validup` + Zod, managed by `ConfigModule`.
Expand Down
20 changes: 15 additions & 5 deletions .agents/structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,29 @@ from local copies.
```
apps/node-message-broker/src/
├── core/ # Domain logic — ports only, zero infra imports
│ ├── hub/types.ts # IHubClient — send / pull / ack / onWakeup
│ ├── delivery/types.ts # IDeliveryService — webhook registry + fan-out
│ └── analysis/types.ts # IAnalysisPolicy + IParticipantResolver
│ ├── hub/ # IHubClient — send / pull / ack / onWakeup
│ ├── crypto/ # ICryptoService — seal / open (node-to-node E2E)
│ ├── delivery/ # IDeliveryService — webhook registry + fan-out
│ ├── analysis/ # IParticipantResolver, IAnalysisClientLookup + assertClientOwnsAnalysis (scope policy)
│ ├── authz/ # IPermissionCheckGateway — capability check
│ ├── messaging/ # outbound send / broadcast orchestration
│ └── inbound/ # inbound delivery loop (pull → decrypt → deliver → ack)
├── adapters/ # External system implementations
│ ├── http/controllers/ # Container-facing REST controllers (thin)
│ ├── http/controllers/ # Container-facing REST controllers (thin) + validup/zod validators
│ ├── http/middleware/ # request permission-checker (Authup over HTTP)
│ ├── hub/ # IHubClient impl (messenger-http-kit + SSE wakeup)
│ ├── crypto/ # ICryptoService impl (@privateaim/kit crypto/message)
│ ├── core/ # IParticipantResolver impl (server-core)
│ ├── authz/ # Authup permission gateway + IPermissionProvider
│ └── delivery/ # IDeliveryService impl (webhook fan-out)
├── app/ # Orchestration & DI wiring
│ ├── builder.ts # ServerMessageBrokerApplicationBuilder
│ ├── factory.ts # createApplication()
│ └── modules/
│ ├── config/ # ConfigModule (env, validation, defaults)
│ ├── components/ # ComponentsModule (delivery + Hub link)
│ ├── components/ # ComponentsModule (delivery + crypto + Hub link)
│ ├── core-client/ # CoreClientModule (server-core resolver + lookup)
│ ├── inbound/ # InboundModule (starts the delivery loop)
│ └── http/ # HTTPModule (routup server + controllers)
├── cli/ # CLI entry point (citty)
└── constants.ts
Expand Down
55 changes: 55 additions & 0 deletions apps/node-message-broker/src/app/modules/config/guard.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import { EnvironmentName } from '@privateaim/server-kit';
import { EnvironmentInputKey } from './constants.ts';
import type { Config } from './types.ts';

/**
* Env vars that must be set explicitly in production. The broker ships development defaults
* for the whole security stack — localhost Authup, `system` / `start123` node-client
* credentials, and no node key — so relying on them in production would silently
* authenticate against the wrong identity provider (or, with an unreachable default Authup,
* degrade `mountAuthorizationMiddleware` to its fake-identity mode) and have no usable
* end-to-end key.
*/
const PRODUCTION_REQUIRED_ENV: EnvironmentInputKey[] = [
EnvironmentInputKey.AUTHUP_URL,
EnvironmentInputKey.CLIENT_ID,
EnvironmentInputKey.CLIENT_SECRET,
EnvironmentInputKey.REALM,
EnvironmentInputKey.NODE_PRIVATE_KEY,
];

/**
* Fail fast when the broker is started in **production** without the security stack
* explicitly configured, rather than silently running with development defaults. Outside
* production (development / test) the defaults are intended, so the guard is a no-op.
*
* Checks raw env presence (not the normalized config) so a value left at its dev default
* is treated as unset.
*/
export function assertProductionConfig(
config: Pick<Config, 'env'>,
env: Record<string, string | undefined> = process.env,
): void {
if (config.env !== EnvironmentName.PRODUCTION) {
return;
}

const missing = PRODUCTION_REQUIRED_ENV.filter((key) => {
const value = env[key];
return value === undefined || value === '';
});

if (missing.length > 0) {
throw new Error(
`Refusing to start in production with an unconfigured security stack: set ${missing.join(', ')}. ` +
'Development defaults must not be relied on in production.',
);
}
}
6 changes: 5 additions & 1 deletion apps/node-message-broker/src/app/modules/config/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import type { IContainer } from 'eldin';
import type { IModule } from 'orkos';
import { ConfigInjectionKey } from './constants.ts';
import { assertProductionConfig } from './guard.ts';
import { normalizeConfig } from './normalize.ts';
import { readConfigFromEnv } from './read.ts';
import type { Config } from './types.ts';
Expand All @@ -30,6 +31,9 @@ export class ConfigModule implements IModule {

private async read(): Promise<Config> {
const raw = readConfigFromEnv();
return normalizeConfig(raw);
const config = await normalizeConfig(raw);
// env-derived startup only: refuse to run in production on development defaults.
assertProductionConfig(config);
return config;
}
}
22 changes: 22 additions & 0 deletions apps/node-message-broker/src/core/inbound/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

/**
* A failure while processing one inbound message. `permanent` distinguishes failures that
* can never succeed on retry (no `analysisId`, unknown sender, decrypt/parse error — drop
* immediately) from transient ones (participant-resolution / webhook outage — retry up to a
* cap before dead-lettering).
*/
export class InboundProcessingError extends Error {
readonly permanent: boolean;

constructor(message: string, options: { permanent: boolean, cause?: unknown }) {
super(message, { cause: options.cause });
this.name = 'InboundProcessingError';
this.permanent = options.permanent;
}
}
1 change: 1 addition & 0 deletions apps/node-message-broker/src/core/inbound/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
*/

export * from './types.ts';
export * from './errors.ts';
export * from './processor.ts';
101 changes: 88 additions & 13 deletions apps/node-message-broker/src/core/inbound/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
*/

import type { Message, MessagePullResponse } from '@privateaim/messenger-kit';
import { InboundProcessingError } from './errors.ts';
import type { InboundDeliveryDeps, InboundProcessorOptions } from './types.ts';

const DEFAULT_PULL_LIMIT = 50;
const DEFAULT_WAIT_MS = 20_000;
const DEFAULT_ERROR_BACKOFF_MS = 1_000;
const DEFAULT_MAX_ATTEMPTS = 5;

/**
* Drives the inbound side of the broker: pull this node's pending ciphertext from the Hub,
Expand All @@ -37,6 +39,14 @@ export class InboundDeliveryProcessor {

protected errorBackoffMs: number;

protected maxAttempts: number;

/** transient-failure attempt counts, keyed by message id; cleared on success or drop */
protected readonly attempts = new Map<string, number>();

/** messages dead-lettered (acked to drop) — exposed for metrics/observability */
protected dropped = 0;

protected running = false;

protected unsubscribe: (() => void) | undefined;
Expand All @@ -56,6 +66,12 @@ export class InboundDeliveryProcessor {
this.pullLimit = options.pullLimit ?? DEFAULT_PULL_LIMIT;
this.waitMs = options.waitMs ?? DEFAULT_WAIT_MS;
this.errorBackoffMs = options.errorBackoffMs ?? DEFAULT_ERROR_BACKOFF_MS;
this.maxAttempts = Math.max(1, options.maxAttempts ?? DEFAULT_MAX_ATTEMPTS);
}

/** Count of messages dead-lettered (dropped via ack) so far — for metrics/tests. */
get droppedCount(): number {
return this.dropped;
}

/** Subscribe to Hub wakeups and start the long-poll fallback loop. Idempotent. */
Expand Down Expand Up @@ -97,19 +113,26 @@ export class InboundDeliveryProcessor {
}

/**
* Decrypt and locally deliver a batch, returning the ids of the messages that were
* delivered (and acked). Per-message failures are isolated; the ack covers only the
* delivered subset so the rest are redelivered on a later pull.
* Decrypt and locally deliver a batch, returning the ids of the messages that were acked.
* The ack covers both successfully delivered messages and **dead-lettered** ones —
* permanent failures (no `analysisId`, unknown sender, decrypt/parse error) are dropped
* on the first attempt, and transient failures (resolution/webhook outage) are retried up
* to `maxAttempts` redeliveries before being dropped. A still-retrying transient failure
* is left unacked so the Hub redelivers it. Dropped messages are logged at `error` and
* counted ({@link droppedCount}); the batch is never aborted by one bad message.
*/
async processBatch(messages: Message[]): Promise<string[]> {
const ackIds: string[] = [];

for (const message of messages) {
try {
await this.deliverMessage(message);
this.attempts.delete(message.id);
ackIds.push(message.id);
} catch (error) {
this.deps.logger?.warn(`Inbound message ${message.id} skipped: ${(error as Error).message}`);
if (this.shouldDeadLetter(message, error)) {
ackIds.push(message.id);
}
}
}

Expand All @@ -124,6 +147,36 @@ export class InboundDeliveryProcessor {
return ackIds;
}

/**
* Record a failed attempt and decide whether to dead-letter (ack-to-drop) the message.
* Returns `true` when the message should be acked away — a permanent failure, or a
* transient one that has exhausted `maxAttempts`. A transient failure with attempts
* remaining returns `false` (left unacked for redelivery).
*/
protected shouldDeadLetter(message: Message, error: unknown): boolean {
const permanent = error instanceof InboundProcessingError && error.permanent;
const reason = error instanceof Error ? error.message : String(error);

if (permanent) {
this.attempts.delete(message.id);
this.dropped += 1;
this.deps.logger?.error(`Dropping inbound message ${message.id} (permanent): ${reason}`);
return true;
}

const attempts = (this.attempts.get(message.id) ?? 0) + 1;
if (attempts >= this.maxAttempts) {
this.attempts.delete(message.id);
this.dropped += 1;
this.deps.logger?.error(`Dropping inbound message ${message.id} after ${attempts} attempts (transient): ${reason}`);
return true;
}

this.attempts.set(message.id, attempts);
this.deps.logger?.warn(`Inbound message ${message.id} attempt ${attempts}/${this.maxAttempts} failed (transient): ${reason}`);
return false;
}

/** Drain the backlog: pull (no wait) and process repeatedly until the mailbox is empty. */
protected async drainBacklog(): Promise<void> {
while (this.running) {
Expand Down Expand Up @@ -192,32 +245,54 @@ export class InboundDeliveryProcessor {
}
}

/** Decrypt one inbound message and deliver it to the analysis's webhooks. */
/**
* Decrypt one inbound message and deliver it to the analysis's webhooks. Throws an
* {@link InboundProcessingError} tagged `permanent` for failures that cannot succeed on
* retry (bad envelope, unknown sender, decrypt/parse error) and `transient` for outages
* (participant resolution, webhook delivery) that warrant a retry.
*/
protected async deliverMessage(message: Message): Promise<void> {
const analysisId = message.metadata?.analysisId;
if (!analysisId) {
throw new Error('message carries no analysisId metadata');
throw new InboundProcessingError('message carries no analysisId metadata', { permanent: true });
}
if (typeof message.data !== 'string') {
throw new Error('message carries no ciphertext payload');
throw new InboundProcessingError('message carries no ciphertext payload', { permanent: true });
}

const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id);

// `analysisId` is bound into the key derivation (HKDF info, matching the seal path),
// so a `metadata.analysisId` relabelled in transit by the untrusted Hub fails to
// decrypt here rather than being mis-routed to another analysis's webhooks.
const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id);
const plaintext = await this.deps.crypto.open(message.data, senderPublicKey, analysisId);
const payload = JSON.parse(this.decoder.decode(plaintext));
let payload: unknown;
try {
const plaintext = await this.deps.crypto.open(message.data, senderPublicKey, analysisId);
payload = JSON.parse(this.decoder.decode(plaintext));
} catch (error) {
throw new InboundProcessingError('failed to decrypt or parse message payload', { permanent: true, cause: error });
}

await this.deps.delivery.deliver(analysisId, payload);
try {
await this.deps.delivery.deliver(analysisId, payload);
} catch (error) {
throw new InboundProcessingError('failed to deliver message to the analysis webhooks', { permanent: false, cause: error });
}
}

/** Map a sender client id to its node public key via the analysis participant set. */
protected async resolveSenderPublicKey(analysisId: string, senderId: string): Promise<string> {
const participants = await this.deps.resolver.resolve(analysisId);
let participants;
try {
participants = await this.deps.resolver.resolve(analysisId);
} catch (error) {
// a resolver/server-core outage is transient — retry rather than drop
throw new InboundProcessingError('failed to resolve analysis participants', { permanent: false, cause: error });
}

const sender = participants.find((participant) => participant.clientId === senderId);
if (!sender) {
throw new Error(`sender '${senderId}' is not a participant of analysis '${analysisId}'`);
throw new InboundProcessingError(`sender '${senderId}' is not a participant of analysis '${analysisId}'`, { permanent: true });
}

return sender.publicKey;
Expand Down
4 changes: 3 additions & 1 deletion apps/node-message-broker/src/core/inbound/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ export type InboundProcessorOptions = {
/** long-poll budget in ms for the fallback loop's pull (default 20000) */
waitMs?: number,
/** backoff in ms after a failed pull before the fallback loop retries (default 1000) */
errorBackoffMs?: number
errorBackoffMs?: number,
/** transient-failure attempts before a message is dead-lettered (acked to drop) (default 5) */
maxAttempts?: number
};
Loading
Loading