diff --git a/.agents/conventions.md b/.agents/conventions.md index 1827d1e..679f9c2 100644 --- a/.agents/conventions.md +++ b/.agents/conventions.md @@ -55,6 +55,10 @@ Output: ESM (`dist/**/*.mjs`) preserving source structure + `.d.ts`. CLI entry a - **Naming**: interfaces are `I`-prefixed (`IHubClient`); types are not (`WebhookSubscription`). - **Types/interfaces** live in `types.ts` in the same directory, never inline in module files. - Prefer static imports; no dynamic imports for types. No `as any`. +- **No third-party re-exports.** Don't forward symbols from external packages through local + modules (e.g. `import type { X } from '@pkg'; export type { X };`). Import third-party + symbols directly from their source package at each use site. (Local `index.ts` barrels that + re-export sibling modules within the same directory are still fine.) ## Published Dependencies diff --git a/apps/node-message-broker/README.md b/apps/node-message-broker/README.md index de7397d..1c074c2 100644 --- a/apps/node-message-broker/README.md +++ b/apps/node-message-broker/README.md @@ -1,10 +1,34 @@ -# @privateaim/node-message-broker +

+ + FLAME Node + +

-> Part of the [FLAME Node](../../README.md) monorepo β€” one of the node-side (data-station) -> services for the FLAME platform, alongside the central [Hub](https://github.com/PrivateAIM/hub). +

@privateaim/node-message-broker πŸ’¬

-The **node-side message broker** for the FLAME platform. It is the thin TypeScript -service that replaces the legacy Java `node-message-broker`. It owns only: +

+ The node-side message broker for the FLAME platform.
+ Container-facing REST API, end-to-end crypto, and local delivery β€” relaying to the Hub durable mailbox. +

+ +

+ CI + node >=24 + license +

+ +

+ Documentation  Β·  + Monorepo  Β·  + Hub +

+ +--- + +Part of the **[FLAME Node](https://github.com/PrivateAIM/node)** monorepo β€” node-side (data-station) +services for the [PrivateAIM](https://privateaim.net) platform, alongside the central [Hub](https://github.com/PrivateAIM/hub). + +A thin TypeScript service β€” the successor to the legacy Java `node-message-broker` β€” that owns only: 1. **Container-facing REST API** β€” the SDK-compatible surface the FLAME `flamesdk` talks to (auth: node-local Authup JWT, the analysis presents its `KEYCLOAK_TOKEN`). @@ -22,9 +46,17 @@ Durability and routing live in the **Hub** broker; this service is an encrypt/de ## Status -Scaffold. The hexagonal skeleton, config, HTTP server, and webhook-subscription CRUD -are in place. The Hub link, crypto, participant resolution, and message send/pull -routes are Phase 4 (marked with `TODO`/stub in the code). +In progress. In place: the hexagonal skeleton, config, HTTP server, +webhook-subscription CRUD + fan-out delivery, the **Hub link** (REST `send` / `pull` / +`ack` via `@privateaim/messenger-http-kit` + a reconnecting SSE wakeup stream, both +authenticated as the node client), and the **end-to-end crypto** adapter (`seal` / +`open` over `@privateaim/kit`'s `crypto/message`). + +Still open (Plan 013 Track B, Phase 4): wiring the `onWakeup` β†’ pull β†’ decrypt β†’ +`delivery.deliver()` loop, the container-facing message routes (`POST /:id/messages`, +`…/broadcast`, `GET /:id/participants`, `…/participants/self`, and the additive pull +endpoint), and the analysis policy (`ANALYSIS_SELF_MESSAGE_BROKER_USE`) + participant +resolution β€” the `core/analysis` ports exist, but no adapter is wired yet. ## Configuration @@ -51,7 +83,13 @@ npm run cli -- start ``` src/ -β”œβ”€β”€ core/ # ports β€” hub link, local delivery, analysis policy (no infra imports) -β”œβ”€β”€ adapters/ # implementations β€” http controllers, hub client, delivery +β”œβ”€β”€ core/ # ports β€” hub link, crypto, local delivery, analysis policy (no infra imports) +β”œβ”€β”€ adapters/ # implementations β€” http controllers, hub client + SSE wakeup, crypto, delivery └── app/ # orchestration β€” builder, factory, DI modules (config, components, http) ``` + +## License + +Made with πŸ’š + +Published under [Apache 2.0](https://github.com/PrivateAIM/node/blob/master/LICENSE). diff --git a/apps/node-message-broker/package.json b/apps/node-message-broker/package.json index 1244f66..be270c8 100644 --- a/apps/node-message-broker/package.json +++ b/apps/node-message-broker/package.json @@ -37,6 +37,7 @@ "eldin": "^1.1.0", "envix": "^1.3.0", "hapic": "^2.8.2", + "lru-cache": "^11.5.1", "orkos": "^1.1.1", "reflect-metadata": "^0.2.2", "routup": "^6.0.0", diff --git a/apps/node-message-broker/src/adapters/crypto/index.ts b/apps/node-message-broker/src/adapters/crypto/index.ts new file mode 100644 index 0000000..eb339f0 --- /dev/null +++ b/apps/node-message-broker/src/adapters/crypto/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './service.ts'; diff --git a/apps/node-message-broker/src/adapters/crypto/service.ts b/apps/node-message-broker/src/adapters/crypto/service.ts new file mode 100644 index 0000000..cae4dc4 --- /dev/null +++ b/apps/node-message-broker/src/adapters/crypto/service.ts @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { + hexToUTF8, + importAsymmetricPrivateKey, + importAsymmetricPublicKey, + isHex, + openMessage, + sealMessage, +} from '@privateaim/kit'; +import type { MessageSealInput } from '@privateaim/kit'; +import { LRUCache } from 'lru-cache'; +import type { ICryptoService } from '../../core/crypto/index.ts'; + +const ECDH_PARAMS = { name: 'ECDH', namedCurve: 'P-256' } as const; + +const DEFAULT_PUBLIC_KEY_CACHE_MAX = 1024; + +type CryptoServiceContext = { + privateKey?: string, // hex-encoded PKCS#8 PEM (config.nodePrivateKey, OPTIONAL) + publicKeyCacheMax?: number, // max imported peer keys to retain (LRU); default 1024 +}; + +/** + * End-to-end crypto adapter backed by `@privateaim/kit`. + * + * Holds the node's single ECDH private key (hex-encoded PKCS#8 PEM, supplied by + * the operator via `config.nodePrivateKey`, lazily imported once and cached). + * Peer ECDH public keys arrive hex-encoded SPKI PEM and are imported with EMPTY + * usages and cached by their hex string. `seal` / `open` delegate to the kit's + * `sealMessage` / `openMessage` (per-message HKDF salt + AES-256-GCM); this + * adapter never touches AES, HKDF or nonces directly. + */ +export class CryptoService implements ICryptoService { + protected privateKey?: string; + + private privateKeyPromise?: Promise; + + // Bounded LRU keyed by the raw hex PEM (distinct keys never collide). The peer + // set is operator-controlled and small, so the cap is only a backstop against + // unbounded growth over a long-lived process. + protected readonly publicKeyCache: LRUCache>; + + constructor(ctx: CryptoServiceContext) { + this.privateKey = ctx.privateKey; + + const max = Math.max(1, Math.floor(ctx.publicKeyCacheMax ?? DEFAULT_PUBLIC_KEY_CACHE_MAX)); + this.publicKeyCache = new LRUCache>({ max }); + } + + private getPrivateKey(): Promise { + if (this.privateKeyPromise) { + return this.privateKeyPromise; + } + + const hex = this.privateKey; + if (!hex) { + throw new Error('Node private key is not configured (NODE_PRIVATE_KEY is missing).'); + } + + if (!isHex(hex)) { + throw new Error('Node private key is not a valid hex-encoded PEM string.'); + } + + // Cache the in-flight import, but evict a rejected result so a corrected + // key can be retried rather than failing forever. + const promise = importAsymmetricPrivateKey(hexToUTF8(hex), ECDH_PARAMS); + promise.catch(() => { + if (this.privateKeyPromise === promise) { + this.privateKeyPromise = undefined; + } + }); + this.privateKeyPromise = promise; + return promise; + } + + private getPublicKey(hex: string): Promise { + const cached = this.publicKeyCache.get(hex); + if (cached) { + return cached; + } + + if (!isHex(hex)) { + throw new Error('Peer public key is not a valid hex-encoded PEM string.'); + } + + const promise = importAsymmetricPublicKey(hexToUTF8(hex), ECDH_PARAMS); + // evict a rejected import (`peek` so we don't disturb LRU recency) so a + // corrected key can be retried rather than failing forever. + promise.catch(() => { + if (this.publicKeyCache.peek(hex) === promise) { + this.publicKeyCache.delete(hex); + } + }); + + this.publicKeyCache.set(hex, promise); + return promise; + } + + async seal(data: MessageSealInput, recipientPublicKey: string, info?: MessageSealInput): Promise { + const privateKey = await this.getPrivateKey(); + const publicKey = await this.getPublicKey(recipientPublicKey); + return sealMessage({ + privateKey, + publicKey, + data, + info, + }); + } + + async open(payload: string, senderPublicKey: string, info?: MessageSealInput): Promise { + const privateKey = await this.getPrivateKey(); + const publicKey = await this.getPublicKey(senderPublicKey); + return openMessage({ + privateKey, + publicKey, + payload, + info, + }); + } +} diff --git a/apps/node-message-broker/src/core/crypto/index.ts b/apps/node-message-broker/src/core/crypto/index.ts new file mode 100644 index 0000000..0b5c307 --- /dev/null +++ b/apps/node-message-broker/src/core/crypto/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './types.ts'; diff --git a/apps/node-message-broker/src/core/crypto/types.ts b/apps/node-message-broker/src/core/crypto/types.ts new file mode 100644 index 0000000..f3f3176 --- /dev/null +++ b/apps/node-message-broker/src/core/crypto/types.ts @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { MessageSealInput } from '@privateaim/kit'; + +/** + * Node-to-node end-to-end crypto port. + * + * Seals outbound messages for a recipient node and opens inbound messages from a + * sender node, using ECDH (P-256) + per-message HKDF + AES-256-GCM via + * `@privateaim/kit`'s `sealMessage` / `openMessage`. The node holds exactly one + * ECDH keypair; the operator supplies the private key (hex-encoded PKCS#8 PEM) + * and peer public keys arrive hex-encoded SPKI PEM. The hub only ever sees the + * opaque base64 frame and never decrypts it. + * + * Implemented by `CryptoService` in `adapters/crypto/service.ts`. + */ +export interface ICryptoService { + /** + * Seal `data` for the recipient node. Returns the base64 frame + * (`salt β€– iv β€– ciphertextβ€–tag`) to relay through the hub. + * + * @param data plaintext (bytes or UTF-8 string) + * @param recipientPublicKey recipient node ECDH public key, hex-encoded SPKI PEM + * @param info optional HKDF context; the opener must pass the identical value + */ + seal(data: MessageSealInput, recipientPublicKey: string, info?: MessageSealInput): Promise; + + /** + * Open a base64 frame produced by a sender node. Returns the decrypted + * plaintext bytes; the caller decodes to string if needed. + * + * @param payload the base64 frame from `sealMessage` + * @param senderPublicKey sender node ECDH public key, hex-encoded SPKI PEM + * @param info the identical HKDF context the sealer used, if any + */ + open(payload: string, senderPublicKey: string, info?: MessageSealInput): Promise; +} diff --git a/apps/node-message-broker/test/unit/adapters/crypto/service.spec.ts b/apps/node-message-broker/test/unit/adapters/crypto/service.spec.ts new file mode 100644 index 0000000..09f2cd1 --- /dev/null +++ b/apps/node-message-broker/test/unit/adapters/crypto/service.spec.ts @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { randomUUID } from 'node:crypto'; +import { describe, expect, it } from 'vitest'; +import { + CryptoAsymmetricAlgorithm, + exportAsymmetricPrivateKey, + exportAsymmetricPublicKey, +} from '@privateaim/kit'; +import { CryptoService } from '../../../../src/adapters/crypto/index.ts'; + +/** Exposes the protected cache size so the LRU bound can be asserted. */ +class InspectableCryptoService extends CryptoService { + get cacheSize() { + return this.publicKeyCache.size; + } +} + +const ECDH_PARAMS = { name: 'ECDH', namedCurve: 'P-256' } as const; + +async function generatePair() { + const algo = new CryptoAsymmetricAlgorithm(ECDH_PARAMS); + return algo.generateKeyPair(); +} + +async function privHex(pair: CryptoKeyPair) { + const pem = await exportAsymmetricPrivateKey(pair.privateKey); + return Buffer.from(pem, 'utf8').toString('hex'); +} + +async function pubHex(pair: CryptoKeyPair) { + const pem = await exportAsymmetricPublicKey(pair.publicKey); + return Buffer.from(pem, 'utf8').toString('hex'); +} + +/** Two configured nodes (A, B) plus their hex-encoded keys. */ +async function setup() { + const a = await generatePair(); + const b = await generatePair(); + + const aPrivHex = await privHex(a); + const aPubHex = await pubHex(a); + const bPrivHex = await privHex(b); + const bPubHex = await pubHex(b); + + const serviceA = new CryptoService({ privateKey: aPrivHex }); + const serviceB = new CryptoService({ privateKey: bPrivHex }); + + return { + aPubHex, + bPubHex, + serviceA, + serviceB, + }; +} + +describe('adapters/crypto/service', () => { + it('round-trips A->B and recovers the plaintext bytes', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const plaintext = `hello node-to-node ${randomUUID()}`; + + const frame = await serviceA.seal(plaintext, bPubHex); + const opened = await serviceB.open(frame, aPubHex); + + expect(typeof frame).toBe('string'); + expect(frame.length).toBeGreaterThan(0); + expect(opened).toBeInstanceOf(Uint8Array); + expect(new TextDecoder().decode(opened)).toBe(plaintext); + }); + + it('round-trips with raw bytes input', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const bytes = new TextEncoder().encode(`binary-${randomUUID()}`); + + const frame = await serviceA.seal(bytes, bPubHex); + const opened = await serviceB.open(frame, aPubHex); + + expect(Array.from(opened)).toEqual(Array.from(bytes)); + }); + + it('produces a distinct frame per seal (per-message HKDF salt) yet both open', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const plaintext = `repeat ${randomUUID()}`; + + const frame1 = await serviceA.seal(plaintext, bPubHex); + const frame2 = await serviceA.seal(plaintext, bPubHex); + + expect(frame1).not.toBe(frame2); + expect(new TextDecoder().decode(await serviceB.open(frame1, aPubHex))).toBe(plaintext); + expect(new TextDecoder().decode(await serviceB.open(frame2, aPubHex))).toBe(plaintext); + }); + + it('round-trips with matching info', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const info = `analysis:${randomUUID()}`; + const plaintext = `with-info ${randomUUID()}`; + + const frame = await serviceA.seal(plaintext, bPubHex, info); + const opened = await serviceB.open(frame, aPubHex, info); + + expect(new TextDecoder().decode(opened)).toBe(plaintext); + }); + + it('throws when info mismatches between seal and open', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const frame = await serviceA.seal(`msg ${randomUUID()}`, bPubHex, 'context-A'); + + await expect(serviceB.open(frame, aPubHex, 'context-B')).rejects.toThrow(); + }); + + it('throws when info present on seal but absent on open', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const frame = await serviceA.seal(`msg ${randomUUID()}`, bPubHex, 'some-info'); + + await expect(serviceB.open(frame, aPubHex)).rejects.toThrow(); + }); + + it('fails to open with the wrong recipient key', async () => { + const { + aPubHex, + bPubHex, + serviceA, + } = await setup(); + + const c = await generatePair(); + const serviceC = new CryptoService({ privateKey: await privHex(c) }); + + const frame = await serviceA.seal(`msg ${randomUUID()}`, bPubHex); + + await expect(serviceC.open(frame, aPubHex)).rejects.toThrow(); + }); + + it('throws when the salt/IV region is tampered', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const frame = await serviceA.seal(`msg ${randomUUID()}`, bPubHex); + + // flip a whole byte on the decoded frame so the change can't be lost to + // base64 padding; the first byte lives in the HKDF salt + const raw = Buffer.from(frame, 'base64'); + raw[0] ^= 0xff; + + await expect(serviceB.open(raw.toString('base64'), aPubHex)).rejects.toThrow(); + }); + + it('throws when the ciphertext/tag region is tampered', async () => { + const { + aPubHex, + bPubHex, + serviceA, + serviceB, + } = await setup(); + + const frame = await serviceA.seal(`msg ${randomUUID()}`, bPubHex); + + // the last byte lives in the GCM auth tag + const raw = Buffer.from(frame, 'base64'); + raw[raw.length - 1] ^= 0xff; + + await expect(serviceB.open(raw.toString('base64'), aPubHex)).rejects.toThrow(); + }); + + it('throws the explicit kit error for a malformed/too-short frame', async () => { + const { aPubHex, serviceB } = await setup(); + + await expect(serviceB.open('AAAA', aPubHex)).rejects.toThrow('The sealed message frame is malformed.'); + }); + + it('throws a clear error on seal when the private key is missing', async () => { + const { bPubHex } = await setup(); + const service = new CryptoService({}); + + await expect(service.seal('x', bPubHex)).rejects.toThrow(/NODE_PRIVATE_KEY is missing|not configured/i); + }); + + it('throws a clear error on open when the private key is missing', async () => { + const { + aPubHex, + bPubHex, + serviceA, + } = await setup(); + const frame = await serviceA.seal(`msg ${randomUUID()}`, bPubHex); + + const service = new CryptoService({}); + + await expect(service.open(frame, aPubHex)).rejects.toThrow(/NODE_PRIVATE_KEY is missing|not configured/i); + }); + + it('throws a clear error when the private key is not hex', async () => { + const { bPubHex } = await setup(); + const service = new CryptoService({ privateKey: 'not-hex-zz!!' }); + + await expect(service.seal('x', bPubHex)).rejects.toThrow(/valid hex/i); + }); + + it('rejects seal when the private key is hex but not a valid PEM/DER key', async () => { + const { bPubHex } = await setup(); + const service = new CryptoService({ privateKey: Buffer.from('not a pem', 'utf8').toString('hex') }); + + await expect(service.seal('x', bPubHex)).rejects.toThrow(); + }); + + it('throws a clear error when the recipient public key is not hex', async () => { + const { serviceA } = await setup(); + + await expect(serviceA.seal('x', 'zzzz-not-hex!!')).rejects.toThrow(/Peer public key.*valid hex|valid hex/i); + }); + + it('throws a clear error when the sender public key is not hex on open', async () => { + const { serviceA } = await setup(); + + await expect(serviceA.open('AAAA', 'zzzz-not-hex!!')).rejects.toThrow(/Peer public key.*valid hex|valid hex/i); + }); + + it('bounds the public-key cache by evicting least-recently-used peers', async () => { + const a = await generatePair(); + const service = new InspectableCryptoService({ + privateKey: await privHex(a), + publicKeyCacheMax: 2, + }); + + const peers = [await generatePair(), await generatePair(), await generatePair()]; + for (const peer of peers) { + await service.seal('x', await pubHex(peer)); + } + + // three distinct peers were sealed to, but the cap holds + expect(service.cacheSize).toBeLessThanOrEqual(2); + + // an evicted peer is simply re-imported on next use β€” still seals + const frame = await service.seal('y', await pubHex(peers[0])); + expect(typeof frame).toBe('string'); + expect(service.cacheSize).toBeLessThanOrEqual(2); + }); +}); diff --git a/package-lock.json b/package-lock.json index acba798..b285fd1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,6 +28,9 @@ "typescript": "6.0.3", "typescript-eslint": "^8.60.0", "unplugin-swc": "^1.5.9" + }, + "engines": { + "node": ">=24" } }, "apps/node-message-broker": { @@ -49,6 +52,7 @@ "eldin": "^1.1.0", "envix": "^1.3.0", "hapic": "^2.8.2", + "lru-cache": "^11.5.1", "orkos": "^1.1.1", "reflect-metadata": "^0.2.2", "routup": "^6.0.0", @@ -57,6 +61,9 @@ }, "devDependencies": { "vitest": "^4.1.7" + }, + "engines": { + "node": ">=24" } }, "apps/node-message-broker/node_modules/dotenv": { @@ -6950,7 +6957,6 @@ "version": "11.5.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-11.5.1.tgz", "integrity": "sha512-RPimw/7aMdv2oqRrxKwvZXcPfwBrn/JZ2xYcY9Hus/6LaS3VOAKVWKWgNLCFSiOm1ESXinjsDlidVU7JlnCN2A==", - "dev": true, "license": "BlueOak-1.0.0", "engines": { "node": "20 || >=22"