Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,45 @@ jobs:
if: steps.check.outputs.affected == 'true'
run: pnpm nx run edge-worker:e2e

# ─────────────────────────────────────── 2a. EDGE-WORKER INTEGRATION ──────────────────────────────────────
# ─────────────────────────────────────── 2a. EDGE-WORKER PORTABLE RUNTIME E2E ──────────────────────────────────────
edge-worker-portable-runtime-e2e:
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
runs-on: ubuntu-latest
env:
NX_CLOUD_ACCESS_TOKEN: ${{ secrets.NX_CLOUD_ACCESS_TOKEN }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- uses: ./.github/actions/setup
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
atlas-cloud-token: ${{ secrets.ATLAS_CLOUD_TOKEN }}

- uses: oven-sh/setup-bun@v2

- name: Check if edge-worker is affected
id: check
run: |
if pnpm nx show projects --affected --base=origin/main --head=HEAD | grep -q "^edge-worker$"; then
echo "affected=true" >> $GITHUB_OUTPUT
echo "edge-worker is affected, running portable runtime e2e tests"
else
echo "affected=false" >> $GITHUB_OUTPUT
echo "edge-worker not affected, skipping portable runtime e2e tests"
fi

- name: Pre-start Supabase
if: steps.check.outputs.affected == 'true'
run: ./scripts/ci-prestart-supabase.sh edge-worker

- name: Run edge-worker portable runtime e2e tests
if: steps.check.outputs.affected == 'true'
run: pnpm nx run edge-worker:e2e:portable-runtimes

# ─────────────────────────────────────── 2b. EDGE-WORKER INTEGRATION ──────────────────────────────────────
edge-worker-integration:
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
Expand Down Expand Up @@ -172,7 +210,7 @@ jobs:
if: steps.check.outputs.affected == 'true'
run: pnpm nx run edge-worker:test:integration

# ─────────────────────────────────────── 2b. EDGE-WORKER BUN SMOKE ──────────────────────────────────────
# ─────────────────────────────────────── 2c. EDGE-WORKER BUN SMOKE ──────────────────────────────────────
edge-worker-bun-smoke:
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
Expand Down Expand Up @@ -206,7 +244,7 @@ jobs:
if: steps.check.outputs.affected == 'true'
run: pnpm nx run edge-worker:smoke:bun

# ─────────────────────────────────────── 2c. CLI E2E ──────────────────────────────────────
# ─────────────────────────────────────── 2d. CLI E2E ──────────────────────────────────────
cli-e2e:
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
Expand Down Expand Up @@ -242,7 +280,7 @@ jobs:
if: steps.check.outputs.affected == 'true'
run: pnpm nx run cli:e2e

# ─────────────────────────────────────── 2d. CLIENT E2E ──────────────────────────────────────
# ─────────────────────────────────────── 2e. CLIENT E2E ──────────────────────────────────────
client-e2e:
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
Expand Down Expand Up @@ -278,7 +316,7 @@ jobs:
if: steps.check.outputs.affected == 'true'
run: pnpm nx run client:e2e

# ─────────────────────────────────────── 2e. CORE PGTAP ──────────────────────────────────────
# ─────────────────────────────────────── 2f. CORE PGTAP ──────────────────────────────────────
core-pgtap:
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
Expand Down Expand Up @@ -321,6 +359,7 @@ jobs:
[
build-and-test,
edge-worker-e2e,
edge-worker-portable-runtime-e2e,
edge-worker-integration,
edge-worker-bun-smoke,
cli-e2e,
Expand Down Expand Up @@ -374,6 +413,7 @@ jobs:
[
build-and-test,
edge-worker-e2e,
edge-worker-portable-runtime-e2e,
edge-worker-integration,
edge-worker-bun-smoke,
cli-e2e,
Expand Down
15 changes: 15 additions & 0 deletions pkgs/edge-worker/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@
"parallel": false
}
},
"e2e:portable-runtimes": {
"executor": "nx:run-commands",
"dependsOn": ["build", "serve:functions:e2e"],
"local": true,
"cache": false,
"inputs": ["default", "^production"],
"options": {
"cwd": "pkgs/edge-worker",
"commands": [
"deno run --allow-net=localhost:50321 \"data:application/typescript,console.log(%22Waiting%20for%20functions%20server%20(port%2050321)...%22)%3B%20const%20deadline%20%3D%20Date.now()%20%2B%20120_000%3B%20while%20(Date.now()%20%3C%20deadline)%20%7B%20try%20%7B%20const%20conn%20%3D%20await%20Deno.connect(%7B%20hostname%3A%20%22localhost%22%2C%20port%3A%2050321%20%7D)%3B%20conn.close()%3B%20console.log(%22%20Ready!%22)%3B%20Deno.exit(0)%3B%20%7D%20catch%20%7B%20await%20new%20Promise((resolve)%20%3D%3E%20setTimeout(resolve%2C%20500))%3B%20%7D%20%7D%20console.error(%22TIMEOUT%3A%20Functions%20server%20not%20available%20on%20port%2050321%20after%20120s%22)%3B%20Deno.exit(1)%3B\"",
"deno test --config deno.test.json --allow-all --env=supabase/functions/.env tests/e2e-portable-runtimes/"
],
"parallel": false
}
},
"test": {
"inputs": ["default", "^production"],
"dependsOn": ["test:types", "test:unit"]
Expand Down
2 changes: 1 addition & 1 deletion pkgs/edge-worker/src/core/Queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class Queries {
RETURNING (w.deprecated_at IS NOT NULL) AS is_deprecated;
`;

return result || { is_deprecated: false };
return result || { is_deprecated: true };
}

async ensureFlowCompiled(
Expand Down
20 changes: 20 additions & 0 deletions pkgs/edge-worker/src/core/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ export class Worker {
return this.lifecycle.edgeFunctionName;
}

get isCreated() {
return this.lifecycle.isCreated;
}

get isStarting() {
return this.lifecycle.isStarting;
}

get isRunning() {
return this.lifecycle.isRunning;
}

get isDeprecated() {
return this.lifecycle.isDeprecated;
}

get isStopped() {
return this.lifecycle.isStopped;
}

/**
* Log deprecation message only once (prevents duplicate logs when deprecation
* is detected in heartbeat and then stop() is called)
Expand Down
4 changes: 4 additions & 0 deletions pkgs/edge-worker/src/core/WorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ export class WorkerLifecycle<IMessage extends Json> implements ILifecycle {
return this.workerState.isCreated;
}

get isStarting() {
return this.workerState.isStarting;
}

get isRunning() {
return this.workerState.isRunning;
}
Expand Down
2 changes: 2 additions & 0 deletions pkgs/edge-worker/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ export interface ILifecycle {
get edgeFunctionName(): string | undefined;
get queueName(): string;
get isCreated(): boolean;
get isStarting(): boolean;
get isRunning(): boolean;
get isDeprecated(): boolean;
get isStopping(): boolean;
get isStopped(): boolean;

Expand Down
4 changes: 4 additions & 0 deletions pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
return this.workerState.isCreated;
}

get isStarting() {
return this.workerState.isStarting;
}

get isRunning() {
return this.workerState.isRunning;
}
Expand Down
78 changes: 56 additions & 22 deletions pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ interface SupabaseEnv extends Record<string, string | undefined> {
export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResources> {
private edgeFunctionName: string | null = null;
private worker: Worker | null = null;
private workerId: string | null = null;
private workerReplacementPromise: Promise<void> | null = null;
private logger: Logger;
private abortController: AbortController;
private _platformResources: SupabaseResources;
Expand Down Expand Up @@ -183,8 +185,9 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
if (this.worker) {
// Signal death to ensure_workers() cron by setting stopped_at.
// This allows the cron to immediately ping for a replacement worker.
const workerId = this.validatedEnv.SB_EXECUTION_ID;
await this.queries.markWorkerStopped(workerId);
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
}

await this.stopWorker();
Expand All @@ -206,7 +209,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
}

private setupStartupHandler(createWorkerFn: CreateWorkerFn): void {
this.deps.serve((req: Request) => {
this.deps.serve(async (req: Request) => {
// Validate auth header in production (skipped in local mode)
const authResult = validateServiceRoleAuth(req, this.validatedEnv);
if (!authResult.valid) {
Expand All @@ -219,35 +222,66 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource

this.logger.debug(`HTTP Request: ${this.edgeFunctionName}`);

const wasStarted = !this.worker;

if (!this.worker) {
this.edgeFunctionName = this.extractFunctionName(req);

const workerId = this.validatedEnv.SB_EXECUTION_ID;

this.loggingFactory.setWorkerId(workerId);
this.loggingFactory.setWorkerName(this.edgeFunctionName);

// Create the worker using the factory function and the logger
this.worker = createWorkerFn(this.loggingFactory.createLogger);
this.worker.startOnlyOnce({
edgeFunctionName: this.edgeFunctionName,
workerId,
startMode: 'http',
});
}
const wasStarted = await this.ensureWorkerStarted(req, createWorkerFn);

return new Response(JSON.stringify({
status: wasStarted ? 'started' : 'running',
workerId: this.validatedEnv.SB_EXECUTION_ID,
workerId: this.workerId,
functionName: this.edgeFunctionName,
}), {
headers: { 'Content-Type': 'application/json' },
});
});
}

private needsWorkerReplacement(): boolean {
return !this.worker || this.worker.isDeprecated || this.worker.isStopped;
}

private async ensureWorkerStarted(req: Request, createWorkerFn: CreateWorkerFn): Promise<boolean> {
while (this.needsWorkerReplacement()) {
if (this.workerReplacementPromise) {
await this.workerReplacementPromise;
continue;
}

this.workerReplacementPromise = this.replaceWorker(req, createWorkerFn);
try {
await this.workerReplacementPromise;
return true;
} finally {
this.workerReplacementPromise = null;
}
}

return false;
}

private async replaceWorker(req: Request, createWorkerFn: CreateWorkerFn): Promise<void> {
if (this.worker) {
await this.worker.stop();
this.abortController = new AbortController();
}

this.edgeFunctionName = this.extractFunctionName(req);

const workerId = this.workerId === null
? this.validatedEnv.SB_EXECUTION_ID
: globalThis.crypto.randomUUID();
this.workerId = workerId;

this.loggingFactory.setWorkerId(workerId);
this.loggingFactory.setWorkerName(this.edgeFunctionName);

// Create the worker using the factory function and the logger
this.worker = createWorkerFn(this.loggingFactory.createLogger);
this.worker.startOnlyOnce({
edgeFunctionName: this.edgeFunctionName,
workerId,
startMode: 'http',
});
}

/**
* Assertion function that validates environment has all required Supabase fields
* @throws Error if any required environment variable is missing
Expand Down
12 changes: 12 additions & 0 deletions pkgs/edge-worker/supabase/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ verify_jwt = false
import_map = "./functions/deno.json"
entrypoint = "./functions/conn_custom_sql/index.ts"

[functions.conn_max_pg_default]
enabled = true
verify_jwt = false
import_map = "./functions/deno.json"
entrypoint = "./functions/conn_max_pg_default/index.ts"

[functions.conn_max_pg_override]
enabled = true
verify_jwt = false
import_map = "./functions/deno.json"
entrypoint = "./functions/conn_max_pg_override/index.ts"

[functions.stopped_at_test]
enabled = true
verify_jwt = false
Expand Down
Loading
Loading