diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 16d2593de..92f7238e6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -136,7 +136,45 @@ jobs: if: steps.check.outputs.affected == 'true' run: pnpm nx run edge-worker:e2e - # ─────────────────────────────────────── 2a. EDGE-WORKER INTEGRATION ────────────────────────────────────── + # ─────────────────────────────────────── 2a. EDGE-WORKER PORTABLE RUNTIME E2E ────────────────────────────────────── + edge-worker-portable-runtime-e2e: + needs: pre_job + if: needs.pre_job.outputs.should_skip != 'true' + runs-on: ubuntu-latest + env: + NX_CLOUD_ACCESS_TOKEN: ${{ secrets.NX_CLOUD_ACCESS_TOKEN }} + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: ./.github/actions/setup + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + atlas-cloud-token: ${{ secrets.ATLAS_CLOUD_TOKEN }} + + - uses: oven-sh/setup-bun@v2 + + - name: Check if edge-worker is affected + id: check + run: | + if pnpm nx show projects --affected --base=origin/main --head=HEAD | grep -q "^edge-worker$"; then + echo "affected=true" >> $GITHUB_OUTPUT + echo "edge-worker is affected, running portable runtime e2e tests" + else + echo "affected=false" >> $GITHUB_OUTPUT + echo "edge-worker not affected, skipping portable runtime e2e tests" + fi + + - name: Pre-start Supabase + if: steps.check.outputs.affected == 'true' + run: ./scripts/ci-prestart-supabase.sh edge-worker + + - name: Run edge-worker portable runtime e2e tests + if: steps.check.outputs.affected == 'true' + run: pnpm nx run edge-worker:e2e:portable-runtimes + + # ─────────────────────────────────────── 2b. EDGE-WORKER INTEGRATION ────────────────────────────────────── edge-worker-integration: needs: pre_job if: needs.pre_job.outputs.should_skip != 'true' @@ -172,7 +210,7 @@ jobs: if: steps.check.outputs.affected == 'true' run: pnpm nx run edge-worker:test:integration - # ─────────────────────────────────────── 2b. EDGE-WORKER BUN SMOKE ────────────────────────────────────── + # ─────────────────────────────────────── 2c. EDGE-WORKER BUN SMOKE ────────────────────────────────────── edge-worker-bun-smoke: needs: pre_job if: needs.pre_job.outputs.should_skip != 'true' @@ -206,7 +244,7 @@ jobs: if: steps.check.outputs.affected == 'true' run: pnpm nx run edge-worker:smoke:bun - # ─────────────────────────────────────── 2c. CLI E2E ────────────────────────────────────── + # ─────────────────────────────────────── 2d. CLI E2E ────────────────────────────────────── cli-e2e: needs: pre_job if: needs.pre_job.outputs.should_skip != 'true' @@ -242,7 +280,7 @@ jobs: if: steps.check.outputs.affected == 'true' run: pnpm nx run cli:e2e - # ─────────────────────────────────────── 2d. CLIENT E2E ────────────────────────────────────── + # ─────────────────────────────────────── 2e. CLIENT E2E ────────────────────────────────────── client-e2e: needs: pre_job if: needs.pre_job.outputs.should_skip != 'true' @@ -278,7 +316,7 @@ jobs: if: steps.check.outputs.affected == 'true' run: pnpm nx run client:e2e - # ─────────────────────────────────────── 2e. CORE PGTAP ────────────────────────────────────── + # ─────────────────────────────────────── 2f. CORE PGTAP ────────────────────────────────────── core-pgtap: needs: pre_job if: needs.pre_job.outputs.should_skip != 'true' @@ -321,6 +359,7 @@ jobs: [ build-and-test, edge-worker-e2e, + edge-worker-portable-runtime-e2e, edge-worker-integration, edge-worker-bun-smoke, cli-e2e, @@ -374,6 +413,7 @@ jobs: [ build-and-test, edge-worker-e2e, + edge-worker-portable-runtime-e2e, edge-worker-integration, edge-worker-bun-smoke, cli-e2e, diff --git a/pkgs/edge-worker/project.json b/pkgs/edge-worker/project.json index 26b9d64dc..d5304d568 100644 --- a/pkgs/edge-worker/project.json +++ b/pkgs/edge-worker/project.json @@ -199,6 +199,21 @@ "parallel": false } }, + "e2e:portable-runtimes": { + "executor": "nx:run-commands", + "dependsOn": ["build", "serve:functions:e2e"], + "local": true, + "cache": false, + "inputs": ["default", "^production"], + "options": { + "cwd": "pkgs/edge-worker", + "commands": [ + "deno run --allow-net=localhost:50321 \"data:application/typescript,console.log(%22Waiting%20for%20functions%20server%20(port%2050321)...%22)%3B%20const%20deadline%20%3D%20Date.now()%20%2B%20120_000%3B%20while%20(Date.now()%20%3C%20deadline)%20%7B%20try%20%7B%20const%20conn%20%3D%20await%20Deno.connect(%7B%20hostname%3A%20%22localhost%22%2C%20port%3A%2050321%20%7D)%3B%20conn.close()%3B%20console.log(%22%20Ready!%22)%3B%20Deno.exit(0)%3B%20%7D%20catch%20%7B%20await%20new%20Promise((resolve)%20%3D%3E%20setTimeout(resolve%2C%20500))%3B%20%7D%20%7D%20console.error(%22TIMEOUT%3A%20Functions%20server%20not%20available%20on%20port%2050321%20after%20120s%22)%3B%20Deno.exit(1)%3B\"", + "deno test --config deno.test.json --allow-all --env=supabase/functions/.env tests/e2e-portable-runtimes/" + ], + "parallel": false + } + }, "test": { "inputs": ["default", "^production"], "dependsOn": ["test:types", "test:unit"] diff --git a/pkgs/edge-worker/src/core/Queries.ts b/pkgs/edge-worker/src/core/Queries.ts index 58e0dc241..f0463aa17 100644 --- a/pkgs/edge-worker/src/core/Queries.ts +++ b/pkgs/edge-worker/src/core/Queries.ts @@ -49,7 +49,7 @@ export class Queries { RETURNING (w.deprecated_at IS NOT NULL) AS is_deprecated; `; - return result || { is_deprecated: false }; + return result || { is_deprecated: true }; } async ensureFlowCompiled( diff --git a/pkgs/edge-worker/src/core/Worker.ts b/pkgs/edge-worker/src/core/Worker.ts index 40fb05626..bbfec5dd4 100644 --- a/pkgs/edge-worker/src/core/Worker.ts +++ b/pkgs/edge-worker/src/core/Worker.ts @@ -116,6 +116,26 @@ export class Worker { return this.lifecycle.edgeFunctionName; } + get isCreated() { + return this.lifecycle.isCreated; + } + + get isStarting() { + return this.lifecycle.isStarting; + } + + get isRunning() { + return this.lifecycle.isRunning; + } + + get isDeprecated() { + return this.lifecycle.isDeprecated; + } + + get isStopped() { + return this.lifecycle.isStopped; + } + /** * Log deprecation message only once (prevents duplicate logs when deprecation * is detected in heartbeat and then stop() is called) diff --git a/pkgs/edge-worker/src/core/WorkerLifecycle.ts b/pkgs/edge-worker/src/core/WorkerLifecycle.ts index 0eaa3cc92..dbc3e693a 100644 --- a/pkgs/edge-worker/src/core/WorkerLifecycle.ts +++ b/pkgs/edge-worker/src/core/WorkerLifecycle.ts @@ -99,6 +99,10 @@ export class WorkerLifecycle implements ILifecycle { return this.workerState.isCreated; } + get isStarting() { + return this.workerState.isStarting; + } + get isRunning() { return this.workerState.isRunning; } diff --git a/pkgs/edge-worker/src/core/types.ts b/pkgs/edge-worker/src/core/types.ts index dcd825fdf..be30b8058 100644 --- a/pkgs/edge-worker/src/core/types.ts +++ b/pkgs/edge-worker/src/core/types.ts @@ -27,7 +27,9 @@ export interface ILifecycle { get edgeFunctionName(): string | undefined; get queueName(): string; get isCreated(): boolean; + get isStarting(): boolean; get isRunning(): boolean; + get isDeprecated(): boolean; get isStopping(): boolean; get isStopped(): boolean; diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 664578aad..68f6e4084 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -166,6 +166,10 @@ export class FlowWorkerLifecycle implements ILifecycle { return this.workerState.isCreated; } + get isStarting() { + return this.workerState.isStarting; + } + get isRunning() { return this.workerState.isRunning; } diff --git a/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts b/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts index 22386419d..ff9aafc1b 100644 --- a/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts +++ b/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts @@ -42,6 +42,8 @@ interface SupabaseEnv extends Record { export class SupabasePlatformAdapter implements PlatformAdapter { private edgeFunctionName: string | null = null; private worker: Worker | null = null; + private workerId: string | null = null; + private workerReplacementPromise: Promise | null = null; private logger: Logger; private abortController: AbortController; private _platformResources: SupabaseResources; @@ -183,8 +185,9 @@ export class SupabasePlatformAdapter implements PlatformAdapter { + this.deps.serve(async (req: Request) => { // Validate auth header in production (skipped in local mode) const authResult = validateServiceRoleAuth(req, this.validatedEnv); if (!authResult.valid) { @@ -219,28 +222,11 @@ export class SupabasePlatformAdapter implements PlatformAdapter { + while (this.needsWorkerReplacement()) { + if (this.workerReplacementPromise) { + await this.workerReplacementPromise; + continue; + } + + this.workerReplacementPromise = this.replaceWorker(req, createWorkerFn); + try { + await this.workerReplacementPromise; + return true; + } finally { + this.workerReplacementPromise = null; + } + } + + return false; + } + + private async replaceWorker(req: Request, createWorkerFn: CreateWorkerFn): Promise { + if (this.worker) { + await this.worker.stop(); + this.abortController = new AbortController(); + } + + this.edgeFunctionName = this.extractFunctionName(req); + + const workerId = this.workerId === null + ? this.validatedEnv.SB_EXECUTION_ID + : globalThis.crypto.randomUUID(); + this.workerId = workerId; + + this.loggingFactory.setWorkerId(workerId); + this.loggingFactory.setWorkerName(this.edgeFunctionName); + + // Create the worker using the factory function and the logger + this.worker = createWorkerFn(this.loggingFactory.createLogger); + this.worker.startOnlyOnce({ + edgeFunctionName: this.edgeFunctionName, + workerId, + startMode: 'http', + }); + } + /** * Assertion function that validates environment has all required Supabase fields * @throws Error if any required environment variable is missing diff --git a/pkgs/edge-worker/supabase/config.toml b/pkgs/edge-worker/supabase/config.toml index 7e5a77e42..a0ad4ec34 100644 --- a/pkgs/edge-worker/supabase/config.toml +++ b/pkgs/edge-worker/supabase/config.toml @@ -87,6 +87,18 @@ verify_jwt = false import_map = "./functions/deno.json" entrypoint = "./functions/conn_custom_sql/index.ts" +[functions.conn_max_pg_default] +enabled = true +verify_jwt = false +import_map = "./functions/deno.json" +entrypoint = "./functions/conn_max_pg_default/index.ts" + +[functions.conn_max_pg_override] +enabled = true +verify_jwt = false +import_map = "./functions/deno.json" +entrypoint = "./functions/conn_max_pg_override/index.ts" + [functions.stopped_at_test] enabled = true verify_jwt = false diff --git a/pkgs/edge-worker/supabase/functions/_shared/portable_examples.js b/pkgs/edge-worker/supabase/functions/_shared/portable_examples.js new file mode 100644 index 000000000..aff2015c1 --- /dev/null +++ b/pkgs/edge-worker/supabase/functions/_shared/portable_examples.js @@ -0,0 +1,117 @@ +const DEFAULT_DB_URL = 'postgresql://postgres:postgres@127.0.0.1:50322/postgres'; + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function maxPgConnectionsHandler(queueName, expectedMax) { + return async (_payload, { sql }) => { + const actualMax = sql.options.max; + const status = actualMax === expectedMax ? 'success' : 'error'; + const errorMessage = actualMax === expectedMax + ? null + : `Expected max=${expectedMax}, got ${actualMax}`; + + await sql` + INSERT INTO e2e_test_results (queue_name, status, actual, error_message) + VALUES (${queueName}, ${status}, ${sql.json({ max: actualMax })}, ${errorMessage}) + `; + + return { max: actualMax }; + }; +} + +function envVarHandler(expectedConnectionString) { + return async (_payload, { sql, workerConfig }) => { + const connectionString = workerConfig.connectionString; + + if (connectionString !== expectedConnectionString) { + throw new Error( + `Unexpected connectionString: ${connectionString}, expected: ${expectedConnectionString}` + ); + } + + await sql`SELECT nextval('conn_test_seq')`; + return { mode: 'env_var', connectionString }; + }; +} + +export const portableExamples = { + max_concurrency: { + name: 'max_concurrency', + queueName: 'max_concurrency', + sequenceName: 'test_seq', + kind: 'sequence', + messagesToSend: 200, + expectedIncrement: 200, + options: { + queueName: 'max_concurrency', + maxConcurrent: 10, + maxPgConnections: 4, + }, + handler: async (_payload, { sql }) => { + await sleep(50); + await sql`SELECT nextval('test_seq')`; + return { processed: true }; + }, + }, + conn_max_pg_default: { + name: 'conn_max_pg_default', + queueName: 'conn_max_pg_default', + kind: 'result', + expectedMax: 4, + messagesToSend: 1, + options: { + queueName: 'conn_max_pg_default', + }, + handler: maxPgConnectionsHandler('conn_max_pg_default', 4), + }, + conn_max_pg_override: { + name: 'conn_max_pg_override', + queueName: 'conn_max_pg_override', + kind: 'result', + expectedMax: 7, + messagesToSend: 1, + options: { + queueName: 'conn_max_pg_override', + maxPgConnections: 7, + }, + handler: maxPgConnectionsHandler('conn_max_pg_override', 7), + }, + conn_env_var: { + name: 'conn_env_var', + queueName: 'conn_env_var', + sequenceName: 'conn_test_seq', + kind: 'sequence', + messagesToSend: 1, + expectedIncrement: 1, + options: { + queueName: 'conn_env_var', + retry: { strategy: 'fixed', limit: 0, baseDelay: 1 }, + }, + createHandler: ({ expectedConnectionString = DEFAULT_DB_URL }) => + envVarHandler(expectedConnectionString), + }, +}; + +export function getPortableExample(name) { + const example = portableExamples[name]; + + if (!example) { + throw new Error(`Unknown portable example: ${name}`); + } + + return example; +} + +export function startPortableExample(EdgeWorker, name, runtimeOptions = {}) { + const example = getPortableExample(name); + const handler = example.createHandler + ? example.createHandler(runtimeOptions) + : example.handler; + + return EdgeWorker.start(handler, { + ...example.options, + ...runtimeOptions.workerOptions, + }); +} diff --git a/pkgs/edge-worker/supabase/functions/conn_env_var/index.ts b/pkgs/edge-worker/supabase/functions/conn_env_var/index.ts index 8d9927f9b..bdcb3d46d 100644 --- a/pkgs/edge-worker/supabase/functions/conn_env_var/index.ts +++ b/pkgs/edge-worker/supabase/functions/conn_env_var/index.ts @@ -1,20 +1,15 @@ import { EdgeWorker } from '@pgflow/edge-worker'; +import { startPortableExample } from '../_shared/portable_examples.js'; -// Override Deno.env to return a specific EDGE_WORKER_DB_URL -// This tests that the env var is actually used (not falling back to docker pooler) const ENV_KEY = 'EDGE_WORKER_DB_URL'; -// Use the direct Supabase DB URL (not the docker-internal pooler URL) -// Inside Docker, we use the internal hostname 'db' which resolves to the postgres container const EXPECTED_URL = 'postgresql://postgres:postgres@db:5432/postgres'; -// This is the docker pooler URL that would be used as fallback -const DOCKER_POOLER_URL = - 'postgresql://postgres.pooler-dev:postgres@pooler:6543/postgres'; const originalGet = Deno.env.get.bind(Deno.env); Deno.env.get = function (key: string) { if (key === ENV_KEY) { return EXPECTED_URL; } + return originalGet(key); }; @@ -26,31 +21,6 @@ Deno.env.toObject = function () { }; }; -EdgeWorker.start( - async (_payload, { sql, workerConfig }) => { - const connectionString = workerConfig.connectionString; - - // If connectionString is undefined, the docker pooler fallback was used - // If it equals DOCKER_POOLER_URL, the env var was ignored - if (!connectionString || connectionString === DOCKER_POOLER_URL) { - throw new Error( - `EDGE_WORKER_DB_URL was not used! Got connectionString: ${connectionString}` - ); - } - - // Verify the expected URL is being used - if (connectionString !== EXPECTED_URL) { - throw new Error( - `Unexpected connectionString: ${connectionString}, expected: ${EXPECTED_URL}` - ); - } - - // Success - env var is being used correctly - await sql`SELECT nextval('conn_test_seq')`; - return { mode: 'env_var', connectionString }; - }, - { - queueName: 'conn_env_var', - retry: { strategy: 'fixed', limit: 0, baseDelay: 1 }, - } -); +startPortableExample(EdgeWorker, 'conn_env_var', { + expectedConnectionString: EXPECTED_URL, +}); diff --git a/pkgs/edge-worker/supabase/functions/conn_max_pg_default/index.ts b/pkgs/edge-worker/supabase/functions/conn_max_pg_default/index.ts index 71c9abad5..2cbeef53a 100644 --- a/pkgs/edge-worker/supabase/functions/conn_max_pg_default/index.ts +++ b/pkgs/edge-worker/supabase/functions/conn_max_pg_default/index.ts @@ -1,20 +1,4 @@ import { EdgeWorker } from '@pgflow/edge-worker'; +import { startPortableExample } from '../_shared/portable_examples.js'; -// Tests that default maxPgConnections is 4 (not the old hardcoded 10) -// NO maxPgConnections specified - should default to 4 -EdgeWorker.start( - async (_payload, { sql }) => { - const queueName = 'conn_max_pg_default'; - const actualMax = sql.options.max; - const status = actualMax === 4 ? 'success' : 'error'; - const errorMessage = actualMax === 4 ? null : `Expected max=4, got ${actualMax}`; - - await sql` - INSERT INTO e2e_test_results (queue_name, status, actual, error_message) - VALUES (${queueName}, ${status}, ${sql.json({ max: actualMax })}, ${errorMessage}) - `; - - return { max: actualMax }; - }, - { queueName: 'conn_max_pg_default' } -); +startPortableExample(EdgeWorker, 'conn_max_pg_default'); diff --git a/pkgs/edge-worker/supabase/functions/conn_max_pg_override/index.ts b/pkgs/edge-worker/supabase/functions/conn_max_pg_override/index.ts index 8aa788583..364ac64f9 100644 --- a/pkgs/edge-worker/supabase/functions/conn_max_pg_override/index.ts +++ b/pkgs/edge-worker/supabase/functions/conn_max_pg_override/index.ts @@ -1,22 +1,4 @@ import { EdgeWorker } from '@pgflow/edge-worker'; +import { startPortableExample } from '../_shared/portable_examples.js'; -// Tests that explicit maxPgConnections: 7 is respected -EdgeWorker.start( - async (_payload, { sql }) => { - const queueName = 'conn_max_pg_override'; - const actualMax = sql.options.max; - const status = actualMax === 7 ? 'success' : 'error'; - const errorMessage = actualMax === 7 ? null : `Expected max=7, got ${actualMax}`; - - await sql` - INSERT INTO e2e_test_results (queue_name, status, actual, error_message) - VALUES (${queueName}, ${status}, ${sql.json({ max: actualMax })}, ${errorMessage}) - `; - - return { max: actualMax }; - }, - { - queueName: 'conn_max_pg_override', - maxPgConnections: 7, - } -); +startPortableExample(EdgeWorker, 'conn_max_pg_override'); diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts index 5ee6fb202..adfef1f4c 100644 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts +++ b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts @@ -1,21 +1,4 @@ import { EdgeWorker } from '@pgflow/edge-worker'; -import { sleep, sql } from '../utils.ts'; +import { startPortableExample } from '../_shared/portable_examples.js'; -async function incrementSeq(payload: { debug?: boolean }) { - await sleep(50); - - if (payload.debug) { - console.log( - '[max_concurrency] last_val =', - await sql`SELECT nextval('test_seq')` - ); - } else { - await sql`SELECT nextval('test_seq')`; - } -} - -EdgeWorker.start(incrementSeq, { - queueName: 'max_concurrency', - maxConcurrent: 10, - maxPgConnections: 4, -}); +startPortableExample(EdgeWorker, 'max_concurrency'); diff --git a/pkgs/edge-worker/tests/e2e-portable-runtimes/portable-process-worker.mjs b/pkgs/edge-worker/tests/e2e-portable-runtimes/portable-process-worker.mjs new file mode 100644 index 000000000..c2c0bcf8d --- /dev/null +++ b/pkgs/edge-worker/tests/e2e-portable-runtimes/portable-process-worker.mjs @@ -0,0 +1,23 @@ +import { EdgeWorker } from '../../dist/index.js'; +import { getPortableExample, startPortableExample } from '../../supabase/functions/_shared/portable_examples.js'; +import process from 'node:process'; + +const exampleName = process.env.PORTABLE_EXAMPLE_NAME; + +if (!exampleName) { + throw new Error('PORTABLE_EXAMPLE_NAME is required'); +} + +const example = getPortableExample(exampleName); + +if (!process.env.WORKER_NAME) { + process.env.WORKER_NAME = example.queueName; +} + +const expectedConnectionString = process.env.PORTABLE_EXPECTED_CONNECTION_STRING; + +await startPortableExample(EdgeWorker, exampleName, { + expectedConnectionString, +}); + +console.log(`portable worker started: ${exampleName}`); diff --git a/pkgs/edge-worker/tests/e2e-portable-runtimes/portable-runtimes.test.ts b/pkgs/edge-worker/tests/e2e-portable-runtimes/portable-runtimes.test.ts new file mode 100644 index 000000000..5b826a94c --- /dev/null +++ b/pkgs/edge-worker/tests/e2e-portable-runtimes/portable-runtimes.test.ts @@ -0,0 +1,360 @@ +import { assertEquals, assertExists } from 'jsr:@std/assert'; +import type postgres from 'postgres'; +import { withSql } from '../sql.ts'; +import { e2eConfig } from '../config.ts'; +import { + sendBatch, + seqLastValue, + startWorker, + waitFor, +} from '../e2e/_helpers.ts'; +import { getPortableExample } from '../../supabase/functions/_shared/portable_examples.js'; + +const SERVICE_ROLE_KEY = 'test-service-role-key'; +const PROCESS_FIXTURE = 'tests/e2e-portable-runtimes/portable-process-worker.mjs'; + +const PROCESS_RUNTIMES = [ + { name: 'node', command: 'node' }, + { name: 'bun', command: 'bun' }, +] as const; + +const EXAMPLES = [ + 'max_concurrency', + 'conn_max_pg_default', + 'conn_max_pg_override', + 'conn_env_var', +] as const; + +type ExampleName = typeof EXAMPLES[number]; + +type PortableExample = + | { + name: ExampleName; + queueName: string; + sequenceName: string; + kind: 'sequence'; + messagesToSend: number; + expectedIncrement: number; + } + | { + name: ExampleName; + queueName: string; + kind: 'result'; + expectedMax: number; + messagesToSend: number; + }; + +function portableExample(exampleName: ExampleName): PortableExample { + return getPortableExample(exampleName) as PortableExample; +} + +async function resetQueue(sql: postgres.Sql, queueName: string) { + await sql` + SELECT * FROM pgmq.drop_queue(${queueName}) + WHERE EXISTS ( + SELECT 1 FROM pgmq.list_queues() WHERE queue_name = ${queueName} + ) + `; + await sql`SELECT pgmq.create(${queueName})`; +} + +async function ensureEmptyQueue(sql: postgres.Sql, queueName: string) { + const queues = await sql<{ queue_name: string }[]>` + SELECT queue_name FROM pgmq.list_queues() WHERE queue_name = ${queueName} + `; + + if (queues.length === 0) { + await sql`SELECT pgmq.create(${queueName})`; + return; + } + + await sql`SELECT pgmq.purge_queue(${queueName})`; +} + +async function resetExample( + sql: postgres.Sql, + exampleName: ExampleName, + options: { preserveWorkerMetadata?: boolean } = {} +) { + const example = portableExample(exampleName); + + if (options.preserveWorkerMetadata) { + await ensureEmptyQueue(sql, example.queueName); + } else { + await resetQueue(sql, example.queueName); + } + + if (example.kind === 'sequence') { + await sql`CREATE SEQUENCE IF NOT EXISTS ${sql(example.sequenceName)}`; + await sql`ALTER SEQUENCE ${sql(example.sequenceName)} RESTART WITH 1`; + } + + await sql` + CREATE TABLE IF NOT EXISTS e2e_test_results ( + id SERIAL PRIMARY KEY, + queue_name TEXT NOT NULL, + status TEXT NOT NULL, + actual JSONB, + error_message TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() + ) + `; + await sql`DELETE FROM e2e_test_results WHERE queue_name = ${example.queueName}`; + + if (!options.preserveWorkerMetadata) { + await sql` + DELETE FROM pgflow.workers + WHERE function_name = ${example.queueName} + AND ( + stopped_at IS NOT NULL + OR last_heartbeat_at < NOW() - INTERVAL '6 seconds' + ) + `; + } +} + +async function waitForWorkerFunctionMode( + sql: postgres.Sql, + queueName: string, + startMode: 'http' | 'process' +) { + const row = await waitFor( + async () => { + const rows = await sql<{ start_mode: string }[]>` + SELECT start_mode + FROM pgflow.worker_functions + WHERE function_name = ${queueName} + `; + + const row = rows[0]; + return row?.start_mode === startMode ? row : false; + }, + { description: `${queueName} worker_functions ${startMode} row` } + ); + + assertEquals(row.start_mode, startMode); +} + +async function waitForResult(sql: postgres.Sql, queueName: string, expectedMax: number) { + const row = await waitFor( + async () => { + const rows = await sql<{ + status: string; + actual: { max?: number } | null; + error_message: string | null; + }[]>` + SELECT status, actual, error_message + FROM e2e_test_results + WHERE queue_name = ${queueName} + ORDER BY id DESC + LIMIT 1 + `; + + return rows[0] ?? false; + }, + { description: `${queueName} result row` } + ); + + assertEquals(row.status, 'success', row.error_message ?? undefined); + assertEquals(row.actual?.max, expectedMax); +} + +async function waitForExampleAssertion( + sql: postgres.Sql, + exampleName: ExampleName, + sequenceStartValue?: number +) { + const example = portableExample(exampleName); + + if (example.kind === 'result') { + await waitForResult(sql, example.queueName, example.expectedMax); + return; + } + + assertExists(sequenceStartValue); + + await waitFor( + async () => { + const currentValue = await seqLastValue(example.sequenceName); + return currentValue >= sequenceStartValue + example.expectedIncrement - 1 + ? currentValue + : false; + }, + { + description: `${example.sequenceName} to increment by ${example.expectedIncrement}`, + timeoutMs: example.name === 'max_concurrency' ? 60000 : 10000, + pollIntervalMs: 500, + } + ); +} + +async function waitForFreshWorker(sql: postgres.Sql, queueName: string) { + return await waitFor( + async () => { + const workers = await sql<{ worker_id: string }[]>` + SELECT worker_id + FROM pgflow.workers + WHERE function_name = ${queueName} + AND stopped_at IS NULL + AND last_heartbeat_at >= NOW() - INTERVAL '6 seconds' + ORDER BY started_at DESC + LIMIT 1 + `; + + return workers[0] ?? false; + }, + { description: `${queueName} active worker` } + ); +} + +async function currentWorkerFunctionMode(sql: postgres.Sql, queueName: string) { + const rows = await sql<{ start_mode: string }[]>` + SELECT start_mode + FROM pgflow.worker_functions + WHERE function_name = ${queueName} + `; + + return rows[0]?.start_mode; +} + +async function waitForProcessExit( + child: Deno.ChildProcess, + statusPromise: Promise, + timeoutMs = 15000 +) { + const timeout = new Promise((resolve) => { + setTimeout(() => resolve(false), timeoutMs); + }); + + const status = await Promise.race([statusPromise, timeout]); + + if (status) return status; + + child.kill('SIGKILL'); + return await statusPromise; +} + +function processEnv(exampleName: ExampleName) { + const example = portableExample(exampleName); + const env: Record = { + ...Deno.env.toObject(), + PORTABLE_EXAMPLE_NAME: exampleName, + WORKER_NAME: example.queueName, + SUPABASE_URL: e2eConfig.apiUrl, + SUPABASE_SERVICE_ROLE_KEY: SERVICE_ROLE_KEY, + EDGE_WORKER_LOG_LEVEL: 'warn', + }; + + if (exampleName === 'conn_env_var') { + env.EDGE_WORKER_DB_URL = e2eConfig.dbUrl; + env.PORTABLE_EXPECTED_CONNECTION_STRING = e2eConfig.dbUrl; + delete env.DATABASE_URL; + } else { + env.DATABASE_URL = e2eConfig.dbUrl; + delete env.EDGE_WORKER_DB_URL; + } + + return env; +} + +async function runProcessExample( + sql: postgres.Sql, + runtime: typeof PROCESS_RUNTIMES[number], + exampleName: ExampleName +) { + const example = portableExample(exampleName); + const previousStartMode = await currentWorkerFunctionMode(sql, example.queueName); + await resetExample(sql, exampleName); + + const child = new Deno.Command(runtime.command, { + args: [PROCESS_FIXTURE], + cwd: new URL('../..', import.meta.url).pathname, + env: processEnv(exampleName), + stdout: 'null', + stderr: 'null', + }).spawn(); + const statusPromise = child.status; + let childExited = false; + + try { + await waitForWorkerFunctionMode(sql, example.queueName, 'process'); + const worker = await waitForFreshWorker(sql, example.queueName); + const sequenceStartValue = example.kind === 'sequence' + ? await seqLastValue(example.sequenceName) + : undefined; + await sendBatch(example.messagesToSend, example.queueName); + await waitForExampleAssertion(sql, exampleName, sequenceStartValue); + + child.kill('SIGTERM'); + const status = await waitForProcessExit(child, statusPromise); + childExited = true; + assertEquals(status.code, 0); + + const stoppedWorker = await waitFor( + async () => { + const rows = await sql<{ stopped_at: string | null }[]>` + SELECT stopped_at + FROM pgflow.workers + WHERE worker_id = ${worker.worker_id} + `; + + return rows[0]?.stopped_at ? rows[0] : false; + }, + { description: `${runtime.name} ${example.queueName} stopped_at` } + ); + + assertExists(stoppedWorker.stopped_at); + } finally { + if (!childExited) { + try { + child.kill('SIGTERM'); + } catch { + // Child may already have exited after the assertion path. + } + + await waitForProcessExit(child, statusPromise); + } + + if (previousStartMode === 'http') { + await sql` + UPDATE pgflow.worker_functions + SET start_mode = 'http' + WHERE function_name = ${example.queueName} + `; + } + } +} + +async function runSupabaseExample(sql: postgres.Sql, exampleName: ExampleName) { + const example = portableExample(exampleName); + await resetExample(sql, exampleName, { preserveWorkerMetadata: true }); + await startWorker(example.queueName); + await waitForWorkerFunctionMode(sql, example.queueName, 'http'); + const sequenceStartValue = example.kind === 'sequence' + ? await seqLastValue(example.sequenceName) + : undefined; + await sendBatch(example.messagesToSend, example.queueName); + await waitForExampleAssertion(sql, exampleName, sequenceStartValue); +} + +for (const exampleName of EXAMPLES) { + Deno.test( + { + name: `portable runtimes - ${exampleName} works in supabase`, + sanitizeOps: false, + sanitizeResources: false, + }, + () => withSql((sql) => runSupabaseExample(sql, exampleName)) + ); + + for (const runtime of PROCESS_RUNTIMES) { + Deno.test( + { + name: `portable runtimes - ${exampleName} works in ${runtime.name}`, + sanitizeOps: false, + sanitizeResources: false, + }, + () => withSql((sql) => runProcessExample(sql, runtime, exampleName)) + ); + } +} diff --git a/pkgs/edge-worker/tests/e2e/_helpers.ts b/pkgs/edge-worker/tests/e2e/_helpers.ts index 318550af5..2da0fef76 100644 --- a/pkgs/edge-worker/tests/e2e/_helpers.ts +++ b/pkgs/edge-worker/tests/e2e/_helpers.ts @@ -143,7 +143,10 @@ export async function waitForSeqToIncrementBy( } } -export async function waitForActiveWorker(functionName: string) { +export async function waitForActiveWorker( + functionName: string, + options: WaitForOptions = {} +) { return await waitFor( async () => { const sql = createSql(); @@ -178,6 +181,7 @@ export async function waitForActiveWorker(functionName: string) { }, { pollIntervalMs: 300, + ...options, description: `active worker for '${functionName}'`, } ); @@ -217,9 +221,16 @@ export async function startWorker(workerName: string) { ); if (response.ok) { - await waitForActiveWorker(workerName); - log('worker spawned!'); - return; + try { + await waitForActiveWorker(workerName, { timeoutMs: 5000 }); + log('worker spawned!'); + return; + } catch (err) { + lastError = err as Error; + log(`Retry ${i + 1}/${maxRetries}: worker not active yet`); + await delay(retryDelayMs); + continue; + } } lastError = new Error( @@ -317,16 +328,18 @@ export async function startWorkerWithAuth( JSON.stringify(body).substring(0, 200) ); - // If we expect a specific status, return immediately (don't retry on 401/500) - if (expectStatus !== undefined) { - return { status: response.status, body }; - } - // For success case, wait for worker and return if (response.ok) { - await waitForActiveWorker(workerName); - log('worker spawned!'); - return { status: response.status, body }; + try { + await waitForActiveWorker(workerName, { timeoutMs: 5000 }); + log('worker spawned!'); + return { status: response.status, body }; + } catch (err) { + lastError = err as Error; + log(`Retry ${i + 1}/${maxRetries}: worker not active yet`); + await delay(retryDelayMs); + continue; + } } // Retry on 404 (function not ready yet) or 502/503 (server starting) @@ -336,6 +349,11 @@ export async function startWorkerWithAuth( continue; } + // If we expect a specific status, return non-startup responses immediately. + if (expectStatus !== undefined) { + return { status: response.status, body }; + } + // Return non-retryable error responses return { status: response.status, body }; } catch (err) { diff --git a/pkgs/edge-worker/tests/e2e/authorization.test.ts b/pkgs/edge-worker/tests/e2e/authorization.test.ts index 257edfbb1..9f3b3e969 100644 --- a/pkgs/edge-worker/tests/e2e/authorization.test.ts +++ b/pkgs/edge-worker/tests/e2e/authorization.test.ts @@ -16,17 +16,16 @@ const SEQ_NAME = 'auth_test_seq'; async function setupTest(sql: postgres.Sql) { await sql`CREATE SEQUENCE IF NOT EXISTS ${sql(SEQ_NAME)}`; await sql`ALTER SEQUENCE ${sql(SEQ_NAME)} RESTART WITH 1`; - await sql` - SELECT * FROM pgmq.drop_queue(${QUEUE_NAME}) - WHERE EXISTS ( - SELECT 1 FROM pgmq.list_queues() WHERE queue_name = ${QUEUE_NAME} - ) - `; - await sql`SELECT pgmq.create(${QUEUE_NAME})`; - await sql` - DELETE FROM pgflow.workers - WHERE last_heartbeat_at < NOW() - INTERVAL '6 seconds' - `; + + const queues = await sql`SELECT queue_name FROM pgmq.list_queues() WHERE queue_name = ${QUEUE_NAME}`; + if (queues.length === 0) { + await sql`SELECT pgmq.create(${QUEUE_NAME})`; + } else { + await sql`SELECT pgmq.purge_queue(${QUEUE_NAME})`; + } + + // Preserve worker metadata: local edge functions are long-lived, and removing + // rows can leave an in-memory worker polling without DB visibility. } // ============================================================ diff --git a/pkgs/edge-worker/tests/e2e/stopped_at.test.ts b/pkgs/edge-worker/tests/e2e/stopped_at.test.ts index 38451a399..2894f91e2 100644 --- a/pkgs/edge-worker/tests/e2e/stopped_at.test.ts +++ b/pkgs/edge-worker/tests/e2e/stopped_at.test.ts @@ -1,7 +1,6 @@ import { withSql } from '../sql.ts'; import { assertEquals, assertExists } from 'jsr:@std/assert'; import { - fetchWorkers, log, sendBatch, seqLastValue, @@ -41,10 +40,14 @@ Deno.test( await sql`SELECT pgmq.purge_queue(${WORKER_NAME})`; } - // Clean up old worker records (but workers may still be running) + // Clean up old worker records without orphaning live workers. await sql` DELETE FROM pgflow.workers WHERE function_name = ${WORKER_NAME} + AND ( + stopped_at IS NOT NULL + OR last_heartbeat_at < NOW() - INTERVAL '6 seconds' + ) `; // Start monitoring for debugging @@ -53,21 +56,32 @@ Deno.test( try { // Start the worker await startWorker(WORKER_NAME); - - // Get the initial worker record - const initialWorkers = await fetchWorkers(WORKER_NAME); - assertEquals(initialWorkers.length, 1, 'Should have exactly 1 worker'); - const workerId = initialWorkers[0].worker_id; + const startedWaitingAt = new Date(); + + // Get the active worker records. Long-lived local edge isolates from + // previous runs may still be polling this function's queue. + const activeWorkers = await sql` + SELECT * + FROM pgflow.workers + WHERE function_name = ${WORKER_NAME} + AND stopped_at IS NULL + AND last_heartbeat_at >= NOW() - INTERVAL '6 seconds' + ORDER BY started_at DESC + `; + const [initialWorker] = activeWorkers; + assertExists(initialWorker, 'Should have an active worker'); // Verify stopped_at is NULL initially assertEquals( - initialWorkers[0].stopped_at, + initialWorker.stopped_at, null, 'Worker should have NULL stopped_at initially' ); - // Send CPU-intensive tasks that will cause the worker to die - await sendBatch(MESSAGES_TO_SEND, WORKER_NAME); + // Send enough CPU-intensive tasks that at least one active isolate + // should hit the CPU clock limit even when old isolates share the work. + const messagesToSend = MESSAGES_TO_SEND * activeWorkers.length; + await sendBatch(messagesToSend, WORKER_NAME); // Wait for at least some messages to be processed (worker was running) await waitForSeqToIncrementBy(MIN_MESSAGES_PROCESSED, { @@ -76,27 +90,30 @@ Deno.test( pollIntervalMs: 300, }); - // Wait for the original worker to have stopped_at set + // Wait for a worker from this test run to have stopped_at set. const stoppedWorker = await waitFor( async () => { const [worker] = await sql` SELECT worker_id, stopped_at, last_heartbeat_at FROM pgflow.workers - WHERE worker_id = ${workerId} + WHERE function_name = ${WORKER_NAME} + AND stopped_at IS NOT NULL + AND stopped_at >= ${startedWaitingAt} + ORDER BY stopped_at DESC + LIMIT 1 `; if (!worker) { - log('Worker not found in DB'); + log('No worker from this test run has stopped yet'); return false; } log(`Worker state: stopped_at=${worker.stopped_at}, last_hb=${worker.last_heartbeat_at}`); - // Return the worker once stopped_at is set - return worker.stopped_at !== null ? worker : false; + return worker; }, { - timeoutMs: 30000, + timeoutMs: 120000, pollIntervalMs: 500, description: 'worker to have stopped_at set', } diff --git a/pkgs/edge-worker/tests/unit/Queries.test.ts b/pkgs/edge-worker/tests/unit/Queries.test.ts index c879d4272..97d08953d 100644 --- a/pkgs/edge-worker/tests/unit/Queries.test.ts +++ b/pkgs/edge-worker/tests/unit/Queries.test.ts @@ -1,5 +1,6 @@ import { assertEquals } from '@std/assert'; import { Queries } from '../../src/core/Queries.ts'; +import type { WorkerRow } from '../../src/core/types.ts'; import type { postgres } from '../sql.ts'; // Mock SQL client that captures the SQL template string and values @@ -73,3 +74,20 @@ Deno.test('Queries.markWorkerStopped - handles different UUID formats', async () assertEquals(calls.length, 1); assertEquals(calls[0].values, [workerId]); }); + +Deno.test('Queries.sendHeartbeat treats missing worker row as deprecated', async () => { + const sql = (() => Promise.resolve([])) as unknown as postgres.Sql; + const queries = new Queries(sql); + const workerRow: WorkerRow = { + worker_id: '00000000-0000-0000-0000-000000000001', + function_name: 'test_worker', + queue_name: 'test_worker', + started_at: new Date().toISOString(), + last_heartbeat_at: new Date().toISOString(), + deprecated_at: null, + }; + + const result = await queries.sendHeartbeat(workerRow); + + assertEquals(result.is_deprecated, true); +}); diff --git a/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts b/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts index c77cbc404..9d8c914d7 100644 --- a/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts +++ b/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts @@ -335,8 +335,12 @@ Deno.test({ serveHandler = h; }, }); + const sql = (() => Promise.resolve([{ has_active: true }])) as unknown as { + end: () => Promise; + }; + sql.end = () => Promise.resolve(); - const adapter = new SupabasePlatformAdapter(undefined, deps); + const adapter = new SupabasePlatformAdapter({ sql: sql as never }, deps); await adapter.startWorker(() => ({ startOnlyOnce: (workerBootstrap: unknown) => { bootstrap = workerBootstrap; @@ -357,6 +361,160 @@ Deno.test({ }, }); +Deno.test({ + name: 'HTTP startup uses a fresh worker id when replacing deprecated worker', + sanitizeResources: false, + fn: async () => { + let serveHandler: ((req: Request) => Response | Promise) | null = null; + const workerIds: string[] = []; + let firstWorkerStopped = false; + let requestShutdown: (() => void) | null = null; + const workers: Worker[] = [ + { + startOnlyOnce: (workerBootstrap: { workerId: string }) => { + workerIds.push(workerBootstrap.workerId); + }, + stop: () => { + firstWorkerStopped = true; + requestShutdown?.(); + return Promise.resolve(); + }, + get isDeprecated() { + return true; + }, + get isStopped() { + return false; + }, + } as unknown as Worker, + { + startOnlyOnce: (workerBootstrap: { workerId: string }) => { + workerIds.push(workerBootstrap.workerId); + }, + stop: () => Promise.resolve(), + get isDeprecated() { + return false; + }, + get isStopped() { + return false; + }, + } as unknown as Worker, + ]; + + const deps = createMockDeps({ + serve: (h) => { + serveHandler = h; + }, + }); + const sql = (() => Promise.resolve([{ has_active: true }])) as unknown as { + end: () => Promise; + }; + sql.end = () => Promise.resolve(); + + const adapter = new SupabasePlatformAdapter({ sql: sql as never }, deps); + requestShutdown = () => adapter.requestShutdown(); + await adapter.startWorker(() => workers.shift()!); + + const handler = serveHandler as unknown as (req: Request) => Response | Promise; + await handler(new Request('http://localhost/functions/v1/my-worker', { + headers: { authorization: 'Bearer test-service-key' }, + })); + + const secondResponse = await handler(new Request('http://localhost/functions/v1/my-worker', { + headers: { authorization: 'Bearer test-service-key' }, + })) as Response; + const secondBody = await secondResponse.json(); + + assertEquals(workerIds.length, 2); + assertEquals(workerIds[0], 'test-exec-id'); + assertEquals(workerIds[1] !== workerIds[0], true); + assertEquals(firstWorkerStopped, true); + assertEquals(adapter.shutdownSignal.aborted, false); + assertEquals(secondBody.workerId, workerIds[1]); + }, +}); + +Deno.test({ + name: 'HTTP startup serializes concurrent deprecated worker replacement', + sanitizeResources: false, + fn: async () => { + let serveHandler: ((req: Request) => Response | Promise) | null = null; + const workerIds: string[] = []; + let stopCalls = 0; + let releaseStop = () => {}; + let resolveStopStarted: (() => void) | null = null; + const stopStarted = new Promise((resolve) => { + resolveStopStarted = resolve; + }); + + const deprecatedWorker = { + startOnlyOnce: (workerBootstrap: { workerId: string }) => { + workerIds.push(workerBootstrap.workerId); + }, + stop: () => { + stopCalls++; + resolveStopStarted?.(); + return new Promise((release) => { + releaseStop = release; + }); + }, + get isDeprecated() { + return true; + }, + get isStopped() { + return false; + }, + } as unknown as Worker; + + const replacementWorker = { + startOnlyOnce: (workerBootstrap: { workerId: string }) => { + workerIds.push(workerBootstrap.workerId); + }, + stop: () => Promise.resolve(), + get isDeprecated() { + return false; + }, + get isStopped() { + return false; + }, + } as unknown as Worker; + + const deps = createMockDeps({ + serve: (h) => { + serveHandler = h; + }, + }); + const adapter = new SupabasePlatformAdapter(undefined, deps); + let createCount = 0; + + await adapter.startWorker(() => { + createCount++; + return createCount === 1 ? deprecatedWorker : replacementWorker; + }); + + const handler = serveHandler as unknown as (req: Request) => Response | Promise; + await handler(new Request('http://localhost/functions/v1/my-worker', { + headers: { authorization: 'Bearer test-service-key' }, + })); + + const replacements = Promise.all([ + handler(new Request('http://localhost/functions/v1/my-worker', { + headers: { authorization: 'Bearer test-service-key' }, + })), + handler(new Request('http://localhost/functions/v1/my-worker', { + headers: { authorization: 'Bearer test-service-key' }, + })), + ]); + + await stopStarted; + await new Promise((resolve) => setTimeout(resolve, 0)); + assertEquals(stopCalls, 1); + releaseStop(); + await replacements; + + assertEquals(workerIds.length, 2); + }, +}); + Deno.test({ name: 'stopWorker aborts the shutdown signal', sanitizeResources: false,