diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 3a4f9eb754..70902673e9 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -87,36 +87,65 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - flat := flattenBlobs(data) + // Fibre's per-upload cap is ~128 MiB (hard server-side reject: + // "data size %d exceeds maximum 134217723"). flattenBlobs adds + // 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk + // to leave overhead room and avoid borderline rejects. + chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget) nsID := namespace[len(namespace)-10:] - result, err := c.fiber.Upload(context.Background(), nsID, flat) - if err != nil { - code := datypes.StatusError - switch { - case errors.Is(err, context.Canceled): - code = datypes.StatusContextCanceled - case errors.Is(err, context.DeadlineExceeded): - code = datypes.StatusContextDeadline - } - c.logger.Error().Err(err).Msg("fiber upload failed") - return datypes.ResultSubmit{ - BaseResult: datypes.BaseResult{ - Code: code, - Message: fmt.Sprintf("fiber upload failed for blob: %v", err), - SubmittedCount: uint64(len(data) - 1), - BlobSize: blobSize, - Timestamp: time.Now(), - }, + + ids := make([][]byte, 0, len(chunks)) + var submitted int + for chunkIdx, chunk := range chunks { + flat := flattenBlobs(chunk) + uploadStart := time.Now() + result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Err(err). + Msg("fiber upload duration (failed)") + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err), + SubmittedCount: uint64(submitted), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } } + c.logger.Info(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Msg("fiber upload duration (ok)") + ids = append(ids, result.BlobID) + submitted += len(chunk) } - c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") + c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, - IDs: [][]byte{result.BlobID}, - SubmittedCount: uint64(len(data)), + IDs: ids, + SubmittedCount: uint64(submitted), Height: 0, /* TODO */ BlobSize: blobSize, Timestamp: time.Now(), @@ -124,6 +153,40 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } +// fibreUploadChunkBudget is the target maximum flattened size of a single +// Fibre Upload call. Fibre rejects payloads above ~128 MiB +// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for +// flattenBlobs's per-blob length prefixes and for any future overhead. +const fibreUploadChunkBudget = 120 * 1024 * 1024 + +// chunkBlobsForFibre groups data into chunks whose flattened size stays +// below budget. Per-blob length-prefix overhead matches flattenBlobs. +// A single oversized blob (already validated against DefaultMaxBlobSize +// above) lands in its own chunk; the upload still fails server-side but +// at least we don't drag healthy peers down with it. +func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte { + if len(data) == 0 { + return nil + } + chunks := make([][][]byte, 0, 1) + cur := make([][]byte, 0, len(data)) + curSize := 4 // flattenBlobs's count prefix + for _, b := range data { + entry := 4 + len(b) + if len(cur) > 0 && curSize+entry > budget { + chunks = append(chunks, cur) + cur = make([][]byte, 0, len(data)) + curSize = 4 + } + cur = append(cur, b) + curSize += entry + } + if len(cur) > 0 { + chunks = append(chunks, cur) + } + return chunks +} + func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, true) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 7cbb780a21..43719c1047 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() { } c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second} - c.Node.MaxPendingHeadersAndData = 50 + // Tighter pending cap (was 50). At 50, a Fibre upload stall lets the + // submitter accumulate 50 × ~32 MiB blob copies + their per-validator + // retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and + // OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint + // bounded while still letting healthy uploads pipeline. + c.Node.MaxPendingHeadersAndData = 10 } // GetNamespace returns the namespace for header submissions. diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..0173609eec 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -51,6 +51,7 @@ import ( "github.com/celestiaorg/celestia-app/v9/app" "github.com/celestiaorg/celestia-app/v9/app/encoding" + appfibre "github.com/celestiaorg/celestia-app/v9/fibre" "github.com/celestiaorg/celestia-node/api/client" cnp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" @@ -210,14 +211,16 @@ func run(cli cliFlags) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Construct the celestia-node-fiber adapter. We don't override - // SubmitConfig.Fibre — the Fibre client defaults (UploadMemoryBudget - // 512 MiB, RPCTimeout 15 s) are sized for the FSP-side concurrency - // the validators can actually absorb. We tried bumping the budget - // to 4 GiB to allow more in-flight blobs; with 16 upload workers - // the FSPs couldn't keep up and the box OOM'd at 63.9 GB. Leaving - // the defaults in place means the upload pipeline self-bounds at - // roughly what the FSPs can sustain. + // Construct the celestia-node-fiber adapter. The Fibre client + // defaults (UploadMemoryBudget 512 MiB, RPCTimeout 15 s) are sized + // for FSP-side concurrency. Bumping the budget alone caused 64 GiB + // OOMs (4 GiB budget × 16 workers), so we leave that conservative + // AND raise RPCTimeout to 30 s so a slow-but-healthy validator + // signature collection isn't cut short under load — under busy + // conditions a 32 MiB row upload + sig aggregation can exceed the + // 15 s default. + fibreCfg := appfibre.DefaultClientConfig() + fibreCfg.RPCTimeout = 30 * time.Second adapter, err := cnfiber.New(ctx, cnfiber.Config{ Client: client.Config{ ReadConfig: client.ReadConfig{ @@ -231,6 +234,7 @@ func run(cli cliFlags) error { CoreGRPCConfig: client.CoreGRPCConfig{ Addr: cli.coreGRPCAddr, }, + Fibre: &fibreCfg, }, }, }, kr) @@ -302,7 +306,12 @@ func run(cli cliFlags) error { // Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s, // DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap. cfg.ApplyFiberDefaults() - block.SetMaxBlobSize(120 * 1024 * 1024) + // 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we + // hit `data size exceeds maximum 134217723` at 128 MiB - 5 B). + // Set the per-block data cap below that so each block_data item + // fits in a single Fibre upload after the submitter splits a + // multi-blob batch into ≤120 MiB chunks. + block.SetMaxBlobSize(100 * 1024 * 1024) cfg.P2P.ListenAddress = cli.p2pListen cfg.P2P.DisableConnectionGater = true cfg.RPC.Address = cli.rpcListen @@ -542,10 +551,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu return coreexecution.ExecutionInfo{MaxGas: 0}, nil } -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { +// FilterTxs admits txs in arrival order until the maxBytes budget is +// reached, then postpones the rest back to the sequencer queue so they +// land in a future batch. Skipping this enforcement (a previous version +// returned FilterOK unconditionally) lets a single block sweep up the +// entire mempool — under sustained txsim load that produced 369 MiB +// blocks that exceeded Fibre's per-upload cap and crashed the node +// with `single item exceeds DA blob size limit`. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { + var used uint64 + for i, tx := range txs { + size := uint64(len(tx)) + if maxBytes > 0 && used+size > maxBytes { + st[i] = coreexecution.FilterPostpone + continue + } st[i] = coreexecution.FilterOK + used += size } return st, nil } diff --git a/tools/talis/aws.go b/tools/talis/aws.go index 249dc89bbd..afcef6ebaa 100644 --- a/tools/talis/aws.go +++ b/tools/talis/aws.go @@ -350,16 +350,28 @@ func CreateAWSInstances(ctx context.Context, insts []Instance, sshKey, keyName s close(results) }() - var created []Instance + var ( + created []Instance + failures []string + ) for res := range results { if res.err != nil { fmt.Printf("❌ %s failed after %v %v\n", res.inst.Name, res.timeRequired, res.err) + failures = append(failures, fmt.Sprintf("%s: %v", res.inst.Name, res.err)) } else { created = append(created, res.inst) fmt.Printf("✅ %s is up (public=%s) in %v\n", res.inst.Name, res.inst.PublicIP, res.timeRequired) } fmt.Printf("---- Progress: %d/%d\n", len(created), total) } + if len(failures) > 0 { + // Surface partial-failure as an error so `talis up` exits + // non-zero; without this, downstream genesis runs against a + // half-provisioned config and fails much later with confusing + // "X has no public IP yet" messages. + return created, fmt.Errorf("%d/%d instance(s) failed to launch: %s", + len(failures), total, strings.Join(failures, "; ")) + } return created, nil } diff --git a/tools/talis/download.go b/tools/talis/download.go index 99284d0326..ddff69e102 100644 --- a/tools/talis/download.go +++ b/tools/talis/download.go @@ -193,16 +193,26 @@ func compressAndDownload(table, localPath, user, host, sshKeyPath string) error return nil } -// sshExec runs a command on a remote host via SSH and returns the combined output. +// sshExec runs a command on a remote host via SSH and returns stdout only. +// +// We intentionally do NOT use CombinedOutput here. ssh prints connection +// chatter ("Warning: Permanently added '...' to the list of known hosts.") +// on stderr, and a previous `CombinedOutput` revision caused +// `fmt.Sscanf(out, "%d")` parses to silently return 0 because the leading +// stderr line had no digits. Capturing only stdout keeps numeric output +// parseable; -q + LogLevel=ERROR further suppresses the chatter for any +// caller that does combine streams. func sshExec(user, host, sshKeyPath, command string) ([]byte, error) { cmd := exec.Command("ssh", + "-q", + "-o", "LogLevel=ERROR", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", sshKeyPath, fmt.Sprintf("%s@%s", user, host), command, ) - return cmd.CombinedOutput() + return cmd.Output() } func sftpDownload(remotePath, localPath, user, host, sshKeyPath string) error { diff --git a/tools/talis/fibre_bootstrap_evnode.go b/tools/talis/fibre_bootstrap_evnode.go index 5df12d41ae..c8d76822b3 100644 --- a/tools/talis/fibre_bootstrap_evnode.go +++ b/tools/talis/fibre_bootstrap_evnode.go @@ -125,21 +125,45 @@ to fetch, then SCPs to each evnode-*.`, defer wg.Done() log.Printf("[%s] pushing JWT + keyring", ev.Name) + // JWT is small + atomic on the receive side because + // it's a single file, so we push it directly. if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, localJWT, "/root/bridge-jwt.txt", false); err != nil { errCh <- fmt.Errorf("[%s] push JWT: %w", ev.Name, err) return } - // mkdir the parent so scp lands at the exact path - // evnode_init.sh waits for. - if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, "mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test"); err != nil { - errCh <- fmt.Errorf("[%s] mkdir keyring-fibre: %w", ev.Name, err) + // Keyring push is staged through a tmp dir and + // promoted via mv. Without staging, evnode_init.sh's + // poll loop (which tests `[ -d keyring-test ]`) + // passes the moment scp -r mkdir's the directory, + // long before fibre-0.info is on disk. evnode then + // launches mid-scp and dies with `keyring entry + // "fibre-0" not found`. mv is atomic on the same + // filesystem so the init script either sees nothing + // (keep waiting) or the fully-populated dir (start + // the daemon cleanly). + stageDir := "/root/.keyring-fibre.staging" + prep := fmt.Sprintf( + "rm -rf %s && mkdir -p %s && mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test", + stageDir, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, prep); err != nil { + errCh <- fmt.Errorf("[%s] stage keyring: %w", ev.Name, err) return } - if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), "/root/keyring-fibre/keyring-test", true); err != nil { + stageDest := stageDir + "/keyring-test" + if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), stageDest, true); err != nil { errCh <- fmt.Errorf("[%s] push keyring: %w", ev.Name, err) return } + promote := fmt.Sprintf( + "mv %s /root/keyring-fibre/keyring-test && rmdir %s", + stageDest, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, promote); err != nil { + errCh <- fmt.Errorf("[%s] promote keyring: %w", ev.Name, err) + return + } log.Printf("[%s] ✓ pushed; daemon should start within ~10s", ev.Name) }(ev) diff --git a/tools/talis/fibre_experiment.go b/tools/talis/fibre_experiment.go new file mode 100644 index 0000000000..585992d5fe --- /dev/null +++ b/tools/talis/fibre_experiment.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + + "github.com/spf13/cobra" +) + +// fibreExperimentCmd is the one-command driver for the Fibre throughput +// experiment. It assumes the operator has already populated the +// experiment directory with a config.json + scripts/ + base config.toml + +// app.toml (i.e. ran `talis init` + `talis add` for validators / bridge +// / evnode / loadgen) and that build artefacts are at $rootDir/build. +// +// It then invokes — in order — the same subcommands the operator would +// run by hand: +// +// 1. up — provision instances +// 2. genesis -b — stage validator/bridge/evnode/loadgen payloads +// 3. deploy — ship payloads + start init scripts +// 4. setup-fibre — register host + deposit escrow on each validator +// 5. start-fibre — launch the fibre server on each validator +// 6. fibre-bootstrap-evnode — scp bridge JWT + fibre keyring onto evnode-* +// +// Each step is invoked via os/exec on the running binary. Any failure +// surfaces immediately; nothing is retried at this layer (the +// individual subcommands handle their own waits + retries). +// +// After step 6 returns, evnode-* daemons start within ~10 s and the +// load-gen's init script auto-launches evnode-txsim. The operator +// reads the final TXSIM: line from the load-gen. +func fibreExperimentCmd() *cobra.Command { + var ( + rootDir string + buildDir string + ) + + cmd := &cobra.Command{ + Use: "fibre-experiment", + Short: "End-to-end driver: up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode", + Long: `Run every step needed to bring up a Fibre throughput experiment from a +prepared root directory. Equivalent to invoking each subcommand in +sequence; included so the operator doesn't have to remember the order +or watch for inter-step races.`, + RunE: func(cmd *cobra.Command, args []string) error { + self, err := os.Executable() + if err != nil { + return fmt.Errorf("locate own binary: %w", err) + } + + steps := []struct { + name string + args []string + }{ + {"up", []string{"up", "-d", rootDir}}, + {"genesis", []string{"genesis", "-d", rootDir, "-b", buildDir}}, + {"deploy", []string{"deploy", "-d", rootDir}}, + {"setup-fibre", []string{"setup-fibre", "-d", rootDir}}, + {"start-fibre", []string{"start-fibre", "-d", rootDir}}, + {"fibre-bootstrap-evnode", []string{"fibre-bootstrap-evnode", "-d", rootDir}}, + } + + for _, s := range steps { + fmt.Printf("\n=== talis %s ===\n", s.name) + c := exec.Command(self, s.args...) + c.Stdout = os.Stdout + c.Stderr = os.Stderr + c.Env = os.Environ() + if err := c.Run(); err != nil { + return fmt.Errorf("step %q failed: %w", s.name, err) + } + } + + fmt.Println() + fmt.Println("=== fibre-experiment complete ===") + fmt.Println("evnode aggregator(s) start within ~10 s and load-gen init") + fmt.Println("scripts auto-launch evnode-txsim once evnode's /stats responds.") + fmt.Println("Final TXSIM: line lands at /root/txsim.log on each load-gen host.") + return nil + }, + } + + cmd.Flags().StringVarP(&rootDir, "directory", "d", ".", "experiment root directory") + cmd.Flags().StringVarP(&buildDir, "build-dir", "b", "./build", "directory containing the cross-compiled linux/amd64 binaries") + + return cmd +} diff --git a/tools/talis/fibre_setup.go b/tools/talis/fibre_setup.go index 28f7115b80..382e780c50 100644 --- a/tools/talis/fibre_setup.go +++ b/tools/talis/fibre_setup.go @@ -51,20 +51,104 @@ func setupFibreCmd() *cobra.Command { // Build script: register host + deposit escrow for validator + all fibre accounts var sb strings.Builder + // 0. Block until the chain has produced at least one block. + // Without this, the very next tx returns + // `celestia-app is not ready; please wait for first block` + // from the local node — the call appears to succeed at + // the CLI level (`--yes` returns the txhash before block + // inclusion), but the tx never lands. Polling explicitly + // avoids the `sleep 10` heuristic that used to be here. + sb.WriteString( + "echo 'waiting for chain to produce first block...'\n" + + "DEADLINE=$(( $(date +%s) + 300 ))\n" + + "while true; do\n" + + " H=$(celestia-appd status 2>/dev/null | " + + " grep -oE '\"latest_block_height\":\"[0-9]+\"' | " + + " grep -oE '[0-9]+' | head -1)\n" + + " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n" + + " echo \"chain is at height $H\"\n" + + " break\n" + + " fi\n" + + " if [ $(date +%s) -gt $DEADLINE ]; then\n" + + " echo 'FATAL: chain never produced a block within 5m' >&2\n" + + " exit 1\n" + + " fi\n" + + " sleep 3\n" + + "done\n", + ) + // 1. Register fibre host address. Plain `host:port` form — // x/valaddr requires it; the gRPC client dials it via the // passthrough resolver. Don't prefix `dns:///` here. + // + // Retry until `query valaddr providers` shows OUR host + // — `--yes` returns the txhash before inclusion, so a + // single one-shot call can succeed at the RPC layer + // while the chain rejects the tx (mempool full, signer + // not yet in validator set, …) and we'd never know. + // 5-minute deadline so a stuck chain doesn't loop + // forever. sb.WriteString(fmt.Sprintf( - "celestia-appd tx valaddr set-host %s:%d "+ + "HOST=%s:%d\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " celestia-appd tx valaddr set-host \"$HOST\" "+ "--from validator --keyring-backend=test --home .celestia-app "+ - "--chain-id %s --fees %s --yes\n", + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q \"\\\"host\\\": *\\\"$HOST\\\"\"; then\n"+ + " echo \"set-host confirmed: $HOST\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: set-host did not register $HOST after 5m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " echo 'set-host pending, retrying...'\n"+ + "done\n", val.PublicIP, fibrePort, cfg.ChainID, fees, + cfg.ChainID, + )) + + // 2. Deposit escrow for fibre-0 inside a retry loop. + // Same silent-failure mode as set-host: `--yes` returns + // the txhash before inclusion, so a single bounced tx + // (mempool full, signer not yet propagated, …) leaves + // the runner failing every upload with + // `escrow account not found for signer …`. fibre-0 is + // the one the runner actually signs with by default, + // so it's the only one we hard-block on. + sb.WriteString(fmt.Sprintf( + "FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " celestia-appd tx fibre deposit-to-escrow %s "+ + "--from fibre-0 --keyring-backend=test --home .celestia-app "+ + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query fibre escrow-account \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q '\"found\":true'; then\n"+ + " echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: fibre-0 escrow did not land after 5m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " echo 'fibre-0 escrow pending, retrying...'\n"+ + "done\n", + escrowAmount, + cfg.ChainID, fees, + cfg.ChainID, )) - sb.WriteString("sleep 10\n") - // 2. Deposit escrow for each fibre worker account - for i := range fibreAccounts { + // 3. Best-effort fund fibre-1..N. The runner only signs + // with fibre-0 by default, so a missing one of these + // doesn't block uploads — they exist as headroom for + // future signer rotation. + for i := 1; i < fibreAccounts; i++ { keyName := fmt.Sprintf("fibre-%d", i) sb.WriteString(fmt.Sprintf( "celestia-appd tx fibre deposit-to-escrow %s "+ @@ -103,6 +187,38 @@ func setupFibreCmd() *cobra.Command { if err := waitForTmuxSessions(cfg.Validators, resolvedSSHKeyPath, SetupFibreSessionName, 10*time.Minute); err != nil { return fmt.Errorf("waiting for setup-fibre sessions: %w", err) } + + // CLI-side verification that every validator's host is on + // the chain's provider list before we hand off to start- + // fibre / fibre-bootstrap-evnode. The per-validator script + // above already self-verifies its own host, but we + // re-check here from a single vantage point so a + // concurrent set-host race across validators surfaces + // before downstream steps cache an empty registry. + if len(cfg.Validators) > 0 { + expected := len(cfg.Validators) + queryHost := cfg.Validators[0].PublicIP + queryCmd := fmt.Sprintf( + "celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null | grep -o '\"host\"' | wc -l", + cfg.ChainID, + ) + deadline := time.Now().Add(5 * time.Minute) + for { + out, err := sshExec("root", queryHost, resolvedSSHKeyPath, queryCmd) + if err == nil { + count := 0 + _, _ = fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count) + if count >= expected { + break + } + fmt.Printf(" valaddr providers: %d/%d registered, retrying...\n", count, expected) + } + if time.Now().After(deadline) { + return fmt.Errorf("only some validators registered as fibre providers within 5m — re-run setup-fibre") + } + time.Sleep(5 * time.Second) + } + } fmt.Println("Validator setup done!") // Deposit escrow for encoder accounts. diff --git a/tools/talis/genesis.go b/tools/talis/genesis.go index 9a4f481a81..e1b229efaa 100644 --- a/tools/talis/genesis.go +++ b/tools/talis/genesis.go @@ -558,9 +558,17 @@ mkdir -p "$EVNODE_HOME" # 2. /root/keyring-fibre/keyring-test cosmos-sdk file keyring with # a Fibre payment account # Without them the daemon would crash immediately on startup. -echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR..." +# +# We check for the *specific* fibre-0.info file (not just the +# keyring-test directory) because a non-atomic scp -r would create +# the directory before transferring its contents — so testing -d +# alone passes mid-scp and the daemon would launch with an empty +# keyring. fibre-bootstrap-evnode now also stages-and-mvs so this +# check should never see a partial state, but keep the file-level +# guard as a defence-in-depth. +echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR/keyring-test/fibre-0.info..." WAITED=0 -until [ -s "$BRIDGE_JWT_FILE" ] && [ -d "$FIBRE_KEYRING_DIR/keyring-test" ]; do +until [ -s "$BRIDGE_JWT_FILE" ] && [ -f "$FIBRE_KEYRING_DIR/keyring-test/fibre-0.info" ]; do sleep 5 WAITED=$((WAITED + 5)) if [ $((WAITED % 60)) -eq 0 ]; then diff --git a/tools/talis/main.go b/tools/talis/main.go index 016b1ee340..899fe8cf54 100644 --- a/tools/talis/main.go +++ b/tools/talis/main.go @@ -37,6 +37,7 @@ func main() { fibreTxsimCmd(), fibreThroughputCmd(), fibreBootstrapEvnodeCmd(), + fibreExperimentCmd(), resourceMonitorCmd(), downloadResourcesCmd(), syncNodeCmd(),