From 80c690ac17158030951089ffd5f4ab9ba0f065e3 Mon Sep 17 00:00:00 2001 From: tada5hi Date: Fri, 26 Jun 2026 17:35:09 +0200 Subject: [PATCH 1/3] docs(agents): sync structure + architecture guides with the shipped broker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These guides are loaded into context every session via CLAUDE.md and had drifted from the code: - structure.md: per-app layout now lists the real core/ (crypto, authz, messaging, inbound) + adapters/ (crypto, core, authz, http/middleware) + app/modules (core-client, inbound) tree. - architecture.md: ports table replaces the collapsed `IAnalysisPolicy` with `ICryptoService`, `IAnalysisClientLookup`, and `IPermissionCheckGateway`, and notes `assertClientOwnsAnalysis` is a pure function. Authorization section reflects the capability check via the request permission checker (not introspection) + scope via server-core ownership. Crypto section documents the analysisId↔HKDF-info binding. --- .agents/architecture.md | 39 ++++++++++++++++++++++++++++----------- .agents/structure.md | 20 +++++++++++++++----- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/.agents/architecture.md b/.agents/architecture.md index 4b8919a..e34bb2b 100644 --- a/.agents/architecture.md +++ b/.agents/architecture.md @@ -21,27 +21,39 @@ The node broker is a thin encrypt/decrypt + local-delivery adapter in front of t durable mailbox. It is **analysis-aware** (unlike the Hub). ``` -send: Container ──REST──▶ Node Broker ──[encrypt, per-message HKDF]──▶ Hub /messages (durable) -notify: Hub ──SSE "messagePending" (payload-free)──▶ Node Broker +send: Container ──REST──▶ Node Broker ──[encrypt per recipient, analysisId-bound HKDF]──▶ Hub /messages (durable) +notify: Hub ──SSE "messagePending" (payload-free)──▶ Node Broker (long-poll pull is the fallback) pull: Node Broker ──GET /messages (cursor, long-poll)──▶ Hub -deliver: Node Broker ──[decrypt]──▶ webhook POST to the analysis container (or container pull) +deliver: Node Broker ──[decrypt]──▶ webhook POST to the analysis container ``` ### Ports (core/) -| Port | Responsibility | -|-----------------------|--------------------------------------------------------------------------| -| `IHubClient` | REST `send`/`pull`/`ack` against the Hub mailbox + SSE wakeup (`onWakeup`)| -| `IDeliveryService` | Webhook-subscription registry + fan-out of decrypted messages | -| `IAnalysisPolicy` | Assert the caller holds `ANALYSIS_SELF_MESSAGE_BROKER_USE` | -| `IParticipantResolver`| Resolve analysis node-client participants via server-core | +| Port | Responsibility | +|--------------------------|--------------------------------------------------------------------------| +| `IHubClient` | REST `send`/`pull`/`ack` against the Hub mailbox + SSE wakeup (`onWakeup`)| +| `ICryptoService` | `seal`/`open` — node-to-node E2E (ECDH + per-message HKDF + AES-256-GCM) | +| `IDeliveryService` | Webhook-subscription registry + fan-out of decrypted messages | +| `IParticipantResolver` | Resolve analysis node-client participants (+ their public keys) via server-core | +| `IAnalysisClientLookup` | Resolve the Authup client that owns an analysis (server-core) | +| `IPermissionCheckGateway`| Check the `ANALYSIS_SELF_MESSAGE_BROKER_USE` capability against Authup over HTTP | + +Outbound send/broadcast orchestration lives in `core/messaging`; the inbound pull → decrypt → +deliver → ack loop in `core/inbound`. The analysis-scope rule is the pure function +`assertClientOwnsAnalysis` (`core/analysis`), not a port. ### Authorization - **Inbound (container → node):** node-local Authup JWT (`KEYCLOAK_TOKEN`), verified by the standard `@privateaim/server-http-kit` authorization middleware. -- **Analysis policy:** enforced node-side from the analysis client's token claims (or - server-core introspection). The Hub does not know about analyses. +- **Capability:** every analysis-scoped route asserts `ANALYSIS_SELF_MESSAGE_BROKER_USE` + via the request permission checker, evaluated against Authup over HTTP + (`IPermissionCheckGateway` → Authup permission-check endpoint, short-TTL cached) — **not** + from token introspection permissions. +- **Analysis scope:** `assertClientOwnsAnalysis` requires the caller's client to own the + analysis (server-core `analysis → client` lookup). One dedicated Authup client per + analysis, so a client match is exact analysis-level isolation. The Hub knows nothing of + analyses. - **Outbound (node → Hub):** the node authenticates as its **node client** (`client_credentials`); the Hub authorizes only "authenticated identity may send/pull". @@ -52,6 +64,11 @@ Node-to-node ECDH (P-256) + AES-256-GCM via `@privateaim/kit`'s `crypto/message` nonce reuse. Each node holds **one** ECDH keypair; the operator keeps the private key (`NODE_PRIVATE_KEY`), the Hub never sees it. The Hub stores ciphertext only. +`analysisId` is bound into the HKDF `info` on **both** `seal` and `open`, so a +`metadata.analysisId` relabelled in transit by the untrusted Hub fails to decrypt instead of +being mis-routed to another analysis's webhooks. The two call sites are coupled — keep them +in sync. + ## Configuration Environment-based via `envix`, validated with `validup` + Zod, managed by `ConfigModule`. diff --git a/.agents/structure.md b/.agents/structure.md index a619601..d307518 100644 --- a/.agents/structure.md +++ b/.agents/structure.md @@ -25,19 +25,29 @@ from local copies. ``` apps/node-message-broker/src/ ├── core/ # Domain logic — ports only, zero infra imports -│ ├── hub/types.ts # IHubClient — send / pull / ack / onWakeup -│ ├── delivery/types.ts # IDeliveryService — webhook registry + fan-out -│ └── analysis/types.ts # IAnalysisPolicy + IParticipantResolver +│ ├── hub/ # IHubClient — send / pull / ack / onWakeup +│ ├── crypto/ # ICryptoService — seal / open (node-to-node E2E) +│ ├── delivery/ # IDeliveryService — webhook registry + fan-out +│ ├── analysis/ # IParticipantResolver, IAnalysisClientLookup + assertClientOwnsAnalysis (scope policy) +│ ├── authz/ # IPermissionCheckGateway — capability check +│ ├── messaging/ # outbound send / broadcast orchestration +│ └── inbound/ # inbound delivery loop (pull → decrypt → deliver → ack) ├── adapters/ # External system implementations -│ ├── http/controllers/ # Container-facing REST controllers (thin) +│ ├── http/controllers/ # Container-facing REST controllers (thin) + validup/zod validators +│ ├── http/middleware/ # request permission-checker (Authup over HTTP) │ ├── hub/ # IHubClient impl (messenger-http-kit + SSE wakeup) +│ ├── crypto/ # ICryptoService impl (@privateaim/kit crypto/message) +│ ├── core/ # IParticipantResolver impl (server-core) +│ ├── authz/ # Authup permission gateway + IPermissionProvider │ └── delivery/ # IDeliveryService impl (webhook fan-out) ├── app/ # Orchestration & DI wiring │ ├── builder.ts # ServerMessageBrokerApplicationBuilder │ ├── factory.ts # createApplication() │ └── modules/ │ ├── config/ # ConfigModule (env, validation, defaults) -│ ├── components/ # ComponentsModule (delivery + Hub link) +│ ├── components/ # ComponentsModule (delivery + crypto + Hub link) +│ ├── core-client/ # CoreClientModule (server-core resolver + lookup) +│ ├── inbound/ # InboundModule (starts the delivery loop) │ └── http/ # HTTPModule (routup server + controllers) ├── cli/ # CLI entry point (citty) └── constants.ts From e3d6a4c60e62c9b5cef977ef9053f8af29b9f994 Mon Sep 17 00:00:00 2001 From: tada5hi Date: Fri, 26 Jun 2026 18:06:16 +0200 Subject: [PATCH 2/3] feat(node-message-broker): dead-letter inbound poison messages after bounded retries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously any per-message failure was logged, skipped, and left unacked — so a permanently undecryptable ("poison") message was redelivered by the Hub on every pull cycle forever. Classify failures and bound the retries: - Permanent failures (no analysisId, no ciphertext, unknown sender, decrypt/parse error) are dropped on the first attempt — acked away so the Hub stops redelivering. - Transient failures (participant-resolution / webhook outage) are retried up to `maxAttempts` (default 5) redeliveries, then dead-lettered; a still-retrying message is left unacked. The per-id attempt count resets on success. Dropped messages are logged at `error` and counted (`droppedCount`) for observability. Adds an `InboundProcessingError` carrying the permanent/transient classification. Closes #17. --- .../src/core/inbound/errors.ts | 22 ++++ .../src/core/inbound/index.ts | 1 + .../src/core/inbound/processor.ts | 101 +++++++++++++++--- .../src/core/inbound/types.ts | 4 +- .../test/unit/core/inbound/processor.spec.ts | 67 ++++++++---- 5 files changed, 160 insertions(+), 35 deletions(-) create mode 100644 apps/node-message-broker/src/core/inbound/errors.ts diff --git a/apps/node-message-broker/src/core/inbound/errors.ts b/apps/node-message-broker/src/core/inbound/errors.ts new file mode 100644 index 0000000..898ed31 --- /dev/null +++ b/apps/node-message-broker/src/core/inbound/errors.ts @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +/** + * A failure while processing one inbound message. `permanent` distinguishes failures that + * can never succeed on retry (no `analysisId`, unknown sender, decrypt/parse error — drop + * immediately) from transient ones (participant-resolution / webhook outage — retry up to a + * cap before dead-lettering). + */ +export class InboundProcessingError extends Error { + readonly permanent: boolean; + + constructor(message: string, options: { permanent: boolean, cause?: unknown }) { + super(message, { cause: options.cause }); + this.name = 'InboundProcessingError'; + this.permanent = options.permanent; + } +} diff --git a/apps/node-message-broker/src/core/inbound/index.ts b/apps/node-message-broker/src/core/inbound/index.ts index fe48961..56d9429 100644 --- a/apps/node-message-broker/src/core/inbound/index.ts +++ b/apps/node-message-broker/src/core/inbound/index.ts @@ -6,4 +6,5 @@ */ export * from './types.ts'; +export * from './errors.ts'; export * from './processor.ts'; diff --git a/apps/node-message-broker/src/core/inbound/processor.ts b/apps/node-message-broker/src/core/inbound/processor.ts index 2ad36e6..a7b8bf3 100644 --- a/apps/node-message-broker/src/core/inbound/processor.ts +++ b/apps/node-message-broker/src/core/inbound/processor.ts @@ -6,11 +6,13 @@ */ import type { Message, MessagePullResponse } from '@privateaim/messenger-kit'; +import { InboundProcessingError } from './errors.ts'; import type { InboundDeliveryDeps, InboundProcessorOptions } from './types.ts'; const DEFAULT_PULL_LIMIT = 50; const DEFAULT_WAIT_MS = 20_000; const DEFAULT_ERROR_BACKOFF_MS = 1_000; +const DEFAULT_MAX_ATTEMPTS = 5; /** * Drives the inbound side of the broker: pull this node's pending ciphertext from the Hub, @@ -37,6 +39,14 @@ export class InboundDeliveryProcessor { protected errorBackoffMs: number; + protected maxAttempts: number; + + /** transient-failure attempt counts, keyed by message id; cleared on success or drop */ + protected readonly attempts = new Map(); + + /** messages dead-lettered (acked to drop) — exposed for metrics/observability */ + protected dropped = 0; + protected running = false; protected unsubscribe: (() => void) | undefined; @@ -56,6 +66,12 @@ export class InboundDeliveryProcessor { this.pullLimit = options.pullLimit ?? DEFAULT_PULL_LIMIT; this.waitMs = options.waitMs ?? DEFAULT_WAIT_MS; this.errorBackoffMs = options.errorBackoffMs ?? DEFAULT_ERROR_BACKOFF_MS; + this.maxAttempts = Math.max(1, options.maxAttempts ?? DEFAULT_MAX_ATTEMPTS); + } + + /** Count of messages dead-lettered (dropped via ack) so far — for metrics/tests. */ + get droppedCount(): number { + return this.dropped; } /** Subscribe to Hub wakeups and start the long-poll fallback loop. Idempotent. */ @@ -97,9 +113,13 @@ export class InboundDeliveryProcessor { } /** - * Decrypt and locally deliver a batch, returning the ids of the messages that were - * delivered (and acked). Per-message failures are isolated; the ack covers only the - * delivered subset so the rest are redelivered on a later pull. + * Decrypt and locally deliver a batch, returning the ids of the messages that were acked. + * The ack covers both successfully delivered messages and **dead-lettered** ones — + * permanent failures (no `analysisId`, unknown sender, decrypt/parse error) are dropped + * on the first attempt, and transient failures (resolution/webhook outage) are retried up + * to `maxAttempts` redeliveries before being dropped. A still-retrying transient failure + * is left unacked so the Hub redelivers it. Dropped messages are logged at `error` and + * counted ({@link droppedCount}); the batch is never aborted by one bad message. */ async processBatch(messages: Message[]): Promise { const ackIds: string[] = []; @@ -107,9 +127,12 @@ export class InboundDeliveryProcessor { for (const message of messages) { try { await this.deliverMessage(message); + this.attempts.delete(message.id); ackIds.push(message.id); } catch (error) { - this.deps.logger?.warn(`Inbound message ${message.id} skipped: ${(error as Error).message}`); + if (this.shouldDeadLetter(message, error)) { + ackIds.push(message.id); + } } } @@ -124,6 +147,36 @@ export class InboundDeliveryProcessor { return ackIds; } + /** + * Record a failed attempt and decide whether to dead-letter (ack-to-drop) the message. + * Returns `true` when the message should be acked away — a permanent failure, or a + * transient one that has exhausted `maxAttempts`. A transient failure with attempts + * remaining returns `false` (left unacked for redelivery). + */ + protected shouldDeadLetter(message: Message, error: unknown): boolean { + const permanent = error instanceof InboundProcessingError && error.permanent; + const reason = error instanceof Error ? error.message : String(error); + + if (permanent) { + this.attempts.delete(message.id); + this.dropped += 1; + this.deps.logger?.error(`Dropping inbound message ${message.id} (permanent): ${reason}`); + return true; + } + + const attempts = (this.attempts.get(message.id) ?? 0) + 1; + if (attempts >= this.maxAttempts) { + this.attempts.delete(message.id); + this.dropped += 1; + this.deps.logger?.error(`Dropping inbound message ${message.id} after ${attempts} attempts (transient): ${reason}`); + return true; + } + + this.attempts.set(message.id, attempts); + this.deps.logger?.warn(`Inbound message ${message.id} attempt ${attempts}/${this.maxAttempts} failed (transient): ${reason}`); + return false; + } + /** Drain the backlog: pull (no wait) and process repeatedly until the mailbox is empty. */ protected async drainBacklog(): Promise { while (this.running) { @@ -192,32 +245,54 @@ export class InboundDeliveryProcessor { } } - /** Decrypt one inbound message and deliver it to the analysis's webhooks. */ + /** + * Decrypt one inbound message and deliver it to the analysis's webhooks. Throws an + * {@link InboundProcessingError} tagged `permanent` for failures that cannot succeed on + * retry (bad envelope, unknown sender, decrypt/parse error) and `transient` for outages + * (participant resolution, webhook delivery) that warrant a retry. + */ protected async deliverMessage(message: Message): Promise { const analysisId = message.metadata?.analysisId; if (!analysisId) { - throw new Error('message carries no analysisId metadata'); + throw new InboundProcessingError('message carries no analysisId metadata', { permanent: true }); } if (typeof message.data !== 'string') { - throw new Error('message carries no ciphertext payload'); + throw new InboundProcessingError('message carries no ciphertext payload', { permanent: true }); } + const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id); + // `analysisId` is bound into the key derivation (HKDF info, matching the seal path), // so a `metadata.analysisId` relabelled in transit by the untrusted Hub fails to // decrypt here rather than being mis-routed to another analysis's webhooks. - const senderPublicKey = await this.resolveSenderPublicKey(analysisId, message.sender_id); - const plaintext = await this.deps.crypto.open(message.data, senderPublicKey, analysisId); - const payload = JSON.parse(this.decoder.decode(plaintext)); + let payload: unknown; + try { + const plaintext = await this.deps.crypto.open(message.data, senderPublicKey, analysisId); + payload = JSON.parse(this.decoder.decode(plaintext)); + } catch (error) { + throw new InboundProcessingError('failed to decrypt or parse message payload', { permanent: true, cause: error }); + } - await this.deps.delivery.deliver(analysisId, payload); + try { + await this.deps.delivery.deliver(analysisId, payload); + } catch (error) { + throw new InboundProcessingError('failed to deliver message to the analysis webhooks', { permanent: false, cause: error }); + } } /** Map a sender client id to its node public key via the analysis participant set. */ protected async resolveSenderPublicKey(analysisId: string, senderId: string): Promise { - const participants = await this.deps.resolver.resolve(analysisId); + let participants; + try { + participants = await this.deps.resolver.resolve(analysisId); + } catch (error) { + // a resolver/server-core outage is transient — retry rather than drop + throw new InboundProcessingError('failed to resolve analysis participants', { permanent: false, cause: error }); + } + const sender = participants.find((participant) => participant.clientId === senderId); if (!sender) { - throw new Error(`sender '${senderId}' is not a participant of analysis '${analysisId}'`); + throw new InboundProcessingError(`sender '${senderId}' is not a participant of analysis '${analysisId}'`, { permanent: true }); } return sender.publicKey; diff --git a/apps/node-message-broker/src/core/inbound/types.ts b/apps/node-message-broker/src/core/inbound/types.ts index 1baccaf..9c95560 100644 --- a/apps/node-message-broker/src/core/inbound/types.ts +++ b/apps/node-message-broker/src/core/inbound/types.ts @@ -27,5 +27,7 @@ export type InboundProcessorOptions = { /** long-poll budget in ms for the fallback loop's pull (default 20000) */ waitMs?: number, /** backoff in ms after a failed pull before the fallback loop retries (default 1000) */ - errorBackoffMs?: number + errorBackoffMs?: number, + /** transient-failure attempts before a message is dead-lettered (acked to drop) (default 5) */ + maxAttempts?: number }; diff --git a/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts index 3f509a1..64966ff 100644 --- a/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts +++ b/apps/node-message-broker/test/unit/core/inbound/processor.spec.ts @@ -74,6 +74,7 @@ function setup() { { waitMs: 50, errorBackoffMs: 5, + maxAttempts: 3, }, ); @@ -108,7 +109,7 @@ describe('core/inbound/processor', () => { expect(hub.acked).toEqual([['msg-1']]); }); - it('isolates a decrypt failure and still delivers and acks the rest of the batch', async () => { + it('dead-letters a permanently undecryptable message and still delivers the rest', async () => { const { crypto, delivery, @@ -123,49 +124,73 @@ describe('core/inbound/processor', () => { inboundMessage({ id: 'good', data: 'cipher-2' }), ]); - expect(acked).toEqual(['good']); + // a decrypt failure is permanent → dropped (acked away), not retried forever + expect(acked).toEqual(['bad', 'good']); + expect(processor.droppedCount).toBe(1); expect(delivery.delivered.map((entry) => entry.message)).toEqual([{ ok: true }]); - expect(hub.acked).toEqual([['good']]); + expect(hub.acked).toEqual([['bad', 'good']]); }); - it('does not ack a message whose local delivery fails (left for redelivery)', async () => { + it('drops permanent failures (no analysisId / ciphertext / unknown sender) on the first attempt', async () => { const { delivery, hub, processor, } = setup(); - delivery.failAnalyses.add('a1'); - const acked = await processor.processBatch([inboundMessage()]); + const acked = await processor.processBatch([ + inboundMessage({ id: 'no-analysis', metadata: null }), + inboundMessage({ id: 'no-cipher', data: null }), + inboundMessage({ id: 'stranger', sender_id: 'client-stranger' }), + ]); - expect(acked).toEqual([]); - expect(hub.acked).toEqual([]); + expect(acked).toEqual(['no-analysis', 'no-cipher', 'stranger']); + expect(processor.droppedCount).toBe(3); + expect(delivery.delivered).toEqual([]); + expect(hub.acked).toEqual([['no-analysis', 'no-cipher', 'stranger']]); }); - it('skips messages without an analysisId or ciphertext payload', async () => { + it('retries a transient delivery failure and dead-letters it only after maxAttempts', async () => { const { delivery, hub, processor, } = setup(); + delivery.failAnalyses.add('a1'); // webhook outage — transient - const acked = await processor.processBatch([ - inboundMessage({ id: 'no-analysis', metadata: null }), - inboundMessage({ id: 'no-cipher', data: null }), - ]); - - expect(acked).toEqual([]); - expect(delivery.delivered).toEqual([]); + // attempts 1 and 2 (< maxAttempts=3): left unacked for redelivery + expect(await processor.processBatch([inboundMessage()])).toEqual([]); + expect(await processor.processBatch([inboundMessage()])).toEqual([]); + expect(processor.droppedCount).toBe(0); expect(hub.acked).toEqual([]); + + // attempt 3 reaches the cap → dead-lettered (acked to drop) + expect(await processor.processBatch([inboundMessage()])).toEqual(['msg-1']); + expect(processor.droppedCount).toBe(1); + expect(hub.acked).toEqual([['msg-1']]); }); - it('skips a message from an unknown sender', async () => { - const { delivery, processor } = setup(); + it('resets the attempt count after a recovered transient failure', async () => { + const { + delivery, + processor, + } = setup(); + delivery.failAnalyses.add('a1'); - const acked = await processor.processBatch([inboundMessage({ sender_id: 'client-stranger' })]); + await processor.processBatch([inboundMessage()]); // attempt 1, unacked + await processor.processBatch([inboundMessage()]); // attempt 2, unacked - expect(acked).toEqual([]); - expect(delivery.delivered).toEqual([]); + delivery.failAnalyses.delete('a1'); // outage recovers + const acked = await processor.processBatch([inboundMessage()]); + + // delivered on recovery, dropped nothing, and the counter is cleared for the id + expect(acked).toEqual(['msg-1']); + expect(processor.droppedCount).toBe(0); + + // a subsequent transient failure starts counting from one again + delivery.failAnalyses.add('a1'); + expect(await processor.processBatch([inboundMessage()])).toEqual([]); + expect(processor.droppedCount).toBe(0); }); it('drains the backlog on a wakeup, then unsubscribes on stop', async () => { From f633c8ec2fcce5a0898630ef7ba5739912e7ce0e Mon Sep 17 00:00:00 2001 From: tada5hi Date: Fri, 26 Jun 2026 18:06:36 +0200 Subject: [PATCH 3/3] fix(node-message-broker): fail fast on an unconfigured production security stack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The broker ships development defaults for the whole security stack — localhost Authup, `system` / `start123` node-client credentials, no node key. Relying on them in production would silently authenticate against the wrong identity provider (or degrade the authorization middleware to fake-identity mode) and have no usable end-to-end key. `assertProductionConfig` (run on the env-derived startup path) now refuses to start when `NODE_ENV=production` and any of `AUTHUP_URL`, `CLIENT_ID`, `CLIENT_SECRET`, `REALM`, `NODE_PRIVATE_KEY` is unset, listing the missing vars. Development and test keep the intended defaults. Closes #14. --- .../src/app/modules/config/guard.ts | 55 +++++++++++++++++++ .../src/app/modules/config/module.ts | 6 +- .../test/unit/app/config-guard.spec.ts | 53 ++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 apps/node-message-broker/src/app/modules/config/guard.ts create mode 100644 apps/node-message-broker/test/unit/app/config-guard.spec.ts diff --git a/apps/node-message-broker/src/app/modules/config/guard.ts b/apps/node-message-broker/src/app/modules/config/guard.ts new file mode 100644 index 0000000..2a86aae --- /dev/null +++ b/apps/node-message-broker/src/app/modules/config/guard.ts @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { EnvironmentName } from '@privateaim/server-kit'; +import { EnvironmentInputKey } from './constants.ts'; +import type { Config } from './types.ts'; + +/** + * Env vars that must be set explicitly in production. The broker ships development defaults + * for the whole security stack — localhost Authup, `system` / `start123` node-client + * credentials, and no node key — so relying on them in production would silently + * authenticate against the wrong identity provider (or, with an unreachable default Authup, + * degrade `mountAuthorizationMiddleware` to its fake-identity mode) and have no usable + * end-to-end key. + */ +const PRODUCTION_REQUIRED_ENV: EnvironmentInputKey[] = [ + EnvironmentInputKey.AUTHUP_URL, + EnvironmentInputKey.CLIENT_ID, + EnvironmentInputKey.CLIENT_SECRET, + EnvironmentInputKey.REALM, + EnvironmentInputKey.NODE_PRIVATE_KEY, +]; + +/** + * Fail fast when the broker is started in **production** without the security stack + * explicitly configured, rather than silently running with development defaults. Outside + * production (development / test) the defaults are intended, so the guard is a no-op. + * + * Checks raw env presence (not the normalized config) so a value left at its dev default + * is treated as unset. + */ +export function assertProductionConfig( + config: Pick, + env: Record = process.env, +): void { + if (config.env !== EnvironmentName.PRODUCTION) { + return; + } + + const missing = PRODUCTION_REQUIRED_ENV.filter((key) => { + const value = env[key]; + return value === undefined || value === ''; + }); + + if (missing.length > 0) { + throw new Error( + `Refusing to start in production with an unconfigured security stack: set ${missing.join(', ')}. ` + + 'Development defaults must not be relied on in production.', + ); + } +} diff --git a/apps/node-message-broker/src/app/modules/config/module.ts b/apps/node-message-broker/src/app/modules/config/module.ts index 54b67e8..76da190 100644 --- a/apps/node-message-broker/src/app/modules/config/module.ts +++ b/apps/node-message-broker/src/app/modules/config/module.ts @@ -8,6 +8,7 @@ import type { IContainer } from 'eldin'; import type { IModule } from 'orkos'; import { ConfigInjectionKey } from './constants.ts'; +import { assertProductionConfig } from './guard.ts'; import { normalizeConfig } from './normalize.ts'; import { readConfigFromEnv } from './read.ts'; import type { Config } from './types.ts'; @@ -30,6 +31,9 @@ export class ConfigModule implements IModule { private async read(): Promise { const raw = readConfigFromEnv(); - return normalizeConfig(raw); + const config = await normalizeConfig(raw); + // env-derived startup only: refuse to run in production on development defaults. + assertProductionConfig(config); + return config; } } diff --git a/apps/node-message-broker/test/unit/app/config-guard.spec.ts b/apps/node-message-broker/test/unit/app/config-guard.spec.ts new file mode 100644 index 0000000..3d56720 --- /dev/null +++ b/apps/node-message-broker/test/unit/app/config-guard.spec.ts @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { EnvironmentName } from '@privateaim/server-kit'; +import { describe, expect, it } from 'vitest'; +import { assertProductionConfig } from '../../../src/app/modules/config/guard.ts'; + +const FULL_ENV = { + AUTHUP_URL: 'https://authup.example', + CLIENT_ID: 'node-x', + CLIENT_SECRET: 'super-secret', + REALM: 'flame', + NODE_PRIVATE_KEY: 'deadbeef', +}; + +describe('app/config/assertProductionConfig', () => { + it('passes in production when the full security stack is set', () => { + expect(() => assertProductionConfig({ env: EnvironmentName.PRODUCTION }, FULL_ENV)).not.toThrow(); + }); + + it('throws in production listing every missing security var', () => { + const error = (() => { + try { + assertProductionConfig({ env: EnvironmentName.PRODUCTION }, { AUTHUP_URL: 'https://authup.example' }); + return undefined; + } catch (err) { + return err as Error; + } + })(); + + expect(error).toBeInstanceOf(Error); + expect(error?.message).toMatch(/CLIENT_ID/); + expect(error?.message).toMatch(/CLIENT_SECRET/); + expect(error?.message).toMatch(/REALM/); + expect(error?.message).toMatch(/NODE_PRIVATE_KEY/); + // a provided var is not reported + expect(error?.message).not.toMatch(/AUTHUP_URL/); + }); + + it('treats an empty-string value as unset', () => { + expect(() => assertProductionConfig({ env: EnvironmentName.PRODUCTION }, { ...FULL_ENV, REALM: '' })) + .toThrow(/REALM/); + }); + + it('is a no-op outside production (dev defaults are intended)', () => { + expect(() => assertProductionConfig({ env: EnvironmentName.DEVELOPMENT }, {})).not.toThrow(); + expect(() => assertProductionConfig({ env: EnvironmentName.TEST }, {})).not.toThrow(); + }); +});