Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .agents/conventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ Output: ESM (`dist/**/*.mjs`) preserving source structure + `.d.ts`. CLI entry a
- **Naming**: interfaces are `I`-prefixed (`IHubClient`); types are not (`WebhookSubscription`).
- **Types/interfaces** live in `types.ts` in the same directory, never inline in module files.
- Prefer static imports; no dynamic imports for types. No `as any`.
- **No third-party re-exports.** Don't forward symbols from external packages through local
modules (e.g. `import type { X } from '@pkg'; export type { X };`). Import third-party
symbols directly from their source package at each use site. (Local `index.ts` barrels that
re-export sibling modules within the same directory are still fine.)

## Published Dependencies

Expand Down
58 changes: 48 additions & 10 deletions apps/node-message-broker/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
# @privateaim/node-message-broker
<p align="center">
<a href="https://github.com/PrivateAIM/node" target="_blank" rel="noopener noreferrer">
<img src="https://raw.githubusercontent.com/PrivateAIM/node/master/.github/assets/icon.svg" alt="FLAME Node" height="100">
</a>
</p>

> Part of the [FLAME Node](../../README.md) monorepo — one of the node-side (data-station)
> services for the FLAME platform, alongside the central [Hub](https://github.com/PrivateAIM/hub).
<h1 align="center">@privateaim/node-message-broker 💬</h1>

The **node-side message broker** for the FLAME platform. It is the thin TypeScript
service that replaces the legacy Java `node-message-broker`. It owns only:
<p align="center">
<b>The node-side message broker for the FLAME platform.</b><br>
Container-facing REST API, end-to-end crypto, and local delivery — relaying to the Hub durable mailbox.
</p>

<p align="center">
<a href="https://github.com/PrivateAIM/node/actions/workflows/main.yml"><img src="https://github.com/PrivateAIM/node/actions/workflows/main.yml/badge.svg" alt="CI"></a>
<img src="https://img.shields.io/badge/node-%E2%89%A524-3c873a?logo=node.js&logoColor=fff" alt="node >=24">
<a href="https://github.com/PrivateAIM/node/blob/master/LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-blue.svg" alt="license"></a>
</p>

<p align="center">
<a href="https://docs.privateaim.net"><b>Documentation</b></a> &nbsp;·&nbsp;
<a href="https://github.com/PrivateAIM/node">Monorepo</a> &nbsp;·&nbsp;
<a href="https://github.com/PrivateAIM/hub">Hub</a>
</p>

---

Part of the **[FLAME Node](https://github.com/PrivateAIM/node)** monorepo — node-side (data-station)
services for the [PrivateAIM](https://privateaim.net) platform, alongside the central [Hub](https://github.com/PrivateAIM/hub).

A thin TypeScript service — the successor to the legacy Java `node-message-broker` — that owns only:

1. **Container-facing REST API** — the SDK-compatible surface the FLAME `flamesdk`
talks to (auth: node-local Authup JWT, the analysis presents its `KEYCLOAK_TOKEN`).
Expand All @@ -22,9 +46,17 @@ Durability and routing live in the **Hub** broker; this service is an encrypt/de

## Status

Scaffold. The hexagonal skeleton, config, HTTP server, and webhook-subscription CRUD
are in place. The Hub link, crypto, participant resolution, and message send/pull
routes are Phase 4 (marked with `TODO`/stub in the code).
In progress. In place: the hexagonal skeleton, config, HTTP server,
webhook-subscription CRUD + fan-out delivery, the **Hub link** (REST `send` / `pull` /
`ack` via `@privateaim/messenger-http-kit` + a reconnecting SSE wakeup stream, both
authenticated as the node client), and the **end-to-end crypto** adapter (`seal` /
`open` over `@privateaim/kit`'s `crypto/message`).

Still open (Plan 013 Track B, Phase 4): wiring the `onWakeup` → pull → decrypt →
`delivery.deliver()` loop, the container-facing message routes (`POST /:id/messages`,
`…/broadcast`, `GET /:id/participants`, `…/participants/self`, and the additive pull
endpoint), and the analysis policy (`ANALYSIS_SELF_MESSAGE_BROKER_USE`) + participant
resolution — the `core/analysis` ports exist, but no adapter is wired yet.

## Configuration

Expand All @@ -51,7 +83,13 @@ npm run cli -- start

```
src/
├── core/ # ports — hub link, local delivery, analysis policy (no infra imports)
├── adapters/ # implementations — http controllers, hub client, delivery
├── core/ # ports — hub link, crypto, local delivery, analysis policy (no infra imports)
├── adapters/ # implementations — http controllers, hub client + SSE wakeup, crypto, delivery
└── app/ # orchestration — builder, factory, DI modules (config, components, http)
```

## License

Made with 💚

Published under [Apache 2.0](https://github.com/PrivateAIM/node/blob/master/LICENSE).
1 change: 1 addition & 0 deletions apps/node-message-broker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"eldin": "^1.1.0",
"envix": "^1.3.0",
"hapic": "^2.8.2",
"lru-cache": "^11.5.1",
"orkos": "^1.1.1",
"reflect-metadata": "^0.2.2",
"routup": "^6.0.0",
Expand Down
8 changes: 8 additions & 0 deletions apps/node-message-broker/src/adapters/crypto/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

export * from './service.ts';
126 changes: 126 additions & 0 deletions apps/node-message-broker/src/adapters/crypto/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import {
hexToUTF8,
importAsymmetricPrivateKey,
importAsymmetricPublicKey,
isHex,
openMessage,
sealMessage,
} from '@privateaim/kit';
import type { MessageSealInput } from '@privateaim/kit';
import { LRUCache } from 'lru-cache';
import type { ICryptoService } from '../../core/crypto/index.ts';

const ECDH_PARAMS = { name: 'ECDH', namedCurve: 'P-256' } as const;

const DEFAULT_PUBLIC_KEY_CACHE_MAX = 1024;

type CryptoServiceContext = {
privateKey?: string, // hex-encoded PKCS#8 PEM (config.nodePrivateKey, OPTIONAL)
publicKeyCacheMax?: number, // max imported peer keys to retain (LRU); default 1024
};

/**
* End-to-end crypto adapter backed by `@privateaim/kit`.
*
* Holds the node's single ECDH private key (hex-encoded PKCS#8 PEM, supplied by
* the operator via `config.nodePrivateKey`, lazily imported once and cached).
* Peer ECDH public keys arrive hex-encoded SPKI PEM and are imported with EMPTY
* usages and cached by their hex string. `seal` / `open` delegate to the kit's
* `sealMessage` / `openMessage` (per-message HKDF salt + AES-256-GCM); this
* adapter never touches AES, HKDF or nonces directly.
*/
export class CryptoService implements ICryptoService {
protected privateKey?: string;

private privateKeyPromise?: Promise<CryptoKey>;

// Bounded LRU keyed by the raw hex PEM (distinct keys never collide). The peer
// set is operator-controlled and small, so the cap is only a backstop against
// unbounded growth over a long-lived process.
protected readonly publicKeyCache: LRUCache<string, Promise<CryptoKey>>;

constructor(ctx: CryptoServiceContext) {
this.privateKey = ctx.privateKey;

const max = Math.max(1, Math.floor(ctx.publicKeyCacheMax ?? DEFAULT_PUBLIC_KEY_CACHE_MAX));
this.publicKeyCache = new LRUCache<string, Promise<CryptoKey>>({ max });
}

private getPrivateKey(): Promise<CryptoKey> {
if (this.privateKeyPromise) {
return this.privateKeyPromise;
}

const hex = this.privateKey;
if (!hex) {
throw new Error('Node private key is not configured (NODE_PRIVATE_KEY is missing).');
}

if (!isHex(hex)) {
throw new Error('Node private key is not a valid hex-encoded PEM string.');
}

// Cache the in-flight import, but evict a rejected result so a corrected
// key can be retried rather than failing forever.
const promise = importAsymmetricPrivateKey(hexToUTF8(hex), ECDH_PARAMS);
promise.catch(() => {
if (this.privateKeyPromise === promise) {
this.privateKeyPromise = undefined;
}
});
this.privateKeyPromise = promise;
return promise;
}

private getPublicKey(hex: string): Promise<CryptoKey> {
const cached = this.publicKeyCache.get(hex);
if (cached) {
return cached;
}

if (!isHex(hex)) {
throw new Error('Peer public key is not a valid hex-encoded PEM string.');
}

const promise = importAsymmetricPublicKey(hexToUTF8(hex), ECDH_PARAMS);
// evict a rejected import (`peek` so we don't disturb LRU recency) so a
// corrected key can be retried rather than failing forever.
promise.catch(() => {
if (this.publicKeyCache.peek(hex) === promise) {
this.publicKeyCache.delete(hex);
}
});

this.publicKeyCache.set(hex, promise);
return promise;
}

async seal(data: MessageSealInput, recipientPublicKey: string, info?: MessageSealInput): Promise<string> {
const privateKey = await this.getPrivateKey();
const publicKey = await this.getPublicKey(recipientPublicKey);
return sealMessage({
privateKey,
publicKey,
data,
info,
});
}

async open(payload: string, senderPublicKey: string, info?: MessageSealInput): Promise<Uint8Array> {
const privateKey = await this.getPrivateKey();
const publicKey = await this.getPublicKey(senderPublicKey);
return openMessage({
privateKey,
publicKey,
payload,
info,
});
}
}
8 changes: 8 additions & 0 deletions apps/node-message-broker/src/core/crypto/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

export * from './types.ts';
42 changes: 42 additions & 0 deletions apps/node-message-broker/src/core/crypto/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2026.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import type { MessageSealInput } from '@privateaim/kit';

/**
* Node-to-node end-to-end crypto port.
*
* Seals outbound messages for a recipient node and opens inbound messages from a
* sender node, using ECDH (P-256) + per-message HKDF + AES-256-GCM via
* `@privateaim/kit`'s `sealMessage` / `openMessage`. The node holds exactly one
* ECDH keypair; the operator supplies the private key (hex-encoded PKCS#8 PEM)
* and peer public keys arrive hex-encoded SPKI PEM. The hub only ever sees the
* opaque base64 frame and never decrypts it.
*
* Implemented by `CryptoService` in `adapters/crypto/service.ts`.
*/
export interface ICryptoService {
/**
* Seal `data` for the recipient node. Returns the base64 frame
* (`salt ‖ iv ‖ ciphertext‖tag`) to relay through the hub.
*
* @param data plaintext (bytes or UTF-8 string)
* @param recipientPublicKey recipient node ECDH public key, hex-encoded SPKI PEM
* @param info optional HKDF context; the opener must pass the identical value
*/
seal(data: MessageSealInput, recipientPublicKey: string, info?: MessageSealInput): Promise<string>;

/**
* Open a base64 frame produced by a sender node. Returns the decrypted
* plaintext bytes; the caller decodes to string if needed.
*
* @param payload the base64 frame from `sealMessage`
* @param senderPublicKey sender node ECDH public key, hex-encoded SPKI PEM
* @param info the identical HKDF context the sealer used, if any
*/
open(payload: string, senderPublicKey: string, info?: MessageSealInput): Promise<Uint8Array>;
}
Loading
Loading