Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ architecture as the Hub backend services.

| Service | Description |
|---------|-------------|
| **[node-message-broker](apps/node-message-broker)** 💬 | Node-side message broker — container-facing REST API, end-to-end crypto, local delivery; relays to the Hub durable mailbox. Replaces the legacy Java `node-message-broker`. |
| **[node-message-broker](apps/node-message-broker)** 💬 | Node-side message broker — `flamesdk`-compatible container REST API, node-to-node end-to-end crypto, and local webhook delivery; relays to and pulls from the Hub durable mailbox. Replaces the legacy Java `node-message-broker`. |

_Further node-side services may be added here over time; others may remain in their own repositories._

Expand Down
77 changes: 62 additions & 15 deletions apps/node-message-broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,59 @@ A thin TypeScript service — the successor to the legacy Java `node-message-bro
only ever sees ciphertext.
3. **Hub link** — REST `send` / `pull` / `ack` against the Hub durable mailbox via
`@privateaim/messenger-http-kit`, plus the SSE wakeup stream that triggers pulls.
4. **Local delivery** — webhook fan-out (default) + optional container pull.
4. **Local delivery** — webhook fan-out to the analysis container.
5. **Node-side analysis policy** — capability check (`ANALYSIS_SELF_MESSAGE_BROKER_USE`)
+ participant resolution via server-core.

Durability and routing live in the **Hub** broker; this service is an encrypt/decrypt
+ local-delivery adapter. See the design authority (Plan 013, Track B).

## Status
## Data flow

In progress. In place: the hexagonal skeleton, config, HTTP server,
webhook-subscription CRUD + fan-out delivery, the **Hub link** (REST `send` / `pull` /
`ack` via `@privateaim/messenger-http-kit` + a reconnecting SSE wakeup stream, both
authenticated as the node client), and the **end-to-end crypto** adapter (`seal` /
`open` over `@privateaim/kit`'s `crypto/message`).
The broker is a thin encrypt/decrypt + local-delivery adapter in front of the Hub's
durable, analysis-agnostic mailbox. `analysisId` rides in message `metadata`; the Hub never
interprets it.

Still open (Plan 013 Track B, Phase 4): wiring the `onWakeup` → pull → decrypt →
`delivery.deliver()` loop, the container-facing message routes (`POST /:id/messages`,
`…/broadcast`, `GET /:id/participants`, `…/participants/self`, and the additive pull
endpoint), and the analysis policy (`ANALYSIS_SELF_MESSAGE_BROKER_USE`) + participant
resolution — the `core/analysis` ports exist, but no adapter is wired yet.
```
send: Container ──REST──▶ Broker ──[seal per recipient, analysisId-bound]──▶ Hub /messages
notify: Hub ──SSE "messagePending" (payload-free)──▶ Broker (long-poll pull is the fallback)
pull: Broker ──GET /messages (cursor, long-poll)──▶ Hub
deliver: Broker ──[open, decrypt]──▶ webhook POST to the analysis container
```

- **Send** — resolve the analysis participants (server-core), seal the payload **once per
recipient** under that node's ECDH public key, and relay one Hub message per recipient.
- **Inbound** — a payload-free wakeup (or the long-poll fallback) triggers a pull; each
message's **sender** node key is resolved, the frame is decrypted, and the plaintext is
fanned out verbatim to the analysis's registered webhooks, then acked (delete-on-ack,
at-least-once).
- **Crypto** — node-to-node ECDH (P-256) + per-message HKDF + AES-256-GCM via
`@privateaim/kit`. `analysisId` is bound into the HKDF `info` on both `seal` and `open`,
so a relabelled `metadata.analysisId` fails to decrypt rather than mis-routing.

## HTTP API

The container-facing surface (auth: node-local Authup JWT — the analysis presents its
`KEYCLOAK_TOKEN`). Every `/analyses/:id/*` route additionally requires the
`ANALYSIS_SELF_MESSAGE_BROKER_USE` capability and that the caller's client owns the analysis.
The surface is kept compatible with the FLAME `flamesdk` (verified against
[`python-sdk`](https://github.com/PrivateAIM/python-sdk)).

| Method & path | Body | Response |
|---|---|---|
| `POST /analyses/:id/messages` | `{ recipients: string[] /* node ids */, message: <JSON> }` | `202`, empty |
| `POST /analyses/:id/messages/broadcast` | `{ message: <JSON> }` | `202`, empty |
| `GET /analyses/:id/participants` | — | `[{ nodeId, nodeType }]` (bare array) |
| `GET /analyses/:id/participants/self` | — | `{ nodeId, nodeType }` (`404` if absent) |
| `POST /analyses/:id/messages/subscriptions` | `{ webhookUrl }` | registered subscription |
| `GET /analyses/:id/messages/subscriptions` | — | `{ data, meta: { total } }` |
| `DELETE /analyses/:id/messages/subscriptions` | `{ webhookUrl }` | unregistered |
| `GET /healthz` | — | liveness (unauthenticated) |

`message` is an opaque JSON payload relayed verbatim — the SDK round-trips its own envelope
(`meta.id`, `sender`, …) inside it; the broker never mints ids or wraps the payload. Inbound
delivery is **webhook-push only** (no pull endpoint). Request bodies are validated with
validup + zod.

## Configuration

Expand All @@ -83,9 +116,23 @@ npm run cli -- start

```
src/
├── core/ # ports — hub link, crypto, local delivery, analysis policy (no infra imports)
├── adapters/ # implementations — http controllers, hub client + SSE wakeup, crypto, delivery
└── app/ # orchestration — builder, factory, DI modules (config, components, http)
├── core/ # ports + domain logic (no infra imports)
│ ├── hub/ # IHubClient — send / pull / ack / onWakeup
│ ├── crypto/ # ICryptoService — seal / open
│ ├── delivery/ # IDeliveryService — webhook registry + fan-out
│ ├── analysis/ # participant resolver + analysis-scope policy
│ ├── authz/ # capability-check gateway port
│ ├── messaging/ # outbound send / broadcast orchestration
│ └── inbound/ # inbound delivery loop (pull → decrypt → deliver → ack)
├── adapters/ # external implementations
│ ├── http/ # routup controllers + permission-checker middleware
│ ├── hub/ # HubClient + reconnecting SSE wakeup source
│ ├── crypto/ # CryptoService over @privateaim/kit
│ ├── core/ # server-core participant resolver
│ ├── authz/ # Authup permission gateway + provider
│ └── delivery/ # in-memory webhook delivery
└── app/ # orchestration — builder, factory, DI modules
└── modules/ # config · components · core-client · inbound · http
```

## License
Expand Down
Loading