diff --git a/Anchor.toml b/Anchor.toml index bb7277c0..4d8d539c 100644 --- a/Anchor.toml +++ b/Anchor.toml @@ -1,5 +1,6 @@ [toolchain] package_manager = "pnpm" +anchor_version = "0.31.1" [features] resolution = true @@ -17,3 +18,4 @@ members = ["modules/payment/program", "modules/migration/program", "modules/stak [scripts] test = "pnpm vitest" + diff --git a/core/protobufs/src/effect.proto b/core/protobufs/src/effect.proto index 02d0932a..a5ddfe9e 100644 --- a/core/protobufs/src/effect.proto +++ b/core/protobufs/src/effect.proto @@ -43,6 +43,46 @@ message RequestToWorkResponse { string peer = 3; } +message WorkerSyncRequest { + uint32 timestamp = 1; + string worker_id = 2; + optional uint64 cursor = 3; + repeated string scopes = 4; + optional uint32 limit = 5; +} + +message WorkerSyncStatus { + string state = 1; + uint32 last_activity = 2; +} + +message WorkerSyncTask { + string task_id = 1; + string status = 2; + uint32 last_event_at = 3; + Task task = 4; +} + +message WorkerSyncPayment { + string payment_id = 1; + string status = 2; + string amount = 3; + optional string task_id = 4; + uint32 created_at = 5; + Payment payment = 6; +} + +message WorkerSyncResponse { + uint32 server_time = 1; + string worker_id = 2; + uint64 cursor = 3; + string manager_peer_id = 4; + optional WorkerSyncStatus status = 5; + repeated string capabilities = 6; + repeated WorkerSyncTask tasks = 7; + repeated WorkerSyncPayment payments = 8; +} + message EffectProtocolMessage { oneof message { Task task = 1; @@ -62,5 +102,7 @@ message EffectProtocolMessage { EffectIdentifyRequest identify_request = 15; EffectIdentifyResponse identify_response = 16; BulkProofRequest bulk_proof_request = 17; + WorkerSyncRequest worker_sync_request = 18; + WorkerSyncResponse worker_sync_response = 19; } } diff --git a/core/protobufs/src/effect.ts b/core/protobufs/src/effect.ts index c949f3f5..1ceca400 100644 --- a/core/protobufs/src/effect.ts +++ b/core/protobufs/src/effect.ts @@ -545,6 +545,554 @@ export namespace RequestToWorkResponse { } } +export interface WorkerSyncRequest { + timestamp: number + workerId: string + cursor?: bigint + scopes: string[] + limit?: number +} + +export namespace WorkerSyncRequest { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.timestamp != null && obj.timestamp !== 0)) { + w.uint32(8) + w.uint32(obj.timestamp) + } + + if ((obj.workerId != null && obj.workerId !== '')) { + w.uint32(18) + w.string(obj.workerId) + } + + if (obj.cursor != null) { + w.uint32(24) + w.uint64(obj.cursor) + } + + if (obj.scopes != null) { + for (const value of obj.scopes) { + w.uint32(34) + w.string(value) + } + } + + if (obj.limit != null) { + w.uint32(40) + w.uint32(obj.limit) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + timestamp: 0, + workerId: '', + scopes: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.timestamp = reader.uint32() + break + } + case 2: { + obj.workerId = reader.string() + break + } + case 3: { + obj.cursor = reader.uint64() + break + } + case 4: { + if (opts.limits?.scopes != null && obj.scopes.length === opts.limits.scopes) { + throw new MaxLengthError('Decode error - map field "scopes" had too many elements') + } + + obj.scopes.push(reader.string()) + break + } + case 5: { + obj.limit = reader.uint32() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncRequest.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncRequest => { + return decodeMessage(buf, WorkerSyncRequest.codec(), opts) + } +} + +export interface WorkerSyncStatus { + state: string + lastActivity: number +} + +export namespace WorkerSyncStatus { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.state != null && obj.state !== '')) { + w.uint32(10) + w.string(obj.state) + } + + if ((obj.lastActivity != null && obj.lastActivity !== 0)) { + w.uint32(16) + w.uint32(obj.lastActivity) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + state: '', + lastActivity: 0 + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.state = reader.string() + break + } + case 2: { + obj.lastActivity = reader.uint32() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncStatus.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncStatus => { + return decodeMessage(buf, WorkerSyncStatus.codec(), opts) + } +} + +export interface WorkerSyncTask { + taskId: string + status: string + lastEventAt: number + task?: Task +} + +export namespace WorkerSyncTask { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.taskId != null && obj.taskId !== '')) { + w.uint32(10) + w.string(obj.taskId) + } + + if ((obj.status != null && obj.status !== '')) { + w.uint32(18) + w.string(obj.status) + } + + if ((obj.lastEventAt != null && obj.lastEventAt !== 0)) { + w.uint32(24) + w.uint32(obj.lastEventAt) + } + + if (obj.task != null) { + w.uint32(34) + Task.codec().encode(obj.task, w) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + taskId: '', + status: '', + lastEventAt: 0 + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.taskId = reader.string() + break + } + case 2: { + obj.status = reader.string() + break + } + case 3: { + obj.lastEventAt = reader.uint32() + break + } + case 4: { + obj.task = Task.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.task + }) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncTask.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncTask => { + return decodeMessage(buf, WorkerSyncTask.codec(), opts) + } +} + +export interface WorkerSyncPayment { + paymentId: string + status: string + amount: string + taskId?: string + createdAt: number + payment?: Payment +} + +export namespace WorkerSyncPayment { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.paymentId != null && obj.paymentId !== '')) { + w.uint32(10) + w.string(obj.paymentId) + } + + if ((obj.status != null && obj.status !== '')) { + w.uint32(18) + w.string(obj.status) + } + + if ((obj.amount != null && obj.amount !== '')) { + w.uint32(26) + w.string(obj.amount) + } + + if (obj.taskId != null) { + w.uint32(34) + w.string(obj.taskId) + } + + if ((obj.createdAt != null && obj.createdAt !== 0)) { + w.uint32(40) + w.uint32(obj.createdAt) + } + + if (obj.payment != null) { + w.uint32(50) + Payment.codec().encode(obj.payment, w) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + paymentId: '', + status: '', + amount: '', + createdAt: 0 + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.paymentId = reader.string() + break + } + case 2: { + obj.status = reader.string() + break + } + case 3: { + obj.amount = reader.string() + break + } + case 4: { + obj.taskId = reader.string() + break + } + case 5: { + obj.createdAt = reader.uint32() + break + } + case 6: { + obj.payment = Payment.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.payment + }) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncPayment.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncPayment => { + return decodeMessage(buf, WorkerSyncPayment.codec(), opts) + } +} + +export interface WorkerSyncResponse { + serverTime: number + workerId: string + cursor: bigint + managerPeerId: string + status?: WorkerSyncStatus + capabilities: string[] + tasks: WorkerSyncTask[] + payments: WorkerSyncPayment[] +} + +export namespace WorkerSyncResponse { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.serverTime != null && obj.serverTime !== 0)) { + w.uint32(8) + w.uint32(obj.serverTime) + } + + if ((obj.workerId != null && obj.workerId !== '')) { + w.uint32(18) + w.string(obj.workerId) + } + + if ((obj.cursor != null && obj.cursor !== 0n)) { + w.uint32(24) + w.uint64(obj.cursor) + } + + if ((obj.managerPeerId != null && obj.managerPeerId !== '')) { + w.uint32(34) + w.string(obj.managerPeerId) + } + + if (obj.status != null) { + w.uint32(42) + WorkerSyncStatus.codec().encode(obj.status, w) + } + + if (obj.capabilities != null) { + for (const value of obj.capabilities) { + w.uint32(50) + w.string(value) + } + } + + if (obj.tasks != null) { + for (const value of obj.tasks) { + w.uint32(58) + WorkerSyncTask.codec().encode(value, w) + } + } + + if (obj.payments != null) { + for (const value of obj.payments) { + w.uint32(66) + WorkerSyncPayment.codec().encode(value, w) + } + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + serverTime: 0, + workerId: '', + cursor: 0n, + managerPeerId: '', + capabilities: [], + tasks: [], + payments: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.serverTime = reader.uint32() + break + } + case 2: { + obj.workerId = reader.string() + break + } + case 3: { + obj.cursor = reader.uint64() + break + } + case 4: { + obj.managerPeerId = reader.string() + break + } + case 5: { + obj.status = WorkerSyncStatus.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.status + }) + break + } + case 6: { + if (opts.limits?.capabilities != null && obj.capabilities.length === opts.limits.capabilities) { + throw new MaxLengthError('Decode error - map field "capabilities" had too many elements') + } + + obj.capabilities.push(reader.string()) + break + } + case 7: { + if (opts.limits?.tasks != null && obj.tasks.length === opts.limits.tasks) { + throw new MaxLengthError('Decode error - map field "tasks" had too many elements') + } + + obj.tasks.push(WorkerSyncTask.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.tasks$ + })) + break + } + case 8: { + if (opts.limits?.payments != null && obj.payments.length === opts.limits.payments) { + throw new MaxLengthError('Decode error - map field "payments" had too many elements') + } + + obj.payments.push(WorkerSyncPayment.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.payments$ + })) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncResponse.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncResponse => { + return decodeMessage(buf, WorkerSyncResponse.codec(), opts) + } +} + export interface EffectProtocolMessage { task?: Task taskAccepted?: TaskAccepted @@ -563,6 +1111,8 @@ export interface EffectProtocolMessage { identifyRequest?: EffectIdentifyRequest identifyResponse?: EffectIdentifyResponse bulkProofRequest?: BulkProofRequest + workerSyncRequest?: WorkerSyncRequest + workerSyncResponse?: WorkerSyncResponse } export namespace EffectProtocolMessage { @@ -577,7 +1127,51 @@ export namespace EffectProtocolMessage { obj = { ...obj } + if (obj.workerSyncResponse != null) { + obj.workerSyncRequest = undefined + obj.bulkProofRequest = undefined + obj.identifyResponse = undefined + obj.identifyRequest = undefined + obj.requestToWorkResponse = undefined + obj.requestToWork = undefined + obj.ack = undefined + obj.error = undefined + obj.templateResponse = undefined + obj.templateRequest = undefined + obj.proofResponse = undefined + obj.proofRequest = undefined + obj.payoutRequest = undefined + obj.payment = undefined + obj.taskCompleted = undefined + obj.taskRejected = undefined + obj.taskAccepted = undefined + obj.task = undefined + } + + if (obj.workerSyncRequest != null) { + obj.workerSyncResponse = undefined + obj.bulkProofRequest = undefined + obj.identifyResponse = undefined + obj.identifyRequest = undefined + obj.requestToWorkResponse = undefined + obj.requestToWork = undefined + obj.ack = undefined + obj.error = undefined + obj.templateResponse = undefined + obj.templateRequest = undefined + obj.proofResponse = undefined + obj.proofRequest = undefined + obj.payoutRequest = undefined + obj.payment = undefined + obj.taskCompleted = undefined + obj.taskRejected = undefined + obj.taskAccepted = undefined + obj.task = undefined + } + if (obj.bulkProofRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined obj.requestToWorkResponse = undefined @@ -597,6 +1191,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyRequest = undefined obj.requestToWorkResponse = undefined @@ -616,6 +1212,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.requestToWorkResponse = undefined @@ -635,6 +1233,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWorkResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -654,6 +1254,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWork != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -673,6 +1275,8 @@ export namespace EffectProtocolMessage { } if (obj.ack != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -692,6 +1296,8 @@ export namespace EffectProtocolMessage { } if (obj.error != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -711,6 +1317,8 @@ export namespace EffectProtocolMessage { } if (obj.templateResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -730,6 +1338,8 @@ export namespace EffectProtocolMessage { } if (obj.templateRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -749,6 +1359,8 @@ export namespace EffectProtocolMessage { } if (obj.proofResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -768,6 +1380,8 @@ export namespace EffectProtocolMessage { } if (obj.proofRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -787,6 +1401,8 @@ export namespace EffectProtocolMessage { } if (obj.payoutRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -806,6 +1422,8 @@ export namespace EffectProtocolMessage { } if (obj.payment != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -825,6 +1443,8 @@ export namespace EffectProtocolMessage { } if (obj.taskCompleted != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -844,6 +1464,8 @@ export namespace EffectProtocolMessage { } if (obj.taskRejected != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -863,6 +1485,8 @@ export namespace EffectProtocolMessage { } if (obj.taskAccepted != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -882,6 +1506,8 @@ export namespace EffectProtocolMessage { } if (obj.task != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -985,6 +1611,16 @@ export namespace EffectProtocolMessage { BulkProofRequest.codec().encode(obj.bulkProofRequest, w) } + if (obj.workerSyncRequest != null) { + w.uint32(146) + WorkerSyncRequest.codec().encode(obj.workerSyncRequest, w) + } + + if (obj.workerSyncResponse != null) { + w.uint32(154) + WorkerSyncResponse.codec().encode(obj.workerSyncResponse, w) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -1099,6 +1735,18 @@ export namespace EffectProtocolMessage { }) break } + case 18: { + obj.workerSyncRequest = WorkerSyncRequest.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.workerSyncRequest + }) + break + } + case 19: { + obj.workerSyncResponse = WorkerSyncResponse.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.workerSyncResponse + }) + break + } default: { reader.skipType(tag & 7) break @@ -1106,7 +1754,51 @@ export namespace EffectProtocolMessage { } } + if (obj.workerSyncResponse != null) { + delete obj.workerSyncRequest + delete obj.bulkProofRequest + delete obj.identifyResponse + delete obj.identifyRequest + delete obj.requestToWorkResponse + delete obj.requestToWork + delete obj.ack + delete obj.error + delete obj.templateResponse + delete obj.templateRequest + delete obj.proofResponse + delete obj.proofRequest + delete obj.payoutRequest + delete obj.payment + delete obj.taskCompleted + delete obj.taskRejected + delete obj.taskAccepted + delete obj.task + } + + if (obj.workerSyncRequest != null) { + delete obj.workerSyncResponse + delete obj.bulkProofRequest + delete obj.identifyResponse + delete obj.identifyRequest + delete obj.requestToWorkResponse + delete obj.requestToWork + delete obj.ack + delete obj.error + delete obj.templateResponse + delete obj.templateRequest + delete obj.proofResponse + delete obj.proofRequest + delete obj.payoutRequest + delete obj.payment + delete obj.taskCompleted + delete obj.taskRejected + delete obj.taskAccepted + delete obj.task + } + if (obj.bulkProofRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.identifyResponse delete obj.identifyRequest delete obj.requestToWorkResponse @@ -1126,6 +1818,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyRequest delete obj.requestToWorkResponse @@ -1145,6 +1839,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.requestToWorkResponse @@ -1164,6 +1860,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWorkResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1183,6 +1881,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWork != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1202,6 +1902,8 @@ export namespace EffectProtocolMessage { } if (obj.ack != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1221,6 +1923,8 @@ export namespace EffectProtocolMessage { } if (obj.error != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1240,6 +1944,8 @@ export namespace EffectProtocolMessage { } if (obj.templateResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1259,6 +1965,8 @@ export namespace EffectProtocolMessage { } if (obj.templateRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1278,6 +1986,8 @@ export namespace EffectProtocolMessage { } if (obj.proofResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1297,6 +2007,8 @@ export namespace EffectProtocolMessage { } if (obj.proofRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1316,6 +2028,8 @@ export namespace EffectProtocolMessage { } if (obj.payoutRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1335,6 +2049,8 @@ export namespace EffectProtocolMessage { } if (obj.payment != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1354,6 +2070,8 @@ export namespace EffectProtocolMessage { } if (obj.taskCompleted != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1373,6 +2091,8 @@ export namespace EffectProtocolMessage { } if (obj.taskRejected != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1392,6 +2112,8 @@ export namespace EffectProtocolMessage { } if (obj.taskAccepted != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1411,6 +2133,8 @@ export namespace EffectProtocolMessage { } if (obj.task != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest diff --git a/core/protocol/src/common/stores/paymentStore.spec.ts b/core/protocol/src/common/stores/paymentStore.spec.ts index 01fad924..6cbc24cb 100644 --- a/core/protocol/src/common/stores/paymentStore.spec.ts +++ b/core/protocol/src/common/stores/paymentStore.spec.ts @@ -6,6 +6,23 @@ import { createDataStore } from "@effectai/test-utils"; import type { Payment } from "@effectai/protobufs"; import { ulid } from "ulid"; +const createMockPayment = (nonce: bigint, amount = 100n): Payment => ({ + id: ulid(), + version: 1, + nonce, + amount, + recipient: "recipient-1", + paymentAccount: "payment-account-1", + publicKey: "manager-public-key-1", + signature: { + R8: { + R8_1: "1", + R8_2: "2", + }, + S: "3", + }, +}); + describe("createPaymentStore", () => { let datastore: Datastore; let paymentStore: ReturnType; @@ -24,11 +41,7 @@ describe("createPaymentStore", () => { describe("create", () => { it("should create a payment record", async () => { - const payment: Payment = { - id: ulid(), - nonce: 1n, - amount: 100n, - }; + const payment: Payment = createMockPayment(1n); const result = await paymentStore.create({ peerId: "peer1", @@ -46,12 +59,7 @@ describe("createPaymentStore", () => { }); it("should store payment under correct key", async () => { - const payment: Payment = { - id: ulid(), - nonce: 2n, - amount: 200n, - // ... other payment fields - }; + const payment: Payment = createMockPayment(2n, 200n); await paymentStore.create({ peerId: "peer1", @@ -64,6 +72,8 @@ describe("createPaymentStore", () => { it("should return undefined when no payments exist", async () => { const highestNonce = await paymentStore.getHighestNonce({ peerId: "peer1", + managerPublicKey: "manager-public-key-1", + recipient: "recipient-1", }); expect(highestNonce).toBe(0n); }); @@ -73,33 +83,35 @@ describe("createPaymentStore", () => { await paymentStore.create({ id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 3n, amount: 100n }, + payment: createMockPayment(3n, 100n), }); await new Promise((resolve) => setTimeout(resolve, 10)); await paymentStore.create({ id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 7n, amount: 300n }, + payment: createMockPayment(7n, 300n), }); await new Promise((resolve) => setTimeout(resolve, 10)); await paymentStore.create({ id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 2n, amount: 200n }, + payment: createMockPayment(2n, 200n), }); await new Promise((resolve) => setTimeout(resolve, 10)); await paymentStore.create({ id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 16n, amount: 200n }, + payment: createMockPayment(16n, 200n), }); await new Promise((resolve) => setTimeout(resolve, 10)); const highestNonce = await paymentStore.getHighestNonce({ peerId: "peer1", + managerPublicKey: "manager-public-key-1", + recipient: "recipient-1", }); expect(highestNonce).toBe(16n); @@ -110,13 +122,15 @@ describe("createPaymentStore", () => { for (let i = 0; i < 25; i += 4) { await paymentStore.create({ peerId: "peer1", - payment: { id: ulid(), nonce: BigInt(i), amount: BigInt(i) }, + payment: createMockPayment(BigInt(i), BigInt(i)), }); await new Promise((resolve) => setTimeout(resolve, 10)); } const payments = await paymentStore.getFrom({ peerId: "peer1", + publicKey: "manager-public-key-1", + recipient: "recipient-1", nonce: 13, }); @@ -130,7 +144,7 @@ describe("createPaymentStore", () => { for (let i = 0; i < n; i++) { await paymentStore.create({ peerId: "peer1", - payment: { id: ulid(), nonce: i, amount: BigInt(i) }, + payment: createMockPayment(BigInt(i), BigInt(i)), }); await new Promise((resolve) => setTimeout(resolve, 10)); diff --git a/modules/manager/src/index.ts b/modules/manager/src/index.ts index 46547ff9..7537eb7d 100644 --- a/modules/manager/src/index.ts +++ b/modules/manager/src/index.ts @@ -1,5 +1,6 @@ export * from "./main.js"; export type { ManagerTaskRecord } from "./stores/managerTaskStore.js"; +export type { ManagerPaymentStore } from "./stores/managerPaymentStore.js"; export { computeTemplateId, computeTaskId, diff --git a/modules/manager/src/main.ts b/modules/manager/src/main.ts index a5531f4f..77f8af5c 100644 --- a/modules/manager/src/main.ts +++ b/modules/manager/src/main.ts @@ -12,6 +12,7 @@ import type { Request, Response } from "express"; import { createPaymentManager } from "./modules/createPaymentManager.js"; import { createTaskManager } from "./modules/createTaskManager.js"; import { createManagerTaskStore } from "./stores/managerTaskStore.js"; +import { createManagerPaymentStore } from "./stores/managerPaymentStore.js"; import { buildEddsa } from "@effectai/payment"; import { createWorkerManager } from "./modules/createWorkerManager.js"; @@ -25,8 +26,8 @@ import { type HttpHandler, Libp2pTransport, type TaskRecord, + peerIdFromString, createEffectEntity, - createPaymentStore, createTemplateStore, } from "@effectai/protocol-core"; import { EffectProtocolMessage, type Payment } from "@effectai/protobufs"; @@ -72,6 +73,8 @@ export type ManagerSettings = { paymentAccount: string | null; withAdmin: boolean; maintenanceMode: boolean; + passivePayouts: boolean; + passivePayoutIntervalSeconds: number; }; export const createManagerEntity = async ({ @@ -79,11 +82,13 @@ export const createManagerEntity = async ({ privateKey, listen, announce, + port, }: { datastore: Datastore; privateKey: PrivateKey; listen: string[]; announce: string[] | undefined; + port: number; }) => { return await createEffectEntity({ protocol: { @@ -92,7 +97,7 @@ export const createManagerEntity = async ({ scheme: EffectProtocolMessage, }, transports: [ - new HttpTransport({ port: 8889 }), + new HttpTransport({ port }), new Libp2pTransport({ autoStart: true, datastore, @@ -125,13 +130,15 @@ export const createManager = async ({ const managerSettings: ManagerSettings = { port: settings.port ?? 19955, autoManage: settings.autoManage ?? true, - listen: settings.listen ?? [`/ip4/0.0.0.0/tcp/${settings.port}/ws`], + listen: settings.listen ?? [`/ip4/0.0.0.0/tcp/${settings.port ?? 19955}/ws`], announce: settings.announce ?? [], paymentBatchSize: settings.paymentBatchSize ?? PAYMENT_BATCH_SIZE, requireAccessCodes: settings.requireAccessCodes ?? true, paymentAccount: settings.paymentAccount ?? null, withAdmin: settings.withAdmin ?? true, maintenanceMode: settings.maintenanceMode ?? false, + passivePayouts: settings.passivePayouts ?? false, + passivePayoutIntervalSeconds: settings.passivePayoutIntervalSeconds ?? 3600, }; if (!managerSettings.paymentAccount) { @@ -159,10 +166,11 @@ export const createManager = async ({ privateKey, listen: managerSettings.listen, announce: managerSettings.announce, + port: managerSettings.port, }); // initialize the stores - const paymentStore = createPaymentStore({ datastore }); + const paymentStore = createManagerPaymentStore({ datastore }); const templateStore = createTemplateStore({ datastore }); const taskStore = createManagerTaskStore({ datastore }); @@ -174,6 +182,7 @@ export const createManager = async ({ datastore, managerSettings, }); + await workerManager.hydrateQueue(); const paymentManager = await createPaymentManager({ logger, @@ -214,7 +223,7 @@ export const createManager = async ({ identifyResponse: { peer: entity.node.peerId.publicKey.raw, pubkey: solanaPublicKey.toBase58(), - batchSize: PAYMENT_BATCH_SIZE, + batchSize: managerSettings.paymentBatchSize, taskTimeout: TASK_ACCEPTANCE_TIME, version: PROTOCOL_VERSION, requiresRegistration: managerSettings.requireAccessCodes, @@ -321,6 +330,94 @@ export const createManager = async ({ payment, }; }) + .onMessage("workerSyncRequest", async (syncRequest, { peerId }) => { + const workerId = syncRequest.workerId || peerId.toString(); + if (workerId !== peerId.toString()) { + throw new Error("Worker ID mismatch"); + } + + const scopes = + syncRequest.scopes && syncRequest.scopes.length > 0 + ? new Set(syncRequest.scopes) + : new Set(["status", "capabilities", "tasks", "payments"]); + + const limit = syncRequest.limit ?? 200; + const cursor = + typeof syncRequest.cursor === "bigint" + ? Number(syncRequest.cursor) + : 0; + + const now = Math.floor(Date.now() / 1000); + + const worker = await workerManager.getWorker(workerId); + + const response = { + workerSyncResponse: { + serverTime: now, + workerId, + cursor: BigInt(now), + managerPeerId: entity.getPeerId().toString(), + status: scopes.has("status") && worker + ? { + state: worker.state.status, + lastActivity: worker.state.lastActivity, + } + : undefined, + capabilities: scopes.has("capabilities") && worker + ? worker.state.capabilities.concat( + worker.state.managerCapabilities || [], + ) + : [], + tasks: [] as any[], + payments: [] as any[], + }, + }; + + if (scopes.has("tasks")) { + const tasks = await taskStore.listByWorker({ workerId, limit }); + for (const taskRecord of tasks) { + const lastEvent = + taskRecord.events[taskRecord.events.length - 1]?.timestamp ?? 0; + if (cursor && lastEvent <= cursor) { + continue; + } + + response.workerSyncResponse.tasks.push({ + taskId: taskRecord.state.id, + status: taskRecord.state.status, + lastEventAt: lastEvent, + task: taskRecord.state.task, + }); + } + } + + if (scopes.has("payments")) { + const payments = await paymentStore.listByWorker({ workerId, limit }); + for (const paymentRecord of payments) { + const lastEvent = + paymentRecord.events[paymentRecord.events.length - 1]?.timestamp ?? + 0; + if (cursor && lastEvent <= cursor) { + continue; + } + + const createdAt = paymentRecord.events.find( + (event) => event.type === "create", + )?.timestamp ?? lastEvent; + + response.workerSyncResponse.payments.push({ + paymentId: paymentRecord.state.payment.id, + status: paymentRecord.state.status, + amount: paymentRecord.state.payment.amount.toString(), + taskId: paymentRecord.state.taskId, + createdAt, + payment: paymentRecord.state.payment, + }); + } + } + + return response; + }) .onMessage("templateRequest", async (template) => { const record = await templateStore.get({ entityId: template.templateId }); @@ -385,7 +482,6 @@ export const createManager = async ({ return await taskManager .getTask({ taskId, - index: "completed", }) .then( (a) => @@ -409,9 +505,9 @@ export const createManager = async ({ res.json({ peerId: entity.getPeerId().toString(), version: PROTOCOL_VERSION, - isStarted, + isStarted: getIsStarted(), startTime, - cycle, + cycle: getCycle(), requireAccessCodes: managerSettings.requireAccessCodes, announcedAddresses, publicKey: solanaPublicKey.toBase58(), @@ -439,7 +535,7 @@ export const createManager = async ({ } }); - const { isStarted, pause, resume, getCycle, start, stop, cycle } = + const { getIsStarted, pause, resume, getCycle, start, stop } = createManagerControls({ events, entity, @@ -448,18 +544,62 @@ export const createManager = async ({ managerSettings, }); - const { tearDown } = await setupManagerDashboard({ - context: { - taskManager, - entity, - workerManager, - getCycle, - pause, - resume, - }, + const tearDown = managerSettings.withAdmin + ? (await setupManagerDashboard({ + context: { + taskManager, + entity, + workerManager, + getCycle, + pause, + resume, + }, + })).tearDown + : async () => {}; + + let passivePayoutInterval: ReturnType | null = null; + const runPassivePayouts = async () => { + if (!managerSettings.paymentAccount) { + return; + } + + const now = Math.floor(Date.now() / 1000); + for (const workerId of workerManager.workerQueue.queue) { + try { + const worker = await workerManager.getWorker(workerId); + if (!worker) continue; + + const secondsSinceLastPayout = now - worker.state.lastPayout; + if (secondsSinceLastPayout < managerSettings.passivePayoutIntervalSeconds) { + continue; + } + + await paymentManager.processPayoutRequest({ + peerId: peerIdFromString(workerId), + }); + } catch (error) { + logger.log.warn( + { workerId, error }, + "Passive payout failed for worker", + ); + } + } + }; + + events.addEventListener("manager:start", async () => { + if (!managerSettings.passivePayouts) return; + if (passivePayoutInterval) return; + + passivePayoutInterval = setInterval(async () => { + await runPassivePayouts(); + }, Math.max(30, managerSettings.passivePayoutIntervalSeconds) * 1000); }); events.addEventListener("manager:stop", async () => { + if (passivePayoutInterval) { + clearInterval(passivePayoutInterval); + passivePayoutInterval = null; + } await tearDown(); }); diff --git a/modules/manager/src/modules/createManagerControls.ts b/modules/manager/src/modules/createManagerControls.ts index cdcf2e66..55398c92 100644 --- a/modules/manager/src/modules/createManagerControls.ts +++ b/modules/manager/src/modules/createManagerControls.ts @@ -19,6 +19,7 @@ export const createManagerControls = ({ let isStarted = false; let isPaused = false; let cycle = 0; + let manageInterval: ReturnType | null = null; const pause = () => { logger.log.info("Pausing manager..."); @@ -30,9 +31,8 @@ export const createManagerControls = ({ isPaused = false; }; - const getCycle = () => { - return cycle; - }; + const getCycle = () => cycle; + const getIsStarted = () => isStarted; const start = async () => { if (isStarted) { @@ -56,8 +56,8 @@ export const createManagerControls = ({ if (managerSettings.autoManage) { let isManaging = false; - setInterval(async () => { - if (isPaused || isManaging) return; + manageInterval = setInterval(async () => { + if (!isStarted || isPaused || isManaging) return; isManaging = true; try { cycle++; @@ -76,11 +76,17 @@ export const createManagerControls = ({ isStarted = false; + if (manageInterval) { + clearInterval(manageInterval); + manageInterval = null; + } + events.safeDispatchEvent("manager:stop"); }; return { getCycle, + getIsStarted, start, stop, pause, diff --git a/modules/manager/src/modules/createPaymentManager.spec.ts b/modules/manager/src/modules/createPaymentManager.spec.ts index 627d6d08..40f917e2 100644 --- a/modules/manager/src/modules/createPaymentManager.spec.ts +++ b/modules/manager/src/modules/createPaymentManager.spec.ts @@ -2,165 +2,63 @@ import { Keypair, PublicKey } from "@solana/web3.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { createPaymentManager } from "./createPaymentManager.js"; -import { signPayment } from "../utils.js"; -import { ulid } from "ulid"; +import type { ManagerPaymentStore } from "../stores/managerPaymentStore.js"; describe("createPaymentManager", () => { - let mockPeerStore: any; let mockPrivateKey: any; let mockWorkerManager: any; - let mockPaymentStore: any; - let mockPeerId: PeerId; - let peerData: any; + let mockPaymentStore: ManagerPaymentStore; + let mockPeerId: any; let mockRecipient: PublicKey; - vi.mock(import("../utils.js"), async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - computePaymentId: vi.fn(), - }; - }); - beforeEach(() => { mockPeerId = { toString: () => "12D3KooWMockedPeerID", - } as PeerId; - - mockRecipient = new Keypair().publicKey; - - peerData = { - metadata: new Map(), }; - mockPeerStore = { - get: vi.fn().mockResolvedValue(peerData), - }; + mockRecipient = new Keypair().publicKey; mockPaymentStore = { - all: vi.fn(), - has: vi.fn(), - get: vi.fn(), - put: vi.fn(), - delete: vi.fn(), - create: vi.fn(), - }; + putPayment: vi.fn(), + } as unknown as ManagerPaymentStore; mockWorkerManager = { getWorker: vi.fn().mockResolvedValue({ state: { - peerId: mockPeerId, - nonce: 5n, + recipient: mockRecipient.toBase58(), + nonce: 1n, }, }), - selectWorker: vi.fn(() => mockPeerId), updateWorkerState: vi.fn(), + workerQueue: { queue: [mockPeerId.toString()] }, }; mockPrivateKey = { raw: new Uint8Array(64).fill(1), }; - - vi.mocked(mockWorkerManager.getWorker).mockResolvedValue({ - state: { - recipient: "GGqak36ECpZP5HbZse41bynygPR2ciYsTVPsriocqjWH", - nonce: 1n, - }, - }); }); - it("generates a signed payment object", async () => { + it("generates a signed payment object and stores it", async () => { + const paymentAccount = new PublicKey("11111111111111111111111111111111"); + const paymentManager = await createPaymentManager({ workerManager: mockWorkerManager, privateKey: mockPrivateKey, paymentStore: mockPaymentStore, + logger: { log: { info: vi.fn(), error: vi.fn() } } as any, + publicKey: mockRecipient.toBase58(), + managerSettings: { + paymentAccount: paymentAccount.toBase58(), + } as any, }); - const paymentAccount = new PublicKey("11111111111111111111111111111111"); - const result = await paymentManager.generatePayment({ - id: ulid(), peerId: mockPeerId, amount: 1000n, paymentAccount, }); - // expect(result.amount).toBe(1000n); - // expect(result.paymentAccount).toBe(paymentAccount.toBase58()); - // expect(result.nonce).toBe(5n); + expect(result).toBeDefined(); + expect(mockPaymentStore.putPayment).toHaveBeenCalled(); }); - - it( - "batches proof", - async () => { - const paymentAccount = new PublicKey( - "6XjpcA3N18t2ToVndtySfUXU2pKtDca2NZCFbygh7f56", - ); - - const paymentManager = await createPaymentManager({ - workerManager: mockWorkerManager, - privateKey: mockPrivateKey, - paymentStore: mockPaymentStore, - managerSettings: { - paymentAccount: paymentAccount.toBase58(), - }, - }); - - const proofsToGenerate = 2; - const paymentsPerProof = 10; - - const proofs = []; - //generate 2 proofs - for (let p = 0; p < proofsToGenerate; p++) { - const payments = []; - //generate 10 payments - for (let i = 0; i < paymentsPerProof; i++) { - vi.mocked(mockWorkerManager.getWorker).mockResolvedValue({ - state: { - recipient: "GGqak36ECpZP5HbZse41bynygPR2ciYsTVPsriocqjWH", - nonce: BigInt(paymentsPerProof * p + i), - }, - }); - - const result = await paymentManager.generatePayment({ - peerId: mockPeerId, - amount: 1000n, - paymentAccount, - }); - - payments.push(result); - await new Promise((resolve) => setTimeout(resolve, 100)); - } - - const { proof, publicSignals, pubKey } = - await paymentManager.generatePaymentProof(mockPrivateKey, payments); - - proofs.push({ - pubKey, - proof, - publicSignals, - }); - } - - //batch the proofs - const batchedProof = await paymentManager.bulkPaymentProofs({ - privateKey: mockPrivateKey, - recipient: mockRecipient, - r8_x: proofs[0].pubKey[0], - r8_y: proofs[0].pubKey[1], - proofs, - }); - - expect(batchedProof).toBeDefined(); - expect(batchedProof.proof).toBeDefined(); - - //expect first public signal to be 0 - expect(batchedProof.publicSignals).toBeDefined(); - expect(batchedProof.publicSignals![0]).toBe("0"); - expect(batchedProof.publicSignals![1]).toBe( - (proofsToGenerate * paymentsPerProof - 1).toString(), - ); - }, - { timeout: 240_000 }, - ); }); diff --git a/modules/manager/src/modules/createPaymentManager.ts b/modules/manager/src/modules/createPaymentManager.ts index f312b0aa..058d2dc5 100644 --- a/modules/manager/src/modules/createPaymentManager.ts +++ b/modules/manager/src/modules/createPaymentManager.ts @@ -1,9 +1,7 @@ import { Payment, type ProofRequest } from "@effectai/protobufs"; -import type { PaymentStore } from "@effectai/protocol-core"; - import type { PeerId, PrivateKey } from "@libp2p/interface"; import { PublicKey } from "@solana/web3.js"; -import { ProofToProofResponseMessage, computePaymentId } from "../utils.js"; +import { ProofToProofResponseMessage } from "../utils.js"; import { type Groth16Proof, @@ -19,6 +17,7 @@ import { ulid } from "ulid"; import type { createLogger } from "../logging.js"; import type { ManagerSettings } from "../main.js"; import type { createWorkerManager } from "./createWorkerManager"; +import type { ManagerPaymentStore } from "../stores/managerPaymentStore.js"; export async function createPaymentManager({ logger, @@ -31,7 +30,7 @@ export async function createPaymentManager({ logger: ReturnType; privateKey: PrivateKey; publicKey: string; - paymentStore: PaymentStore; + paymentStore: ManagerPaymentStore; workerManager: ReturnType; managerSettings: ManagerSettings; }) { @@ -68,17 +67,10 @@ export async function createPaymentManager({ })); //insert payment into the store - await paymentStore.put({ - entityId: computePaymentId(payment), - record: { - state: payment, - events: [ - { - type: "payment:created", - timestamp: Date.now(), - }, - ], - }, + await paymentStore.putPayment({ + payment, + workerId: peerId.toString(), + status: "created", }); return payment; @@ -214,12 +206,14 @@ export async function createPaymentManager({ paymentAccount, version = 1, label, + taskId, }: { peerId: PeerId; version?: number; amount: bigint; paymentAccount: PublicKey | string; label?: string; + taskId?: string; }) => { const peer = await workerManager.getWorker(peerId.toString()); @@ -251,17 +245,11 @@ export async function createPaymentManager({ })); //save payment in store. - await paymentStore.put({ - entityId: computePaymentId(payment), - record: { - state: payment, - events: [ - { - type: "payment:created", - timestamp: Math.floor(Date.now() / 1000), - }, - ], - }, + await paymentStore.putPayment({ + payment, + workerId: peerId.toString(), + taskId, + status: "created", }); return payment; diff --git a/modules/manager/src/modules/createTaskManager.spec.ts b/modules/manager/src/modules/createTaskManager.spec.ts index 0ee25916..7e0d74e6 100644 --- a/modules/manager/src/modules/createTaskManager.spec.ts +++ b/modules/manager/src/modules/createTaskManager.spec.ts @@ -8,18 +8,8 @@ const mockEventEmitter = { safeDispatchEvent: vi.fn(), }; -const createMockTaskRecord = (): Task => ({ - id: mockTaskId, - title: "Test Task", - reward: 1000n, - templateId: "template-1", - templateData: '{"key": "value"}', - timeLimitSeconds: 60, -}); - describe("createTaskManager", () => { let manager: any; - let workerQueue: any; let taskStore: any; let paymentManager: any; let workerManager: any; @@ -32,14 +22,22 @@ describe("createTaskManager", () => { workerManager = { selectWorker: vi.fn(() => mockWorkerId), + markTaskAssigned: vi.fn(), + markTaskReleased: vi.fn(), + incrementStateValue: vi.fn(), + setWorkerStatus: vi.fn(), + updateWorkerState: vi.fn(), }; taskStore = { - all: vi.fn(), + listByStatus: vi.fn(() => []), assign: vi.fn(), reject: vi.fn(), payout: vi.fn(), getTask: vi.fn(), + create: vi.fn(), + accept: vi.fn(), + complete: vi.fn(), }; paymentManager = { @@ -55,10 +53,16 @@ describe("createTaskManager", () => { taskStore, paymentManager, events: mockEventEmitter, + templateStore: { get: vi.fn(), create: vi.fn() }, + managerSettings: { + paymentAccount: "account", + } as any, }); vi.clearAllMocks(); }); - it("should create a task manager instance", () => {}); + it("should create a task manager instance", () => { + expect(taskManager).toBeDefined(); + }); }); diff --git a/modules/manager/src/modules/createTaskManager.ts b/modules/manager/src/modules/createTaskManager.ts index 2d7a906d..93b0c41f 100644 --- a/modules/manager/src/modules/createTaskManager.ts +++ b/modules/manager/src/modules/createTaskManager.ts @@ -5,9 +5,7 @@ import type { createPaymentManager } from "./createPaymentManager.js"; import type { ManagerTaskRecord, ManagerTaskStore, - TaskAcceptedEvent, - TaskAssignedEvent, - TaskSubmissionEvent, + TaskStatus, } from "../stores/managerTaskStore.js"; import type { createWorkerManager } from "./createWorkerManager.js"; @@ -53,18 +51,17 @@ export function createTaskManager({ managerSettings: ManagerSettings; }) { - const isExpired = (timestamp: number, value: number) => - timestamp + value < Math.floor(Date.now() / 1000); + const isExpired = (deadlineSeconds?: number) => + typeof deadlineSeconds === "number" && + deadlineSeconds < Math.floor(Date.now() / 1000); const getTask = async ({ taskId, - index = "active", }: { taskId: string; - index?: string; }): Promise => { - const taskRecord = await taskStore.get({ - entityId: `${index}/${taskId}`, + const taskRecord = await taskStore.getTask({ + entityId: taskId, }); if (!taskRecord) { @@ -116,6 +113,7 @@ export function createTaskManager({ }); await workerManager.incrementStateValue(workerPeerIdStr, "tasksAccepted"); + await workerManager.setWorkerStatus(workerPeerIdStr, "accepted"); events.safeDispatchEvent("task:accepted", { detail: taskRecord, @@ -137,7 +135,7 @@ export function createTaskManager({ reason, }); - workerManager.markTaskReleased(workerPeerIdStr, taskId); + await workerManager.markTaskReleased(workerPeerIdStr, taskId); const taskRecord = await taskStore.getTask({ entityId: taskId, @@ -165,7 +163,7 @@ export function createTaskManager({ peerIdStr: workerPeerIdStr, }); - workerManager.markTaskReleased(workerPeerIdStr, taskId); + await workerManager.markTaskReleased(workerPeerIdStr, taskId); await workerManager.incrementStateValue(workerPeerIdStr, "tasksCompleted"); @@ -178,17 +176,8 @@ export function createTaskManager({ await assignTask({ entityId: taskRecord.state.id }); }; - const handleAssignEvent = async ( - taskRecord: ManagerTaskRecord, - lastEvent: TaskAssignedEvent, - ) => { - const { timeLimitSeconds } = taskRecord.state; - const acceptanceWindow = - typeof timeLimitSeconds === "number" && timeLimitSeconds > 0 - ? timeLimitSeconds - : TASK_ACCEPTANCE_TIME; - - if (isExpired(lastEvent.timestamp, acceptanceWindow)) { + const handleAssignEvent = async (taskRecord: ManagerTaskRecord) => { + if (isExpired(taskRecord.state.acceptanceDeadline)) { await rejectAndReassignTask(taskRecord); } }; @@ -197,35 +186,23 @@ export function createTaskManager({ taskRecord: ManagerTaskRecord, reason = "Worker took too long to accept/reject task", ) => { - const latestAssignEvent = taskRecord.events.reduce( - (latest: TaskAssignedEvent | null, current) => { - if (current.type === "assign") { - if (!latest || current.timestamp > latest.timestamp) { - return current; - } - } - return latest; - }, - null, - ); - - if (!latestAssignEvent) { + if (!taskRecord.state.assignedTo) { return; } await taskStore.reject({ entityId: taskRecord.state.id, - peerIdStr: latestAssignEvent.assignedToPeer, + peerIdStr: taskRecord.state.assignedTo, reason, }); - workerManager.markTaskReleased( - latestAssignEvent.assignedToPeer, + await workerManager.markTaskReleased( + taskRecord.state.assignedTo, taskRecord.state.id, ); await workerManager.incrementStateValue( - latestAssignEvent.assignedToPeer, + taskRecord.state.assignedTo, "tasksRejected", ); @@ -233,29 +210,31 @@ export function createTaskManager({ await assignTask({ entityId: taskRecord.state.id }); }; - const handleAcceptEvent = async ( - taskRecord: ManagerTaskRecord, - lastEvent: TaskAcceptedEvent, - ) => { - const { timeLimitSeconds } = taskRecord.state; - if (isExpired(lastEvent.timestamp, timeLimitSeconds)) { - await rejectAndReassignTask(taskRecord, "Worker took too long to submit task"); + const handleAcceptEvent = async (taskRecord: ManagerTaskRecord) => { + if (isExpired(taskRecord.state.completionDeadline)) { + await rejectAndReassignTask( + taskRecord, + "Worker took too long to submit task", + ); } }; - const handleSubmissionEvent = async ( - taskRecord: ManagerTaskRecord, - event: TaskSubmissionEvent, - ) => { + const handleSubmissionEvent = async (taskRecord: ManagerTaskRecord) => { if (!managerSettings.paymentAccount) { throw new Error("Payment account not set, cannot process payout"); } + const submissionPeer = taskRecord.state.submissionBy; + if (!submissionPeer) { + throw new Error("Submission peer not found for task"); + } + const payment = await paymentManager.generatePayment({ - peerId: peerIdFromString(event.submissionByPeer), - amount: taskRecord.state.reward, + peerId: peerIdFromString(submissionPeer), + amount: taskRecord.state.task.reward, paymentAccount: new PublicKey(managerSettings.paymentAccount), label: `Payment for task: ${taskRecord.state.id}`, + taskId: taskRecord.state.id, }); await taskStore.payout({ @@ -265,13 +244,14 @@ export function createTaskManager({ //send the payment. const [ack, error] = await manager.sendMessage( - peerIdFromString(event.submissionByPeer), + peerIdFromString(submissionPeer), { payment }, ); //update state - await workerManager.updateWorkerState(event.submissionByPeer, (state) => ({ - totalEarned: state.totalEarned + BigInt(taskRecord.state.reward), + await workerManager.updateWorkerState(submissionPeer, (state) => ({ + totalEarned: state.totalEarned + BigInt(taskRecord.state.task.reward), + status: "idle", })); //sendout task completed event @@ -289,29 +269,24 @@ export function createTaskManager({ }; const manageTask = async (taskRecord: ManagerTaskRecord) => { - const lastEvent = taskRecord.events[taskRecord.events.length - 1]; - - if (!lastEvent) { - return; - } - - switch (lastEvent.type) { - case "create": + switch (taskRecord.state.status) { + case "created": await handleCreateEvent(taskRecord); break; - case "assign": - await handleAssignEvent(taskRecord, lastEvent); + case "assigned": + await handleAssignEvent(taskRecord); break; - case "accept": - await handleAcceptEvent(taskRecord, lastEvent); + case "accepted": + await handleAcceptEvent(taskRecord); break; - case "reject": + case "rejected": await handleRejectEvent(taskRecord); break; - case "submission": - await handleSubmissionEvent(taskRecord, lastEvent); + case "submitted": + await handleSubmissionEvent(taskRecord); break; - case "payout": + case "payout_pending": + case "completed": // do nothing.. break; default: @@ -327,18 +302,16 @@ export function createTaskManager({ return; } - const lastEvent = taskRecord.events[taskRecord.events.length - 1]; - - if (lastEvent.type === "assign") { + if (taskRecord.state.status === "assigned") { throw new Error("Task is already assigned."); } - const originalWorkerId = taskRecord?.state.templateData - ? JSON.parse(taskRecord.state.templateData)?.submissionByPeer - : undefined; + const originalWorkerId = taskRecord?.state.task.templateData + ? JSON.parse(taskRecord.state.task.templateData)?.submissionByPeer + : undefined; const worker = await workerManager.selectWorker( - taskRecord.state.capability || undefined, + taskRecord.state.task.capability || undefined, originalWorkerId || undefined, ); @@ -351,24 +324,32 @@ export function createTaskManager({ workerPeerIdStr: worker, }); - workerManager.markTaskAssigned(worker, taskRecord.state.id); + await workerManager.markTaskAssigned(worker, taskRecord.state.id); await workerManager.incrementStateValue(worker, "totalTasks"); await manager.sendMessage(peerIdFromString(worker), { - task: taskRecord.state, + task: taskRecord.state.task, }); }; const manageTasks = async () => { try { - const activeTasks = await taskStore.all({ - prefix: "tasks/active", - limit: 50, - }); - - for (const taskRecord of activeTasks) { - await manageTask(taskRecord); + const statuses: TaskStatus[] = [ + "created", + "assigned", + "accepted", + "submitted", + ]; + + for (const status of statuses) { + const tasks = await taskStore.listByStatus({ + status, + limit: 50, + }); + for (const taskRecord of tasks) { + await manageTask(taskRecord); + } } } catch (e) { console.error("System error:", e); @@ -376,9 +357,11 @@ export function createTaskManager({ }; const getActiveTasks = async () => { - const tasks = await taskStore.all({ - prefix: "tasks/active", - }); + const statuses: TaskStatus[] = ["created", "assigned", "accepted", "submitted"]; + const tasks = [] as ManagerTaskRecord[]; + for (const status of statuses) { + tasks.push(...(await taskStore.listByStatus({ status }))); + } return tasks; }; @@ -386,7 +369,7 @@ export function createTaskManager({ const getCompletedTaskCount = async () => { let total = 0; for await (const _ of taskStore.datastore.queryKeys({ - prefix: "/tasks/completed", + prefix: "/tasks/byStatus/completed", })) { total++; } @@ -401,13 +384,12 @@ export function createTaskManager({ offset: number; limit: number; }) => { - const tasks = await taskStore.all({ - offset, - limit, - prefix: "tasks/completed", + const tasks = await taskStore.listByStatus({ + status: "completed", + limit: offset + limit, }); - return tasks; + return tasks.slice(offset, offset + limit); }; const registerTemplate = async ({ diff --git a/modules/manager/src/modules/createWorkerManager.spec.ts b/modules/manager/src/modules/createWorkerManager.spec.ts index 561fb291..be9b9a9b 100644 --- a/modules/manager/src/modules/createWorkerManager.spec.ts +++ b/modules/manager/src/modules/createWorkerManager.spec.ts @@ -1,12 +1,10 @@ -import { PeerId } from "@libp2p/interface"; -import { Keypair, PublicKey } from "@solana/web3.js"; +import { Keypair } from "@solana/web3.js"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { createPaymentManager } from "./createPaymentManager.js"; import { createWorkerManager } from "./createWorkerManager.js"; import { promises } from "node:fs"; -// import { createDataStore } from "@effectai/test-utils"; -import { Key, type Datastore } from "@effectai/protocol-core"; +import { createDataStore } from "@effectai/test-utils"; +import { type Datastore } from "@effectai/protocol-core"; describe("createWorkerManager", () => { let datastore: Datastore; @@ -34,7 +32,7 @@ describe("createWorkerManager", () => { datastore, managerSettings: { requireAccessCodes: true, - }, + } as any, }); }); @@ -62,7 +60,7 @@ describe("createWorkerManager", () => { const [result] = await workerManager.getAccessCodes(); expect(result.events.some((e) => e.type === "redeem")).toBe(true); - expect(result.state.code).to.equal(code); + expect(result.state.code).toBe(code); }); it("should throw an InvalidAccessCode error if given a wrong access code", () => {}); @@ -76,7 +74,7 @@ describe("createWorkerManager", () => { datastore, managerSettings: { requireAccessCodes: false, - }, + } as any, }); }); @@ -103,7 +101,7 @@ describe("createWorkerManager", () => { //make worker 2 busy await workerManager.updateWorkerState("worker2", () => ({ - totalTasks: 3, + assignments: ["task-1", "task-2", "task-3"], })); //expect worker 2 to be busy diff --git a/modules/manager/src/modules/createWorkerManager.ts b/modules/manager/src/modules/createWorkerManager.ts index 1456ae1a..a14a8fb8 100644 --- a/modules/manager/src/modules/createWorkerManager.ts +++ b/modules/manager/src/modules/createWorkerManager.ts @@ -4,6 +4,7 @@ import type { ManagerSettings } from "../main.js"; import { type WorkerRecord, createWorkerStore, + type WorkerStatus, } from "../stores/managerWorkerStore.js"; import { @@ -27,33 +28,6 @@ export const createWorkerManager = ({ const workerStore = createWorkerStore({ datastore, }); - - const workerAssignments = new Map>(); - const assignmentsFor = (workerId: string) => { - let assignments = workerAssignments.get(workerId); - if (!assignments) { - assignments = new Set(); - workerAssignments.set(workerId, assignments); - } - return assignments; - }; - const getAssignmentCount = (workerId: string) => - workerAssignments.get(workerId)?.size ?? 0; - const markTaskAssigned = (workerId: string, taskId: string) => { - assignmentsFor(workerId).add(taskId); - }; - const markTaskReleased = (workerId: string, taskId: string) => { - const assignments = workerAssignments.get(workerId); - if (!assignments) { - return; - } - - assignments.delete(taskId); - - if (assignments.size === 0) { - workerAssignments.delete(workerId); - } - }; const accessCodeStore = createAccessCodeStore({ datastore }); @@ -84,6 +58,7 @@ export const createWorkerManager = ({ await workerStore.updateWorker(peerIdStr, () => ({ lastActivity: Math.floor(Date.now() / 1000), + status: "disconnected", })); workerQueue.removePeer(peerIdStr); @@ -100,7 +75,7 @@ export const createWorkerManager = ({ const currentTime = Math.floor(Date.now() / 1000); // Retrieve existing worker or null if not found - const workerRecord = await workerStore.getSafe({ entityId: peerId }); + const workerRecord = await workerStore.getSafe({ entityId: `state/${peerId}` }); const isNewWorker = !workerRecord; @@ -134,6 +109,7 @@ export const createWorkerManager = ({ nonce, recipient, accessCodeRedeemed: accessCode, + status: managerSettings.maintenanceMode ? "maintenance_blocked" : "connected", }); } @@ -170,17 +146,20 @@ export const createWorkerManager = ({ recipient, lastPayout: currentTime, lastActivity: currentTime, + status: managerSettings.maintenanceMode ? "maintenance_blocked" : "idle", })); // === Add to queue === - workerQueue.addPeer({ peerIdStr: peerId }); + if (!managerSettings.maintenanceMode) { + workerQueue.addPeer({ peerIdStr: peerId }); + } } catch (err) { console.error("connectWorker error:", err); throw err; } }; const getWorker = async (peerId: string): Promise => { - const worker = await workerStore.getSafe({ entityId: peerId }); + const worker = await workerStore.getWorkerState(peerId); if (!worker) { return null; @@ -189,13 +168,13 @@ export const createWorkerManager = ({ return worker; }; - const getCapabilityInfo = (id: string) => - availableCapabilities.find(capability => capability.id === id); + const getCapabilityInfo = (id: string) => + availableCapabilities.find((capability) => capability.id === id); const selectWorker = async ( capability?: string, - originalWorkerId?: string + originalWorkerId?: string, ): Promise => { const queue = workerQueue.getQueue(); @@ -229,14 +208,36 @@ export const createWorkerManager = ({ }; const isBusy = async (workerRecord: WorkerRecord) => { - return getAssignmentCount(workerRecord.state.peerId) >= 3; + return workerRecord.state.assignments.length >= 3; + }; + + const setWorkerStatus = async (peerId: string, status: WorkerStatus) => { + await workerStore.updateWorker(peerId, () => ({ + status, + })); + }; + + const markTaskAssigned = async (workerId: string, taskId: string) => { + await workerStore.updateWorker(workerId, (state) => ({ + assignments: state.assignments.includes(taskId) + ? state.assignments + : [...state.assignments, taskId], + status: "assigned", + })); + }; + + const markTaskReleased = async (workerId: string, taskId: string) => { + await workerStore.updateWorker(workerId, (state) => ({ + assignments: state.assignments.filter((id) => id !== taskId), + status: workerQueue.queue.includes(workerId) ? "idle" : "connected", + })); }; const incrementStateValue = async ( peerId: string, stateKey: keyof WorkerRecord["state"], ) => { - const worker = await workerStore.getSafe({ entityId: peerId }); + const worker = await workerStore.getSafe({ entityId: `state/${peerId}` }); if (!worker) { throw new Error("Worker not found"); @@ -264,12 +265,23 @@ export const createWorkerManager = ({ const getWorkers = async (ids: string[]) => { const workers = workerStore.getMany({ - keys: ids.map((id) => new Key(`/worker/${id}`)), + keys: ids.map((id) => new Key(`/workers/state/${id}`)), }); return workers; }; + const hydrateQueue = async () => { + for await (const key of datastore.queryKeys({ + prefix: "/workers/byStatus/idle", + })) { + const workerId = key.toString().split("/").pop(); + if (workerId) { + workerQueue.addPeer({ peerIdStr: workerId }); + } + } + }; + const all = async () => { return await workerStore.all(); }; @@ -281,10 +293,12 @@ export const createWorkerManager = ({ selectWorker, getWorker, getWorkers, + hydrateQueue, connectWorker, disconnectWorker, markTaskAssigned, markTaskReleased, + setWorkerStatus, generateAccessCode, getAccessCodes, updateWorkerState, @@ -342,4 +356,4 @@ const createWorkerQueue = () => { }; }; -export type WorkerManager = ReturnType; \ No newline at end of file +export type WorkerManager = ReturnType; diff --git a/modules/manager/src/stores/managerPaymentStore.ts b/modules/manager/src/stores/managerPaymentStore.ts new file mode 100644 index 00000000..af6b91f9 --- /dev/null +++ b/modules/manager/src/stores/managerPaymentStore.ts @@ -0,0 +1,161 @@ +import { + type Datastore, + createEntityStore, + stringifyWithBigInt, + parseWithBigInt, + Key, +} from "@effectai/protocol-core"; +import type { Payment } from "@effectai/protobufs"; +import { computePaymentId } from "../utils.js"; + +export type PaymentStatus = "created" | "proofed" | "sent" | "failed"; + +export interface PaymentEvent { + type: "create" | "status"; + timestamp: number; + status?: PaymentStatus; +} + +export interface ManagerPaymentState { + payment: Payment; + workerId: string; + taskId?: string; + status: PaymentStatus; +} + +export interface ManagerPaymentRecord { + events: PaymentEvent[]; + state: ManagerPaymentState; +} + +export const createManagerPaymentStore = ({ + datastore, +}: { + datastore: Datastore; +}) => { + const coreStore = createEntityStore({ + datastore, + defaultPrefix: "payments", + stringify: (record) => stringifyWithBigInt(record), + parse: (data) => parseWithBigInt(data), + }); + + const putPayment = async ({ + payment, + workerId, + taskId, + status = "created", + }: { + payment: Payment; + workerId: string; + taskId?: string; + status?: PaymentStatus; + }) => { + const paymentId = computePaymentId(payment); + const record: ManagerPaymentRecord = { + state: { + payment, + workerId, + taskId, + status, + }, + events: [ + { + type: "create", + timestamp: Math.floor(Date.now() / 1000), + }, + ], + }; + + const batch = datastore.batch(); + + batch.put( + new Key(`/payments/state/${paymentId}`), + Buffer.from(stringifyWithBigInt(record)), + ); + + batch.put(new Key(`/payments/byWorker/${workerId}/${paymentId}`), new Uint8Array()); + + if (taskId) { + batch.put(new Key(`/payments/byTask/${taskId}/${paymentId}`), new Uint8Array()); + } + + batch.put(new Key(`/payments/byStatus/${status}/${paymentId}`), new Uint8Array()); + + await batch.commit(); + + return record; + }; + + const updateStatus = async ({ + paymentId, + status, + }: { + paymentId: string; + status: PaymentStatus; + }) => { + const record = await coreStore.get({ entityId: `state/${paymentId}` }); + + if (!record) { + throw new Error("Payment not found"); + } + + const previousStatus = record.state.status; + record.state.status = status; + record.events.push({ + type: "status", + status, + timestamp: Math.floor(Date.now() / 1000), + }); + + const batch = datastore.batch(); + batch.put( + new Key(`/payments/state/${paymentId}`), + Buffer.from(stringifyWithBigInt(record)), + ); + + if (previousStatus !== status) { + batch.delete(new Key(`/payments/byStatus/${previousStatus}/${paymentId}`)); + batch.put(new Key(`/payments/byStatus/${status}/${paymentId}`), new Uint8Array()); + } + + await batch.commit(); + }; + + const listByWorker = async ({ + workerId, + limit, + }: { + workerId: string; + limit?: number; + }) => { + const payments: ManagerPaymentRecord[] = []; + let count = 0; + + for await (const key of datastore.queryKeys({ + prefix: `/payments/byWorker/${workerId}`, + })) { + if (limit && count >= limit) break; + + const paymentId = key.toString().split("/").pop(); + if (!paymentId) continue; + + const record = await coreStore.get({ entityId: `state/${paymentId}` }); + if (record) { + payments.push(record); + count += 1; + } + } + + return payments; + }; + + return { + ...coreStore, + putPayment, + updateStatus, + listByWorker, + }; +}; + +export type ManagerPaymentStore = ReturnType; diff --git a/modules/manager/src/stores/managerTaskStore.spec.ts b/modules/manager/src/stores/managerTaskStore.spec.ts index 9a3fa558..e6f81354 100644 --- a/modules/manager/src/stores/managerTaskStore.spec.ts +++ b/modules/manager/src/stores/managerTaskStore.spec.ts @@ -1,528 +1,175 @@ -import type { PeerId } from "@libp2p/interface"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { describe, expect, it, beforeEach, vi } from "vitest"; import { createManagerTaskStore } from "./managerTaskStore.js"; import { type Datastore, Key, - Task, TaskValidationError, TaskExpiredError, stringifyWithBigInt, } from "@effectai/protocol-core"; - -const mockDatastore = { - has: vi.fn(), - get: vi.fn(), - put: vi.fn(), - delete: vi.fn(), - query: vi.fn(), - batch: vi.fn(() => { - return { - put: vi.fn(), - delete: vi.fn(), - commit: vi.fn(), - }; - }), +import type { Task } from "@effectai/protobufs"; + +const createInMemoryDatastore = (): Datastore => { + const store = new Map(); + + return { + has: vi.fn(async (key: Key) => store.has(key.toString())), + get: vi.fn(async (key: Key) => { + const value = store.get(key.toString()); + if (!value) throw new Error("not found"); + return value; + }), + put: vi.fn(async (key: Key, value: Uint8Array) => { + store.set(key.toString(), value); + return key; + }), + delete: vi.fn(async (key: Key) => { + store.delete(key.toString()); + }), + query: vi.fn(), + queryKeys: async function* ({ prefix }: { prefix: string }) { + for (const k of store.keys()) { + if (k.startsWith(prefix)) { + yield new Key(k); + } + } + }, + batch: vi.fn(() => { + const puts: Array<[Key, Uint8Array]> = []; + const deletes: Key[] = []; + return { + put: (key: Key, value: Uint8Array) => { + puts.push([key, value]); + }, + delete: (key: Key) => { + deletes.push(key); + }, + commit: async () => { + for (const [key, value] of puts) { + store.set(key.toString(), value); + } + for (const key of deletes) { + store.delete(key.toString()); + } + }, + }; + }), + } as unknown as Datastore; }; -const mockPeerId = { - toString: () => "peerId123", -} as PeerId; - const mockTask: Task = { - id: "task123", + id: "01HZZZ0JXH5X6Y8E9Z3QTK1Q6P", title: "Test Task", reward: 100n, - timeLimitSeconds: 3600, + timeLimitSeconds: 60, templateId: "template123", templateData: "{}", }; describe("ManagerTaskStore", () => { + let datastore: Datastore; let taskStore: ReturnType; beforeEach(() => { - vi.resetAllMocks(); + datastore = createInMemoryDatastore(); taskStore = createManagerTaskStore({ - datastore: mockDatastore as unknown as Datastore, - }); - - mockDatastore.get.mockImplementation(async (key: Key) => { - return Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [], - }), - ); - }); - mockDatastore.put.mockResolvedValue(new Key("/tasks/task123")); - - mockDatastore.batch.mockReturnValue({ - put: vi.fn().mockResolvedValue(new Key("/tasks/task123")), - delete: vi.fn(), - commit: vi.fn(), + datastore, }); }); - describe("create", () => { - it("should create a new task record", async () => { - const result = await taskStore.create({ - task: mockTask, - providerPeerId: mockPeerId, - }); - - expect(result.state).toEqual(mockTask); - expect(result.events).toHaveLength(1); - expect(result.events[0].type).toBe("create"); - expect(mockDatastore.put).toHaveBeenCalled(); + it("creates and indexes a task", async () => { + const record = await taskStore.create({ + task: mockTask, + providerPeerIdStr: "peer-1", }); - it("should include the provider peer in create event", async () => { - const result = await taskStore.create({ - task: mockTask, - providerPeerIdStr: mockPeerId.toString(), - }); + expect(record.state.status).toBe("created"); - expect(result.events[0].providerPeer).toBe("peerId123"); - }); + const stored = await datastore.get(new Key(`/tasks/state/${mockTask.id}`)); + const parsed = JSON.parse(Buffer.from(stored).toString()) as any; + expect(parsed.state.id).toBe(mockTask.id); }); - describe("assign", () => { - it("should assign task to worker when last event is create", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { - type: "create", - timestamp: Math.floor(Date.now() / 1000), - providerPeer: "peerId123", - }, - ], - }), - ), - ); - - await taskStore.assign({ - entityId: "task123", - workerPeerIdStr: "workerPeerId123", - }); - - expect(mockDatastore.batch().put).toHaveBeenCalled(); - const updatedRecord = JSON.parse( - mockDatastore.batch().put.mock.calls[0][1].toString(), - ); - expect(updatedRecord.events[1].type).toBe("assign"); - expect(updatedRecord.events[1].assignedToPeer).toBe("workerPeerId123"); + it("assigns, accepts, completes, and pays out a task", async () => { + await taskStore.create({ + task: mockTask, + providerPeerIdStr: "peer-1", }); - it("should assign task to worker when last event is reject", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "reject", - timestamp: 2000, - reason: "busy", - rejectedByPeer: "worker1", - }, - ], - }), - ), - ); - - await taskStore.assign({ - entityId: "task123", - workerPeerIdStr: "workerPeerId123", - }); - - const updatedRecord = JSON.parse( - mockDatastore.batch().put.mock.calls[0][1].toString(), - ); - expect(updatedRecord.events[2].type).toBe("assign"); - }); - - it("should throw when trying to assign to a non-assignable state", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { type: "assign", timestamp: 2000, assignedToPeer: "worker1" }, - ], - }), - ), - ); - - await expect( - taskStore.assign({ - entityId: "task123", - workerPeerIdStr: "workerPeerId123", - }), - ).rejects.toThrow(TaskValidationError); + await taskStore.assign({ + entityId: mockTask.id, + workerPeerIdStr: "worker-1", }); - }); - - describe("accept", () => { - it("should accept task when properly assigned", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: Math.floor(Date.now() / 1000) - 10, - assignedToPeer: "workerPeerId123", - }, - ], - }), - ), - ); - - await taskStore.accept({ - entityId: "task123", - peerIdStr: "workerPeerId123", - }); - expect(mockDatastore.put).toHaveBeenCalled(); - const updatedRecord = JSON.parse( - new TextDecoder().decode(mockDatastore.put.mock.calls[0][1]), - ); - expect(updatedRecord.events[2].type).toBe("accept"); + const accepted = await taskStore.accept({ + entityId: mockTask.id, + peerIdStr: "worker-1", }); + expect(accepted.state.status).toBe("accepted"); - it("should throw when accepting an expired task", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: Math.floor(Date.now() / 1000) - 3600, - assignedToPeer: "workerPeerId123", - }, - ], - }), - ), - ); - - await expect( - taskStore.accept({ - entityId: "task123", - peerIdStr: "workerPeerId123", - }), - ).rejects.toThrow(TaskExpiredError); + const submitted = await taskStore.complete({ + entityId: mockTask.id, + peerIdStr: "worker-1", + result: "ok", }); + expect(submitted.state.status).toBe("submitted"); - it("should throw when accepting a task not assigned to this worker", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "differentWorker", - }, - ], - }), - ), - ); - - await expect( - taskStore.accept({ - entityId: "task123", - peerIdStr: "workerPeerId123", - }), - ).rejects.toThrow(TaskValidationError); + const payout = await taskStore.payout({ + entityId: mockTask.id, + payment: { + id: "payment-1", + version: 1, + amount: 100n, + recipient: "recipient", + paymentAccount: "account", + publicKey: "pub", + nonce: 1n, + label: "", + }, }); + expect(payout.state.status).toBe("completed"); }); - describe("reject", () => { - it("should reject task when properly assigned", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - ], - }), - ), - ); - - await taskStore.reject({ - entityId: "task123", - peerIdStr: "workerPeerId123", - reason: "Too busy", - }); - - const updatedRecord = JSON.parse( - mockDatastore.batch().put.mock.calls[0][1].toString(), - ); - expect(updatedRecord.events[2].type).toBe("reject"); - expect(updatedRecord.events[2].reason).toBe("Too busy"); + it("rejects invalid transitions", async () => { + await taskStore.create({ + task: mockTask, + providerPeerIdStr: "peer-1", }); - it("should throw when rejecting a task not assigned to this worker", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "differentWorker", - }, - ], - }), - ), - ); - - await expect( - taskStore.reject({ - entityId: "task123", - peerIdStr: "workerPeerId123", - reason: "Too busy", - }), - ).rejects.toThrow(TaskValidationError); - }); + await expect( + taskStore.accept({ + entityId: mockTask.id, + peerIdStr: "worker-1", + }), + ).rejects.toThrow(TaskValidationError); }); - describe("complete", () => { - it("should complete task when properly accepted", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - { - type: "accept", - timestamp: Math.floor(Date.now() / 1000) - 10, // 10 seconds ago - acceptedByPeer: "workerPeerId123", - }, - ], - }), - ), - ); - - await taskStore.complete({ - entityId: "task123", - result: "Task completed successfully", - peerIdStr: "workerPeerId123", - }); - - const updatedRecord = JSON.parse( - mockDatastore.batch().put.mock.calls[0][1].toString(), - ); - expect(updatedRecord.events[3].type).toBe("submission"); - expect(updatedRecord.events[3].result).toBe( - "Task completed successfully", - ); - }); - - it("should throw when completing an expired task", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: { ...mockTask, timeLimitSeconds: 60 }, // 1 minute limit - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - { - type: "accept", - timestamp: Math.floor(Date.now() / 1000) - 120, // 2 minutes ago - acceptedByPeer: "workerPeerId123", - }, - ], - }), - ), - ); - - await expect( - taskStore.complete({ - entityId: "task123", - result: "Task completed", - peerIdStr: "workerPeerId123", - }), - ).rejects.toThrow(TaskExpiredError); - }); - - it("should throw when completing a task not accepted by this worker", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - { - type: "accept", - timestamp: 3000, - acceptedByPeer: "differentWorker", - }, - ], - }), - ), - ); - - await expect( - taskStore.complete({ - entityId: "task123", - result: "Task completed", - peerIdStr: "workerPeerId123", - }), - ).rejects.toThrow(TaskValidationError); - }); - - it("should throw when task is already completed", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - { - type: "accept", - timestamp: 3000, - acceptedByPeer: "workerPeerId123", - }, - { - type: "complete", - timestamp: 4000, - completedByPeer: "workerPeerId123", - result: "done", - }, - ], - }), - ), - ); - - await expect( - taskStore.complete({ - entityId: "task123", - result: "Task completed again", - peerIdStr: "workerPeerId123", - }), - ).rejects.toThrow(TaskValidationError); + it("rejects expired acceptance", async () => { + await taskStore.create({ + task: { ...mockTask, timeLimitSeconds: 1 }, + providerPeerIdStr: "peer-1", }); - }); - - describe("payout", () => { - const mockPayment = { - id: "payment123", - amount: 100, - recipient: "workerPeerId123", - nonce: 1, - timestamp: Math.floor(Date.now() / 1000), - }; - - it("should add payout event when task is completed", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - { - type: "accept", - timestamp: 3000, - acceptedByPeer: "workerPeerId123", - }, - { - type: "submission", - timestamp: 4000, - completedByPeer: "workerPeerId123", - result: "done", - }, - ], - }), - ), - ); - await taskStore.payout({ - entityId: "task123", - payment: mockPayment, - }); - - const updatedRecord = JSON.parse( - mockDatastore.batch().put.mock.calls[0][1].toString(), - ); - expect(updatedRecord.events[4].type).toBe("payout"); - expect(updatedRecord.events[4].payment).toEqual(mockPayment); + await taskStore.assign({ + entityId: mockTask.id, + workerPeerIdStr: "worker-1", }); - it("should throw when paying out a non-completed task", async () => { - mockDatastore.get.mockResolvedValueOnce( - Buffer.from( - stringifyWithBigInt({ - state: mockTask, - events: [ - { type: "create", timestamp: 1000, providerPeer: "peerId123" }, - { - type: "assign", - timestamp: 2000, - assignedToPeer: "workerPeerId123", - }, - { - type: "accept", - timestamp: 3000, - acceptedByPeer: "workerPeerId123", - }, - ], - }), - ), - ); + // force expiration by manipulating stored state + const stored = await datastore.get(new Key(`/tasks/state/${mockTask.id}`)); + const parsed = JSON.parse(Buffer.from(stored).toString()) as any; + parsed.state.acceptanceDeadline = Math.floor(Date.now() / 1000) - 1; + await datastore.put( + new Key(`/tasks/state/${mockTask.id}`), + Buffer.from(stringifyWithBigInt(parsed)), + ); - await expect( - taskStore.payout({ - entityId: "task123", - payment: mockPayment, - }), - ).rejects.toThrow(TaskValidationError); - }); + await expect( + taskStore.accept({ + entityId: mockTask.id, + peerIdStr: "worker-1", + }), + ).rejects.toThrow(TaskExpiredError); }); }); diff --git a/modules/manager/src/stores/managerTaskStore.ts b/modules/manager/src/stores/managerTaskStore.ts index 4959d2ca..e025c022 100644 --- a/modules/manager/src/stores/managerTaskStore.ts +++ b/modules/manager/src/stores/managerTaskStore.ts @@ -2,7 +2,6 @@ import { TASK_ACCEPTANCE_TIME } from "../consts.js"; import { type Datastore, type BaseTaskEvent, - type TaskRecord, Key, TaskValidationError, TaskExpiredError, @@ -13,6 +12,15 @@ import { } from "@effectai/protocol-core"; import type { Task, Payment } from "@effectai/protobufs"; +export type TaskStatus = + | "created" + | "assigned" + | "accepted" + | "submitted" + | "payout_pending" + | "completed" + | "rejected"; + export type ManagerTaskEvent = | TaskCreatedEvent | TaskAssignedEvent @@ -53,13 +61,24 @@ export interface TaskPaymentEvent extends BaseTaskEvent { payment: Payment; } -export interface TaskCompletedEvent extends BaseTaskEvent { - type: "complete"; - result: string; - completedByPeer: string; +export interface ManagerTaskState { + id: string; + task: Task; + status: TaskStatus; + providerPeer: string; + assignedTo?: string; + acceptedBy?: string; + submissionBy?: string; + result?: string; + attempts: number; + acceptanceDeadline?: number; + completionDeadline?: number; } -export type ManagerTaskRecord = TaskRecord; +export interface ManagerTaskRecord { + events: ManagerTaskEvent[]; + state: ManagerTaskState; +} export const createManagerTaskStore = ({ datastore, @@ -73,16 +92,53 @@ export const createManagerTaskStore = ({ parse: (data) => parseWithBigInt(data), }); + const stateKey = (taskId: string) => new Key(`/tasks/state/${taskId}`); + const statusKey = (status: TaskStatus, taskId: string) => + new Key(`/tasks/byStatus/${status}/${taskId}`); + const workerKey = (workerId: string, taskId: string) => + new Key(`/tasks/byWorker/${workerId}/${taskId}`); + + const writeRecord = async ({ + record, + previousStatus, + nextStatus, + previousAssignedTo, + nextAssignedTo, + }: { + record: ManagerTaskRecord; + previousStatus?: TaskStatus; + nextStatus: TaskStatus; + previousAssignedTo?: string; + nextAssignedTo?: string; + }) => { + const batch = datastore.batch(); + + batch.put(stateKey(record.state.id), Buffer.from(stringifyWithBigInt(record))); + + if (previousStatus && previousStatus !== nextStatus) { + batch.delete(statusKey(previousStatus, record.state.id)); + } + batch.put(statusKey(nextStatus, record.state.id), new Uint8Array()); + + if (previousAssignedTo && previousAssignedTo !== nextAssignedTo) { + batch.delete(workerKey(previousAssignedTo, record.state.id)); + } + + if (nextAssignedTo) { + batch.put(workerKey(nextAssignedTo, record.state.id), new Uint8Array()); + } + + await batch.commit(); + }; + const getTask = async ({ entityId, - index = "active", }: { entityId: string; - index?: string; - }): Promise => { + }): Promise => { try { const taskRecord = await coreStore.get({ - entityId: `${index}/${entityId}`, + entityId: `state/${entityId}`, }); if (!taskRecord) { @@ -117,60 +173,64 @@ export const createManagerTaskStore = ({ providerPeer: providerPeerIdStr, }, ], - state: task, + state: { + id: task.id, + task, + status: "created", + providerPeer: providerPeerIdStr, + attempts: 0, + }, }; - await coreStore.put({ entityId: `active/${task.id}`, record }); + await writeRecord({ + record, + nextStatus: "created", + }); return record; }; - const complete = async ({ + const assign = async ({ entityId, - result, - peerIdStr, + workerPeerIdStr, }: { entityId: string; - result: string; - peerIdStr: string; + workerPeerIdStr: string; }): Promise => { const taskRecord = await getTask({ entityId }); - if (!taskRecord) { - throw new TaskValidationError("Task not found"); + const { status } = taskRecord.state; + if (status !== "created" && status !== "rejected") { + throw new TaskValidationError("Task is not in a valid state to assign"); } - if (taskRecord.events.some((e) => e.type === "submission")) { - throw new TaskValidationError("Task is already submitted"); - } + const acceptanceWindow = + typeof taskRecord.state.task.timeLimitSeconds === "number" && + taskRecord.state.task.timeLimitSeconds > 0 + ? taskRecord.state.task.timeLimitSeconds + : TASK_ACCEPTANCE_TIME; - // only allowed to complete if last event is accept - const lastEvent = taskRecord.events[taskRecord.events.length - 1]; - if (lastEvent.type !== "accept" || lastEvent.acceptedByPeer !== peerIdStr) { - throw new TaskValidationError("Task was not accepted by this worker"); - } - - if ( - Date.now() / 1000 - lastEvent.timestamp >= - taskRecord.state.timeLimitSeconds - ) { - throw new TaskExpiredError("Task has expired."); - } + const previousAssignedTo = taskRecord.state.assignedTo; taskRecord.events.push({ timestamp: Math.floor(Date.now() / 1000), - type: "submission", - result, - submissionByPeer: peerIdStr, + type: "assign", + assignedToPeer: workerPeerIdStr, }); - const batch = datastore.batch(); - batch.put( - new Key(`/tasks/active/${taskRecord.state.id}`), - Buffer.from(stringifyWithBigInt(taskRecord)), - ); - batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`)); - await batch.commit(); + taskRecord.state.status = "assigned"; + taskRecord.state.assignedTo = workerPeerIdStr; + taskRecord.state.acceptanceDeadline = + Math.floor(Date.now() / 1000) + acceptanceWindow; + taskRecord.state.attempts += 1; + + await writeRecord({ + record: taskRecord, + previousStatus: status, + nextStatus: "assigned", + previousAssignedTo, + nextAssignedTo: workerPeerIdStr, + }); return taskRecord; }; @@ -184,28 +244,43 @@ export const createManagerTaskStore = ({ }): Promise => { const taskRecord = await getTask({ entityId }); - if (!taskRecord) { - throw new TaskValidationError("Task not found"); + if (taskRecord.state.status !== "assigned") { + throw new TaskValidationError("Task is not assigned"); } - // only allowed to accept if last event is assign - const lastEvent = taskRecord.events[taskRecord.events.length - 1]; - - if (lastEvent.type !== "assign" || lastEvent.assignedToPeer !== peerIdStr) { + if (taskRecord.state.assignedTo !== peerIdStr) { throw new TaskValidationError("Task was not assigned to this worker"); } - if (Date.now() / 1000 - lastEvent.timestamp >= TASK_ACCEPTANCE_TIME) { + const acceptanceDeadline = taskRecord.state.acceptanceDeadline; + if (acceptanceDeadline && Math.floor(Date.now() / 1000) > acceptanceDeadline) { throw new TaskExpiredError("Task has expired."); } + const completionWindow = + typeof taskRecord.state.task.timeLimitSeconds === "number" && + taskRecord.state.task.timeLimitSeconds > 0 + ? taskRecord.state.task.timeLimitSeconds + : TASK_ACCEPTANCE_TIME; + taskRecord.events.push({ timestamp: Math.floor(Date.now() / 1000), type: "accept", acceptedByPeer: peerIdStr, }); - await coreStore.put({ entityId: `active/${entityId}`, record: taskRecord }); + taskRecord.state.status = "accepted"; + taskRecord.state.acceptedBy = peerIdStr; + taskRecord.state.completionDeadline = + Math.floor(Date.now() / 1000) + completionWindow; + + await writeRecord({ + record: taskRecord, + previousStatus: "assigned", + nextStatus: "accepted", + previousAssignedTo: taskRecord.state.assignedTo, + nextAssignedTo: taskRecord.state.assignedTo, + }); return taskRecord; }; @@ -218,26 +293,20 @@ export const createManagerTaskStore = ({ entityId: string; peerIdStr: string; reason: string; - }): Promise => { + }): Promise => { const taskRecord = await getTask({ entityId }); - if (!taskRecord) { - throw new TaskValidationError("Task not found"); + const status = taskRecord.state.status; + if (status !== "assigned" && status !== "accepted") { + throw new TaskValidationError("Task is not in a valid state to reject"); } - const latestAssignEvent = taskRecord.events.reduce( - (latest: TaskAssignedEvent | null, current) => { - if (current.type === "assign") { - if (!latest || current.timestamp > latest.timestamp) { - return current; - } - } - return latest; - }, - null, - ); + const expectedPeer = + status === "assigned" + ? taskRecord.state.assignedTo + : taskRecord.state.acceptedBy; - if (!latestAssignEvent || latestAssignEvent.assignedToPeer !== peerIdStr) { + if (!expectedPeer || expectedPeer !== peerIdStr) { throw new TaskValidationError("Task was not assigned to this worker"); } @@ -248,98 +317,159 @@ export const createManagerTaskStore = ({ rejectedByPeer: peerIdStr, }); - const batch = datastore.batch(); - - batch.put( - new Key(`/tasks/active/${taskRecord.state.id}`), - Buffer.from(stringifyWithBigInt(taskRecord)), - ); - - batch.delete( - new Key(`/tasks/assign/${latestAssignEvent?.assignedToPeer}/${entityId}`), - ); + const previousAssignedTo = taskRecord.state.assignedTo; + taskRecord.state.status = "rejected"; + taskRecord.state.assignedTo = undefined; + taskRecord.state.acceptedBy = undefined; + taskRecord.state.submissionBy = undefined; + taskRecord.state.result = undefined; + + await writeRecord({ + record: taskRecord, + previousStatus: status, + nextStatus: "rejected", + previousAssignedTo, + nextAssignedTo: undefined, + }); - await batch.commit(); + return taskRecord; }; - const payout = async ({ + const complete = async ({ entityId, - payment, + result, + peerIdStr, }: { entityId: string; - payment: Payment; - }): Promise => { + result: string; + peerIdStr: string; + }): Promise => { const taskRecord = await getTask({ entityId }); - if (!taskRecord) { - throw new TaskValidationError("Task not found"); + if (taskRecord.state.status !== "accepted") { + throw new TaskValidationError("Task is not accepted"); } - // only allowed to payout if last event is complete - const lastEvent = taskRecord.events[taskRecord.events.length - 1]; + if (taskRecord.state.acceptedBy !== peerIdStr) { + throw new TaskValidationError("Task was not accepted by this worker"); + } - if (lastEvent.type !== "submission") { - throw new TaskValidationError("Task is not submitted yet"); + const completionDeadline = taskRecord.state.completionDeadline; + if (completionDeadline && Math.floor(Date.now() / 1000) > completionDeadline) { + throw new TaskExpiredError("Task has expired."); } taskRecord.events.push({ timestamp: Math.floor(Date.now() / 1000), - type: "payout", - payment, + type: "submission", + result, + submissionByPeer: peerIdStr, }); - //delete from active tasks - const batch = datastore.batch(); + taskRecord.state.status = "submitted"; + taskRecord.state.submissionBy = peerIdStr; + taskRecord.state.result = result; - batch.put( - new Key(`/tasks/completed/${taskRecord.state.id}`), - Buffer.from(stringifyWithBigInt(taskRecord)), - ); - batch.delete(new Key(`/tasks/active/${taskRecord.state.id}`)); + await writeRecord({ + record: taskRecord, + previousStatus: "accepted", + nextStatus: "submitted", + previousAssignedTo: taskRecord.state.assignedTo, + nextAssignedTo: taskRecord.state.assignedTo, + }); - await batch.commit(); + return taskRecord; }; - const assign = async ({ + const payout = async ({ entityId, - workerPeerIdStr, + payment, }: { entityId: string; - workerPeerIdStr: string; - }): Promise => { + payment: Payment; + }): Promise => { const taskRecord = await getTask({ entityId }); - if (!taskRecord) { - throw new TaskValidationError("Task not found"); - } - - // only allowed to assign if last event is create or reject. - const lastEvent = taskRecord.events[taskRecord.events.length - 1]; - - if (lastEvent.type !== "create" && lastEvent.type !== "reject") { - throw new TaskValidationError("Task is not in a valid state to assign"); + if (taskRecord.state.status !== "submitted") { + throw new TaskValidationError("Task is not submitted yet"); } taskRecord.events.push({ timestamp: Math.floor(Date.now() / 1000), - type: "assign", - assignedToPeer: workerPeerIdStr, + type: "payout", + payment, }); - const batch = datastore.batch(); + const previousAssignedTo = taskRecord.state.assignedTo; + taskRecord.state.status = "completed"; + taskRecord.state.assignedTo = undefined; + taskRecord.state.acceptedBy = undefined; + + await writeRecord({ + record: taskRecord, + previousStatus: "submitted", + nextStatus: "completed", + previousAssignedTo, + nextAssignedTo: taskRecord.state.submissionBy, + }); - batch.put( - new Key(`/tasks/active/${taskRecord.state.id}`), - Buffer.from(stringifyWithBigInt(taskRecord)), - ); + return taskRecord; + }; - // add index - batch.put( - new Key(`/tasks/assign/${workerPeerIdStr}/${entityId}`), - new Uint8Array(), - ); + const listByWorker = async ({ + workerId, + limit, + }: { + workerId: string; + limit?: number; + }) => { + const tasks: ManagerTaskRecord[] = []; + let count = 0; + + for await (const key of datastore.queryKeys({ + prefix: `/tasks/byWorker/${workerId}`, + })) { + if (limit && count >= limit) break; + + const taskId = key.toString().split("/").pop(); + if (!taskId) continue; + + const record = await coreStore.get({ entityId: `state/${taskId}` }); + if (record) { + tasks.push(record); + count += 1; + } + } - await batch.commit(); + return tasks; + }; + + const listByStatus = async ({ + status, + limit, + }: { + status: TaskStatus; + limit?: number; + }) => { + const tasks: ManagerTaskRecord[] = []; + let count = 0; + + for await (const key of datastore.queryKeys({ + prefix: `/tasks/byStatus/${status}`, + })) { + if (limit && count >= limit) break; + + const taskId = key.toString().split("/").pop(); + if (!taskId) continue; + + const record = await coreStore.get({ entityId: `state/${taskId}` }); + if (record) { + tasks.push(record); + count += 1; + } + } + + return tasks; }; return { @@ -351,6 +481,8 @@ export const createManagerTaskStore = ({ payout, assign, getTask, + listByStatus, + listByWorker, }; }; diff --git a/modules/manager/src/stores/managerWorkerStore.ts b/modules/manager/src/stores/managerWorkerStore.ts index 1d998651..dab70ad4 100644 --- a/modules/manager/src/stores/managerWorkerStore.ts +++ b/modules/manager/src/stores/managerWorkerStore.ts @@ -3,9 +3,20 @@ import { parseWithBigInt, createEntityStore, stringifyWithBigInt, + Key, } from "@effectai/protocol-core"; -export type ManagerWorkerEvent = WorkerCreated | WorkerJoined | WorkerBanned; +export type WorkerStatus = + | "disconnected" + | "connected" + | "idle" + | "assigned" + | "accepted" + | "submitting" + | "banned" + | "maintenance_blocked"; + +export type ManagerWorkerEvent = WorkerCreated | WorkerJoined | WorkerBanned | WorkerStatusChanged; export interface WorkerCreated { timestamp: number; @@ -23,6 +34,12 @@ export interface WorkerBanned { reason: string; } +export interface WorkerStatusChanged { + timestamp: number; + type: "status"; + status: WorkerStatus; +} + export interface WorkerState { peerId: string; recipient: string; @@ -39,6 +56,8 @@ export interface WorkerState { capabilities: string[]; managerCapabilities: string[]; accessCodeRedeemed?: string; + status: WorkerStatus; + assignments: string[]; } export interface ManagerWorkerRecord { @@ -54,16 +73,64 @@ export const createWorkerStore = ({ datastore }: { datastore: Datastore }) => { ManagerWorkerRecord >({ datastore, - defaultPrefix: "worker", + defaultPrefix: "workers", stringify: (record) => stringifyWithBigInt(record), parse: (data) => parseWithBigInt(data), }); + const stateKey = (peerId: string) => new Key(`/workers/state/${peerId}`); + const statusKey = (status: WorkerStatus, peerId: string) => + new Key(`/workers/byStatus/${status}/${peerId}`); + const capabilityKey = (capability: string, peerId: string) => + new Key(`/workers/byCapability/${capability}/${peerId}`); + + const allCapabilities = (state: WorkerState) => + Array.from(new Set([...state.capabilities, ...state.managerCapabilities])); + + const updateIndexes = async ({ + previous, + next, + }: { + previous: WorkerState | null; + next: WorkerState; + }) => { + const batch = datastore.batch(); + + batch.put(stateKey(next.peerId), Buffer.from(stringifyWithBigInt({ + events: [], + state: next, + }))); + + if (!previous || previous.status !== next.status) { + if (previous) { + batch.delete(statusKey(previous.status, next.peerId)); + } + batch.put(statusKey(next.status, next.peerId), new Uint8Array()); + } + + const previousCaps = previous ? allCapabilities(previous) : []; + const nextCaps = allCapabilities(next); + + for (const cap of previousCaps) { + if (!nextCaps.includes(cap)) { + batch.delete(capabilityKey(cap, next.peerId)); + } + } + + for (const cap of nextCaps) { + if (!previousCaps.includes(cap)) { + batch.put(capabilityKey(cap, next.peerId), new Uint8Array()); + } + } + + await batch.commit(); + }; + const createWorker = async ( peerId: string, - state: Partial & Pick, + state: Partial & Pick ) => { - const record = await coreStore.getSafe({ entityId: peerId }); + const record = await coreStore.getSafe({ entityId: `state/${peerId}` }); if (record) { throw new Error("Worker already exists"); @@ -90,12 +157,22 @@ export const createWorkerStore = ({ datastore }: { datastore: Datastore }) => { isAdmin: false, accessCodeRedeemed: state.accessCodeRedeemed, capabilities: state.capabilities || [], - managerCapabilities: [], + managerCapabilities: state.managerCapabilities || [], lastActivity: Math.floor(Date.now() / 1000), + status: state.status || "connected", + assignments: state.assignments || [], }, }; - await coreStore.put({ entityId: peerId, record: newRecord }); + const batch = datastore.batch(); + batch.put(stateKey(peerId), Buffer.from(stringifyWithBigInt(newRecord))); + batch.put(statusKey(newRecord.state.status, peerId), new Uint8Array()); + + for (const cap of allCapabilities(newRecord.state)) { + batch.put(capabilityKey(cap, peerId), new Uint8Array()); + } + + await batch.commit(); return newRecord; }; @@ -104,17 +181,53 @@ export const createWorkerStore = ({ datastore }: { datastore: Datastore }) => { peerId: string, updater: (current: WorkerRecord["state"]) => Partial, ) => { - const record = await coreStore.get({ entityId: peerId }); + const record = await coreStore.get({ entityId: `state/${peerId}` }); + const previous = structuredClone(record.state); const update = updater(record.state); Object.assign(record.state, update); - await coreStore.put({ entityId: peerId, record }); + if (previous.status !== record.state.status) { + record.events.push({ + timestamp: Math.floor(Date.now() / 1000), + type: "status", + status: record.state.status, + }); + } + + const batch = datastore.batch(); + batch.put(stateKey(peerId), Buffer.from(stringifyWithBigInt(record))); + + if (previous.status !== record.state.status) { + batch.delete(statusKey(previous.status, peerId)); + batch.put(statusKey(record.state.status, peerId), new Uint8Array()); + } + + const previousCaps = allCapabilities(previous); + const nextCaps = allCapabilities(record.state); + + for (const cap of previousCaps) { + if (!nextCaps.includes(cap)) { + batch.delete(capabilityKey(cap, peerId)); + } + } + + for (const cap of nextCaps) { + if (!previousCaps.includes(cap)) { + batch.put(capabilityKey(cap, peerId), new Uint8Array()); + } + } + + await batch.commit(); }; + const getWorkerState = async (peerId: string) => + coreStore.getSafe({ entityId: `state/${peerId}` }); + return { ...coreStore, updateWorker, createWorker, + getWorkerState, }; }; diff --git a/modules/payment/tests/circuits.spec.ts b/modules/payment/tests/circuits.spec.ts index 9ef93758..a852a2e5 100644 --- a/modules/payment/tests/circuits.spec.ts +++ b/modules/payment/tests/circuits.spec.ts @@ -6,8 +6,8 @@ import { intStringTo32Bytes, prove, signPayment, -} from "../clients/js"; -import { publicKeyToTruncatedHex } from "../clients/js"; +} from "../clients/js/node"; +import { publicKeyToTruncatedHex } from "../clients/js/node"; import { randomBytes } from "node:crypto"; import { PublicKey } from "@solana/web3.js"; import { buildEddsa } from "circomlibjs"; @@ -15,7 +15,10 @@ import { buildEddsa } from "circomlibjs"; import { setup } from "@effectai/test-utils"; import { generateKeyPairSigner } from "@solana/kit"; -describe("Generate Proof", () => { +const runIntegrationTests = process.env.RUN_INTEGRATION_TESTS === "1"; +const describeIntegration = runIntegrationTests ? describe : describe.skip; + +describeIntegration("Generate Proof", () => { it("should generate and prove a proof", async () => { const privateKeyBytes = randomBytes(32); diff --git a/modules/payment/tests/program.test.ts b/modules/payment/tests/program.test.ts index ebe9648b..c735e3cc 100644 --- a/modules/payment/tests/program.test.ts +++ b/modules/payment/tests/program.test.ts @@ -11,7 +11,7 @@ import { getRecipientManagerDataAccountEncoder, PAYMENT_BATCH_SIZE, signPayment, -} from "../clients/js"; +} from "../clients/js/node"; import { address, @@ -27,23 +27,25 @@ import { } from "@effectai/utils"; import { LiteSVM } from "litesvm"; -describe("Payment Program", async () => { - const eddsa = await buildEddsa(); - const liteSVM = new LiteSVM(); - liteSVM.addProgramFromFile( - EFFECT_PAYMENT_PROGRAM_ADDRESS, - "../../../target/deploy/effect_payment.so", - ); +const runIntegrationTests = process.env.RUN_INTEGRATION_TESTS === "1"; +const describeIntegration = runIntegrationTests ? describe : describe.skip; - const managerPrivateKey = randomBytes(32); - const eddsaPublicKey = eddsa.prv2pub(managerPrivateKey); - const bs58ManagerPublicKey = getAddressDecoder().decode( - eddsa.babyJub.packPoint(eddsaPublicKey), - ); +describeIntegration("Payment Program", async () => { + it("can redeem a proof", async () => { + const eddsa = await buildEddsa(); + const liteSVM = new LiteSVM(); + liteSVM.addProgramFromFile( + EFFECT_PAYMENT_PROGRAM_ADDRESS, + "../../../target/deploy/effect_payment.so", + ); - const provider = await createLocalSolanaProvider(); + const managerPrivateKey = randomBytes(32); + const eddsaPublicKey = eddsa.prv2pub(managerPrivateKey); + const bs58ManagerPublicKey = getAddressDecoder().decode( + eddsa.babyJub.packPoint(eddsaPublicKey), + ); + const provider = await createLocalSolanaProvider(); - it("can redeem a proof", async () => { const { mint, ata, signer } = await setup(); const paymentAccount = await generateKeyPairSigner(); console.log(PAYMENT_BATCH_SIZE, "PAYMENT_BATCH_SIZE"); diff --git a/modules/worker/src/index.ts b/modules/worker/src/index.ts index 1df9014d..6b670df6 100644 --- a/modules/worker/src/index.ts +++ b/modules/worker/src/index.ts @@ -1,2 +1,3 @@ export * from "./main.js"; export * from "./stores/workerTaskStore.js"; +export * from "./stores/workerSyncStateStore.js"; diff --git a/modules/worker/src/main.ts b/modules/worker/src/main.ts index 4e0cff59..893ccc28 100644 --- a/modules/worker/src/main.ts +++ b/modules/worker/src/main.ts @@ -2,6 +2,7 @@ import { webRTC } from "@libp2p/webrtc"; import { createPaymentWorker } from "./modules/createPaymentWorker.js"; import { createTaskWorker } from "./modules/createTaskWorker.js"; import { createWorkerTaskStore } from "./stores/workerTaskStore.js"; +import { createWorkerSyncStateStore } from "./stores/workerSyncStateStore.js"; import { createTemplateWorker } from "./modules/createTemplateWorker.js"; // import type { PingService } from "@libp2p/ping"; @@ -25,9 +26,13 @@ import { PingService } from "@libp2p/ping"; import { EffectProtocolMessage, + type RequestToWorkResponse, type Payment, type Task, + type WorkerSyncResponse, } from "@effectai/protobufs"; +import { applyWorkerSyncResponse } from "./sync/applyWorkerSyncResponse.js"; +import { runConnectFlow } from "./sync/runConnectFlow.js"; export interface WorkerEvents { "task:created": CustomEvent; @@ -70,10 +75,12 @@ export const createWorker = async ({ datastore, privateKey, autoExpire = true, + autoSyncBeforeConnect = true, }: { datastore: Datastore; privateKey: Uint8Array | PrivateKey; autoExpire: boolean; + autoSyncBeforeConnect: boolean; }) => { const ed25519PrivateKey: PrivateKey = privateKey instanceof Uint8Array @@ -90,6 +97,7 @@ export const createWorker = async ({ const taskStore = createWorkerTaskStore({ datastore }); const paymentStore = createPaymentStore({ datastore }); const templateStore = createTemplateStore({ datastore }); + const syncStateStore = createWorkerSyncStateStore({ datastore }); // register worker modules const templateWorker = createTemplateWorker({ entity, templateStore }); @@ -158,6 +166,43 @@ export const createWorker = async ({ return response; }; + const syncWithManager = async ( + manager: Multiaddr, + { + scopes, + cursor, + limit, + }: { + scopes?: string[]; + cursor?: bigint; + limit?: number; + } = {}, + ) => { + const [response, error] = await entity.sendMessage(manager, { + workerSyncRequest: { + timestamp: Math.floor(Date.now() / 1000), + workerId: entity.getPeerId().toString(), + scopes: scopes ?? [], + cursor, + limit, + }, + }); + + if (error || !response) { + throw new Error(`Failed to sync with manager: ${error}`); + } + + const sync: WorkerSyncResponse = response; + await applyWorkerSyncResponse({ + sync, + taskStore, + paymentStore, + syncStateStore, + }); + + return sync; + }; + //connect to an identified manager const connect = async ( multiaddress: Multiaddr, @@ -166,20 +211,36 @@ export const createWorker = async ({ nonce, capabilities, accessCode, + skipSync, }: { recipient: string; nonce: bigint; capabilities: string[]; accessCode?: string; + skipSync?: boolean; }, ) => { - const [response, error] = await entity.sendMessage(multiaddress, { - requestToWork: { - timestamp: Math.floor(Date.now() / 1000), - recipient, - nonce, - capabilities: capabilities.join(","), - accessCode, + const [response, error] = await runConnectFlow({ + autoSyncBeforeConnect, + skipSync, + syncWithManager: async () => + await syncWithManager(multiaddress, { + scopes: ["status", "capabilities", "tasks", "payments"], + }), + requestToWork: async (): Promise< + readonly [RequestToWorkResponse | null, Error | null] + > => { + const [response, error] = await entity.sendMessage(multiaddress, { + requestToWork: { + timestamp: Math.floor(Date.now() / 1000), + recipient, + nonce, + capabilities: capabilities.join(","), + accessCode, + }, + }); + + return [response ?? null, error]; }, }); @@ -209,6 +270,7 @@ export const createWorker = async ({ entity, events, taskStore, + syncStateStore, getTask, getTasks, @@ -227,6 +289,7 @@ export const createWorker = async ({ requestBulkProofs, identify, connect, + syncWithManager, disconnect, start, stop, diff --git a/modules/worker/src/modules/createPaymentWorker.spec.ts b/modules/worker/src/modules/createPaymentWorker.spec.ts index 24110712..cb67e300 100644 --- a/modules/worker/src/modules/createPaymentWorker.spec.ts +++ b/modules/worker/src/modules/createPaymentWorker.spec.ts @@ -1,8 +1,10 @@ -import { beforeEach, describe, it, test, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { createPaymentWorker } from "./createPaymentWorker"; import { createPaymentStore } from "@effectai/protocol-core"; import { ulid } from "ulid"; import { promises } from "node:fs"; +import { createDataStore } from "@effectai/test-utils"; +import type { Payment } from "@effectai/protobufs"; describe("createPaymentWorker", async () => { let paymentWorker: ReturnType; @@ -34,13 +36,25 @@ describe("createPaymentWorker", async () => { it("tests worker payments", async () => { const n = 50; for (let i = 0; i < n; i++) { - console.log(i); + const payment: Payment = { + id: ulid(), + version: 1, + nonce: BigInt(i), + amount: 100n, + recipient: "recipient-test", + paymentAccount: "payment-account-test", + publicKey: "manager-public-key", + signature: { + R8: { + R8_1: "1", + R8_2: "2", + }, + S: "3", + }, + }; await paymentWorker.createPayment({ managerPeerId: "peer-testing-1", - payment: { - id: ulid(), - nonce: BigInt(i), - }, + payment, }); } @@ -49,8 +63,6 @@ describe("createPaymentWorker", async () => { perPage: 10, }); - result.items.map((item) => { - console.log(item.state); - }); + expect(result.items.length).toBeGreaterThan(0); }); }); diff --git a/modules/worker/src/stores/workerSyncStateStore.ts b/modules/worker/src/stores/workerSyncStateStore.ts new file mode 100644 index 00000000..2f23259f --- /dev/null +++ b/modules/worker/src/stores/workerSyncStateStore.ts @@ -0,0 +1,93 @@ +import { + type Datastore, + createEntityStore, + parseWithBigInt, + stringifyWithBigInt, +} from "@effectai/protocol-core"; +import type { WorkerSyncStatus } from "@effectai/protobufs"; + +interface WorkerSyncStateUpdatedEvent { + type: "sync_state_updated"; + timestamp: number; +} + +export interface WorkerSyncStateRecord { + events: WorkerSyncStateUpdatedEvent[]; + state: { + workerId: string; + managerPeerId: string; + cursor: bigint; + serverTime: number; + status?: WorkerSyncStatus; + capabilities: string[]; + updatedAt: number; + }; +} + +export const createWorkerSyncStateStore = ({ + datastore, +}: { + datastore: Datastore; +}) => { + const coreStore = createEntityStore< + WorkerSyncStateUpdatedEvent, + WorkerSyncStateRecord + >({ + datastore, + defaultPrefix: "workerSyncState", + stringify: (record) => stringifyWithBigInt(record), + parse: (data) => parseWithBigInt(data), + }); + + const syncEntityId = "current"; + + const saveFromSync = async ({ + workerId, + managerPeerId, + cursor, + serverTime, + status, + capabilities, + }: Omit) => { + const existing = await coreStore.getSafe({ entityId: syncEntityId }); + const timestamp = Math.floor(Date.now() / 1000); + + const record: WorkerSyncStateRecord = { + events: [ + ...(existing?.events ?? []), + { + type: "sync_state_updated", + timestamp, + }, + ], + state: { + workerId, + managerPeerId, + cursor, + serverTime, + status, + capabilities, + updatedAt: timestamp, + }, + }; + + await coreStore.put({ + entityId: syncEntityId, + record, + }); + + return record; + }; + + const getCurrent = async () => { + return await coreStore.getSafe({ entityId: syncEntityId }); + }; + + return { + ...coreStore, + saveFromSync, + getCurrent, + }; +}; + +export type WorkerSyncStateStore = ReturnType; diff --git a/modules/worker/src/stores/workerTaskStore.ts b/modules/worker/src/stores/workerTaskStore.ts index 25160920..22f325a2 100644 --- a/modules/worker/src/stores/workerTaskStore.ts +++ b/modules/worker/src/stores/workerTaskStore.ts @@ -88,6 +88,32 @@ export const createWorkerTaskStore = ({ await coreStore.put({ entityId: `${index}/${entityId}`, record }); }; + const upsertFromSync = async ({ + task, + status, + managerPeerId, + }: { + task: Task; + status: string; + managerPeerId?: string; + }) => { + const index = status === "completed" ? "completed" : "active"; + const record: WorkerTaskRecord = { + events: managerPeerId + ? [ + { + timestamp: Math.floor(Date.now() / 1000), + type: "create", + managerPeer: managerPeerId, + }, + ] + : [], + state: task, + }; + + await saveTask({ entityId: task.id, record, index }); + }; + const create = async ({ task, managerPeerId, @@ -250,6 +276,8 @@ export const createWorkerTaskStore = ({ accept, reject, expire, + saveTask, + upsertFromSync, }; }; export type WorkerTaskStore = ReturnType; diff --git a/modules/worker/src/sync/applyWorkerSyncResponse.ts b/modules/worker/src/sync/applyWorkerSyncResponse.ts new file mode 100644 index 00000000..7ca8082a --- /dev/null +++ b/modules/worker/src/sync/applyWorkerSyncResponse.ts @@ -0,0 +1,45 @@ +import type { Payment, WorkerSyncResponse } from "@effectai/protobufs"; +import type { WorkerTaskStore } from "../stores/workerTaskStore.js"; +import type { WorkerSyncStateStore } from "../stores/workerSyncStateStore.js"; + +interface PaymentStoreLike { + create: (args: { peerId: string; payment: Payment }) => Promise; +} + +export const applyWorkerSyncResponse = async ({ + sync, + taskStore, + paymentStore, + syncStateStore, +}: { + sync: WorkerSyncResponse; + taskStore: Pick; + paymentStore: PaymentStoreLike; + syncStateStore: Pick; +}) => { + for (const task of sync.tasks ?? []) { + if (!task.task) continue; + await taskStore.upsertFromSync({ + task: task.task, + status: task.status, + managerPeerId: sync.managerPeerId, + }); + } + + for (const payment of sync.payments ?? []) { + if (!payment.payment) continue; + await paymentStore.create({ + peerId: sync.managerPeerId, + payment: payment.payment, + }); + } + + await syncStateStore.saveFromSync({ + workerId: sync.workerId, + managerPeerId: sync.managerPeerId, + cursor: sync.cursor, + serverTime: sync.serverTime, + status: sync.status, + capabilities: sync.capabilities ?? [], + }); +}; diff --git a/modules/worker/src/sync/runConnectFlow.ts b/modules/worker/src/sync/runConnectFlow.ts new file mode 100644 index 00000000..473be2fd --- /dev/null +++ b/modules/worker/src/sync/runConnectFlow.ts @@ -0,0 +1,17 @@ +export const runConnectFlow = async ({ + autoSyncBeforeConnect, + skipSync, + syncWithManager, + requestToWork, +}: { + autoSyncBeforeConnect: boolean; + skipSync?: boolean; + syncWithManager: () => Promise; + requestToWork: () => Promise; +}) => { + if (autoSyncBeforeConnect && !skipSync) { + await syncWithManager(); + } + + return await requestToWork(); +}; diff --git a/modules/worker/src/sync/sync.connect.spec.ts b/modules/worker/src/sync/sync.connect.spec.ts new file mode 100644 index 00000000..efcfaca6 --- /dev/null +++ b/modules/worker/src/sync/sync.connect.spec.ts @@ -0,0 +1,51 @@ +import { describe, expect, it, vi } from "vitest"; +import { runConnectFlow } from "./runConnectFlow"; + +describe("sync connect flow", () => { + it("runs sync before requestToWork by default", async () => { + const calls: string[] = []; + + const syncWithManager = vi.fn(async () => { + calls.push("sync"); + }); + + const requestToWork = vi.fn(async () => { + calls.push("connect"); + return [{}, null] as const; + }); + + await runConnectFlow({ + autoSyncBeforeConnect: true, + syncWithManager, + requestToWork, + }); + + expect(calls).toEqual(["sync", "connect"]); + expect(syncWithManager).toHaveBeenCalledTimes(1); + expect(requestToWork).toHaveBeenCalledTimes(1); + }); + + it("skips sync when skipSync is true", async () => { + const calls: string[] = []; + + const syncWithManager = vi.fn(async () => { + calls.push("sync"); + }); + + const requestToWork = vi.fn(async () => { + calls.push("connect"); + return [{}, null] as const; + }); + + await runConnectFlow({ + autoSyncBeforeConnect: true, + skipSync: true, + syncWithManager, + requestToWork, + }); + + expect(calls).toEqual(["connect"]); + expect(syncWithManager).not.toHaveBeenCalled(); + expect(requestToWork).toHaveBeenCalledTimes(1); + }); +}); diff --git a/modules/worker/src/sync/sync.contract.spec.ts b/modules/worker/src/sync/sync.contract.spec.ts new file mode 100644 index 00000000..2c3a090d --- /dev/null +++ b/modules/worker/src/sync/sync.contract.spec.ts @@ -0,0 +1,68 @@ +import { promises } from "node:fs"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createDataStore } from "@effectai/test-utils"; +import { createPaymentStore, type Datastore } from "../../../../core/protocol/src/index"; + +import { createWorkerTaskStore } from "../stores/workerTaskStore"; +import { createWorkerSyncStateStore } from "../stores/workerSyncStateStore"; +import { applyWorkerSyncResponse } from "./applyWorkerSyncResponse"; +import { syncContractScenarios } from "./test/syncScenario"; + +describe("sync contract", () => { + let datastore: Datastore; + + beforeEach(async () => { + await promises.rm("/tmp/test/sync-contract", { + recursive: true, + force: true, + }); + + datastore = await createDataStore("/tmp/test/sync-contract"); + }); + + afterEach(async () => { + await datastore.close(); + }); + + it.each(syncContractScenarios)( + "applies manager snapshot: $name", + async (scenario) => { + const taskStore = createWorkerTaskStore({ datastore }); + const paymentStore = createPaymentStore({ datastore }); + const syncStateStore = createWorkerSyncStateStore({ datastore }); + + for (let i = 0; i < scenario.repeatSync; i += 1) { + await applyWorkerSyncResponse({ + sync: scenario.syncResponse, + taskStore, + paymentStore, + syncStateStore, + }); + } + + for (const taskId of scenario.expectedActiveTaskIds) { + const record = await taskStore.get({ entityId: `active/${taskId}` }); + expect(record).toBeDefined(); + expect(record.state.id).toBe(taskId); + } + + for (const taskId of scenario.expectedCompletedTaskIds) { + const record = await taskStore.get({ entityId: `completed/${taskId}` }); + expect(record).toBeDefined(); + expect(record.state.id).toBe(taskId); + } + + const payments = await paymentStore.all({}); + expect(payments).toHaveLength(scenario.expectedPaymentCount); + + const syncState = await syncStateStore.getCurrent(); + expect(syncState).toBeDefined(); + expect(syncState?.state.capabilities).toEqual( + scenario.syncResponse.capabilities, + ); + expect(syncState?.state.status?.state).toBe( + scenario.syncResponse.status?.state, + ); + }, + ); +}); diff --git a/modules/worker/src/sync/test/syncScenario.ts b/modules/worker/src/sync/test/syncScenario.ts new file mode 100644 index 00000000..d1460260 --- /dev/null +++ b/modules/worker/src/sync/test/syncScenario.ts @@ -0,0 +1,95 @@ +import type { Payment, Task, WorkerSyncResponse } from "@effectai/protobufs"; + +export const WORKER_PEER_ID = "12D3KooWR3aZ9bLgTjsyUNqC8oZp5tf3HRmqb9G5wNpEAKiUjVv5"; +export const MANAGER_PEER_ID = "12D3KooWQ7mA7rM8ar1W5NPHm2m5WjQmLxY6V9QJ4T8w2n3X1Y7z"; +const IDs = { + taskActive: "01ARZ3NDEKTSV4RRFFQ69G5FAV", + taskCompleted: "01ARZ3NDEKTSV4RRFFQ69G5FAW", + payment: "01ARZ3NDEKTSV4RRFFQ69G5FAX", +}; + +export const makeTask = (overrides: Partial = {}): Task => ({ + id: IDs.taskActive, + title: "sync task", + reward: 100n, + timeLimitSeconds: 600, + templateId: "template-1", + templateData: JSON.stringify({ prompt: "hello" }), + capability: "model/gpt5", + ...overrides, +}); + +export const makePayment = (overrides: Partial = {}): Payment => ({ + id: IDs.payment, + version: 1, + amount: 25n, + recipient: "recipient-1", + paymentAccount: "payment-account-1", + nonce: 1n, + publicKey: "manager-public-key", + signature: { + R8: { + R8_1: "1", + R8_2: "2", + }, + S: "3", + }, + ...overrides, +}); + +export interface SyncScenario { + name: string; + syncResponse: WorkerSyncResponse; + repeatSync: number; + expectedActiveTaskIds: string[]; + expectedCompletedTaskIds: string[]; + expectedPaymentCount: number; +} + +const activeTask = makeTask({ id: IDs.taskActive }); +const completedTask = makeTask({ id: IDs.taskCompleted }); +const payment = makePayment({ nonce: 3n }); + +export const syncContractScenarios: SyncScenario[] = [ + { + name: "full_sync_first_connect", + syncResponse: { + serverTime: Math.floor(Date.now() / 1000), + workerId: WORKER_PEER_ID, + cursor: 42n, + managerPeerId: MANAGER_PEER_ID, + status: { + state: "idle", + lastActivity: Math.floor(Date.now() / 1000), + }, + capabilities: ["model/gpt5", "vision/ocr"], + tasks: [ + { + taskId: activeTask.id, + status: "assigned", + lastEventAt: Math.floor(Date.now() / 1000), + task: activeTask, + }, + { + taskId: completedTask.id, + status: "completed", + lastEventAt: Math.floor(Date.now() / 1000), + task: completedTask, + }, + ], + payments: [ + { + paymentId: payment.id, + status: "created", + amount: payment.amount.toString(), + createdAt: Math.floor(Date.now() / 1000), + payment, + }, + ], + }, + repeatSync: 2, + expectedActiveTaskIds: [activeTask.id], + expectedCompletedTaskIds: [completedTask.id], + expectedPaymentCount: 1, + }, +]; diff --git a/packages/library/tests/protocol.spec.ts b/packages/library/tests/protocol.spec.ts index b478eb76..b53d0911 100644 --- a/packages/library/tests/protocol.spec.ts +++ b/packages/library/tests/protocol.spec.ts @@ -31,7 +31,10 @@ import { promises } from "node:fs"; import { Keypair } from "@solana/web3.js"; -describe("Complete Task Lifecycle", () => { +const runIntegrationTests = process.env.RUN_INTEGRATION_TESTS === "1"; +const describeIntegration = runIntegrationTests ? describe : describe.skip; + +describeIntegration("Complete Task Lifecycle", () => { let manager: Awaited>; let worker: Awaited>; const providerPeerId = peerIdFromString( @@ -104,12 +107,20 @@ describe("Complete Task Lifecycle", () => { }); afterEach(async () => { - await manager.stop(); - await worker.stop(); + if (manager) { + await manager.stop(); + } + if (worker) { + await worker.stop(); + } //close the datastores - await managerDatastore.close(); - await workerDatastore.close(); + if (managerDatastore) { + await managerDatastore.close(); + } + if (workerDatastore) { + await workerDatastore.close(); + } }); it( diff --git a/vitest.config.ts b/vitest.config.ts new file mode 100644 index 00000000..1acdfc56 --- /dev/null +++ b/vitest.config.ts @@ -0,0 +1,20 @@ +import { defineConfig } from "vitest/config"; +import { fileURLToPath } from "node:url"; + +const rootDir = fileURLToPath(new URL(".", import.meta.url)); + +export default defineConfig({ + resolve: { + alias: { + "@effectai/test-utils": fileURLToPath( + new URL("./packages/test-utils/src/index.ts", import.meta.url), + ), + "@capabilities": fileURLToPath( + new URL("./apps/worker-app/app/constants/capabilities.ts", import.meta.url), + ), + }, + }, + test: { + root: rootDir, + }, +});