diff --git a/bin/ethlambda/src/checkpoint_sync.rs b/bin/ethlambda/src/checkpoint_sync.rs index 8a81edc2..903525f1 100644 --- a/bin/ethlambda/src/checkpoint_sync.rs +++ b/bin/ethlambda/src/checkpoint_sync.rs @@ -5,6 +5,7 @@ use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::state::{State, Validator, anchor_pair_is_consistent}; use libssz::{DecodeError, SszDecode}; use reqwest::Client; +use tracing::{info, warn}; /// Timeout for establishing the HTTP connection to the checkpoint peer. /// Fail fast if the peer is unreachable. @@ -20,6 +21,12 @@ const FINALIZED_STATE_PATH: &str = "/lean/v0/states/finalized"; /// Path of the finalized-block endpoint (relative to the peer's API base URL). const FINALIZED_BLOCK_PATH: &str = "/lean/v0/blocks/finalized"; +/// Maximum attempts to refetch the anchor pair if the state and block roots don't match. +const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3; + +/// Delay between anchor fetch attempts. +const ANCHOR_FETCH_RETRY_DELAY: Duration = Duration::from_secs(1); + #[derive(Debug, thiserror::Error)] pub enum CheckpointSyncError { #[error("HTTP request failed: {0}")] @@ -58,6 +65,10 @@ pub enum CheckpointSyncError { BlockHeaderJustifiedRootMismatch, #[error("anchor block does not match anchor state")] AnchorPairingMismatch, + #[error("all checkpoint peers failed; last error: {0}")] + AllPeersFailed(Box), + #[error("no checkpoint urls configured")] + NoCheckpointUrls, } /// Build the HTTP client used for checkpoint sync fetches. @@ -269,6 +280,69 @@ fn verify_checkpoint_state( Ok(()) } +/// Fetch the finalized anchor from a single checkpoint URL, retrying transient +/// races where the peer advances finalization between the state and block +/// fetches. +async fn try_checkpoint_url( + url: &str, + genesis_time: u64, + validators: &[Validator], +) -> Result<(State, SignedBlock), CheckpointSyncError> { + let mut attempt = 1; + loop { + match fetch_finalized_anchor(url, genesis_time, validators).await { + Ok(pair) => return Ok(pair), + Err(CheckpointSyncError::AnchorPairingMismatch) + if attempt < MAX_ANCHOR_FETCH_ATTEMPTS => + { + warn!( + %url, + attempt, + max = MAX_ANCHOR_FETCH_ATTEMPTS, + "Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying" + ); + tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await; + attempt += 1; + } + Err(err) => return Err(err), + } + } +} + +/// Try each checkpoint URL in order, returning the first successful anchor +/// pair. Logs per-peer success/failure. On total failure, returns +/// the last fetch error encountered. +pub async fn fetch_anchor_block_and_state( + checkpoint_urls: &[String], + genesis_time: u64, + validators: &[Validator], +) -> Result<(State, SignedBlock), CheckpointSyncError> { + let mut iter = checkpoint_urls.iter().peekable(); + let mut last_err: Option = None; + loop { + let Some(url) = iter.next() else { + return Err(match last_err { + Some(err) => CheckpointSyncError::AllPeersFailed(Box::new(err)), + None => CheckpointSyncError::NoCheckpointUrls, + }); + }; + match try_checkpoint_url(url, genesis_time, validators).await { + Ok(pair) => { + info!(%url, "Checkpoint sync successful with this peer"); + return Ok(pair); + } + Err(err) => { + if iter.peek().is_some() { + warn!(%url, %err, "Checkpoint sync failed for this peer; trying next URL"); + } else { + warn!(%url, %err, "Checkpoint sync failed for this peer; no more URLs to try"); + } + last_err = Some(err); + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 0852dd49..c76e27c0 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -76,14 +76,20 @@ struct CliOptions { /// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0") #[arg(long)] node_id: String, - /// Base URL of a checkpoint-sync peer's API server (e.g., http://peer:5052). + /// Base URL(s) of checkpoint-sync peer API servers (e.g., http://peer:5052). /// When set, skips genesis initialization and fetches the finalized state - /// and block from the peer's `/lean/v0/states/finalized` and + /// and block from each peer's `/lean/v0/states/finalized` and /// `/lean/v0/blocks/finalized` endpoints. For backward compatibility, a /// URL ending in `/lean/v0/states/finalized` is accepted and the trailing /// path is stripped. - #[arg(long)] - checkpoint_sync_url: Option, + /// + /// Multiple URLs may be supplied for redundancy, either comma-separated + /// (`--checkpoint-sync-url u1,u2`) or by repeating the flag + /// (`--checkpoint-sync-url u1 --checkpoint-sync-url u2`). URLs are tried + /// in order; the first one that succeeds is used and any failures fall + /// over to the next URL. Startup only aborts if every URL fails. + #[arg(long, value_delimiter = ',')] + checkpoint_sync_url: Vec, /// Whether this node acts as a committee aggregator. /// /// Seeds the initial value of the live aggregator flag shared by the @@ -206,13 +212,16 @@ async fn main() -> eyre::Result<()> { std::fs::create_dir_all(&data_dir).expect("Failed to create data directory"); let backend = Arc::new(RocksDBBackend::open(&data_dir).expect("Failed to open RocksDB")); - let store = fetch_initial_state( - options.checkpoint_sync_url.as_deref(), - &genesis_config, - backend.clone(), - ) - .await - .inspect_err(|err| error!(%err, "Failed to initialize state"))?; + let clean_checkpoint_urls: Vec = options + .checkpoint_sync_url + .into_iter() + .map(|url| url.trim().to_string()) + .filter(|url| !url.is_empty()) + .collect(); + + let store = fetch_initial_state(&clean_checkpoint_urls, &genesis_config, backend.clone()) + .await + .inspect_err(|err| error!(%err, "Failed to initialize state"))?; let validator_ids: Vec = validator_keys.keys().copied().collect(); @@ -565,10 +574,11 @@ fn read_hex_file_bytes(path: impl AsRef) -> Vec { /// Fetch the initial state for the node. /// -/// If `checkpoint_url` is provided, performs checkpoint sync by downloading -/// and verifying the finalized state AND signed block in parallel from a -/// remote peer. Otherwise, creates a genesis state from the local genesis -/// configuration. +/// If `checkpoint_urls` is empty, creates a genesis state from the local +/// genesis configuration. Otherwise performs checkpoint sync by downloading +/// and verifying the finalized state AND signed block from a peer. URLs are +/// tried in order: the first peer that succeeds wins, and failures fall over +/// to the next URL. Startup only aborts if every URL fails. /// /// Fetching the matching signed block lets the local store serve a valid /// anchor via the `BlocksByRoot` req-resp protocol; without it, peers @@ -577,7 +587,7 @@ fn read_hex_file_bytes(path: impl AsRef) -> Vec { /// /// # Arguments /// -/// * `checkpoint_url` - Optional base URL to a peer's API server +/// * `checkpoint_urls` - Zero or more base URLs of peer API servers /// * `genesis` - Genesis configuration (for genesis_time verification and genesis state creation) /// * `backend` - Storage backend for Store creation /// @@ -586,51 +596,30 @@ fn read_hex_file_bytes(path: impl AsRef) -> Vec { /// `Ok(Store)` on success, or `Err(CheckpointSyncError)` if checkpoint sync fails. /// Genesis path is infallible and always returns `Ok`. async fn fetch_initial_state( - checkpoint_url: Option<&str>, + checkpoint_urls: &[String], genesis: &GenesisConfig, backend: Arc, ) -> Result { let validators = genesis.validators(); - let Some(checkpoint_url) = checkpoint_url else { + if checkpoint_urls.is_empty() { info!("No checkpoint sync URL provided, initializing from genesis state"); let genesis_state = State::from_genesis(genesis.genesis_time, validators); return Ok(Store::from_anchor_state(backend, genesis_state)); }; - // Checkpoint sync path - info!(%checkpoint_url, "Starting checkpoint sync"); - - // The state and block are fetched in parallel; if the peer advances - // finalization between the two requests the pair won't match. Retry a - // small number of times so this transient race doesn't fail node startup. - const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3; - const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1); - - let mut attempt = 1; - let (state, signed_block) = loop { - match checkpoint_sync::fetch_finalized_anchor( - checkpoint_url, - genesis.genesis_time, - &validators, - ) - .await - { - Ok(pair) => break pair, - Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch) - if attempt < MAX_ANCHOR_FETCH_ATTEMPTS => - { - warn!( - attempt, - max = MAX_ANCHOR_FETCH_ATTEMPTS, - "Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying" - ); - tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await; - attempt += 1; - } - Err(err) => return Err(err), - } - }; + // Checkpoint sync path: try URLs in order, fail over to the next on error. + info!( + url_count = checkpoint_urls.len(), + "Starting checkpoint sync" + ); + + let (state, signed_block) = checkpoint_sync::fetch_anchor_block_and_state( + checkpoint_urls, + genesis.genesis_time, + &validators, + ) + .await?; info!( slot = state.slot,