diff --git a/.gitignore b/.gitignore index 540f0e2..382c7f6 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,10 @@ lp_rpc.proto # environment file env + +# IDE +.vscode/ +*.code-workspace +.idea/ +.codex +.claude diff --git a/docs/runner-sdk.md b/docs/runner-sdk.md new file mode 100644 index 0000000..c209523 --- /dev/null +++ b/docs/runner-sdk.md @@ -0,0 +1,279 @@ +# Runner SDK + +> Status: **draft**. Interface not yet locked. Examples still landing — +> see [Coverage](#coverage) and [Not yet implemented](#not-yet-implemented). +> +> Source: [`src/livepeer_gateway/runner/`](../src/livepeer_gateway/runner/). +> Tracking PR: [livepeer/livepeer-python-gateway#7][pr7]. + +## Where this fits + +This is the **deploy half** of the Livepeer Python SDK — "I run +pipelines on the network." The other half is the **client / gateway +side** — "I send requests to the network" — tracked in epic +[#9][issue9]. Both will ship from one monorepo using PEP 420 namespace +packages: shared `livepeer` import root, separate distributions +(`livepeer-runner`, `livepeer-client`, `livepeer-trickle`) installed +independently. Today the code lives under `livepeer_gateway.runner` +pending the restructure (tracked as C12 in [#8][issue8]). +Full architecture: [pipeline-sdk spec][spec]. + +## What the SDK provides + +A runner author writes one Python class, calls `serve(pipeline)`, gets a +FastAPI app the orchestrator can hit. Two base classes cover the two +transport modes: + +| Base class | Transport | Hook surface | +| ----------------------------------------------------------------- | ------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------- | +| [`Pipeline`](../src/livepeer_gateway/runner/pipeline.py) | `POST /run` — request/response, or SSE when `run()` yields | `setup()`, `run(**kwargs)` | +| [`LivePipeline`](../src/livepeer_gateway/runner/live_pipeline.py) | trickle (BYOC live) over `POST /stream/{start,stop,params}` | `setup()`, `on_stream_start`, `process_video`, `process_audio`, `on_params_update`, `on_stream_stop`, plus `emit_event` / `emit_data` | + +`make_app(pipeline)` dispatches on the base class. Health is `GET /health` +on both. + +## `Pipeline` — batch + +```python +class Sentiment(Pipeline): + def setup(self) -> None: + self.model = load_model() + + def run(self, text: str) -> dict: + return {"label": self.model(text)} +``` + +- `run` signature is introspected. A single `BaseModel` parameter is + used directly; bare params get wrapped in a generated model. +- Generator `run` is auto-detected → response framed as SSE. +- Return type annotation, if `BaseModel`, becomes the OpenAPI response. + +Examples: [`hello_world`](../examples/runner/hello_world/), +[`sentiment`](../examples/runner/sentiment/), +[`image_upscale`](../examples/runner/image_upscale/), +[`llm`](../examples/runner/llm/) (SSE). + +## `LivePipeline` — trickle + +Minimal — video transform only: + +```python +class Grayscale(LivePipeline): + async def process_video(self, frame: VideoFrame) -> VideoFrame | None: + return frame.to_grayscale() +``` + +Audio in, structured data out — uses per-session state and the side +channels: + +```python +class LiveTranscribe(LivePipeline): + def setup(self) -> None: + self._model = WhisperModel("tiny.en") # loaded once at container start + + async def on_stream_start(self, params: dict) -> None: + self._buffer = np.empty(0, dtype=np.float32) # fresh per session + await self.emit_event({"type": "ready", "model": "whisper-tiny.en"}) + + async def process_audio(self, frame: AudioFrame) -> AudioFrame: + self._buffer = np.concatenate([self._buffer, _to_pcm(frame)]) + if self._buffer.size >= WINDOW_SAMPLES: + for seg in self._model.transcribe(self._buffer[:WINDOW_SAMPLES]): + await self.emit_data({"type": "transcript", "text": seg.text}) + self._buffer = self._buffer[WINDOW_SAMPLES:] + return frame # passthrough — caller still gets the audio track + + async def on_stream_stop(self) -> None: + await self.emit_event({"type": "stopped"}) +``` + +Lifecycle, in call order per session: + +1. `setup()` — once, before the server binds (sync). +2. `on_stream_start(params)` — per-session init, before the first frame. +3. `process_video(frame)` / `process_audio(frame)` — per-frame transform. + Return `None` to drop. +4. `on_params_update(params)` — caller posted new params mid-stream. +5. `on_stream_stop()` — per-session cleanup. `emit_data` / `emit_event` + still work here. + +Side channels, callable from any hook during a session: + +- `emit_event({...})` — JSON on the events trickle channel (lifecycle, + telemetry). +- `emit_data({...})` — JSON on the data trickle channel (transcripts, + detections, classifications). No-op if the orchestrator didn't + enable a data channel. + +### Heartbeat + +The SDK auto-publishes a `{"type": "heartbeat", "timestamp": ...}` event +on the events channel every 10 seconds while a session is active. This +exists to satisfy the gateway's 30s events-gap watchdog +(`byoc/trickle.go: maxEventGap`) — without it, the gateway tears down +sessions whose pipelines aren't emitting their own events. User +`emit_event` calls also reset the watchdog, so a pipeline that emits +its own telemetry doesn't strictly need the heartbeat — but it's always +on. The task is started before the frame loop (so `events_url` shows +activity during cold start) and cancelled on `/stream/stop`. + +The payload is intentionally minimal today; FPS / throughput fields are +in [Not yet implemented](#not-yet-implemented). + +### Track allocation by introspection + +A track is allocated **iff** its `process_*` hook is overridden — except +a stock `LivePipeline` (no overrides) which allocates both as a +passthrough relay. See +[`_emit_flags`](../src/livepeer_gateway/runner/live_pipeline.py). + +Concretely: + +| Override | Video out | Audio out | +| --------------------- | --------- | --------- | +| neither (passthrough) | yes | yes | +| `process_video` only | yes | no | +| `process_audio` only | no | yes | +| both | yes | yes | + +Examples: [`live_tint`](../examples/runner/live_tint/) (video only, +also exercises `on_params_update`), [`live_depth`](../examples/runner/live_depth/) +(video only, GPU model), [`live_transcribe`](../examples/runner/live_transcribe/) +(audio in → data out). + +## Coverage + +| Mode | Example | Status | +| --------------------- | ------------------------------------- | ------ | +| Batch sync | hello_world, sentiment, image_upscale | ✅ | +| Batch streaming (SSE) | llm | ✅ | +| Live video → video | live_tint, live_depth | ✅ | +| Live audio → data | live_transcribe | ⏳ | +| Live audio → audio | _planned: pitch-shift_ | ⏳ | +| Live video + audio | _planned: combined_ | ⏳ | + +> [!NOTE] +> `live_transcribe` is feature-complete on the runner side but is marked +> ⏳ pending two known transport bugs that drop records before they reach +> the SSE caller — one SDK-side ([#12][issue12]: first segment of every +> trickle channel duplicate-writes seg 0, one-line fix), one upstream +> ([go-livepeer#3924][go3924]: gateway tears down the data SSE proxy +> before the runner's final `emit_data` is relayed). Combined effect: +> 5 transcripts emitted → 3 delivered. Promote to ✅ once both land. + +## Open questions + +Design points still open before the surface is declared stable. + +1. **`emit_data` / `emit_event` outside an active session.** Today: + silent no-op. Alternative: raise. Silent matches "the orchestrator + didn't enable this channel" but masks programming errors. +2. **Single-session constraint.** `LivePipeline._session` is one slot; + `/stream/start` returns 409 if busy. Multi-session is deferred to + post-C8 (demand-driven). Documenting the constraint as a contract + means callers won't lean on it being temporary. + +## Not yet implemented + +Items that are decided in principle but not in the SDK yet. Tracked in +[the SDK epic][issue8]. + +### `livepeer.yaml` + `livepeer push` + +The biggest pending piece. Today every example ships a hand-written +Dockerfile, `register_capability.py`, and `docker-compose.yml`. A +manifest + CLI collapses that to a one-file declaration: + +Before — `examples/runner/sentiment/`: + +```text +Dockerfile ← hand-written +docker-compose.yml ← hand-written +register_capability.py ← boilerplate +requirements.txt +prepare_models.py +pipeline.py +README.md +test.sh +``` + +After: + +```text +livepeer.yaml ← single source of truth +prepare_models.py +pipeline.py +README.md +``` + +Sample manifest (shape illustrative, not final): + +```yaml +pipeline_id: sentiment +pipeline: pipeline:Sentiment # module:class + +python: + version: "3.11" + packages: + - transformers==4.40.0 + - torch==2.3.0 + +system: + packages: + - ffmpeg + +build: + prepare: prepare_models.py # runs at image build, caches weights + +gpu: false # or "T4", "L4", etc. +``` + +`livepeer push` parses the manifest, generates the Dockerfile, builds the +image, embeds the schema as an image label, and registers the capability +on the network. Builders never write Dockerfiles or call the +registration RPC. + +The manifest is the recommended path, not the only one. Builders who want +full control over the container — custom base images, multi-stage builds, +non-trivial system setup — can still write their own Dockerfile, call +`serve(pipeline)` from any entrypoint, and register the capability +manually. `livepeer push` is sugar; the underlying contract (HTTP +endpoints + image labels) is what the orchestrator cares about. + +### Other pending items + +- **Schema generation (`/openapi.json`) + discovery doc (`GET /`).** + Phases C3 / C4 / C5 in the staircase. +- **Health state machine** on `/health` (`LOADING → OK | ERROR | IDLE`). + Currently `setup()` blocks the bind, examples use a docker + healthcheck workaround. +- **Structured error events.** `emit_event` is freeform JSON today. + We want a typed `error` event shape so the orchestrator and caller + can react uniformly to runner-side failures (model OOM, GPU lost, + per-frame error budget exceeded). +- **FPS / throughput in heartbeats.** Today's heartbeat is + `{type, timestamp}` only. Adding observed input fps + processed-frame + fps lets the orchestrator detect a slow pipeline before the + events-gap watchdog fires. +- **Inbound JSON streaming.** `/stream/params` is one-shot today. Pipelines + that need a continuous JSON stream from the caller (live prompts, + streaming text, high-frequency control beyond a single param dict) + have nothing — likely an `on_control(payload)` hook exposing the + `control_url` stream directly. Tracked: [#8][issue8]. +- **Lock down `_session` access.** The user-facing `emit_event` / + `emit_data` methods reach into `pipeline._session` directly, leaking + the private attribute as an accidental subclass contract. Replace + with a private accessor or move the methods to operate on session + state passed in. Tracked: [#8][issue8]. +- **Multi-session per pipeline.** Capacity demand-driven; post-C8. + +Tracking PR: [livepeer/livepeer-python-gateway#7][pr7]. Upstream +go-livepeer work and the broader SDK roadmap are tracked in the +[SDK epic][issue8]. + +[pr7]: https://github.com/livepeer/livepeer-python-gateway/pull/7 +[issue8]: https://github.com/livepeer/livepeer-python-gateway/issues/8 +[issue9]: https://github.com/livepeer/livepeer-python-gateway/issues/9 +[issue12]: https://github.com/livepeer/livepeer-python-gateway/issues/12 +[go3924]: https://github.com/livepeer/go-livepeer/issues/3924 +[spec]: https://github.com/rickstaa/livepeer-specs/blob/main/docs/developer-journey/pipeline-sdk.md diff --git a/examples/runner/hello_world/Dockerfile b/examples/runner/hello_world/Dockerfile new file mode 100644 index 0000000..c5803b8 --- /dev/null +++ b/examples/runner/hello_world/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install the package from source — pulls aiohttp, grpcio, protobuf, av per +# pyproject.toml. Build context is the repo root. +COPY pyproject.toml README ./ +COPY src /app/src +RUN pip install --no-cache-dir /app + +COPY examples/runner/hello_world/pipeline.py /app/pipeline.py + +EXPOSE 5000 + +CMD ["python", "/app/pipeline.py"] diff --git a/examples/runner/hello_world/README.md b/examples/runner/hello_world/README.md new file mode 100644 index 0000000..1ee80a4 --- /dev/null +++ b/examples/runner/hello_world/README.md @@ -0,0 +1,61 @@ +# Hello world (BYOC) + +> [!NOTE] +> **TODO** — `test.sh` and the `gateway:` compose service collapse into a single Python script using the client SDK once [livepeer/livepeer-python-gateway#6](https://github.com/livepeer/livepeer-python-gateway/pull/6) merges. + + +Smallest end-to-end test of the Pipeline SDK against a real +[go-livepeer](https://github.com/livepeer/go-livepeer) BYOC stack. A `Pipeline` +subclass returns `{"message": "hello, X"}` over HTTP. Registered as a BYOC +capability, called through the gateway, response flows back end-to-end. + +## Run + +> [!WARNING] +> Only one example can run at a time — all share container names +> (`gateway`, `orchestrator`, …) and ports (`1935`, `9935`, `5000`). If +> `./test.sh` fails at the capability-registration step, run `docker +> compose down` in the other example's directory first. + +```bash +docker compose up -d +./test.sh +docker compose down +``` + +`test.sh` prints `PASS` on success. + +## Browse the API + +- Swagger UI: +- ReDoc: +- OpenAPI JSON: + +## What's running + +```mermaid +sequenceDiagram + autonumber + participant curl + participant gateway + participant orchestrator + participant hello_world as hello_world
(SDK container) + + curl->>gateway: POST /process/request/run + gateway->>orchestrator: forward (Livepeer-signed) + orchestrator->>hello_world: POST /run {"name":"..."} + hello_world-->>orchestrator: {"message":"hello, ..."} + orchestrator-->>gateway: response + gateway-->>curl: response +``` + +Four compose services: + +| Service | What it is | +| --- | --- | +| `gateway`, `orchestrator` | `livepeer/go-livepeer:master` from Docker Hub | +| `hello_world` | The pipeline container — a [BYOC](https://github.com/livepeer/go-livepeer/blob/main/doc/byoc.md) capability built with `livepeer_gateway.runner`. Attached via HTTP register, not the `-aiWorker` mechanism. | +| `register_capability` | One-shot helper that POSTs to `orchestrator:8935/capability/register` | + +First `docker compose up` pulls `livepeer/go-livepeer:master` (~few hundred MB, +cached after) and builds the `hello_world` image locally. diff --git a/examples/runner/hello_world/docker-compose.yml b/examples/runner/hello_world/docker-compose.yml new file mode 100644 index 0000000..8d87b94 --- /dev/null +++ b/examples/runner/hello_world/docker-compose.yml @@ -0,0 +1,54 @@ +services: + # Mirrors go-livepeer/doc/byoc.md: on-chain mode against a public Arbitrum + # RPC, but `pricePerUnit 0` keeps the registration free — no balance ever + # leaves the auto-generated keystore. No real chain interaction occurs. + + orchestrator: + image: livepeer/go-livepeer:master + container_name: orchestrator + command: > + -network arbitrum-one-mainnet + -orchestrator + -ethUrl https://arb1.arbitrum.io/rpc + -ethPassword secret-password + -pricePerUnit 0 + -serviceAddr=orchestrator:8935 + -orchSecret=orch-secret + -v 6 + + # TODO: see README — service removed after migration to the Python client SDK. + gateway: + image: livepeer/go-livepeer:master + container_name: gateway + command: > + -network arbitrum-one-mainnet + -gateway + -ethUrl https://arb1.arbitrum.io/rpc + -ethPassword secret-password + -orchAddr=orchestrator:8935 + -httpAddr=0.0.0.0:9935 + -httpIngest + -v 6 + ports: + - "9935:9935" + depends_on: + - orchestrator + + hello_world: + build: + context: ../../.. + dockerfile: examples/runner/hello_world/Dockerfile + container_name: hello_world + ports: + - "5000:5000" + environment: + LIVEPEER_ORCH_URL: https://orchestrator:8935 + LIVEPEER_ORCH_SECRET: orch-secret + LIVEPEER_CAPABILITY_NAME: hello-world + LIVEPEER_CAPABILITY_URL: http://hello_world:5000 + depends_on: + - orchestrator + +networks: + default: + name: livepeer diff --git a/examples/runner/hello_world/pipeline.py b/examples/runner/hello_world/pipeline.py new file mode 100644 index 0000000..1264559 --- /dev/null +++ b/examples/runner/hello_world/pipeline.py @@ -0,0 +1,15 @@ +"""Hello-world BYOC pipeline. Run via ``docker compose up``.""" + +import logging + +from livepeer_gateway.runner import Pipeline, serve + +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + + +class HelloWorld(Pipeline): + def run(self, name: str = "world") -> dict: + return {"message": f"hello, {name}"} + +if __name__ == "__main__": + serve(HelloWorld()) diff --git a/examples/runner/hello_world/test.sh b/examples/runner/hello_world/test.sh new file mode 100755 index 0000000..092c483 --- /dev/null +++ b/examples/runner/hello_world/test.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# E2E: send a request through the gateway, assert the response from the +# hello_world container comes back through the orchestrator. + +# TODO: see README — migration to the Python client SDK. + +set -euo pipefail +cd "$(dirname "$0")" + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:9935}" +NAME="${NAME:-livepeer}" +EXPECTED_MSG="hello, ${NAME}" + +echo "Waiting for capability registration..." +# SDK self-registers inside the hello_world container; look for the log line +# emitted by livepeer_gateway.runner.registration.register(). +# TODO: switch to `curl /status` once the SDK exposes a status endpoint +# (Phase 2 of auto-registration). Structured check beats log grep. +for _ in $(seq 30); do + if docker logs hello_world 2>&1 | grep -q "registered capability=hello-world"; then + echo " registered." + break + fi + sleep 1 +done +if ! docker logs hello_world 2>&1 | grep -q "registered capability=hello-world"; then + echo "FAIL: hello_world container hasn't logged registration success." + echo "Make sure 'docker compose up -d --wait --build' completed first." + exit 1 +fi +LIVEPEER_HDR=$(printf '%s' '{"request":"{}","parameters":"{}","capability":"hello-world","timeout_seconds":30}' | base64 -w0) + +echo "Sending request through gateway..." +RESPONSE=$(curl -fsS -X POST "${GATEWAY_URL}/process/request/run" \ + -H "Livepeer: ${LIVEPEER_HDR}" \ + -H "Content-Type: application/json" \ + -d "{\"name\":\"${NAME}\"}") + +echo "Response: ${RESPONSE}" + +if echo "${RESPONSE}" | grep -qE "\"message\"[[:space:]]*:[[:space:]]*\"${EXPECTED_MSG}\""; then + echo "PASS" + exit 0 +fi + +echo "FAIL" +exit 1 diff --git a/examples/runner/image_upscale/Dockerfile b/examples/runner/image_upscale/Dockerfile new file mode 100644 index 0000000..8481785 --- /dev/null +++ b/examples/runner/image_upscale/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim + +WORKDIR /app + +# SDK install (in-repo source until livepeer-gateway publishes; will collapse +# to a single `pip install livepeer-gateway` line once on PyPI). +COPY pyproject.toml README ./ +COPY src /app/src +RUN pip install --no-cache-dir /app + +# Pipeline-specific deps. The requirements.txt sets --extra-index-url to +# pull the CPU-only torch wheel (~200 MB vs ~5 GB for the CUDA variant). +COPY examples/runner/image_upscale/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Bake model weights at build time so setup() loads from local disk in +# milliseconds. +COPY examples/runner/image_upscale/prepare_models.py /app/prepare_models.py +RUN python /app/prepare_models.py + +# Pipeline code last so edits don't invalidate the bake layer above. +COPY examples/runner/image_upscale/pipeline.py /app/pipeline.py + +EXPOSE 5000 + +CMD ["python", "/app/pipeline.py"] diff --git a/examples/runner/image_upscale/README.md b/examples/runner/image_upscale/README.md new file mode 100644 index 0000000..684b1f1 --- /dev/null +++ b/examples/runner/image_upscale/README.md @@ -0,0 +1,111 @@ +# Image upscale (BYOC) + +> [!NOTE] +> **TODO** — `test.sh` and the `gateway:` compose service collapse into a single Python script using the client SDK once [livepeer/livepeer-python-gateway#6](https://github.com/livepeer/livepeer-python-gateway/pull/6) merges. + + +A ~2x image super-resolution BYOC capability — proves the SDK handles binary +I/O cleanly via Pydantic's `Base64Bytes`. Built on +[Swin2SR](https://huggingface.co/caidas/swin2SR-classical-sr-x2-64), small +enough to run on CPU. + +A `Pipeline` subclass loads the model once in `setup()`, then takes a +base64-encoded image on each `POST /run` and returns the upscaled PNG. +The processor pads inputs to its window size before upscaling, so output +dimensions are at least 2x input but may be slightly larger. Registered as +a BYOC capability, called through the gateway, response flows back +end-to-end. + +## Run + +> [!WARNING] +> Only one example can run at a time — all share container names +> (`gateway`, `orchestrator`, …) and ports (`1935`, `9935`, `5000`). If +> `./test.sh` fails at the capability-registration step, run `docker +> compose down` in the other example's directory first. + +```bash +docker compose up -d --wait --build +./test.sh +docker compose down +``` + +`test.sh` prints `PASS` on success. + +`prepare_models.py` bakes the model into the image at build time so +`setup()` loads from local cache in milliseconds. + +## Browse the API + +- Swagger UI: +- ReDoc: +- OpenAPI JSON: + +## What's running + +```mermaid +sequenceDiagram + autonumber + participant curl + participant gateway + participant orchestrator + participant image_upscale as image_upscale
(SDK container) + + curl->>gateway: POST /process/request/run + gateway->>orchestrator: forward (Livepeer-signed) + orchestrator->>image_upscale: POST /run {"image":""} + image_upscale-->>orchestrator: {"image":"","width":W,"height":H} + orchestrator-->>gateway: response + gateway-->>curl: response +``` + +Four compose services: + +| Service | What it is | +| ------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | +| `gateway`, `orchestrator` | `livepeer/go-livepeer:master` from Docker Hub | +| `image_upscale` | The pipeline container — a [BYOC](https://github.com/livepeer/go-livepeer/blob/main/doc/byoc.md) capability built with `livepeer_gateway.runner`. | +| `register_capability` | One-shot helper that POSTs to `orchestrator:8935/capability/register` once `image_upscale` is healthy | + +The pipeline service has a healthcheck that probes `GET /health` until the +model finishes loading. `register_capability` waits on `service_healthy`, so +the orchestrator never sees a "registered but not loaded" container. + +## Binary I/O contract + +Both `image` fields use Pydantic's `Base64Bytes`: + +- **Input** — `image` is a base64-encoded string in the JSON body. Pydantic + decodes to `bytes` before `run()` runs. +- **Output** — `image` is `bytes` in the pipeline; Pydantic encodes back to + base64 in the JSON response. + +`width` and `height` are returned alongside for convenience. The pipeline +always emits PNG; document the format in the field description if you need +to surface it to callers. + +## Try with your own image + +```bash +TEST_IMAGE=/path/to/your.png \ +INPUT_WIDTH=$W INPUT_HEIGHT=$H \ +./test.sh +``` + +The test asserts output is at least 2x input dimensions. + +Or manually: + +```bash +INPUT_B64=$(base64 -w0 < your.png) + +LIVEPEER_HDR=$(printf '%s' \ + '{"request":"{}","parameters":"{}","capability":"image-upscale","timeout_seconds":60}' \ + | base64 -w0) + +curl -X POST http://localhost:9935/process/request/run \ + -H "Livepeer: ${LIVEPEER_HDR}" \ + -H "Content-Type: application/json" \ + -d "{\"image\":\"${INPUT_B64}\"}" \ + | jq -r '.image' | base64 -d > upscaled.png +``` diff --git a/examples/runner/image_upscale/docker-compose.yml b/examples/runner/image_upscale/docker-compose.yml new file mode 100644 index 0000000..92da7ac --- /dev/null +++ b/examples/runner/image_upscale/docker-compose.yml @@ -0,0 +1,64 @@ +services: + # Mirrors go-livepeer/doc/byoc.md: on-chain mode against a public Arbitrum + # RPC, but `pricePerUnit 0` keeps the registration free — no balance ever + # leaves the auto-generated keystore. No real chain interaction occurs. + + orchestrator: + image: livepeer/go-livepeer:master + container_name: orchestrator + command: > + -network arbitrum-one-mainnet + -orchestrator + -ethUrl https://arb1.arbitrum.io/rpc + -ethPassword secret-password + -pricePerUnit 0 + -serviceAddr=orchestrator:8935 + -orchSecret=orch-secret + -v 6 + + # TODO: see README — service removed after migration to the Python client SDK. + gateway: + image: livepeer/go-livepeer:master + container_name: gateway + command: > + -network arbitrum-one-mainnet + -gateway + -ethUrl https://arb1.arbitrum.io/rpc + -ethPassword secret-password + -orchAddr=orchestrator:8935 + -httpAddr=0.0.0.0:9935 + -httpIngest + -v 6 + ports: + - "9935:9935" + depends_on: + - orchestrator + + image_upscale: + build: + context: ../../.. + dockerfile: examples/runner/image_upscale/Dockerfile + container_name: image_upscale + ports: + - "5000:5000" + # Healthcheck waits for setup() to finish loading the model before the + # orchestrator can route requests here. Without this, register_capability + # could complete and the test could fire before the model is loaded. + healthcheck: + test: ["CMD", "python", "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:5000/health').read()"] + interval: 5s + timeout: 5s + retries: 30 + start_period: 60s + environment: + LIVEPEER_ORCH_URL: https://orchestrator:8935 + LIVEPEER_ORCH_SECRET: orch-secret + LIVEPEER_CAPABILITY_NAME: image-upscale + LIVEPEER_CAPABILITY_URL: http://image_upscale:5000 + depends_on: + - orchestrator + +networks: + default: + name: livepeer diff --git a/examples/runner/image_upscale/pipeline.py b/examples/runner/image_upscale/pipeline.py new file mode 100644 index 0000000..14bca41 --- /dev/null +++ b/examples/runner/image_upscale/pipeline.py @@ -0,0 +1,57 @@ +"""Image upscale BYOC pipeline. Run via ``docker compose up`` — see README.md.""" + +import base64 +import io +import logging + +import numpy as np +import torch +from PIL import Image +from pydantic import Base64Bytes, BaseModel, Field +from transformers import Swin2SRForImageSuperResolution, Swin2SRImageProcessor + +from livepeer_gateway.runner import Pipeline, serve + +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + + +class UpscaleInput(BaseModel): + image: Base64Bytes = Field(description="Source image, base64-encoded PNG/JPEG") + + +class UpscaleOutput(BaseModel): + image: str = Field(description="Upscaled image, base64-encoded PNG") + width: int + height: int + + +class ImageUpscaler(Pipeline): + def setup(self): + # Loads from local HF cache populated at Docker build time. + model_id = "caidas/swin2SR-classical-sr-x2-64" + self.processor = Swin2SRImageProcessor.from_pretrained(model_id) + self.model = Swin2SRForImageSuperResolution.from_pretrained(model_id) + + def run(self, params: UpscaleInput) -> UpscaleOutput: + src = Image.open(io.BytesIO(params.image)).convert("RGB") + + inputs = self.processor(images=src, return_tensors="pt") + with torch.no_grad(): + outputs = self.model(**inputs) + + # CHW float [0, 1] → HWC uint8 + chw = outputs.reconstruction.squeeze().clamp(0, 1).cpu().numpy() + hwc = np.moveaxis(chw, 0, -1) + upscaled = Image.fromarray((hwc * 255.0).round().astype(np.uint8)) + + buf = io.BytesIO() + upscaled.save(buf, format="PNG") + return UpscaleOutput( + image=base64.b64encode(buf.getvalue()).decode(), + width=upscaled.width, + height=upscaled.height, + ) + + +if __name__ == "__main__": + serve(ImageUpscaler()) diff --git a/examples/runner/image_upscale/prepare_models.py b/examples/runner/image_upscale/prepare_models.py new file mode 100644 index 0000000..f0c8ca7 --- /dev/null +++ b/examples/runner/image_upscale/prepare_models.py @@ -0,0 +1,11 @@ +"""Download model weights into the local HF cache at build time. + +Invoked by the Dockerfile so ``setup()`` loads from local disk in +milliseconds instead of pulling from HF Hub on every container start. +""" + +from transformers import Swin2SRForImageSuperResolution, Swin2SRImageProcessor + +model_id = "caidas/swin2SR-classical-sr-x2-64" +Swin2SRImageProcessor.from_pretrained(model_id) +Swin2SRForImageSuperResolution.from_pretrained(model_id) diff --git a/examples/runner/image_upscale/requirements.txt b/examples/runner/image_upscale/requirements.txt new file mode 100644 index 0000000..80fa612 --- /dev/null +++ b/examples/runner/image_upscale/requirements.txt @@ -0,0 +1,6 @@ +--extra-index-url https://download.pytorch.org/whl/cpu + +transformers +torch +Pillow +numpy diff --git a/examples/runner/image_upscale/test.sh b/examples/runner/image_upscale/test.sh new file mode 100755 index 0000000..65bd0b5 --- /dev/null +++ b/examples/runner/image_upscale/test.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# E2E: send a request through the gateway, assert the upscaled image +# (2x of the 32x32 fixture = 64x64) comes back through the orchestrator. + +# TODO: see README — migration to the Python client SDK. + +set -euo pipefail +cd "$(dirname "$0")" + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:9935}" +TEST_IMAGE="${TEST_IMAGE:-test_image.png}" +INPUT_WIDTH="${INPUT_WIDTH:-64}" +INPUT_HEIGHT="${INPUT_HEIGHT:-64}" + +echo "Waiting for capability registration..." +# SDK self-registers inside the pipeline container; look for the log line +# emitted by livepeer_gateway.runner.registration.register(). +# TODO: switch to `curl /status` once the SDK exposes a status endpoint +# (Phase 2 of auto-registration). Structured check beats log grep. +for _ in $(seq 30); do + if docker logs image_upscale 2>&1 | grep -q "registered capability=image-upscale"; then + echo " registered." + break + fi + sleep 1 +done +if ! docker logs image_upscale 2>&1 | grep -q "registered capability=image-upscale"; then + echo "FAIL: image_upscale container hasn't logged registration success." >&2 + echo "Make sure 'docker compose up -d --wait --build' completed first." >&2 + exit 1 +fi + +INPUT_B64=$(base64 -w0 < "${TEST_IMAGE}") +LIVEPEER_HDR=$(printf '%s' '{"request":"{}","parameters":"{}","capability":"image-upscale","timeout_seconds":60}' | base64 -w0) + +echo "Sending request through gateway..." +RESPONSE=$(curl -fsS -X POST "${GATEWAY_URL}/process/request/run" \ + -H "Livepeer: ${LIVEPEER_HDR}" \ + -H "Content-Type: application/json" \ + -d "{\"image\":\"${INPUT_B64}\"}") + +# Trim the base64 image from the echoed response — keeps stdout readable. +echo "Response (image truncated): $(echo "${RESPONSE}" | sed 's/\("image":"\)[^"]*/\1/')" + +WIDTH=$(echo "${RESPONSE}" | grep -oE '"width"[[:space:]]*:[[:space:]]*[0-9]+' | grep -oE '[0-9]+$') +HEIGHT=$(echo "${RESPONSE}" | grep -oE '"height"[[:space:]]*:[[:space:]]*[0-9]+' | grep -oE '[0-9]+$') + +# The Swin2SR processor pads inputs to its window size before upscaling, so +# output is at least 2x input but may be slightly larger. +if [ "${WIDTH}" -ge $((INPUT_WIDTH * 2)) ] && [ "${HEIGHT}" -ge $((INPUT_HEIGHT * 2)) ]; then + echo "PASS (${WIDTH}x${HEIGHT}, >=2x of ${INPUT_WIDTH}x${INPUT_HEIGHT})" + exit 0 +fi + +echo "FAIL: expected >=${INPUT_WIDTH}x${INPUT_HEIGHT} doubled, got ${WIDTH}x${HEIGHT}" +exit 1 diff --git a/examples/runner/image_upscale/test_image.png b/examples/runner/image_upscale/test_image.png new file mode 100644 index 0000000..c419e41 Binary files /dev/null and b/examples/runner/image_upscale/test_image.png differ diff --git a/examples/runner/live_depth/.gitignore b/examples/runner/live_depth/.gitignore new file mode 100644 index 0000000..ee88966 --- /dev/null +++ b/examples/runner/live_depth/.gitignore @@ -0,0 +1 @@ +assets/ diff --git a/examples/runner/live_depth/Dockerfile b/examples/runner/live_depth/Dockerfile new file mode 100644 index 0000000..9f93e82 --- /dev/null +++ b/examples/runner/live_depth/Dockerfile @@ -0,0 +1,29 @@ +FROM pytorch/pytorch:2.4.0-cuda12.1-cudnn9-runtime + +WORKDIR /app + +# SDK install (in-repo source until livepeer-gateway publishes; will collapse +# to a single `pip install livepeer` line once on PyPI). +COPY pyproject.toml README ./ +COPY src /app/src +RUN pip install --no-cache-dir /app + +# Bake the depth model into the image so first stream skips the download. +# Switch variant by editing this line and `_MODEL_ID` in pipeline.py. +# +# transformers<4.49 because 4.49+ uses torch.library.custom_op signatures +# that torch 2.4 (our base) can't parse. Lift after upgrading to torch>=2.5. +RUN pip install --no-cache-dir \ + "transformers>=4.45,<4.49" \ + accelerate>=0.34 \ + opencv-python-headless>=4.10 \ + pillow>=10.0 \ + && python -c "from transformers import pipeline; \ +pipeline(task='depth-estimation', model='depth-anything/Depth-Anything-V2-Base-hf')" + +# Pipeline code last so edits don't invalidate the SDK install layer. +COPY examples/runner/live_depth/pipeline.py /app/pipeline.py + +EXPOSE 5000 + +CMD ["python", "/app/pipeline.py"] diff --git a/examples/runner/live_depth/Dockerfile.mediamtx b/examples/runner/live_depth/Dockerfile.mediamtx new file mode 100644 index 0000000..e194c24 --- /dev/null +++ b/examples/runner/live_depth/Dockerfile.mediamtx @@ -0,0 +1,8 @@ +# bluenviron/mediamtx ships as a scratch image (no shell, no curl), but +# `runOnReady` needs curl to call back into the gateway's BYOC ingest webhook. +# Repackage on alpine + curl — same mediamtx binary, ~10 MB image. +FROM alpine:3.20 +RUN apk add --no-cache curl ca-certificates +COPY --from=bluenviron/mediamtx:latest /mediamtx /mediamtx +COPY --from=bluenviron/mediamtx:latest /mediamtx.yml /mediamtx.default.yml +ENTRYPOINT ["/mediamtx"] diff --git a/examples/runner/live_depth/README.md b/examples/runner/live_depth/README.md new file mode 100644 index 0000000..989fa06 --- /dev/null +++ b/examples/runner/live_depth/README.md @@ -0,0 +1,85 @@ +# Live depth (BYOC, real-time, GPU) + +> [!NOTE] +> **TODO** — `test.sh`, `demo.sh`, and the `gateway:` compose service collapse into a single Python script using the client SDK once [livepeer/livepeer-python-gateway#6](https://github.com/livepeer/livepeer-python-gateway/pull/6) merges. + + +Real-time monocular depth estimation via [DepthAnything V2](https://github.com/DepthAnything/Depth-Anything-V2). +Each video frame goes through the model; the depth map replaces the luma +plane and the chroma planes are zeroed, so the egress reads as a +grayscale "bright = close, dark = far" visualization. + +## Requirements + +- **NVIDIA GPU** with CUDA 12.x driver +- [`nvidia-container-toolkit`](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html) + installed and registered with Docker +- Several GB of free disk (image bundles PyTorch + CUDA runtime + model) + +Verify with: + +```bash +docker info | grep -i nvidia # should show "Runtimes: ... nvidia ..." +docker run --rm --gpus all nvidia/cuda:12.1.0-base-ubuntu22.04 nvidia-smi +``` + +Without GPU access the container will still start but PyTorch falls back +to CPU — depth inference is too slow to sustain real-time and the publisher +tears down within seconds. + +## Run + +> [!WARNING] +> Only one example can run at a time — all share container names +> (`gateway`, `orchestrator`, …) and ports (`1935`, `9935`, `5000`). If +> `./test.sh` fails at the capability-registration step, run `docker +> compose down` in the other example's directory first. + +```bash +docker compose up -d --wait --build # first build downloads model + CUDA layers + +./test.sh # CI: real depth-rich clip, asserts non-trivial depth output +./demo.sh # interactive: webcam in, live depth ffplay window + +docker compose down +``` + +The pipeline container starts in `LOADING` state until `setup()` finishes +loading the model on GPU and runs the warm pass. + +### `test.sh` — automated assertion + +Pushes a depth-rich basketball clip (auto-downloaded to `assets/sample.mp4` +on first run) through the full BYOC chain, captures the egress, and asserts +both that chroma is zeroed *and* the luma plane has spatial variance — so a +no-op pipeline can't pass by accidentally producing uniform output. + +### `demo.sh` — live webcam viewer + +Pushes your webcam through the depth pipeline and opens an ffplay window +showing the depth output in real time. Bright = close, dark = far. Wave +your hand toward the camera to see the gradient shift. + +Defaults to 640×480 @ 15fps. Bump or override via env vars: + +```bash +WEBCAM_FPS=30 ./demo.sh # smoother, may stall on slower CPUs +WEBCAM_RES=1280x720 ./demo.sh +WEBCAM_DEVICE=/dev/video1 ./demo.sh # Linux: pick a different camera +``` + +If the live viewer disconnects mid-stream, you've hit the throughput +ceiling — drop FPS or resolution. + +## Throughput + +DepthAnything V2 Base with `torch.float16` on a 30/40-series GPU runs +~25 ms per frame — comfortably inside the 33 ms (30 fps) or 66 ms (15 fps) +budget. Switch variant by editing `_MODEL_ID` in `pipeline.py` (and the +matching bake line in the `Dockerfile`). + +| Variant | Params | VRAM (fp16) | Latency on RTX 3090 | +| --- | --- | --- | --- | +| `Small-hf` | 24 M | ~1 GB | ~15 ms | +| `Base-hf` (default) | 97 M | ~2 GB | ~25 ms | +| `Large-hf` | 335 M | ~3 GB | ~80 ms | diff --git a/examples/runner/live_depth/demo.sh b/examples/runner/live_depth/demo.sh new file mode 100755 index 0000000..993db1b --- /dev/null +++ b/examples/runner/live_depth/demo.sh @@ -0,0 +1,123 @@ +#!/usr/bin/env bash +# Live demo — push your webcam through the grayscale pipeline and watch +# the result in ffplay. Minimal complement to test.sh (which uses synthetic +# testsrc + capture-to-file for CI assertions). +# +# Path: webcam → ffmpeg push (RTMP) → mediamtx → gateway → orch → runner → +# orch → mediamtx → ffplay pull (RTMP) +# +# Prerequisites: +# - Stack up: `docker compose up -d --wait --build` +# - ffmpeg + ffplay on host +# - Webcam: /dev/video0 (Linux) or first AVFoundation device (macOS) + +# TODO: see README — migration to the Python client SDK. + +set -euo pipefail +cd "$(dirname "$0")" + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:9935}" + +# 640x480 @ 15fps — sustains real-time on any 30/40-series GPU. +WEBCAM_RES="${WEBCAM_RES:-640x480}" +WEBCAM_FPS="${WEBCAM_FPS:-15}" + +case "$(uname -s)" in + Linux*) + WEBCAM_DEVICE="${WEBCAM_DEVICE:-/dev/video0}" + # `-thread_queue_size` absorbs v4l2 driver hiccups; `+discardcorrupt` + # drops bad-buffer frames USB webcams emit at startup. + INPUT_FLAGS=(-thread_queue_size 1024 -fflags +discardcorrupt + -f v4l2 -framerate "${WEBCAM_FPS}" -video_size "${WEBCAM_RES}" + -i "${WEBCAM_DEVICE}") + ;; + Darwin*) + # `0` = first AVFoundation video device. Use `ffmpeg -f avfoundation -list_devices true -i ""` to enumerate. + WEBCAM_DEVICE="${WEBCAM_DEVICE:-0}" + INPUT_FLAGS=(-thread_queue_size 1024 -fflags +discardcorrupt + -f avfoundation -framerate "${WEBCAM_FPS}" -video_size "${WEBCAM_RES}" + -i "${WEBCAM_DEVICE}") + ;; + *) + echo "Unsupported platform $(uname -s); only Linux and macOS are wired up." >&2 + exit 1 + ;; +esac + +echo "Waiting for capability registration..." +# SDK self-registers inside the pipeline container; look for the log line +# emitted by livepeer_gateway.runner.registration.register(). +# TODO: switch to `curl /status` once the SDK exposes a status endpoint +# (Phase 2 of auto-registration). Structured check beats log grep. +for _ in $(seq 30); do + if docker logs live_depth 2>&1 | grep -q "registered capability=live-depth"; then + echo " registered." + break + fi + sleep 1 +done +if ! docker logs live_depth 2>&1 | grep -q "registered capability=live-depth"; then + echo "FAIL: live_depth container hasn't logged registration success." >&2 + echo "Make sure 'docker compose up -d --wait --build' completed first." >&2 + exit 1 +fi + +# `parameters` is a stringified JSON; enable_video_{ingress,egress} drive +# trickle channel creation (go-livepeer byoc/types.go). 600s timeout for long demos. +LIVEPEER_HDR=$(printf '%s' \ + '{"request":"{}","parameters":"{\"enable_video_ingress\":true,\"enable_video_egress\":true}","capability":"live-depth","timeout_seconds":600}' \ + | base64 -w0) + +# Best-effort session cleanup; registered early to catch Ctrl-C. +# `${STREAM_ID:-}` so an early failure (before stream/start succeeded) doesn't +# trip `set -u` when the trap fires. +trap 'kill -INT "${PUSH_PID:-}" 2>/dev/null || true; curl -fsS -X POST "${GATEWAY_URL}/process/stream/${STREAM_ID:-}/stop" -H "Livepeer: ${LIVEPEER_HDR}" -d "{}" >/dev/null 2>&1 || true' EXIT + +echo "Starting stream session..." +RESPONSE=$(curl -fsS -X POST "${GATEWAY_URL}/process/stream/start" \ + -H "Livepeer: ${LIVEPEER_HDR}" -d '{}') + +STREAM_ID=$(echo "${RESPONSE}" | python3 -c "import json,sys; print(json.load(sys.stdin)['stream_id'])") +RTMP_IN=$(echo "${RESPONSE}" | python3 -c "import json,sys; print(json.load(sys.stdin)['rtmp_url'])") +RTMP_OUT=$(echo "${RESPONSE}" | python3 -c "import json,sys; print(json.load(sys.stdin)['rtmp_output_url'].split(',')[0])") + +# Gateway URLs use docker-DNS name `mediamtx`, only resolvable inside compose. +RTMP_IN="${RTMP_IN/mediamtx:/localhost:}" +RTMP_OUT="${RTMP_OUT/mediamtx:/localhost:}" + +echo " stream_id=${STREAM_ID}" +echo " webcam =${WEBCAM_DEVICE} @ ${WEBCAM_RES} ${WEBCAM_FPS}fps" +echo " rtmp_in=${RTMP_IN}" +echo " rtmp_out=${RTMP_OUT}" + +echo "Pushing webcam to gateway..." +# `-g 30` = 1s GOP so first segment lands quickly (else 8s+ latency). +# `-vf format=yuv420p` normalizes YUYV/NV12 before x264. +# `-loglevel error` mutes the cosmetic v4l2 startup warning. +ffmpeg -loglevel error -re \ + "${INPUT_FLAGS[@]}" \ + -vf format=yuv420p \ + -c:v libx264 -preset ultrafast -tune zerolatency -g 30 \ + -f flv "${RTMP_IN}" /dev/null & +PUSH_PID=$! + +# Retry until mediamtx serves egress; low-latency flags = faster first packet. +echo -n "Waiting for processed stream" +for _ in $(seq 1 "${RETRIES:-30}"); do + # Normal probe — readiness needs reliable confirmation, not low latency. + if ffmpeg -loglevel error -y \ + -i "${RTMP_OUT}" -t 0.1 -f null - /dev/null; then + echo " ok." + break + fi + echo -n "." + sleep 1 +done + +echo "Opening live viewer (close the window or Ctrl-C to stop)..." +# `nobuffer` + `low_delay` for low playback latency; no `probesize 32` — +# too aggressive for RTMP (causes I/O error on open before ffplay locks). +ffplay -loglevel warning \ + -fflags nobuffer -flags low_delay \ + -window_title "live_depth (webcam)" \ + "${RTMP_OUT}" diff --git a/examples/runner/live_depth/docker-compose.yml b/examples/runner/live_depth/docker-compose.yml new file mode 100644 index 0000000..b95bc00 --- /dev/null +++ b/examples/runner/live_depth/docker-compose.yml @@ -0,0 +1,85 @@ +services: + # Mirrors go-livepeer/doc/byoc.md: on-chain mode against a public Arbitrum + # RPC, but `pricePerUnit 0` keeps the registration free — no balance ever + # leaves the auto-generated keystore. No real chain interaction occurs. + + orchestrator: + image: livepeer/go-livepeer:master + container_name: orchestrator + command: > + -network arbitrum-one-mainnet + -orchestrator + -ethUrl https://arb1.arbitrum.io/rpc + -ethPassword secret-password + -pricePerUnit 0 + -serviceAddr=orchestrator:8935 + -orchSecret=orch-secret + -v 6 + + # TODO: see README — service removed after migration to the Python client SDK. + gateway: + image: livepeer/go-livepeer:master + container_name: gateway + command: > + -network arbitrum-one-mainnet + -gateway + -ethUrl https://arb1.arbitrum.io/rpc + -ethPassword secret-password + -orchAddr=orchestrator:8935 + -httpAddr=0.0.0.0:9935 + -httpIngest + -v 6 + environment: + LIVE_AI_PLAYBACK_HOST: rtmp://mediamtx:1935 + ports: + - "9935:9935" + depends_on: + - orchestrator + - mediamtx + + # RTMP frontend. Custom build adds curl to the scratch base so mediamtx.yml's + # `runOnReady` can call the gateway — no Livepeer mediamtx fork required. + mediamtx: + build: + context: . + dockerfile: Dockerfile.mediamtx + container_name: mediamtx + ports: + - "1935:1935" # RTMP push/pull + volumes: + - ./mediamtx.yml:/mediamtx.yml:ro + + live_depth: + build: + context: ../../.. + dockerfile: examples/runner/live_depth/Dockerfile + container_name: live_depth + ports: + - "5000:5000" + # Enable GPU access — requires nvidia-container-toolkit on the host. + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + # Healthcheck waits for setup() to load the model on GPU. Larger start_period than the CPU examples so model load + first warm pass don't fail health probes. + healthcheck: + test: ["CMD", "python", "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:5000/health').read()"] + interval: 5s + timeout: 5s + retries: 60 + start_period: 60s + environment: + LIVEPEER_ORCH_URL: https://orchestrator:8935 + LIVEPEER_ORCH_SECRET: orch-secret + LIVEPEER_CAPABILITY_NAME: live-depth + LIVEPEER_CAPABILITY_URL: http://live_depth:5000 + depends_on: + - orchestrator + +networks: + default: + name: livepeer diff --git a/examples/runner/live_depth/mediamtx.yml b/examples/runner/live_depth/mediamtx.yml new file mode 100644 index 0000000..e411d15 --- /dev/null +++ b/examples/runner/live_depth/mediamtx.yml @@ -0,0 +1,30 @@ +# Bridges stock bluenviron mediamtx → BYOC gateway, no Livepeer fork needed. +# See go-livepeer byoc/stream_gateway.go:StartStreamRTMPIngest. + +# Gateway hits the API from another container for stream-exists checks and +# input kicks; default ACL is localhost-only and would 401. +api: yes +authInternalUsers: + - user: any + pass: + ips: [] + permissions: + - action: publish + - action: read + - action: playback + - action: api + +paths: + # Egress paths the gateway publishes to (-out). Skip the ingest + # hook — gateway is the publisher, not an external client, and would + # 404 looking these up. + '~.*-out$': + runOnReadyRestart: no + all_others: + # No shell in the image, so curl is invoked directly (no `sh -c`). + runOnReady: > + curl -fsS -X POST + http://gateway:9935/process/stream/$MTX_PATH/rtmp + -F source_id=$MTX_SOURCE_ID + -F source_type=$MTX_SOURCE_TYPE + runOnReadyRestart: no diff --git a/examples/runner/live_depth/pipeline.py b/examples/runner/live_depth/pipeline.py new file mode 100644 index 0000000..5145f91 --- /dev/null +++ b/examples/runner/live_depth/pipeline.py @@ -0,0 +1,73 @@ +"""Real-time monocular depth via DepthAnything V2 on GPU — bright = close, dark = far.""" + +from __future__ import annotations + +import logging + +import cv2 +import numpy as np +import torch +from transformers import pipeline as hf_pipeline + +from livepeer_gateway.runner import LivePipeline, serve +from livepeer_gateway.runner.frames import VideoFrame + +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") +_LOG = logging.getLogger(__name__) + +_MODEL_ID = "depth-anything/Depth-Anything-V2-Base-hf" + +# 128 = neutral 8-bit YUV chroma; keeps output grayscale so depth-as-luma reads clean. +_NEUTRAL_CHROMA = 128 + + +class LiveDepth(LivePipeline): + """Per-frame monocular depth estimation; bright = close, dark = far.""" + + def setup(self) -> None: + device = "cuda" if torch.cuda.is_available() else "cpu" + dtype = torch.float16 if device == "cuda" else torch.float32 + _LOG.info("Loading %s on %s (dtype=%s)...", _MODEL_ID, device, dtype) + self._pipe = hf_pipeline( + task="depth-estimation", + model=_MODEL_ID, + device=device, + torch_dtype=dtype, + ) + + # Warm pass so first real frame doesn't include compile / cache cost. + from PIL import Image + self._pipe(Image.fromarray(np.zeros((384, 384, 3), dtype=np.uint8))) + _LOG.info("Model loaded.") + + async def process_video(self, frame: VideoFrame) -> VideoFrame: + av_frame = frame.frame + if "yuv" not in av_frame.format.name.lower(): + return frame # passthrough for unexpected pixel formats + + # HF depth-estimation returns {"depth": PIL.Image uint8 auto-normalized + # for display, "predicted_depth": raw tensor}; we want the visualization. + depth_pil = self._pipe(av_frame.to_image())["depth"] + depth = np.asarray(depth_pil, dtype=np.uint8) + + # Defensive: HF normally returns the depth at the input size already. + if depth.shape != (av_frame.height, av_frame.width): + depth = cv2.resize(depth, (av_frame.width, av_frame.height)) + + # Write depth into the Y plane and zero chroma so the output reads as grayscale. + y_plane = av_frame.planes[0] + if y_plane.line_size == av_frame.width: + y_plane.update(depth.tobytes()) + else: + buf = np.zeros((y_plane.height, y_plane.line_size), dtype=np.uint8) + buf[: depth.shape[0], : depth.shape[1]] = depth + y_plane.update(buf.tobytes()) + for plane_idx in (1, 2): + plane = av_frame.planes[plane_idx] + plane.update(bytes([_NEUTRAL_CHROMA]) * (plane.line_size * plane.height)) + + return frame + + +if __name__ == "__main__": + serve(LiveDepth()) diff --git a/examples/runner/live_depth/test.sh b/examples/runner/live_depth/test.sh new file mode 100755 index 0000000..3a35db9 --- /dev/null +++ b/examples/runner/live_depth/test.sh @@ -0,0 +1,138 @@ +#!/usr/bin/env bash +# E2E test for live_depth — pushes a synthetic colored stream through the +# full BYOC stack, asserts the egress is grayscale (chroma zeroed) AND that +# the luma plane has spatial variance (i.e., the depth model produced a +# non-uniform output rather than a fixed grayscale fill). +# +# Path: ffmpeg push (RTMP) → mediamtx → gateway → orch → runner → orch → mediamtx → ffmpeg pull (RTMP) + +# TODO: see README — migration to the Python client SDK. + +set -euo pipefail +cd "$(dirname "$0")" + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:9935}" +OUTPUT_FILE="${OUTPUT_FILE:-/tmp/live_depth_output.mts}" + +echo "Waiting for capability registration..." +# SDK self-registers inside the pipeline container; look for the log line +# emitted by livepeer_gateway.runner.registration.register(). +# TODO: switch to `curl /status` once the SDK exposes a status endpoint +# (Phase 2 of auto-registration). Structured check beats log grep. +for _ in $(seq 30); do + if docker logs live_depth 2>&1 | grep -q "registered capability=live-depth"; then + echo " registered." + break + fi + sleep 1 +done +if ! docker logs live_depth 2>&1 | grep -q "registered capability=live-depth"; then + echo "FAIL: live_depth container hasn't logged registration success." >&2 + echo "Make sure 'docker compose up -d --wait --build' completed first." >&2 + exit 1 +fi + +# `parameters` is a stringified JSON; enable_video_{ingress,egress} drive +# trickle channel creation (go-livepeer byoc/types.go). +LIVEPEER_HDR=$(printf '%s' \ + '{"request":"{}","parameters":"{\"enable_video_ingress\":true,\"enable_video_egress\":true}","capability":"live-depth","timeout_seconds":60}' \ + | base64 -w0) + +# Best-effort session cleanup; registered early to catch Ctrl-C. +# `${STREAM_ID:-}` so an early failure (before stream/start succeeded) doesn't +# trip `set -u` when the trap fires. +trap 'curl -fsS -X POST "${GATEWAY_URL}/process/stream/${STREAM_ID:-}/stop" -H "Livepeer: ${LIVEPEER_HDR}" -d "{}" >/dev/null 2>&1 || true' EXIT + +echo "Starting stream session..." +RESPONSE=$(curl -fsS -X POST "${GATEWAY_URL}/process/stream/start" \ + -H "Livepeer: ${LIVEPEER_HDR}" -d '{}') + +STREAM_ID=$(echo "${RESPONSE}" | python3 -c "import json,sys; print(json.load(sys.stdin)['stream_id'])") +RTMP_IN=$(echo "${RESPONSE}" | python3 -c "import json,sys; print(json.load(sys.stdin)['rtmp_url'])") +RTMP_OUT=$(echo "${RESPONSE}" | python3 -c "import json,sys; print(json.load(sys.stdin)['rtmp_output_url'].split(',')[0])") +echo " stream_id=${STREAM_ID}" +echo " rtmp_in =${RTMP_IN}" +echo " rtmp_out=${RTMP_OUT}" + +RTMP_IN="${RTMP_IN/mediamtx:/localhost:}" +RTMP_OUT="${RTMP_OUT/mediamtx:/localhost:}" + +# Depth-rich clip from the DepthAnything V2 repo (cached under assets/ on +# first run). +SAMPLE_URL="${SAMPLE_URL:-https://raw.githubusercontent.com/DepthAnything/Depth-Anything-V2/main/assets/examples_video/basketball.mp4}" +SAMPLE_FILE="${SAMPLE_FILE:-assets/sample.mp4}" +if [ ! -f "${SAMPLE_FILE}" ]; then + echo "Downloading sample video..." + mkdir -p "$(dirname "${SAMPLE_FILE}")" + curl -fsSL -o "${SAMPLE_FILE}" "${SAMPLE_URL}" +fi + +# `-stream_loop -1` loops the clip until the `-t` budget is hit. +echo "Pushing depth-rich sample..." +ffmpeg -loglevel error -re -stream_loop -1 -i "${SAMPLE_FILE}" \ + -vf "scale=640:480" -an \ + -c:v libx264 -preset ultrafast -tune zerolatency -g 30 \ + -t 60 -f flv "${RTMP_IN}" /dev/null & +PUSH_PID=$! + +# Re-encode (not `-c:v copy`) so the capture starts with fresh SPS/PPS. +# A mid-stream RTMP pull lands after the originals, leaving a copied stream undecodable. +echo -n "Pulling processed stream" +PULL_OK=0 +for _ in $(seq 1 "${RETRIES:-20}"); do + if ffmpeg -y -loglevel error \ + -i "${RTMP_OUT}" -t 5 \ + -c:v libx264 -preset ultrafast -an \ + "${OUTPUT_FILE}" /dev/null; then + echo " ok." + PULL_OK=1 + break + fi + echo -n "." + sleep 2 +done + +# SIGINT (not SIGTERM/SIGKILL) lets ffmpeg flush its RTMP trailer cleanly. +kill -INT ${PUSH_PID} 2>/dev/null || true +wait ${PUSH_PID} 2>/dev/null || true + +if [ ${PULL_OK} -ne 1 ]; then + echo + echo "FAIL: pull never succeeded after retries" + exit 1 +fi + +if [ ! -s "${OUTPUT_FILE}" ]; then + echo "FAIL: no bytes received from gateway egress" + exit 1 +fi + +# Validate the depth pipeline actually ran: +# 1. Extract a PNG frame (direct ffprobe on the .mts trips on mid-stream H.264 issues). +# 2. Read luma min/max + chroma averages via signalstats. +# ffprobe emits CSV in Y*→U*→V* order regardless of -show_entries; match `read` to that. +# 3. Assert U ≈ V ≈ 128 (chroma was zeroed). +# 4. Assert YHIGH - YLOW > 50 (luma non-uniform — real depth, else a chroma-only no-op +# would pass since real depth maps span ~150+ while uniform fill spans 0). +FRAME_PNG="${OUTPUT_FILE%.mts}_frame.png" +ffmpeg -v error -i "${OUTPUT_FILE}" -frames:v 1 -y "${FRAME_PNG}" /dev/null \ + || { echo "FAIL: could not extract a frame from ${OUTPUT_FILE}"; exit 1; } +IFS=, read -r YLOW YHIGH U V < <(ffprobe -v error -f lavfi \ + -i "movie=${FRAME_PNG},signalstats" \ + -show_entries frame_tags=lavfi.signalstats.YLOW,lavfi.signalstats.YHIGH,lavfi.signalstats.UAVG,lavfi.signalstats.VAVG \ + -of csv=p=0) +awk -v u="${U:-0}" -v v="${V:-0}" 'BEGIN { exit !(u>123 && u<133 && v>123 && v<133) }' \ + || { echo "FAIL: chroma not zeroed (U=${U} V=${V}, expected ≈128)"; exit 1; } +RANGE=$(awk -v hi="${YHIGH:-0}" -v lo="${YLOW:-0}" 'BEGIN { print hi - lo }') +awk -v r="${RANGE:-0}" 'BEGIN { exit !(r > 50.0) }' \ + || { echo "FAIL: luma plane too uniform (YHIGH-YLOW=${RANGE}); depth model may not be producing varied output"; exit 1; } + +OUTPUT_SIZE=$(stat -c %s "${OUTPUT_FILE}" 2>/dev/null || stat -f %z "${OUTPUT_FILE}") +echo "PASS (${OUTPUT_SIZE} bytes, U=${U} V=${V} YHIGH-YLOW=${RANGE} → ${OUTPUT_FILE})" + +# Open in ffplay (skipped in non-TTY or via SKIP_VIEWER=1). +if [ -t 1 ] && [ "${SKIP_VIEWER:-0}" != "1" ]; then + echo "Opening captured clip..." + ffplay -loglevel error -window_title "live_depth (captured)" \ + -x 640 -y 480 -autoexit "${OUTPUT_FILE}" +fi diff --git a/examples/runner/live_detect/.gitignore b/examples/runner/live_detect/.gitignore new file mode 100644 index 0000000..ee88966 --- /dev/null +++ b/examples/runner/live_detect/.gitignore @@ -0,0 +1 @@ +assets/ diff --git a/examples/runner/live_detect/Dockerfile b/examples/runner/live_detect/Dockerfile new file mode 100644 index 0000000..1731d49 --- /dev/null +++ b/examples/runner/live_detect/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.11-slim + +WORKDIR /app + +# OpenCV (ultralytics dep) needs libgl1 + libglib2.0-0 at runtime. +RUN apt-get update \ + && apt-get install -y --no-install-recommends libgl1 libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +# SDK install (in-repo source until livepeer-gateway publishes; will collapse +# to a single `pip install livepeer-gateway` line once on PyPI). +COPY pyproject.toml README ./ +COPY src /app/src +RUN pip install --no-cache-dir /app + +# Pipeline-specific deps + pre-download both models. Whisper via faster-whisper +# (CTranslate2 CPU build), detection via ultralytics YOLO. Baking the weights +# avoids cold-start network fetches at stream-start. +RUN pip install --no-cache-dir \ + faster-whisper>=1.0.0 \ + "ultralytics>=8.0.0,<9" \ + && python -c "from faster_whisper import WhisperModel; \ +WhisperModel('tiny.en', device='cpu', compute_type='int8')" \ + && python -c "from ultralytics import YOLO; YOLO('yolov8n.pt')" + +# Pipeline code last so edits don't invalidate the model bake layer above. +COPY examples/runner/live_detect/pipeline.py /app/pipeline.py + +EXPOSE 5000 + +CMD ["python", "/app/pipeline.py"] diff --git a/examples/runner/live_detect/Dockerfile.mediamtx b/examples/runner/live_detect/Dockerfile.mediamtx new file mode 100644 index 0000000..e194c24 --- /dev/null +++ b/examples/runner/live_detect/Dockerfile.mediamtx @@ -0,0 +1,8 @@ +# bluenviron/mediamtx ships as a scratch image (no shell, no curl), but +# `runOnReady` needs curl to call back into the gateway's BYOC ingest webhook. +# Repackage on alpine + curl — same mediamtx binary, ~10 MB image. +FROM alpine:3.20 +RUN apk add --no-cache curl ca-certificates +COPY --from=bluenviron/mediamtx:latest /mediamtx /mediamtx +COPY --from=bluenviron/mediamtx:latest /mediamtx.yml /mediamtx.default.yml +ENTRYPOINT ["/mediamtx"] diff --git a/examples/runner/live_detect/README.md b/examples/runner/live_detect/README.md new file mode 100644 index 0000000..9027068 --- /dev/null +++ b/examples/runner/live_detect/README.md @@ -0,0 +1,123 @@ +# Live detect (BYOC, real-time, multi-modal) + +> [!NOTE] +> **TODO** — `test.sh`, `demo.sh`, `_format_records.py`, and the `gateway:` compose service collapse into a single Python script using the client SDK once [livepeer/livepeer-python-gateway#6](https://github.com/livepeer/livepeer-python-gateway/pull/6) merges. + +Real-time **object detection + speech-to-text** on the same stream. +Detection via [YOLOv8n](https://github.com/ultralytics/ultralytics) (~6 MB, +CPU-fast); transcription via [faster-whisper tiny.en](https://github.com/SYSTRAN/faster-whisper) +(~150 MB, int8-quantized). Both pipelines emit on the same `data` trickle +channel as separate record types — `{"type": "detection", ...}` and +`{"type": "transcript", ...}`. + +The canonical example for **multi-modal LivePipeline** — exercises both +`process_video` and `process_audio` overrides on the same pipeline class: + +| Hook | What this example does | +| --- | --- | +| `setup()` | Load YOLOv8n + whisper tiny.en at container start (both baked into the image) | +| `on_stream_start(params)` | Reset per-session state, build resampler, warm both models with dummy input, emit `ready` event | +| `process_video(frame)` | Run YOLO every 15th frame (~1 inference/sec at 15 fps); emit `detection` records via `emit_data` | +| `process_audio(frame)` | Resample to 16 kHz mono → buffer 3 s windows → whisper transcribe → emit `transcript` records via `emit_data` | +| `emit_data({...})` | Two record types interleave on the same `data_url` stream | +| `on_stream_stop()` | Flush partial audio buffer + emit final stats event | + +Both `process_*` are overridden, so the SDK's track-allocation-by-introspection +allocates **both** output tracks. Both return the input frame as passthrough — +the caller sees the original video + audio plus the structured detection / +transcript records on the data channel. + +## Run + +> [!WARNING] +> Only one example can run at a time — all share container names +> (`gateway`, `orchestrator`, …) and ports (`1935`, `9935`, `5000`). If +> `./test.sh` fails at the capability-registration step, run `docker +> compose down` in the other example's directory first. + +```bash +docker compose up -d --wait --build # both models bake into the image (~250 MB total) + +./test.sh # CI: composited dog/cat photo + JFK audio +./demo.sh # interactive: webcam + mic, see records live + +docker compose down +``` + +The pipeline container starts in `LOADING` state until `setup()` finishes +loading both models. Healthcheck reflects this — first `up` takes ~20s while +the YOLOv8n + whisper-tiny.en weights stream from the bake layer. + +### `test.sh` — automated assertion + +Auto-downloads two public-domain fixtures into `assets/` on first run: + +- **Video** — OpenCV's canonical [`vtest.avi`](https://raw.githubusercontent.com/opencv/opencv/4.x/samples/data/vtest.avi) (~8 MB, 80s plaza-pedestrian clip, looped) +- **Audio** — JFK's "Ask not..." inaugural (`jfk.flac`, also used by `live_transcribe`) + +Composites both into a single FLV push. Asserts: + +- At least one `detection` record contains `person` (vtest reliably yields 5-9 per frame) +- At least one `transcript` record contains `country` (the JFK speech says it three times) + +Override either fixture with environment variables: + +```bash +VIDEO_URL=https://example.com/your-video.mp4 ./test.sh +AUDIO_URL=https://example.com/your-audio.flac ./test.sh +``` + +### `demo.sh` — interactive webcam + mic + +Pushes audio + video from your webcam and microphone for `DURATION` seconds +(default 60) and prints both detections and transcripts to the terminal as +they arrive. Show things to the camera, speak — `Ctrl-C` to stop early. + +```bash +DURATION=30 ./demo.sh # shorter session +PULSE_SOURCE=alsa_input.usb-... ./demo.sh # specific Linux mic +VIDEO_DEVICE=/dev/video1 ./demo.sh # specific Linux webcam +MAC_VIDEO_DEVICE=1 MAC_AUDIO_DEVICE=0 ./demo.sh # macOS — pick devices +``` + +Sample output while looking at a coffee mug and speaking: + +```text +[T 0] Hello, this is a test. +[D 0] frame=15 person(0.91), cup(0.78) +[D 1] frame=30 person(0.93), cup(0.79), laptop(0.62) +[T 1] I'm just talking and showing things to the camera. +[D 2] frame=45 person(0.92), cup(0.81) +``` + +Each line is one record from the `data_url` SSE stream — `[T#]` for transcripts, +`[D#]` for detections. The same payload a real caller would receive. + +## Throughput + +| Component | CPU cost (per call) | Cadence | Total CPU budget | +| --- | --- | --- | --- | +| YOLOv8n (640×640, CPU) | ~30 ms | every 15th frame (~1 Hz) | ~30 ms/sec | +| whisper tiny.en (3 s window, CPU int8) | ~500 ms | every 3 s | ~170 ms/sec | +| Combined | — | — | ~200 ms/sec real-time-friendly | + +Both run faster than real-time at 15 fps video + 16 kHz audio on a modest +CPU. To trade latency for accuracy, raise the YOLO cadence (run every frame), +swap to `yolov8s` / `yolov8m` (larger models), or use whisper `base.en`. + +## Going further + +- **Better video understanding** — for natural-language scene descriptions, + swap YOLOv8n for a small vision-language model like + [Moondream2](https://huggingface.co/vikhyatk/moondream2) (1.6 GB, ~300 ms + per inference on GPU). Same `LivePipeline` lifecycle, different model in + `process_video`. Out of scope for this example — separate `live_describe` + example tracks. +- **Multi-class filtering** — restrict emissions to a class subset (e.g. + only `person` + `cat` + `dog`) by filtering on `label` before `emit_data`. +- **Bounding box overlays** — modify `process_video` to draw boxes on the + outgoing frame using OpenCV / PyAV before returning, so subscribers see + annotated video. +- **Production-grade transcription** — see `live_transcribe`'s README + "Going further" section for `whisper_streaming` / WhisperLive / native + streaming ASR pointers. Same model swap applies here. diff --git a/examples/runner/live_detect/_format_records.py b/examples/runner/live_detect/_format_records.py new file mode 100644 index 0000000..6bd7233 --- /dev/null +++ b/examples/runner/live_detect/_format_records.py @@ -0,0 +1,35 @@ +"""Pretty-print BYOC data-channel SSE records, one per line. + +Reads JSON-shaped data lines from stdin (one record per line, as emitted +by `pipeline.emit_data` and proxied through the gateway's SSE endpoint) +and renders each one for human-readable terminal output. Used by demo.sh. +""" + +# TODO: see README — migration to the Python client SDK. + +from __future__ import annotations + +import json +import sys + + +def main() -> None: + for line in sys.stdin: + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + kind = rec.get("type") + if kind == "transcript": + print("[T{:3d}] {}".format(rec["index"], rec["text"]), flush=True) + elif kind == "detection": + objs = ", ".join( + "{}({:.2f})".format(o["label"], o["conf"]) for o in rec["objects"] + ) + print("[D{:3d}] frame={} {}".format(rec["index"], rec["frame"], objs), flush=True) + elif kind in ("ready", "stopped"): + print(" ({})".format(kind), flush=True) + + +if __name__ == "__main__": + main() diff --git a/examples/runner/live_detect/demo.sh b/examples/runner/live_detect/demo.sh new file mode 100755 index 0000000..ea6b47f --- /dev/null +++ b/examples/runner/live_detect/demo.sh @@ -0,0 +1,131 @@ +#!/usr/bin/env bash +# Live demo — push your webcam + microphone through the BYOC stack and watch +# both detection and transcript records arrive in real time. Detections fire +# every ~1s (one YOLO inference per second of video); transcripts fire every +# ~3s (one whisper window). +# +# Path: webcam + mic → ffmpeg push (RTMP) → mediamtx → gateway → orch → +# runner → emit_data → orch trickle (data) → gateway +# SSE proxy → curl subscriber → pretty-printed records +# +# Prerequisites: +# - Stack up: `docker compose up -d --wait --build` +# - ffmpeg + curl + python3 on host +# - Webcam + microphone: +# Linux — pulseaudio default mic + /dev/video0; override via +# PULSE_SOURCE=... and VIDEO_DEVICE=... +# macOS — first AVFoundation video + audio device (`:0`); override +# via MAC_VIDEO_DEVICE=... MAC_AUDIO_DEVICE=... + +# TODO: see README — migration to the Python client SDK. + +set -euo pipefail +cd "$(dirname "$0")" + +# Use the system ffmpeg — capture demuxers (pulse/alsa/avfoundation/v4l2) +# must be compiled in. The transcoding-only ffmpeg that ships with go- +# livepeer lacks them, so a Livepeer dev's $PATH ffmpeg often won't open +# the webcam/mic. +FFMPEG="${FFMPEG:-${HOMEBREW_PREFIX:+$HOMEBREW_PREFIX/bin/ffmpeg}}" +[ -x "${FFMPEG:-}" ] || FFMPEG="/usr/bin/ffmpeg" +[ -x "${FFMPEG}" ] || FFMPEG="$(command -v ffmpeg)" + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:9935}" +DURATION="${DURATION:-60}" + +case "$(uname -s)" in + Linux*) + VIDEO_INPUT=(-f v4l2 -framerate 15 -video_size 640x480 -i "${VIDEO_DEVICE:-/dev/video0}") + AUDIO_INPUT=(-f pulse -i "${PULSE_SOURCE:-default}") + AUDIO_MAP="1:a" + ;; + Darwin*) + # AVFoundation: