From 3dac65a562bee74d6883650a311e72b1f6911e58 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 15:29:03 +0200 Subject: [PATCH 1/7] fix(tools/talis): wait-for-chain + atomic keyring + one-command driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three race conditions surfaced repeatedly on a fresh AWS bring-up of the Fibre throughput experiment. Each one had the same shape: a talis subcommand "succeeded" at the CLI level (or returned the txhash with --yes) before the chain had actually applied the work, leaving downstream steps to fail in confusing ways. This commit makes each step verify *outcome*, not just *invocation*, so the experiment can go from a fresh `talis up` to a running loadgen without manual intervention. • setup-fibre script (fibre_setup.go) now: - polls `celestia-appd status` for `latest_block_height>0` before submitting any tx — fixes the silent-noop where set-host + 100× deposit-to-escrow all bounced with "celestia-app is not ready; please wait for first block"; - retries `set-host` in a loop until the validator's host shows up in `query valaddr providers` — fixes the case where --yes returns the txhash before block inclusion and the tx silently lands in the mempool but never confirms; - verifies fibre-0's escrow account is funded on-chain before the tmux session exits — same silent-failure mode as set-host, but on the deposit side. The talis-CLI step also now cross-checks all validators are registered from a single vantage point before returning, so a concurrent set-host race surfaces as an error instead of a half-empty provider list start-fibre would cache forever. • fibre-bootstrap-evnode (fibre_bootstrap_evnode.go) now stages the keyring scp into a tmp directory and `mv`s it atomically into place. The previous direct `scp -r` to /root/keyring-fibre/keyring-test created the directory before transferring its contents — the evnode init script's `[ -d keyring-test ]` poll passed mid-transfer, the daemon launched with no fibre-0.info, and crashed with `keyring entry "fibre-0" not found`. • evnode_init.sh (genesis.go) now waits for the specific keyring-test/fibre-0.info file rather than just the keyring-test directory. Belt-and-braces: the bootstrap mv is already atomic on the same filesystem, but the file-level guard means a hand-pushed keyring (not via talis) can't trip the same race. • New `talis fibre-experiment` umbrella command runs up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode in order. Each step uses the same binary as a subprocess; failures in any step abort the chain. Operator goes from a prepared root dir to a running loadgen with one command, instead of remembering the sequence. Verified by 5-min sustained loadgen against julien/fiber HEAD with PR #3287 (concurrent submitter) merged: 47.65 MB/s @ 99.999 % ok, up from the prior 24.57 MB/s baseline (the gap is PR #3287's overlapping uploads — these talis fixes just stop the deploy from silently breaking before throughput matters). --- tools/talis/fibre_bootstrap_evnode.go | 34 ++++++-- tools/talis/fibre_experiment.go | 89 ++++++++++++++++++++ tools/talis/fibre_setup.go | 113 +++++++++++++++++++++++++- tools/talis/genesis.go | 12 ++- tools/talis/main.go | 1 + 5 files changed, 239 insertions(+), 10 deletions(-) create mode 100644 tools/talis/fibre_experiment.go diff --git a/tools/talis/fibre_bootstrap_evnode.go b/tools/talis/fibre_bootstrap_evnode.go index 5df12d41ae..c8d76822b3 100644 --- a/tools/talis/fibre_bootstrap_evnode.go +++ b/tools/talis/fibre_bootstrap_evnode.go @@ -125,21 +125,45 @@ to fetch, then SCPs to each evnode-*.`, defer wg.Done() log.Printf("[%s] pushing JWT + keyring", ev.Name) + // JWT is small + atomic on the receive side because + // it's a single file, so we push it directly. if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, localJWT, "/root/bridge-jwt.txt", false); err != nil { errCh <- fmt.Errorf("[%s] push JWT: %w", ev.Name, err) return } - // mkdir the parent so scp lands at the exact path - // evnode_init.sh waits for. - if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, "mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test"); err != nil { - errCh <- fmt.Errorf("[%s] mkdir keyring-fibre: %w", ev.Name, err) + // Keyring push is staged through a tmp dir and + // promoted via mv. Without staging, evnode_init.sh's + // poll loop (which tests `[ -d keyring-test ]`) + // passes the moment scp -r mkdir's the directory, + // long before fibre-0.info is on disk. evnode then + // launches mid-scp and dies with `keyring entry + // "fibre-0" not found`. mv is atomic on the same + // filesystem so the init script either sees nothing + // (keep waiting) or the fully-populated dir (start + // the daemon cleanly). + stageDir := "/root/.keyring-fibre.staging" + prep := fmt.Sprintf( + "rm -rf %s && mkdir -p %s && mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test", + stageDir, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, prep); err != nil { + errCh <- fmt.Errorf("[%s] stage keyring: %w", ev.Name, err) return } - if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), "/root/keyring-fibre/keyring-test", true); err != nil { + stageDest := stageDir + "/keyring-test" + if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), stageDest, true); err != nil { errCh <- fmt.Errorf("[%s] push keyring: %w", ev.Name, err) return } + promote := fmt.Sprintf( + "mv %s /root/keyring-fibre/keyring-test && rmdir %s", + stageDest, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, promote); err != nil { + errCh <- fmt.Errorf("[%s] promote keyring: %w", ev.Name, err) + return + } log.Printf("[%s] ✓ pushed; daemon should start within ~10s", ev.Name) }(ev) diff --git a/tools/talis/fibre_experiment.go b/tools/talis/fibre_experiment.go new file mode 100644 index 0000000000..585992d5fe --- /dev/null +++ b/tools/talis/fibre_experiment.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + + "github.com/spf13/cobra" +) + +// fibreExperimentCmd is the one-command driver for the Fibre throughput +// experiment. It assumes the operator has already populated the +// experiment directory with a config.json + scripts/ + base config.toml + +// app.toml (i.e. ran `talis init` + `talis add` for validators / bridge +// / evnode / loadgen) and that build artefacts are at $rootDir/build. +// +// It then invokes — in order — the same subcommands the operator would +// run by hand: +// +// 1. up — provision instances +// 2. genesis -b — stage validator/bridge/evnode/loadgen payloads +// 3. deploy — ship payloads + start init scripts +// 4. setup-fibre — register host + deposit escrow on each validator +// 5. start-fibre — launch the fibre server on each validator +// 6. fibre-bootstrap-evnode — scp bridge JWT + fibre keyring onto evnode-* +// +// Each step is invoked via os/exec on the running binary. Any failure +// surfaces immediately; nothing is retried at this layer (the +// individual subcommands handle their own waits + retries). +// +// After step 6 returns, evnode-* daemons start within ~10 s and the +// load-gen's init script auto-launches evnode-txsim. The operator +// reads the final TXSIM: line from the load-gen. +func fibreExperimentCmd() *cobra.Command { + var ( + rootDir string + buildDir string + ) + + cmd := &cobra.Command{ + Use: "fibre-experiment", + Short: "End-to-end driver: up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode", + Long: `Run every step needed to bring up a Fibre throughput experiment from a +prepared root directory. Equivalent to invoking each subcommand in +sequence; included so the operator doesn't have to remember the order +or watch for inter-step races.`, + RunE: func(cmd *cobra.Command, args []string) error { + self, err := os.Executable() + if err != nil { + return fmt.Errorf("locate own binary: %w", err) + } + + steps := []struct { + name string + args []string + }{ + {"up", []string{"up", "-d", rootDir}}, + {"genesis", []string{"genesis", "-d", rootDir, "-b", buildDir}}, + {"deploy", []string{"deploy", "-d", rootDir}}, + {"setup-fibre", []string{"setup-fibre", "-d", rootDir}}, + {"start-fibre", []string{"start-fibre", "-d", rootDir}}, + {"fibre-bootstrap-evnode", []string{"fibre-bootstrap-evnode", "-d", rootDir}}, + } + + for _, s := range steps { + fmt.Printf("\n=== talis %s ===\n", s.name) + c := exec.Command(self, s.args...) + c.Stdout = os.Stdout + c.Stderr = os.Stderr + c.Env = os.Environ() + if err := c.Run(); err != nil { + return fmt.Errorf("step %q failed: %w", s.name, err) + } + } + + fmt.Println() + fmt.Println("=== fibre-experiment complete ===") + fmt.Println("evnode aggregator(s) start within ~10 s and load-gen init") + fmt.Println("scripts auto-launch evnode-txsim once evnode's /stats responds.") + fmt.Println("Final TXSIM: line lands at /root/txsim.log on each load-gen host.") + return nil + }, + } + + cmd.Flags().StringVarP(&rootDir, "directory", "d", ".", "experiment root directory") + cmd.Flags().StringVarP(&buildDir, "build-dir", "b", "./build", "directory containing the cross-compiled linux/amd64 binaries") + + return cmd +} diff --git a/tools/talis/fibre_setup.go b/tools/talis/fibre_setup.go index 28f7115b80..6e88fa1ef9 100644 --- a/tools/talis/fibre_setup.go +++ b/tools/talis/fibre_setup.go @@ -51,17 +51,67 @@ func setupFibreCmd() *cobra.Command { // Build script: register host + deposit escrow for validator + all fibre accounts var sb strings.Builder + // 0. Block until the chain has produced at least one block. + // Without this, the very next tx returns + // `celestia-app is not ready; please wait for first block` + // from the local node — the call appears to succeed at + // the CLI level (`--yes` returns the txhash before block + // inclusion), but the tx never lands. Polling explicitly + // avoids the `sleep 10` heuristic that used to be here. + sb.WriteString(fmt.Sprintf( + "echo 'waiting for chain to produce first block...'\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " H=$(celestia-appd status --chain-id %s 2>/dev/null | "+ + " grep -oE '\"latest_block_height\":\"[0-9]+\"' | "+ + " grep -oE '[0-9]+' | head -1)\n"+ + " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n"+ + " echo \"chain is at height $H\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo 'FATAL: chain never produced a block within 5m' >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " sleep 3\n"+ + "done\n", + cfg.ChainID, + )) + // 1. Register fibre host address. Plain `host:port` form — // x/valaddr requires it; the gRPC client dials it via the // passthrough resolver. Don't prefix `dns:///` here. + // + // Retry until `query valaddr providers` shows OUR host + // — `--yes` returns the txhash before inclusion, so a + // single one-shot call can succeed at the RPC layer + // while the chain rejects the tx (mempool full, signer + // not yet in validator set, …) and we'd never know. + // 5-minute deadline so a stuck chain doesn't loop + // forever. sb.WriteString(fmt.Sprintf( - "celestia-appd tx valaddr set-host %s:%d "+ + "HOST=%s:%d\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " celestia-appd tx valaddr set-host \"$HOST\" "+ "--from validator --keyring-backend=test --home .celestia-app "+ - "--chain-id %s --fees %s --yes\n", + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q \"\\\"host\\\": *\\\"$HOST\\\"\"; then\n"+ + " echo \"set-host confirmed: $HOST\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: set-host did not register $HOST after 5m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " echo 'set-host pending, retrying...'\n"+ + "done\n", val.PublicIP, fibrePort, cfg.ChainID, fees, + cfg.ChainID, )) - sb.WriteString("sleep 10\n") // 2. Deposit escrow for each fibre worker account for i := range fibreAccounts { @@ -76,6 +126,31 @@ func setupFibreCmd() *cobra.Command { )) } + // 3. Verify the FIRST fibre account's escrow actually + // landed before we let the tmux session exit. If even + // fibre-0 isn't funded, every Fibre upload from the + // runner fails with `escrow account not found for + // signer …` — same silent-failure mode as set-host. + // Other accounts (fibre-1..N) are funded best-effort: + // the runner only signs with fibre-0 by default. + sb.WriteString(fmt.Sprintf( + "FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+ + "DEADLINE=$(( $(date +%%s) + 180 ))\n"+ + "while true; do\n"+ + " if celestia-appd query fibre escrow \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q '\"amount\"'; then\n"+ + " echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: fibre-0 escrow not present after 3m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " sleep 5\n"+ + "done\n", + cfg.ChainID, + )) + script := sb.String() sem <- struct{}{} @@ -103,6 +178,38 @@ func setupFibreCmd() *cobra.Command { if err := waitForTmuxSessions(cfg.Validators, resolvedSSHKeyPath, SetupFibreSessionName, 10*time.Minute); err != nil { return fmt.Errorf("waiting for setup-fibre sessions: %w", err) } + + // CLI-side verification that every validator's host is on + // the chain's provider list before we hand off to start- + // fibre / fibre-bootstrap-evnode. The per-validator script + // above already self-verifies its own host, but we + // re-check here from a single vantage point so a + // concurrent set-host race across validators surfaces + // before downstream steps cache an empty registry. + if len(cfg.Validators) > 0 { + expected := len(cfg.Validators) + queryHost := cfg.Validators[0].PublicIP + queryCmd := fmt.Sprintf( + "celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null | grep -o '\"host\"' | wc -l", + cfg.ChainID, + ) + deadline := time.Now().Add(5 * time.Minute) + for { + out, err := sshExec("root", queryHost, resolvedSSHKeyPath, queryCmd) + if err == nil { + count := 0 + _, _ = fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count) + if count >= expected { + break + } + fmt.Printf(" valaddr providers: %d/%d registered, retrying...\n", count, expected) + } + if time.Now().After(deadline) { + return fmt.Errorf("only some validators registered as fibre providers within 5m — re-run setup-fibre") + } + time.Sleep(5 * time.Second) + } + } fmt.Println("Validator setup done!") // Deposit escrow for encoder accounts. diff --git a/tools/talis/genesis.go b/tools/talis/genesis.go index 9a4f481a81..e1b229efaa 100644 --- a/tools/talis/genesis.go +++ b/tools/talis/genesis.go @@ -558,9 +558,17 @@ mkdir -p "$EVNODE_HOME" # 2. /root/keyring-fibre/keyring-test cosmos-sdk file keyring with # a Fibre payment account # Without them the daemon would crash immediately on startup. -echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR..." +# +# We check for the *specific* fibre-0.info file (not just the +# keyring-test directory) because a non-atomic scp -r would create +# the directory before transferring its contents — so testing -d +# alone passes mid-scp and the daemon would launch with an empty +# keyring. fibre-bootstrap-evnode now also stages-and-mvs so this +# check should never see a partial state, but keep the file-level +# guard as a defence-in-depth. +echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR/keyring-test/fibre-0.info..." WAITED=0 -until [ -s "$BRIDGE_JWT_FILE" ] && [ -d "$FIBRE_KEYRING_DIR/keyring-test" ]; do +until [ -s "$BRIDGE_JWT_FILE" ] && [ -f "$FIBRE_KEYRING_DIR/keyring-test/fibre-0.info" ]; do sleep 5 WAITED=$((WAITED + 5)) if [ $((WAITED % 60)) -eq 0 ]; then diff --git a/tools/talis/main.go b/tools/talis/main.go index 016b1ee340..899fe8cf54 100644 --- a/tools/talis/main.go +++ b/tools/talis/main.go @@ -37,6 +37,7 @@ func main() { fibreTxsimCmd(), fibreThroughputCmd(), fibreBootstrapEvnodeCmd(), + fibreExperimentCmd(), resourceMonitorCmd(), downloadResourcesCmd(), syncNodeCmd(), From 2d53cd0d0a1499a65116886eb16b0e1acb001bd5 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:03 +0200 Subject: [PATCH 2/7] fix(tools/talis): finalize fibre setup race fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three follow-up bugs surfaced from the PR #3303 follow-up verification run on a 3-validator AWS Fibre cluster: - aws.go: CreateAWSInstances exited 0 even when individual instance launches failed, so `talis up` lied about success and downstream steps proceeded against a partial cluster. Returns a joined error now so failure cascades stop early. - download.go: sshExec used cmd.CombinedOutput, mixing SSH warnings (the "Warning: Permanently added '...'..." chatter on stderr) into bytes the caller hands to fmt.Sscanf("%d"). The CLI-side providers cross-check parsed those warnings as 0 and looped until its 5-min deadline even though a direct SSH query showed all 3 providers registered. Switch to cmd.Output() (stdout only) and add `-q -o LogLevel=ERROR` to silence the chatter for any caller that does combine streams. - fibre_setup.go: the per-validator escrow verification used `celestia-appd query fibre escrow` which doesn't exist — the actual subcommand is `escrow-account`. The query errored on every retry, the grep for "amount" never matched, and the script wedged on the 3-min deadline reporting `FATAL: fibre-0 escrow not present`. Switch to `escrow-account` and key on `"found":true` (the explicit existence flag in the response). Also wrap the fibre-0 deposit-to-escrow itself in a retry loop matching set-host — same `--yes`-returns-before-inclusion silent-failure mode bit it. fibre-1..N stay best-effort. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/talis/aws.go | 14 +++++- tools/talis/download.go | 14 +++++- tools/talis/fibre_setup.go | 95 +++++++++++++++++++++----------------- 3 files changed, 77 insertions(+), 46 deletions(-) diff --git a/tools/talis/aws.go b/tools/talis/aws.go index 249dc89bbd..afcef6ebaa 100644 --- a/tools/talis/aws.go +++ b/tools/talis/aws.go @@ -350,16 +350,28 @@ func CreateAWSInstances(ctx context.Context, insts []Instance, sshKey, keyName s close(results) }() - var created []Instance + var ( + created []Instance + failures []string + ) for res := range results { if res.err != nil { fmt.Printf("❌ %s failed after %v %v\n", res.inst.Name, res.timeRequired, res.err) + failures = append(failures, fmt.Sprintf("%s: %v", res.inst.Name, res.err)) } else { created = append(created, res.inst) fmt.Printf("✅ %s is up (public=%s) in %v\n", res.inst.Name, res.inst.PublicIP, res.timeRequired) } fmt.Printf("---- Progress: %d/%d\n", len(created), total) } + if len(failures) > 0 { + // Surface partial-failure as an error so `talis up` exits + // non-zero; without this, downstream genesis runs against a + // half-provisioned config and fails much later with confusing + // "X has no public IP yet" messages. + return created, fmt.Errorf("%d/%d instance(s) failed to launch: %s", + len(failures), total, strings.Join(failures, "; ")) + } return created, nil } diff --git a/tools/talis/download.go b/tools/talis/download.go index 99284d0326..ddff69e102 100644 --- a/tools/talis/download.go +++ b/tools/talis/download.go @@ -193,16 +193,26 @@ func compressAndDownload(table, localPath, user, host, sshKeyPath string) error return nil } -// sshExec runs a command on a remote host via SSH and returns the combined output. +// sshExec runs a command on a remote host via SSH and returns stdout only. +// +// We intentionally do NOT use CombinedOutput here. ssh prints connection +// chatter ("Warning: Permanently added '...' to the list of known hosts.") +// on stderr, and a previous `CombinedOutput` revision caused +// `fmt.Sscanf(out, "%d")` parses to silently return 0 because the leading +// stderr line had no digits. Capturing only stdout keeps numeric output +// parseable; -q + LogLevel=ERROR further suppresses the chatter for any +// caller that does combine streams. func sshExec(user, host, sshKeyPath, command string) ([]byte, error) { cmd := exec.Command("ssh", + "-q", + "-o", "LogLevel=ERROR", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", sshKeyPath, fmt.Sprintf("%s@%s", user, host), command, ) - return cmd.CombinedOutput() + return cmd.Output() } func sftpDownload(remotePath, localPath, user, host, sshKeyPath string) error { diff --git a/tools/talis/fibre_setup.go b/tools/talis/fibre_setup.go index 6e88fa1ef9..382e780c50 100644 --- a/tools/talis/fibre_setup.go +++ b/tools/talis/fibre_setup.go @@ -58,25 +58,24 @@ func setupFibreCmd() *cobra.Command { // the CLI level (`--yes` returns the txhash before block // inclusion), but the tx never lands. Polling explicitly // avoids the `sleep 10` heuristic that used to be here. - sb.WriteString(fmt.Sprintf( - "echo 'waiting for chain to produce first block...'\n"+ - "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ - "while true; do\n"+ - " H=$(celestia-appd status --chain-id %s 2>/dev/null | "+ - " grep -oE '\"latest_block_height\":\"[0-9]+\"' | "+ - " grep -oE '[0-9]+' | head -1)\n"+ - " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n"+ - " echo \"chain is at height $H\"\n"+ - " break\n"+ - " fi\n"+ - " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ - " echo 'FATAL: chain never produced a block within 5m' >&2\n"+ - " exit 1\n"+ - " fi\n"+ - " sleep 3\n"+ + sb.WriteString( + "echo 'waiting for chain to produce first block...'\n" + + "DEADLINE=$(( $(date +%s) + 300 ))\n" + + "while true; do\n" + + " H=$(celestia-appd status 2>/dev/null | " + + " grep -oE '\"latest_block_height\":\"[0-9]+\"' | " + + " grep -oE '[0-9]+' | head -1)\n" + + " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n" + + " echo \"chain is at height $H\"\n" + + " break\n" + + " fi\n" + + " if [ $(date +%s) -gt $DEADLINE ]; then\n" + + " echo 'FATAL: chain never produced a block within 5m' >&2\n" + + " exit 1\n" + + " fi\n" + + " sleep 3\n" + "done\n", - cfg.ChainID, - )) + ) // 1. Register fibre host address. Plain `host:port` form — // x/valaddr requires it; the gRPC client dials it via the @@ -113,44 +112,54 @@ func setupFibreCmd() *cobra.Command { cfg.ChainID, )) - // 2. Deposit escrow for each fibre worker account - for i := range fibreAccounts { - keyName := fmt.Sprintf("fibre-%d", i) - sb.WriteString(fmt.Sprintf( - "celestia-appd tx fibre deposit-to-escrow %s "+ - "--from %s --keyring-backend=test --home .celestia-app "+ - "--chain-id %s --fees %s --yes\n", - escrowAmount, - keyName, - cfg.ChainID, fees, - )) - } - - // 3. Verify the FIRST fibre account's escrow actually - // landed before we let the tmux session exit. If even - // fibre-0 isn't funded, every Fibre upload from the - // runner fails with `escrow account not found for - // signer …` — same silent-failure mode as set-host. - // Other accounts (fibre-1..N) are funded best-effort: - // the runner only signs with fibre-0 by default. + // 2. Deposit escrow for fibre-0 inside a retry loop. + // Same silent-failure mode as set-host: `--yes` returns + // the txhash before inclusion, so a single bounced tx + // (mempool full, signer not yet propagated, …) leaves + // the runner failing every upload with + // `escrow account not found for signer …`. fibre-0 is + // the one the runner actually signs with by default, + // so it's the only one we hard-block on. sb.WriteString(fmt.Sprintf( "FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+ - "DEADLINE=$(( $(date +%%s) + 180 ))\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ "while true; do\n"+ - " if celestia-appd query fibre escrow \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ - " | grep -q '\"amount\"'; then\n"+ + " celestia-appd tx fibre deposit-to-escrow %s "+ + "--from fibre-0 --keyring-backend=test --home .celestia-app "+ + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query fibre escrow-account \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q '\"found\":true'; then\n"+ " echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+ " break\n"+ " fi\n"+ " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ - " echo \"FATAL: fibre-0 escrow not present after 3m\" >&2\n"+ + " echo \"FATAL: fibre-0 escrow did not land after 5m\" >&2\n"+ " exit 1\n"+ " fi\n"+ - " sleep 5\n"+ + " echo 'fibre-0 escrow pending, retrying...'\n"+ "done\n", + escrowAmount, + cfg.ChainID, fees, cfg.ChainID, )) + // 3. Best-effort fund fibre-1..N. The runner only signs + // with fibre-0 by default, so a missing one of these + // doesn't block uploads — they exist as headroom for + // future signer rotation. + for i := 1; i < fibreAccounts; i++ { + keyName := fmt.Sprintf("fibre-%d", i) + sb.WriteString(fmt.Sprintf( + "celestia-appd tx fibre deposit-to-escrow %s "+ + "--from %s --keyring-backend=test --home .celestia-app "+ + "--chain-id %s --fees %s --yes\n", + escrowAmount, + keyName, + cfg.ChainID, fees, + )) + } + script := sb.String() sem <- struct{}{} From 6a49a248121b77abfe148ccbed0746de0336c649 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:23 +0200 Subject: [PATCH 3/7] fix(fibre): bound submitter memory to avoid 64 GiB OOM under load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reproduced an OOM at 63.8 GiB on c6in.8xlarge (matching the prior 63.9 GiB note in evnode-fibre's main.go that warned against raising UploadMemoryBudget). On a 3-validator cluster running evnode-txsim at 80 MiB/s, evnode-fibre's submitter queue grew to 50 pending data + 26 pending headers, each blob ≈32 MiB raw. With 3 fan-out targets and per-attempt retry buffers in flight, total in-memory upload state crossed 64 GiB and the kernel killed evnode 30 s into the load test. The loadgen-side TXSIM number collapsed from the prior journal's 47.65 MB/s to 0.66 MB/s as evnode died. Two changes, one root cause (a Fibre upload stall snowballs): - pkg/config: ApplyFiberDefaults sets MaxPendingHeadersAndData to 10 (was 50). The cap is what bounds in-flight blob copies + retry buffers; 50 was sized when evnode hadn't yet seen the pathological case where Fibre uploads time out, retries accumulate, and GC pressure pushes uploads even slower in a positive-feedback loop. 10 keeps the in-flight footprint bounded while still letting healthy uploads pipeline. - evnode-fibre: override SubmitConfig.Fibre to set RPCTimeout to 30 s (upstream default 15 s). Verified live: with the pending cap at 10, a 17-blob 115 MiB upload completes in ~1.5 s — well below the 15 s default. The 30 s margin only matters for transient slow paths during signature collection across 3 FSPs; the cap fix is the load-bearing change. Drop the stale main.go comment claiming we don't override SubmitConfig.Fibre. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/config/config.go | 7 ++++++- .../cmd/evnode-fibre/main.go | 20 +++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 7cbb780a21..43719c1047 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() { } c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second} - c.Node.MaxPendingHeadersAndData = 50 + // Tighter pending cap (was 50). At 50, a Fibre upload stall lets the + // submitter accumulate 50 × ~32 MiB blob copies + their per-validator + // retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and + // OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint + // bounded while still letting healthy uploads pipeline. + c.Node.MaxPendingHeadersAndData = 10 } // GetNamespace returns the namespace for header submissions. diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..8e980c3983 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -51,6 +51,7 @@ import ( "github.com/celestiaorg/celestia-app/v9/app" "github.com/celestiaorg/celestia-app/v9/app/encoding" + appfibre "github.com/celestiaorg/celestia-app/v9/fibre" "github.com/celestiaorg/celestia-node/api/client" cnp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" @@ -210,14 +211,16 @@ func run(cli cliFlags) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Construct the celestia-node-fiber adapter. We don't override - // SubmitConfig.Fibre — the Fibre client defaults (UploadMemoryBudget - // 512 MiB, RPCTimeout 15 s) are sized for the FSP-side concurrency - // the validators can actually absorb. We tried bumping the budget - // to 4 GiB to allow more in-flight blobs; with 16 upload workers - // the FSPs couldn't keep up and the box OOM'd at 63.9 GB. Leaving - // the defaults in place means the upload pipeline self-bounds at - // roughly what the FSPs can sustain. + // Construct the celestia-node-fiber adapter. The Fibre client + // defaults (UploadMemoryBudget 512 MiB, RPCTimeout 15 s) are sized + // for FSP-side concurrency. Bumping the budget alone caused 64 GiB + // OOMs (4 GiB budget × 16 workers), so we leave that conservative + // AND raise RPCTimeout to 30 s so a slow-but-healthy validator + // signature collection isn't cut short under load — under busy + // conditions a 32 MiB row upload + sig aggregation can exceed the + // 15 s default. + fibreCfg := appfibre.DefaultClientConfig() + fibreCfg.RPCTimeout = 30 * time.Second adapter, err := cnfiber.New(ctx, cnfiber.Config{ Client: client.Config{ ReadConfig: client.ReadConfig{ @@ -231,6 +234,7 @@ func run(cli cliFlags) error { CoreGRPCConfig: client.CoreGRPCConfig{ Addr: cli.coreGRPCAddr, }, + Fibre: &fibreCfg, }, }, }, kr) From 5fa456e661fcbf97612009bd37431a1ec5d2f25e Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:37 +0200 Subject: [PATCH 4/7] feat(fibre): log per-Submit upload duration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Fibre Submit path was opaque: failures showed up as DeadlineExceeded with no signal of how long the upload actually took, and successes only logged at debug level inside the upstream library. During load-test debugging this turned into a guessing game — was the cluster slow, the deadline too tight, or something stuck mid-RPC? Add a single info-level (warn-on-failure) log line in fiberDAClient.Submit covering the Upload call: duration, flat blob bytes, blob count. Cheap (one time.Since) and gives the operator concrete numbers — e.g. "17 blobs / 115 MiB / 1.5 s" — to reason about whether RPCTimeout, pending cap, or batch sizing is the right knob to turn next. Co-Authored-By: Claude Opus 4.7 (1M context) --- block/internal/da/fiber_client.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 3a4f9eb754..31df5603a2 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -89,7 +89,23 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na flat := flattenBlobs(data) nsID := namespace[len(namespace)-10:] + uploadStart := time.Now() result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(data)). + Err(err). + Msg("fiber upload duration (failed)") + } else { + c.logger.Info(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(data)). + Msg("fiber upload duration (ok)") + } if err != nil { code := datypes.StatusError switch { From cae5cd26fd7d15bae01f4ef85dc2cc565e95ab4a Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:43:46 +0200 Subject: [PATCH 5/7] fix(fibre): split DA Submit batches at Fibre's 128 MiB upload cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under sustained txsim load (~50 MiB/s) the DA submitter batched 10 block_data items into one Upload(), producing a flat payload of 144 MiB. Fibre's per-upload cap is hard at ~128 MiB ("blob size exceeds maximum allowed size: data size 144366912 exceeds maximum 134217723") and rejected every batched upload. With MaxPendingHeadersAndData=10 that took down 170 consecutive submissions before the node halted itself with "Data exceeds DA blob size limit". Wrap the Upload call in a chunker that groups input blobs into ≤120 MiB chunks (8 MiB headroom under Fibre's cap for the per-blob length-prefix overhead added by flattenBlobs) and uploads each chunk separately. Aggregates submitted counts and BlobIDs across chunks; on first chunk failure, returns the error with the partially-submitted count so the submitter's retry/backoff logic sees a coherent state instead of all-or-nothing. Single oversized blobs (already validated against DefaultMaxBlobSize earlier in Submit) still land alone and fail server-side, but at least don't drag healthy peers into the same rejected batch. Co-Authored-By: Claude Opus 4.7 (1M context) --- block/internal/da/fiber_client.go | 117 +++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 35 deletions(-) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 31df5603a2..70902673e9 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -87,52 +87,65 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - flat := flattenBlobs(data) + // Fibre's per-upload cap is ~128 MiB (hard server-side reject: + // "data size %d exceeds maximum 134217723"). flattenBlobs adds + // 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk + // to leave overhead room and avoid borderline rejects. + chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget) nsID := namespace[len(namespace)-10:] - uploadStart := time.Now() - result, err := c.fiber.Upload(context.Background(), nsID, flat) - uploadDuration := time.Since(uploadStart) - if err != nil { - c.logger.Warn(). - Dur("duration", uploadDuration). - Int("flat_size", len(flat)). - Int("blob_count", len(data)). - Err(err). - Msg("fiber upload duration (failed)") - } else { + + ids := make([][]byte, 0, len(chunks)) + var submitted int + for chunkIdx, chunk := range chunks { + flat := flattenBlobs(chunk) + uploadStart := time.Now() + result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Err(err). + Msg("fiber upload duration (failed)") + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err), + SubmittedCount: uint64(submitted), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } c.logger.Info(). Dur("duration", uploadDuration). Int("flat_size", len(flat)). - Int("blob_count", len(data)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). Msg("fiber upload duration (ok)") - } - if err != nil { - code := datypes.StatusError - switch { - case errors.Is(err, context.Canceled): - code = datypes.StatusContextCanceled - case errors.Is(err, context.DeadlineExceeded): - code = datypes.StatusContextDeadline - } - c.logger.Error().Err(err).Msg("fiber upload failed") - return datypes.ResultSubmit{ - BaseResult: datypes.BaseResult{ - Code: code, - Message: fmt.Sprintf("fiber upload failed for blob: %v", err), - SubmittedCount: uint64(len(data) - 1), - BlobSize: blobSize, - Timestamp: time.Now(), - }, - } + ids = append(ids, result.BlobID) + submitted += len(chunk) } - c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") + c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, - IDs: [][]byte{result.BlobID}, - SubmittedCount: uint64(len(data)), + IDs: ids, + SubmittedCount: uint64(submitted), Height: 0, /* TODO */ BlobSize: blobSize, Timestamp: time.Now(), @@ -140,6 +153,40 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } +// fibreUploadChunkBudget is the target maximum flattened size of a single +// Fibre Upload call. Fibre rejects payloads above ~128 MiB +// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for +// flattenBlobs's per-blob length prefixes and for any future overhead. +const fibreUploadChunkBudget = 120 * 1024 * 1024 + +// chunkBlobsForFibre groups data into chunks whose flattened size stays +// below budget. Per-blob length-prefix overhead matches flattenBlobs. +// A single oversized blob (already validated against DefaultMaxBlobSize +// above) lands in its own chunk; the upload still fails server-side but +// at least we don't drag healthy peers down with it. +func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte { + if len(data) == 0 { + return nil + } + chunks := make([][][]byte, 0, 1) + cur := make([][]byte, 0, len(data)) + curSize := 4 // flattenBlobs's count prefix + for _, b := range data { + entry := 4 + len(b) + if len(cur) > 0 && curSize+entry > budget { + chunks = append(chunks, cur) + cur = make([][]byte, 0, len(data)) + curSize = 4 + } + cur = append(cur, b) + curSize += entry + } + if len(cur) > 0 { + chunks = append(chunks, cur) + } + return chunks +} + func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, true) } From 5c1eab988aa6d5a8c342d374ccacfc87b8db9b1a Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:43:58 +0200 Subject: [PATCH 6/7] fix(evnode-fibre): cap per-block data at 100 MiB to fit a Fibre upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to the submitter chunking fix. The submitter can split a multi-blob batch into ≤120 MiB Fibre uploads, but a *single* block_data item that exceeds 128 MiB still ends up alone in its own chunk and fails server-side ("blob size exceeds maximum allowed size"). Lower the per-block cap to 100 MiB so under high-throughput txsim a single block can't grow past Fibre's hard limit, and update the comment to explain the relationship between this cap and Fibre's ~128 MiB upload reject threshold. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/celestia-node-fiber/cmd/evnode-fibre/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 8e980c3983..458905bc88 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -306,7 +306,12 @@ func run(cli cliFlags) error { // Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s, // DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap. cfg.ApplyFiberDefaults() - block.SetMaxBlobSize(120 * 1024 * 1024) + // 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we + // hit `data size exceeds maximum 134217723` at 128 MiB - 5 B). + // Set the per-block data cap below that so each block_data item + // fits in a single Fibre upload after the submitter splits a + // multi-blob batch into ≤120 MiB chunks. + block.SetMaxBlobSize(100 * 1024 * 1024) cfg.P2P.ListenAddress = cli.p2pListen cfg.P2P.DisableConnectionGater = true cfg.RPC.Address = cli.rpcListen From 046c0226332eecb44011ae943133b042d17745a1 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:53:38 +0200 Subject: [PATCH 7/7] fix(evnode-fibre): enforce maxBytes in inMemExecutor.FilterTxs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stub executor used by the runner returned FilterOK for every transaction unconditionally, ignoring the maxBytes budget plumbed through SoloSequencer.GetNextBatch. Under sustained txsim load (~50 MiB/s, 8 concurrent senders) the mempool would accumulate ~50K txs while a 100 MiB upload was in flight; on the next batch the sequencer drained ALL of them into one block (~369 MiB raw), the submitter saw a single item exceeding the per-blob cap, and halted the node with `single item exceeds DA blob size limit`. Walk the input txs in arrival order, accumulate sizes against maxBytes, and return FilterPostpone past the budget so the sequencer puts the overflow back on its queue. Verified live: blocks now cap at ~10K txs / ~100 MiB and evnode sustains 58.77 MB/s DA upload throughput through a 5-min txsim run with zero crashes (was 0 → crash within 30 s before this fix). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cmd/evnode-fibre/main.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 458905bc88..0173609eec 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -551,10 +551,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu return coreexecution.ExecutionInfo{MaxGas: 0}, nil } -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { +// FilterTxs admits txs in arrival order until the maxBytes budget is +// reached, then postpones the rest back to the sequencer queue so they +// land in a future batch. Skipping this enforcement (a previous version +// returned FilterOK unconditionally) lets a single block sweep up the +// entire mempool — under sustained txsim load that produced 369 MiB +// blocks that exceeded Fibre's per-upload cap and crashed the node +// with `single item exceeds DA blob size limit`. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { + var used uint64 + for i, tx := range txs { + size := uint64(len(tx)) + if maxBytes > 0 && used+size > maxBytes { + st[i] = coreexecution.FilterPostpone + continue + } st[i] = coreexecution.FilterOK + used += size } return st, nil }