From 6b37aad21ede5fa4b1c2928035eafbd07730e54a Mon Sep 17 00:00:00 2001 From: tada5hi Date: Thu, 25 Jun 2026 12:49:49 +0200 Subject: [PATCH 1/2] feat(node-message-broker): resolve analysis participants via server-core Slice S2 of the node message-broker build-out (Plan 013 Track B): resolve which node clients participate in an analysis (and their public keys) from server-core, so the send/deliver flows can address and encrypt to peers. - core/analysis: IAnalysisNodeProvider port + CoreNode; AnalysisParticipant now carries the node's ECDH publicKey. - adapters/core: ParticipantResolver maps analysis-node entries to participants, skips ones missing a client id / public key, and caches per analysis behind a short-TTL LRU (off the Hub hot path); resolveSelf matches this node's client id. - app/modules/core-client: CoreClientModule wires a @privateaim/core-http-kit client (node-client credentials, mirroring the Hub link) behind the provider port and registers the resolver. Adds the core-http-kit dependency. - The resolver depends on the narrow IAnalysisNodeProvider (not the whole core client) so it stays unit-testable with a fake; the rapiq getMany query lives at the wiring site. Closes #5 --- apps/node-message-broker/package.json | 1 + .../src/adapters/core/index.ts | 8 + .../src/adapters/core/participant-resolver.ts | 109 +++++++++++ apps/node-message-broker/src/app/builder.ts | 5 + apps/node-message-broker/src/app/factory.ts | 1 + .../src/app/modules/core-client/constants.ts | 11 ++ .../src/app/modules/core-client/index.ts | 9 + .../src/app/modules/core-client/module.ts | 83 ++++++++ .../src/core/analysis/types.ts | 22 ++- .../core/fake-analysis-node-provider.ts | 30 +++ .../core/participant-resolver.spec.ts | 182 ++++++++++++++++++ package-lock.json | 15 ++ 12 files changed, 475 insertions(+), 1 deletion(-) create mode 100644 apps/node-message-broker/src/adapters/core/index.ts create mode 100644 apps/node-message-broker/src/adapters/core/participant-resolver.ts create mode 100644 apps/node-message-broker/src/app/modules/core-client/constants.ts create mode 100644 apps/node-message-broker/src/app/modules/core-client/index.ts create mode 100644 apps/node-message-broker/src/app/modules/core-client/module.ts create mode 100644 apps/node-message-broker/test/unit/adapters/core/fake-analysis-node-provider.ts create mode 100644 apps/node-message-broker/test/unit/adapters/core/participant-resolver.spec.ts diff --git a/apps/node-message-broker/package.json b/apps/node-message-broker/package.json index be270c8..18bb349 100644 --- a/apps/node-message-broker/package.json +++ b/apps/node-message-broker/package.json @@ -23,6 +23,7 @@ "node": ">=24" }, "dependencies": { + "@privateaim/core-http-kit": "^0.11.5", "@privateaim/errors": "^0.8.42", "@privateaim/kit": "^0.11.5", "@privateaim/messenger-http-kit": "^0.11.5", diff --git a/apps/node-message-broker/src/adapters/core/index.ts b/apps/node-message-broker/src/adapters/core/index.ts new file mode 100644 index 0000000..af9e569 --- /dev/null +++ b/apps/node-message-broker/src/adapters/core/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './participant-resolver.ts'; diff --git a/apps/node-message-broker/src/adapters/core/participant-resolver.ts b/apps/node-message-broker/src/adapters/core/participant-resolver.ts new file mode 100644 index 0000000..e3acf3b --- /dev/null +++ b/apps/node-message-broker/src/adapters/core/participant-resolver.ts @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { Logger } from '@privateaim/server-kit'; +import { LRUCache } from 'lru-cache'; +import type { + AnalysisParticipant, + IAnalysisNodeProvider, + IParticipantResolver, +} from '../../core/analysis/index.ts'; + +const DEFAULT_CACHE_MAX = 256; + +const DEFAULT_CACHE_TTL_MS = 30_000; + +type ParticipantResolverContext = { + provider: IAnalysisNodeProvider, + /** this node's own authup client id (config.clientId) — used by {@link resolveSelf} */ + selfClientId: string, + cacheMax?: number, + /** per-analysis cache lifetime; `<= 0` disables caching. */ + cacheTtlMs?: number, + logger?: Logger +}; + +/** + * Resolves analysis participants — their node id / type / client id / public key — + * from server-core's analysis-node API, mapping each included `node` to an + * {@link AnalysisParticipant}. Results are cached per analysis (short TTL) to keep + * resolution off the Hub send/deliver hot path; participants missing a client id + * or public key are skipped, since they can't be addressed or encrypted to. + */ +export class ParticipantResolver implements IParticipantResolver { + protected provider: IAnalysisNodeProvider; + + protected selfClientId: string; + + protected logger: Logger | undefined; + + protected cache: LRUCache> | undefined; + + constructor(ctx: ParticipantResolverContext) { + this.provider = ctx.provider; + this.selfClientId = ctx.selfClientId; + this.logger = ctx.logger; + + const ttl = ctx.cacheTtlMs ?? DEFAULT_CACHE_TTL_MS; + if (ttl > 0) { + this.cache = new LRUCache>({ + max: Math.max(1, Math.floor(ctx.cacheMax ?? DEFAULT_CACHE_MAX)), + ttl, + }); + } + } + + resolve(analysisId: string): Promise { + const cached = this.cache?.get(analysisId); + if (cached) { + return cached; + } + + const promise = this.fetch(analysisId); + + const { cache } = this; + if (cache) { + // evict a rejected fetch so a transient failure isn't cached for the TTL + promise.catch(() => { + if (cache.peek(analysisId) === promise) { + cache.delete(analysisId); + } + }); + cache.set(analysisId, promise); + } + + return promise; + } + + async resolveSelf(analysisId: string): Promise { + const participants = await this.resolve(analysisId); + return participants.find((participant) => participant.clientId === this.selfClientId); + } + + protected async fetch(analysisId: string): Promise { + const nodes = await this.provider.list(analysisId); + + const participants: AnalysisParticipant[] = []; + for (const node of nodes) { + if (!node.client_id || !node.public_key) { + this.logger?.warn( + `Analysis ${analysisId}: skipping participant ${node.id} with a missing client id or public key`, + ); + continue; + } + + participants.push({ + nodeId: node.id, + nodeType: node.type, + clientId: node.client_id, + publicKey: node.public_key, + }); + } + + return participants; + } +} diff --git a/apps/node-message-broker/src/app/builder.ts b/apps/node-message-broker/src/app/builder.ts index cd69681..a362937 100644 --- a/apps/node-message-broker/src/app/builder.ts +++ b/apps/node-message-broker/src/app/builder.ts @@ -9,6 +9,7 @@ import { BaseApplicationBuilder } from '@privateaim/server-kit'; import type { IModule } from 'orkos'; import { ConfigModule } from './modules/config/index.ts'; import { ComponentsModule } from './modules/components/index.ts'; +import { CoreClientModule } from './modules/core-client/index.ts'; import { HTTPModule } from './modules/http/index.ts'; export class ServerMessageBrokerApplicationBuilder extends BaseApplicationBuilder { @@ -20,6 +21,10 @@ export class ServerMessageBrokerApplicationBuilder extends BaseApplicationBuilde return this.addModuleSlot('components', instance, () => new ComponentsModule()); } + withCoreClient(instance?: CoreClientModule | false): this { + return this.addModuleSlot('coreClient', instance, () => new CoreClientModule()); + } + withHTTP(instance?: HTTPModule | false): this { return this.addModuleSlot('http', instance, () => new HTTPModule()); } diff --git a/apps/node-message-broker/src/app/factory.ts b/apps/node-message-broker/src/app/factory.ts index cc6a045..835b316 100644 --- a/apps/node-message-broker/src/app/factory.ts +++ b/apps/node-message-broker/src/app/factory.ts @@ -18,6 +18,7 @@ export function createApplication(): Application { ], }) .withComponents() + .withCoreClient() .withHTTP(); return builder.build(); diff --git a/apps/node-message-broker/src/app/modules/core-client/constants.ts b/apps/node-message-broker/src/app/modules/core-client/constants.ts new file mode 100644 index 0000000..cadb4be --- /dev/null +++ b/apps/node-message-broker/src/app/modules/core-client/constants.ts @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { TypedToken } from 'eldin'; +import type { IParticipantResolver } from '../../../core/analysis/index.ts'; + +export const CoreClientInjectionKey = { ParticipantResolver: new TypedToken('ParticipantResolver') }; diff --git a/apps/node-message-broker/src/app/modules/core-client/index.ts b/apps/node-message-broker/src/app/modules/core-client/index.ts new file mode 100644 index 0000000..3fd551e --- /dev/null +++ b/apps/node-message-broker/src/app/modules/core-client/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +export * from './constants.ts'; +export * from './module.ts'; diff --git a/apps/node-message-broker/src/app/modules/core-client/module.ts b/apps/node-message-broker/src/app/modules/core-client/module.ts new file mode 100644 index 0000000..bfd53cc --- /dev/null +++ b/apps/node-message-broker/src/app/modules/core-client/module.ts @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { IContainer } from 'eldin'; +import type { IModule } from 'orkos'; +import { Client } from '@privateaim/core-http-kit'; +import { + LoggerInjectionKey, + createAuthupClientAuthenticationHook, + createAuthupClientTokenCreator, +} from '@privateaim/server-kit'; +import type { IAnalysisNodeProvider } from '../../../core/analysis/index.ts'; +import { ParticipantResolver } from '../../../adapters/core/index.ts'; +import { ConfigInjectionKey } from '../config/constants.ts'; +import { CoreClientInjectionKey } from './constants.ts'; + +/** + * Wires the server-core link: a `@privateaim/core-http-kit` client (authenticated + * as the node client via client credentials, mirroring the Hub link) backing the + * {@link ParticipantResolver}. Registered for the analysis policy (S3) and the + * send/deliver flows (S5/S6) to consume. + */ +export class CoreClientModule implements IModule { + readonly name = 'coreClient'; + + readonly dependencies: string[] = ['config']; + + private authHook: ReturnType | undefined; + + async setup(container: IContainer): Promise { + const config = container.resolve(ConfigInjectionKey); + + const loggerResult = container.tryResolve(LoggerInjectionKey); + const logger = loggerResult.success ? loggerResult.data : undefined; + + const tokenCreator = createAuthupClientTokenCreator({ + baseURL: config.authupURL, + clientId: config.clientId, + clientSecret: config.clientSecret, + realm: config.realm, + }); + + const client = new Client({ baseURL: config.coreURL }); + const authHook = createAuthupClientAuthenticationHook({ + baseURL: config.authupURL, + tokenCreator, + }); + authHook.attach(client); + this.authHook = authHook; + + // wrap the core client's analysis-node lookup behind the narrow provider port + const provider: IAnalysisNodeProvider = { + list: async (analysisId) => { + const response = await client.analysisNode.getMany({ + filter: { analysis_id: analysisId }, + include: ['node'], + }); + return response.data.map((entry) => entry.node); + }, + }; + + const resolver = new ParticipantResolver({ + provider, + selfClientId: config.clientId, + logger, + }); + container.register(CoreClientInjectionKey.ParticipantResolver, { useValue: resolver }); + } + + async teardown(): Promise { + // The auth hook owns a token-refresh timer; drop it so it can't fire (and + // hit Authup) after shutdown. + if (this.authHook) { + this.authHook.disable(); + this.authHook.clearTimer(); + this.authHook = undefined; + } + } +} diff --git a/apps/node-message-broker/src/core/analysis/types.ts b/apps/node-message-broker/src/core/analysis/types.ts index 096823c..8f60c9c 100644 --- a/apps/node-message-broker/src/core/analysis/types.ts +++ b/apps/node-message-broker/src/core/analysis/types.ts @@ -9,9 +9,29 @@ export type AnalysisParticipant = { nodeId: string, nodeType: string, - clientId: string + clientId: string, + /** the node's ECDH public key (hex-encoded SPKI PEM) for node-to-node E2E crypto */ + publicKey: string }; +/** The slice of a server-core `Node` the broker reads off the analysis-node relation. */ +export type CoreNode = { + id: string, + type: string, + client_id: string | null, + public_key: string | null +}; + +/** + * Lists the nodes participating in an analysis — server-core's analysis-node + * relation with the `node` included. Declared as a narrow port so the resolver can + * be tested with a fake; the live implementation (in `app/modules/core-client`) + * wraps `@privateaim/core-http-kit`'s `analysisNode.getMany`. + */ +export interface IAnalysisNodeProvider { + list(analysisId: string): Promise; +} + /** * Analysis authorization lives node-side (the Hub is analysis-agnostic). Asserts * the calling analysis client holds `ANALYSIS_SELF_MESSAGE_BROKER_USE` — read from diff --git a/apps/node-message-broker/test/unit/adapters/core/fake-analysis-node-provider.ts b/apps/node-message-broker/test/unit/adapters/core/fake-analysis-node-provider.ts new file mode 100644 index 0000000..bf8b08e --- /dev/null +++ b/apps/node-message-broker/test/unit/adapters/core/fake-analysis-node-provider.ts @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import type { CoreNode, IAnalysisNodeProvider } from '../../../../src/core/analysis/index.ts'; + +/** + * In-memory `IAnalysisNodeProvider` that records every lookup and returns + * configurable canned nodes per analysis — stands in for the server-core + * analysis-node lookup so the resolver is testable without a live core client. + */ +export class FakeAnalysisNodeProvider implements IAnalysisNodeProvider { + calls: string[] = []; + + nodesByAnalysis = new Map(); + + /** When set, the next `list` rejects with this error. */ + error: Error | undefined; + + list = async (analysisId: string): Promise => { + this.calls.push(analysisId); + if (this.error) { + throw this.error; + } + return this.nodesByAnalysis.get(analysisId) ?? []; + }; +} diff --git a/apps/node-message-broker/test/unit/adapters/core/participant-resolver.spec.ts b/apps/node-message-broker/test/unit/adapters/core/participant-resolver.spec.ts new file mode 100644 index 0000000..3e11f1b --- /dev/null +++ b/apps/node-message-broker/test/unit/adapters/core/participant-resolver.spec.ts @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2026. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { describe, expect, it } from 'vitest'; +import type { CoreNode } from '../../../../src/core/analysis/index.ts'; +import { ParticipantResolver } from '../../../../src/adapters/core/index.ts'; +import { FakeAnalysisNodeProvider } from './fake-analysis-node-provider.ts'; + +const SELF_CLIENT_ID = 'client-self'; + +function node(over: Partial = {}): CoreNode { + return { + id: 'node-1', + type: 'default', + client_id: 'client-1', + public_key: 'pubkey-1', + ...over, + }; +} + +function setup() { + const provider = new FakeAnalysisNodeProvider(); + const resolver = new ParticipantResolver({ provider, selfClientId: SELF_CLIENT_ID }); + return { + provider, + resolver, + }; +} + +describe('adapters/core/participant-resolver', () => { + it('resolves participants and maps the node fields', async () => { + const { provider, resolver } = setup(); + provider.nodesByAnalysis.set('a1', [ + node({ + id: 'n1', + type: 'default', + client_id: 'c1', + public_key: 'pk1', + }), + node({ + id: 'n2', + type: 'aggregator', + client_id: 'c2', + public_key: 'pk2', + }), + ]); + + const participants = await resolver.resolve('a1'); + + expect(participants).toEqual([ + { + nodeId: 'n1', + nodeType: 'default', + clientId: 'c1', + publicKey: 'pk1', + }, + { + nodeId: 'n2', + nodeType: 'aggregator', + clientId: 'c2', + publicKey: 'pk2', + }, + ]); + expect(provider.calls).toEqual(['a1']); + }); + + it('skips participants missing a client id or public key', async () => { + const { provider, resolver } = setup(); + provider.nodesByAnalysis.set('a1', [ + node({ + id: 'ok', + client_id: 'c-ok', + public_key: 'pk-ok', + }), + node({ + id: 'no-client', + client_id: null, + public_key: 'pk', + }), + node({ + id: 'no-key', + client_id: 'c', + public_key: null, + }), + ]); + + const participants = await resolver.resolve('a1'); + + expect(participants.map((p) => p.nodeId)).toEqual(['ok']); + }); + + it('resolveSelf returns the participant matching this node client id', async () => { + const { provider, resolver } = setup(); + provider.nodesByAnalysis.set('a1', [ + node({ + id: 'other', + client_id: 'c-other', + public_key: 'pk', + }), + node({ + id: 'self', + client_id: SELF_CLIENT_ID, + public_key: 'pk-self', + }), + ]); + + const self = await resolver.resolveSelf('a1'); + + expect(self).toEqual({ + nodeId: 'self', + nodeType: 'default', + clientId: SELF_CLIENT_ID, + publicKey: 'pk-self', + }); + }); + + it('resolveSelf returns undefined when this node is not a participant', async () => { + const { provider, resolver } = setup(); + provider.nodesByAnalysis.set('a1', [node({ + id: 'other', + client_id: 'c-other', + public_key: 'pk', + })]); + + expect(await resolver.resolveSelf('a1')).toBeUndefined(); + }); + + it('returns an empty list for an unknown analysis', async () => { + const { resolver } = setup(); + + expect(await resolver.resolve('unknown')).toEqual([]); + expect(await resolver.resolveSelf('unknown')).toBeUndefined(); + }); + + it('caches resolution per analysis within the ttl', async () => { + const { provider, resolver } = setup(); + provider.nodesByAnalysis.set('a1', [node()]); + + await resolver.resolve('a1'); + await resolver.resolve('a1'); + + expect(provider.calls).toEqual(['a1']); + }); + + it('does not cache a failed resolution', async () => { + const { provider, resolver } = setup(); + provider.error = new Error('core unavailable'); + + await expect(resolver.resolve('a1')).rejects.toThrow('core unavailable'); + + provider.error = undefined; + provider.nodesByAnalysis.set('a1', [node({ + id: 'n1', + client_id: 'c1', + public_key: 'pk1', + })]); + + const participants = await resolver.resolve('a1'); + + expect(participants.map((p) => p.nodeId)).toEqual(['n1']); + expect(provider.calls).toEqual(['a1', 'a1']); + }); + + it('does not cache when caching is disabled', async () => { + const provider = new FakeAnalysisNodeProvider(); + const resolver = new ParticipantResolver({ + provider, + selfClientId: SELF_CLIENT_ID, + cacheTtlMs: 0, + }); + provider.nodesByAnalysis.set('a1', [node()]); + + await resolver.resolve('a1'); + await resolver.resolve('a1'); + + expect(provider.calls).toEqual(['a1', 'a1']); + }); +}); diff --git a/package-lock.json b/package-lock.json index b285fd1..0537da3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -38,6 +38,7 @@ "version": "0.1.0", "license": "Apache-2.0", "dependencies": { + "@privateaim/core-http-kit": "^0.11.5", "@privateaim/errors": "^0.8.42", "@privateaim/kit": "^0.11.5", "@privateaim/messenger-http-kit": "^0.11.5", @@ -1970,6 +1971,20 @@ "url": "https://github.com/sponsors/posva" } }, + "node_modules/@privateaim/core-http-kit": { + "version": "0.11.5", + "resolved": "https://registry.npmjs.org/@privateaim/core-http-kit/-/core-http-kit-0.11.5.tgz", + "integrity": "sha512-UMRakXpUHfRFyt9O6J+39SbAcwlaSJqluClyj8EQ2zBm8kBNAAj5n2APvx0I1JRRugMJY8nj2i24IHN4E3GRHg==", + "license": "Apache-2.0", + "peerDependencies": { + "@authup/core-kit": "^1.0.0-beta.48", + "@authup/kit": "^1.0.0-beta.48", + "@privateaim/core-kit": "^0.11.5", + "@privateaim/telemetry-kit": "^0.11.5", + "hapic": "^2.8.2", + "rapiq": "^0.9.0" + } + }, "node_modules/@privateaim/core-kit": { "version": "0.11.5", "resolved": "https://registry.npmjs.org/@privateaim/core-kit/-/core-kit-0.11.5.tgz", From 7052744a3da87c5ae1eeafdc6b17ee26c3dd6fc2 Mon Sep 17 00:00:00 2001 From: tada5hi Date: Thu, 25 Jun 2026 13:09:25 +0200 Subject: [PATCH 2/2] fix(node-message-broker): drop analysis-node entries without a node relation server-core types AnalysisNode.node as non-null, but may omit the relation when `include` is not honored or data is incomplete; filter those out at the provider boundary so ParticipantResolver never dereferences an undefined node. --- .../src/app/modules/core-client/module.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/node-message-broker/src/app/modules/core-client/module.ts b/apps/node-message-broker/src/app/modules/core-client/module.ts index bfd53cc..ca0be42 100644 --- a/apps/node-message-broker/src/app/modules/core-client/module.ts +++ b/apps/node-message-broker/src/app/modules/core-client/module.ts @@ -59,7 +59,12 @@ export class CoreClientModule implements IModule { filter: { analysis_id: analysisId }, include: ['node'], }); - return response.data.map((entry) => entry.node); + // `node` is typed non-null, but server-core may omit the relation + // (include not honored / incomplete data); drop those so the + // resolver never dereferences an undefined node. + return response.data + .map((entry) => entry.node) + .filter((node): node is NonNullable => !!node); }, };