Skip to content

Commit 635afa0

Browse files
committed
Add the Async Scheduler Hook API: a thin, policy-free concurrency core
The engine gains the ability to activate a concurrent execution mode. A scheduler - a C extension or a PHP library - registers a set of hooks through a single call and takes control of concurrency. The core contains NO implementation: no scheduler, no queues, no event system; it is hook routing plus the minimal mechanism only the engine can perform (context switching, the GC destructor walker, the coroutine lifecycle status). Core ABI (Zend/zend_async_API.h): - zend_coroutine_t: lifecycle status packed into the flags word, modifier flags, object_offset for the single-allocation embed pattern (or a stored object pointer via OBJ_REF), an awaiting_info diagnostics hook. No embedded wait state. - scheduler slots registered once per process via a versioned struct: new_coroutine, enqueue_coroutine, suspend(from_main, is_bailout), resume, cancel, launch, shutdown, get_class_ce, call_on_main_stack, context accessors (userland zval keys + internal numeric keys with a core-owned key registry), intercept_fiber, gc_destructors, defer. - zend_async_microtask_t: a thin one-shot task (handler, dtor, 32-bit ref_count, named flag bits incl. is_cancelled). The queue is provider-owned; the defer slot only routes the pointer. - per-thread globals: state, current/main coroutine, live coroutine count, the in_scheduler_context flag, the PHP bridge hook storage. Engine integration: - the scheduler is not lazy: it launches right before the script code runs (main.c, phpdbg) and receives the after-main handover; after destructors the API deactivates. - fibers: intercept_fiber links a fiber to a coroutine (the hook returns the coroutine to bind, created by the scheduler, or NULL for the legacy low-level path). There is NO switching API: inside scheduler code (in_scheduler_context, set around hook invocations) the plain Fiber API on a bound fiber performs the direct context switch; in application code the same calls park the value and route through the hooks. Deferred starts take an owned deep copy of their arguments. Fiber::suspend() is unchanged. - GC: the destructor phase is an around-interceptor. The engine keeps the walker, the once-per-object guarantee and the phase cursor, and re-runs missed destructors after the hook as a safety net; the hook brackets the run with provider logic (open a completion group, run, await everything the destructors spawned, transitively). Each destructor executes as application code (the scheduler flag is dropped around it). The executor reaches the hook as a Closure over an unregistered internal function - unreachable from application code. PHP bridge (Zend/zend_scheduler_hook.c): final class Async\ SchedulerHook - hook-name constants, register(module, hooks) (throws when a scheduler is already registered), getModule(), defer() (forwards to the DEFER hook). Hooks are stored by numeric index in the per-thread globals; each slot is backed by a C thunk forwarding to the stored callable. Tests (Zend/tests/async, 13): registration semantics, constants, launch-at-registration, managed-fiber lifecycle through a plain-Fiber scheduler loop (start/suspend/resume/finish, throw at the suspension point, uncaught exception propagation, after-main drain, direct- inside/routed-outside context semantics), low-level fibers untouched by a null intercept, microtask forwarding, and a GC cycle whose destructors spawn managed fibers awaited by the hook. Full async, fibers and gc suites pass (194/194, debug build - leak-checked). RFC and integration reference: https://github.com/true-async/php-async-core-rfc
1 parent 041ef84 commit 635afa0

28 files changed

Lines changed: 2707 additions & 7 deletions
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
--TEST--
2+
intercept_fiber returning null keeps the fiber on the low-level path
3+
--FILE--
4+
<?php
5+
Async\SchedulerHook::register('test', [
6+
Async\SchedulerHook::INTERCEPT_FIBER => function (Fiber $fiber): ?object {
7+
echo "intercept: null\n";
8+
return null;
9+
},
10+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout): bool {
11+
// Never called for the low-level fiber itself; the engine invokes it
12+
// for the after-main handover (script end, then after destructors).
13+
echo "suspend(fromMain: ", var_export($fromMain, true), ")\n";
14+
return true;
15+
},
16+
]);
17+
18+
// A low-level fiber behaves exactly as classic Fiber even with a scheduler.
19+
$fiber = new Fiber(function (int $x): int {
20+
$y = Fiber::suspend($x + 1);
21+
return $y * 10;
22+
});
23+
24+
var_dump($fiber->start(5));
25+
var_dump($fiber->resume(4));
26+
var_dump($fiber->getReturn());
27+
?>
28+
--EXPECT--
29+
intercept: null
30+
int(6)
31+
NULL
32+
int(40)
33+
suspend(fromMain: true)
34+
suspend(fromMain: true)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
--TEST--
2+
After-main handover runs deferred managed coroutines; abandoned ones die cleanly
3+
--FILE--
4+
<?php
5+
$queue = new SplQueue();
6+
7+
Async\SchedulerHook::register('test', [
8+
Async\SchedulerHook::INTERCEPT_FIBER => fn (Fiber $fiber): object
9+
=> new class($fiber) {
10+
public function __construct(public readonly Fiber $fiber) {}
11+
},
12+
Async\SchedulerHook::ENQUEUE => function (object $coroutine) use ($queue): bool {
13+
$queue->enqueue($coroutine);
14+
return true;
15+
},
16+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout) use ($queue): bool {
17+
// This scheduler defers everything: nothing runs until after main.
18+
if (!$fromMain) {
19+
return true;
20+
}
21+
22+
while (!$queue->isEmpty()) {
23+
$fiber = $queue->dequeue()->fiber;
24+
$fiber->isStarted() ? $fiber->resume() : $fiber->start();
25+
}
26+
27+
return true;
28+
},
29+
]);
30+
31+
$fiber = new Fiber(function (): void {
32+
echo "ran after main\n";
33+
Fiber::suspend();
34+
echo "never printed: nobody resumes\n";
35+
});
36+
37+
// The deferring scheduler does not run the fiber here.
38+
var_dump($fiber->start());
39+
var_dump($fiber->isStarted());
40+
41+
echo "end of script\n";
42+
// After-main handover: the scheduler drains its queue, the fiber runs and
43+
// suspends. Nobody resumes it, so it is destroyed at shutdown without
44+
// completing; the second echo never happens.
45+
?>
46+
--EXPECT--
47+
NULL
48+
bool(false)
49+
end of script
50+
ran after main
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
--TEST--
2+
A fiber adopted by the scheduler runs through the coroutine path
3+
--FILE--
4+
<?php
5+
$queue = new SplQueue();
6+
$log = [];
7+
8+
Async\SchedulerHook::register('test', [
9+
Async\SchedulerHook::INTERCEPT_FIBER => function (Fiber $fiber) use (&$log): object {
10+
$log[] = 'intercept';
11+
12+
// The scheduler defines the coroutine; here it simply remembers
13+
// which fiber the coroutine drives.
14+
return new class($fiber) {
15+
public function __construct(public readonly Fiber $fiber) {}
16+
};
17+
},
18+
Async\SchedulerHook::ENQUEUE => function (object $coroutine) use ($queue, &$log): bool {
19+
$log[] = 'enqueue';
20+
$queue->enqueue($coroutine);
21+
return true;
22+
},
23+
Async\SchedulerHook::RESUME => function (object $coroutine, ?Throwable $error) use ($queue, &$log): bool {
24+
$log[] = 'resume';
25+
$queue->enqueue($coroutine);
26+
return true;
27+
},
28+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout) use ($queue, &$log): bool {
29+
$log[] = 'suspend';
30+
31+
while (!$queue->isEmpty()) {
32+
$fiber = $queue->dequeue()->fiber;
33+
$fiber->isStarted() ? $fiber->resume() : $fiber->start();
34+
35+
if (!$fromMain) {
36+
return true;
37+
}
38+
}
39+
40+
return true;
41+
},
42+
]);
43+
44+
$fiber = new Fiber(function (int $x): int {
45+
$y = Fiber::suspend($x + 1);
46+
return $y * 10;
47+
});
48+
49+
var_dump($fiber->start(5));
50+
var_dump($fiber->isSuspended());
51+
var_dump($fiber->resume(4));
52+
var_dump($fiber->isTerminated());
53+
var_dump($fiber->getReturn());
54+
echo implode(',', $log), "\n";
55+
?>
56+
--EXPECT--
57+
int(6)
58+
bool(true)
59+
NULL
60+
bool(true)
61+
int(40)
62+
intercept,enqueue,suspend,resume,suspend
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
--TEST--
2+
Fiber::throw() on a managed fiber delivers the exception at the suspension point
3+
--FILE--
4+
<?php
5+
$queue = new SplQueue();
6+
7+
Async\SchedulerHook::register('test', [
8+
Async\SchedulerHook::INTERCEPT_FIBER => fn (Fiber $fiber): object
9+
=> new class($fiber) {
10+
public function __construct(public readonly Fiber $fiber) {}
11+
},
12+
Async\SchedulerHook::ENQUEUE => function (object $coroutine) use ($queue): bool {
13+
$queue->enqueue($coroutine);
14+
return true;
15+
},
16+
Async\SchedulerHook::RESUME => function (object $coroutine, ?Throwable $error) use ($queue): bool {
17+
echo "resume hook error: ", $error === null ? 'none' : get_class($error), "\n";
18+
$queue->enqueue($coroutine);
19+
return true;
20+
},
21+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout) use ($queue): bool {
22+
while (!$queue->isEmpty()) {
23+
$fiber = $queue->dequeue()->fiber;
24+
$fiber->isStarted() ? $fiber->resume() : $fiber->start();
25+
26+
if (!$fromMain) {
27+
return true;
28+
}
29+
}
30+
31+
return true;
32+
},
33+
]);
34+
35+
$fiber = new Fiber(function (): string {
36+
try {
37+
Fiber::suspend('waiting');
38+
} catch (RuntimeException $e) {
39+
return 'caught: ' . $e->getMessage();
40+
}
41+
42+
return 'not reached';
43+
});
44+
45+
var_dump($fiber->start());
46+
$fiber->throw(new RuntimeException('boom'));
47+
var_dump($fiber->getReturn());
48+
?>
49+
--EXPECT--
50+
string(7) "waiting"
51+
resume hook error: RuntimeException
52+
string(12) "caught: boom"
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
--TEST--
2+
An uncaught exception in a managed fiber surfaces at the start()/resume() caller
3+
--FILE--
4+
<?php
5+
$queue = new SplQueue();
6+
7+
Async\SchedulerHook::register('test', [
8+
Async\SchedulerHook::INTERCEPT_FIBER => fn (Fiber $fiber): object
9+
=> new class($fiber) {
10+
public function __construct(public readonly Fiber $fiber) {}
11+
},
12+
Async\SchedulerHook::ENQUEUE => function (object $coroutine) use ($queue): bool {
13+
$queue->enqueue($coroutine);
14+
return true;
15+
},
16+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout) use ($queue): bool {
17+
while (!$queue->isEmpty()) {
18+
$fiber = $queue->dequeue()->fiber;
19+
$fiber->isStarted() ? $fiber->resume() : $fiber->start();
20+
21+
if (!$fromMain) {
22+
return true;
23+
}
24+
}
25+
26+
return true;
27+
},
28+
]);
29+
30+
$fiber = new Fiber(function (): void {
31+
throw new RuntimeException('escaped');
32+
});
33+
34+
try {
35+
$fiber->start();
36+
} catch (RuntimeException $e) {
37+
echo "caught: ", $e->getMessage(), "\n";
38+
}
39+
40+
var_dump($fiber->isTerminated());
41+
?>
42+
--EXPECT--
43+
caught: escaped
44+
bool(true)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
--TEST--
2+
GC destructor phase: the hook brackets the engine run and awaits spawned work
3+
--FILE--
4+
<?php
5+
$queue = new SplQueue();
6+
7+
Async\SchedulerHook::register('test', [
8+
Async\SchedulerHook::INTERCEPT_FIBER => fn (Fiber $fiber): object
9+
=> new class($fiber) {
10+
public function __construct(public readonly Fiber $fiber) {}
11+
},
12+
Async\SchedulerHook::ENQUEUE => function (object $coroutine) use ($queue): bool {
13+
$queue->enqueue($coroutine);
14+
return true;
15+
},
16+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout) use ($queue): bool {
17+
// Defer: the queue is drained by the GC hook (and after main).
18+
if (!$fromMain) {
19+
return true;
20+
}
21+
22+
while (!$queue->isEmpty()) {
23+
$fiber = $queue->dequeue()->fiber;
24+
$fiber->isStarted() ? $fiber->resume() : $fiber->start();
25+
}
26+
27+
return true;
28+
},
29+
Async\SchedulerHook::GC_DESTRUCTORS => function (callable $run) use ($queue): bool {
30+
echo "hook: before\n";
31+
32+
$run(); // the engine calls every pending destructor
33+
34+
// Await everything the destructors spawned (the "scope" logic:
35+
// membership is the scheduler's own bookkeeping).
36+
while (!$queue->isEmpty()) {
37+
$fiber = $queue->dequeue()->fiber;
38+
$fiber->isStarted() ? $fiber->resume() : $fiber->start();
39+
}
40+
41+
echo "hook: after\n";
42+
return true;
43+
},
44+
]);
45+
46+
class Node
47+
{
48+
public ?Node $other = null;
49+
50+
public function __destruct()
51+
{
52+
echo "destructor\n";
53+
54+
// The destructor spawns concurrent work; the hook must wait for it.
55+
$fiber = new Fiber(function (): void {
56+
echo "spawned by destructor\n";
57+
});
58+
$fiber->start();
59+
}
60+
}
61+
62+
// A garbage cycle: collected only by the GC.
63+
$a = new Node();
64+
$b = new Node();
65+
$a->other = $b;
66+
$b->other = $a;
67+
unset($a, $b);
68+
69+
// The GC reruns after the destructor phase: the finished fiber/coroutine
70+
// pairs collected there carry internal destructors, so the hook brackets
71+
// a second (empty) phase.
72+
$collected = gc_collect_cycles();
73+
var_dump($collected > 0);
74+
echo "end\n";
75+
?>
76+
--EXPECT--
77+
hook: before
78+
destructor
79+
destructor
80+
spawned by destructor
81+
spawned by destructor
82+
hook: after
83+
hook: before
84+
hook: after
85+
bool(true)
86+
end
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
--TEST--
2+
Microtasks: defer() forwards to the scheduler's DEFER hook; the queue is the scheduler's
3+
--FILE--
4+
<?php
5+
$tasks = new SplQueue();
6+
7+
Async\SchedulerHook::register('test', [
8+
Async\SchedulerHook::SUSPEND => function (bool $fromMain, bool $isBailout): bool {
9+
return true;
10+
},
11+
Async\SchedulerHook::DEFER => function (callable $task) use ($tasks): bool {
12+
// The queue is owned by the scheduler, not by the engine.
13+
$tasks->enqueue($task);
14+
return true;
15+
},
16+
]);
17+
18+
Async\SchedulerHook::defer(function () use ($tasks): void {
19+
echo "task 1\n";
20+
21+
// Queued while draining: the scheduler decides the semantics; this
22+
// one drains everything queued before it finishes (classic microtasks).
23+
Async\SchedulerHook::defer(function (): void {
24+
echo "task 3 (queued by task 1)\n";
25+
});
26+
});
27+
28+
Async\SchedulerHook::defer(function (): void {
29+
echo "task 2\n";
30+
});
31+
32+
// The scheduler drains its own queue on its tick; simulate one here.
33+
while (!$tasks->isEmpty()) {
34+
($tasks->dequeue())();
35+
}
36+
37+
echo "drained\n";
38+
?>
39+
--EXPECT--
40+
task 1
41+
task 2
42+
task 3 (queued by task 1)
43+
drained

0 commit comments

Comments
 (0)