diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 2f344bdeb7b..4b885f84b7c 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -22,6 +22,7 @@ use graph::{ use graph_chain_ethereum::EthereumAdapter; use graph_chain_ethereum::EthereumAdapterTrait as _; use graph_chain_ethereum::chain::BlockFinality; +use graph_store_postgres::AsyncPgConnection; use graph_store_postgres::BlockStore; use graph_store_postgres::ChainStore; use graph_store_postgres::PoolCoordinator; @@ -236,6 +237,43 @@ pub async fn update_chain_genesis( Ok(()) } +struct ChainSwapOutcome { + latest_backup_name: String, + previous_backup_final_name: Option, + reused_previous_backup: bool, + allocated_chain: bool, +} + +fn backup_name(chain: &str, base: &str) -> String { + format!("{chain}-{base}") +} + +fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String { + format!("{backup_name}-{suffix}") +} + +async fn next_backup_name( + conn: &mut AsyncPgConnection, + chain: &str, + base: &str, +) -> Result { + let backup_name = backup_name(chain, base); + if find_chain(conn, &backup_name).await?.is_none() { + return Ok(backup_name); + } + + let mut suffix = 1usize; + + loop { + let candidate = suffixed_backup_name(&backup_name, suffix); + if find_chain(conn, &candidate).await?.is_none() { + return Ok(candidate); + } + + suffix += 1; + } +} + pub async fn change_block_cache_shard( primary_store: ConnectionPool, store: BlockStore, @@ -250,6 +288,10 @@ pub async fn change_block_cache_shard( .await? .ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?; let old_shard = chain.shard; + let canonical_backup_name = format!("{chain_name}-old"); + + let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?; + let existing_backup_store = store.chain_store(&canonical_backup_name).await; println!("Current shard: {}", old_shard); @@ -257,44 +299,101 @@ pub async fn change_block_cache_shard( .chain_store(&chain_name) .await .ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?; - let new_name = format!("{}-old", &chain_name); let ident = chain_store.chain_identifier().await?; + let target_shard = Shard::new(shard.clone())?; + let reuse_existing_backup = existing_backup + .as_ref() + .map(|backup| backup.shard.as_str() == target_shard.as_str()) + .unwrap_or(false); + + let allocated_chain = if reuse_existing_backup { + None + } else { + let chain = + BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?; + store.add_chain_store(&chain, true).await?; + Some(chain) + }; - conn.transaction::<(), StoreError, _>(|conn| { + let outcome = conn.transaction::(|conn| { async { - let shard = Shard::new(shard.to_string())?; - - let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?; - - store.add_chain_store(&chain, true).await?; - - // Drop the foreign key constraint on deployment_schemas sql_query( "alter table deployment_schemas drop constraint deployment_schemas_network_fkey;", ) .execute(conn).await?; - // Update the current chain name to chain-old - update_chain_name(conn, &chain_name, &new_name).await?; + let previous_backup_final_name = if let Some(backup) = existing_backup.as_ref() { + let temp_name = next_backup_name(conn, &chain_name, "old").await?; + update_chain_name(conn, &backup.name, &temp_name).await?; + + if reuse_existing_backup { + update_chain_name(conn, &temp_name, &chain_name).await?; + Some(chain_name.clone()) + } else { + Some(temp_name) + } + } else { + None + }; - // Create a new chain with the name in the destination shard - let _ = add_chain(conn, &chain_name, &shard, ident).await?; + let latest_backup_name = next_backup_name(conn, &chain_name, "old").await?; + update_chain_name(conn, &chain_name, &latest_backup_name).await?; + + if !reuse_existing_backup { + add_chain(conn, &chain_name, &target_shard, ident.clone()).await?; + } - // Re-add the foreign key constraint sql_query( "alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);", ) .execute(conn).await?; - Ok(()) + + Ok(ChainSwapOutcome { + latest_backup_name, + previous_backup_final_name, + reused_previous_backup: reuse_existing_backup, + allocated_chain: allocated_chain.is_some(), + }) }.scope_boxed() }).await?; - chain_store.update_name(&new_name).await?; + let ChainSwapOutcome { + latest_backup_name, + previous_backup_final_name, + reused_previous_backup, + allocated_chain, + } = outcome; + + chain_store.update_name(&latest_backup_name).await?; + + if let (Some(backup_store), Some(final_name)) = ( + existing_backup_store.as_ref(), + previous_backup_final_name.as_ref(), + ) { + backup_store.update_name(final_name).await?; + } println!( "Changed block cache shard for {} from {} to {}", chain_name, old_shard, shard ); + println!("Latest backup recorded as `{}`", latest_backup_name); + + if reused_previous_backup { + println!( + "Reused existing backup `{}` as the active `{}` chain", + canonical_backup_name, chain_name + ); + } else if let Some(ref preserved) = previous_backup_final_name { + println!("Preserved earlier backup as `{}`", preserved); + } + + if allocated_chain { + println!( + "Allocated new chain state for `{}` on shard {}", + chain_name, shard + ); + } Ok(()) } @@ -329,3 +428,19 @@ pub async fn ingest( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::{backup_name, suffixed_backup_name}; + + #[test] + fn backup_name_uses_plain_name_first() { + assert_eq!(backup_name("mainnet", "old"), "mainnet-old"); + } + + #[test] + fn suffixed_backup_name_adds_numeric_suffixes() { + assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1"); + assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42"); + } +}