From 6622da51487964c14ddfd47c8bfa930f82691cdd Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 20 May 2026 17:34:27 -0300 Subject: [PATCH 1/8] feat: add attestation aggregate coverage metrics Port leanSpec PR #735: register three IntGaugeVec metrics describing attestation aggregate coverage, with default zero-valued series so dashboards render from a fresh node startup. - lean_attestation_aggregate_coverage_validators (section, subnet) - lean_attestation_aggregate_coverage_subnets (section) - lean_attestation_aggregate_coverage_diff_validators (direction) ATTESTATION_AGGREGATE_COVERAGE_SECTIONS and ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS are exported as the single source of truth for label sets. init() forces the new statics and seeds combined-subnet, section, and direction series to 0 (18 default series total). Per-subnet (subnet="subnet_N") series appear lazily when instrumentation writes them. Registration only. The producer side (per-slot coverage computation, de-duplication across payloads, and the chain-status log line) ports zeam #876 and lands in a follow-up PR. The diff_validators help text diverges from upstreams terse phrasing to spell out the symmetric-difference semantics (block_only: in block but not in local timely pool; timely_only: the reverse). Metric name, labels, and values are unchanged. --- crates/blockchain/src/metrics.rs | 76 ++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index a9aef7a0..32850507 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -4,6 +4,27 @@ use std::time::Duration; use ethlambda_metrics::*; +// --- Label sets --- + +/// Section labels for attestation aggregate coverage gauges. Order matches +/// the names printed in slot/report logs. +/// +/// Slot is the X-axis (time series), not a label dimension. +pub const ATTESTATION_AGGREGATE_COVERAGE_SECTIONS: &[&str] = &[ + "timely", + "late", + "block", + "combined", + "agg_start_new", + "proposal_payloads", + "proposal_gossip", + "proposal_combined", +]; + +/// Validator-coverage delta directions between block payloads and +/// locally-aggregated pre-merge (`timely`) payloads. +pub const ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS: &[&str] = &["block_only", "timely_only"]; + // --- Gauges --- static LEAN_HEAD_SLOT: std::sync::LazyLock = std::sync::LazyLock::new(|| { @@ -104,6 +125,42 @@ static LEAN_TABLE_BYTES: std::sync::LazyLock = std::sync::LazyLock: .unwrap() }); +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_validators", + "Validator coverage in attestation aggregate reports, labeled by section and \ + subnet. subnet=combined is the section total; subnet=subnet_N is per-subnet \ + coverage. Updated each slot (slot is the X-axis).", + &["section", "subnet"] + ) + .unwrap() + }); + +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_subnets", + "Number of covered subnets in attestation aggregate reports, labeled by section. \ + Updated each slot (slot is the X-axis).", + &["section"] + ) + .unwrap() + }); + +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_diff_validators", + "Count of validators in the symmetric difference between block-included aggregates \ + and locally-aggregated pre-merge (timely) aggregates for the same slot. \ + direction=block_only: in block but not in local pool. direction=timely_only: in \ + local pool but not in block. Updated each slot (slot is the X-axis).", + &["direction"] + ) + .unwrap() + }); + // --- Counters --- static LEAN_ATTESTATIONS_VALID_TOTAL: std::sync::LazyLock = @@ -409,6 +466,25 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_IS_AGGREGATOR); std::sync::LazyLock::force(&LEAN_ATTESTATION_COMMITTEE_COUNT); std::sync::LazyLock::force(&LEAN_TABLE_BYTES); + // Attestation aggregate coverage (leanMetrics: Fork-Choice Metrics). + // Per upstream leanSpec, seed only the combined-subnet series for each + // section; per-subnet series appear lazily when instrumentation writes them. + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS); + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS); + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS); + for §ion in ATTESTATION_AGGREGATE_COVERAGE_SECTIONS { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS + .with_label_values(&[section, "combined"]) + .set(0); + LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS + .with_label_values(&[section]) + .set(0); + } + for &direction in ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS + .with_label_values(&[direction]) + .set(0); + } // Counters std::sync::LazyLock::force(&LEAN_ATTESTATIONS_VALID_TOTAL); std::sync::LazyLock::force(&LEAN_ATTESTATIONS_INVALID_TOTAL); From 4c2e65fdd17936b460af85efe584a5d8be70b67e Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Thu, 21 May 2026 17:14:45 -0300 Subject: [PATCH 2/8] feat: emit attestation aggregate coverage metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port the producer side of zeam #876 on top of the metrics registered in the previous commit. After this commit, all 18 coverage series receive real per-slot updates from chain activity. Five emission sites: - accept_new_attestations (store.rs): captures `new_payloads` participant bits BEFORE promote and stashes them as a CoverageSnapshot on the Store. Read at the next slot boundary to populate the `timely` section ("prev_new" in zeam). - on_block_core (store.rs): mirrors the imported block s per-AttData aggregation bits into Store::last_block_coverage. Observability-only; fork choice is unchanged. - on_tick interval 0 (lib.rs): emits the post-block-merge report for `slot - 1`. Computes `timely`/`late`/`block`/`combined` from the stashed snapshots and the current `new_payloads`, then emits the diff_validators direction counts as the symmetric difference between `block` and `timely`. - start_aggregation_session (lib.rs): emits `agg_start_new` from the current `new_payloads` right before fork-choice aggregation runs at interval 2. - propose_block (lib.rs): emits `proposal_payloads`, `proposal_gossip`, and `proposal_combined` after the block is built. Each validator set in the block is classified by whether the AttestationData has a matching known-payload proof. New module crates/blockchain/src/coverage.rs holds the Coverage type (seen + has_subnet bitsets, derived subnet via vid % committee_count to match the gossip subnet assignment) plus the 3 emission helpers and 6 unit tests covering add_bits, merge_from, diff_counts, empty/zero/out- of-range edge cases. Storage gets a CoverageSnapshot type and two Arc>> fields on Store. No proofs are duplicated — only AggregationBits are captured, keeping the per-slot allocation in the tens of bytes per entry. The pre-merge capture happens inside accept_new_attestations just before promote_new_aggregated_payloads, so consumer-side timing concerns stay in the existing tick path. BlockChain::spawn now takes attestation_committee_count as a parameter; bin/ethlambda/src/main.rs already resolves the value (CLI > validator-config.yaml > 1) and passes it through. The number of attestation committees was previously only known to P2P (for subnet subscriptions); the coverage emitters need it to derive subnet ids. --- bin/ethlambda/src/main.rs | 7 +- crates/blockchain/src/coverage.rs | 321 ++++++++++++++++++++++++++++++ crates/blockchain/src/lib.rs | 24 +++ crates/blockchain/src/metrics.rs | 21 ++ crates/blockchain/src/store.rs | 34 +++- crates/storage/src/lib.rs | 2 +- crates/storage/src/store.rs | 64 +++++- 7 files changed, 469 insertions(+), 4 deletions(-) create mode 100644 crates/blockchain/src/coverage.rs diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f6d3ca3e..0852dd49 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -221,7 +221,12 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + let blockchain = BlockChain::spawn( + store.clone(), + validator_keys, + aggregator.clone(), + attestation_committee_count, + ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the // AggregatorController — subnet subscriptions are decided once here and diff --git a/crates/blockchain/src/coverage.rs b/crates/blockchain/src/coverage.rs new file mode 100644 index 00000000..c827e1ee --- /dev/null +++ b/crates/blockchain/src/coverage.rs @@ -0,0 +1,321 @@ +//! Per-slot attestation aggregate coverage computation. +//! +//! Mirrors the producer side of [zeam#876](https://github.com/blockblaz/zeam/pull/876) +//! on top of the metrics registered by leanSpec PR #735. +//! +//! All `Coverage` instances are bound to a fixed `(validator_count, +//! committee_count)` pair from genesis state; ethlambda's validator set +//! is immutable across slots, so no resize handling is required. + +use ethlambda_storage::Store; +use ethlambda_types::{ + attestation::{AggregationBits, validator_indices}, + block::AggregatedSignatureProof, + primitives::HashTreeRoot, +}; + +use crate::metrics; + +/// Per-validator and per-subnet presence bitsets for one coverage section. +#[derive(Debug, Clone)] +pub struct Coverage { + seen: Vec, + has_subnet: Vec, +} + +impl Coverage { + pub fn new(validator_count: usize, committee_count: usize) -> Self { + Self { + seen: vec![false; validator_count], + has_subnet: vec![false; committee_count], + } + } + + /// Subnet for validator `vid` matches `crates/net/p2p/src/lib.rs:241` + /// (`vid % committee_count`). + pub fn add_bits(&mut self, bits: &AggregationBits) { + let committee_count = self.has_subnet.len(); + if committee_count == 0 { + return; + } + for vid in validator_indices(bits) { + let vid = vid as usize; + if vid < self.seen.len() { + self.seen[vid] = true; + self.has_subnet[vid % committee_count] = true; + } + } + } + + /// Convenience: merge all `proofs` for one entry. + pub fn add_proofs(&mut self, proofs: &[AggregatedSignatureProof]) { + for proof in proofs { + self.add_bits(&proof.participants); + } + } + + pub fn merge_from(&mut self, other: &Self) { + for (dst, &src) in self.seen.iter_mut().zip(other.seen.iter()) { + *dst |= src; + } + for (dst, &src) in self.has_subnet.iter_mut().zip(other.has_subnet.iter()) { + *dst |= src; + } + } + + pub fn count_seen(&self) -> usize { + self.seen.iter().filter(|&&b| b).count() + } + + pub fn count_subnets(&self) -> usize { + self.has_subnet.iter().filter(|&&b| b).count() + } + + pub fn seen(&self) -> &[bool] { + &self.seen + } + + /// Mark validator `vid` (and its derived subnet) as covered. + pub fn mark(&mut self, vid: usize, subnet: usize) { + if vid < self.seen.len() { + self.seen[vid] = true; + } + if subnet < self.has_subnet.len() { + self.has_subnet[subnet] = true; + } + } +} + +/// Symmetric-difference counts: `(a_only, b_only)` validators. +pub fn diff_counts(a: &Coverage, b: &Coverage) -> (usize, usize) { + let len = a.seen.len().min(b.seen.len()); + let mut a_only = 0; + let mut b_only = 0; + for i in 0..len { + match (a.seen[i], b.seen[i]) { + (true, false) => a_only += 1, + (false, true) => b_only += 1, + _ => {} + } + } + (a_only, b_only) +} + +/// Emit `validators{section, subnet="combined"}` + `subnets{section}` for one section. +/// +/// Per-subnet (`subnet="subnet_N"`) series intentionally stay at zero until a +/// future PR wires per-subnet emission; this matches zeam's current emission +/// pattern (one series per section). +pub fn record_section(section: &str, coverage: &Coverage) { + metrics::set_attestation_aggregate_coverage_validators( + section, + "combined", + coverage.count_seen() as i64, + ); + metrics::set_attestation_aggregate_coverage_subnets(section, coverage.count_subnets() as i64); +} + +/// Emit `diff_validators{direction}` for both directions. +pub fn record_diff(block_only: usize, timely_only: usize) { + metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only as i64); + metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only as i64); +} + +/// Emit the post-block-merge coverage report for `reporting_slot` (the slot +/// that just finished). Reads pre-merge / late / block snapshots from the +/// Store, computes `combined` as their union, and records all 5 metrics. +pub fn emit_post_block_report(store: &Store, committee_count: u64, reporting_slot: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + + // `timely` — pre-merge snapshot of new_payloads (i.e., "prev_new" in zeam). + let mut timely = Coverage::new(validator_count, cc); + if let Some(snap) = store.pre_merge_new_coverage() + && snap.slot == reporting_slot + { + for bits in &snap.participant_bits { + timely.add_bits(bits); + } + } + + // `late` — current new_payloads that match the reporting slot + // (arrived after the last merge). + let mut late = Coverage::new(validator_count, cc); + for (data, proofs) in store.new_aggregated_payloads().values() { + if data.slot == reporting_slot { + late.add_proofs(proofs); + } + } + + // `block` — participant bits from the most-recently-imported block, + // if and only if its slot matches. + let mut block = Coverage::new(validator_count, cc); + if let Some(snap) = store.last_block_coverage() + && snap.slot == reporting_slot + { + for bits in &snap.participant_bits { + block.add_bits(bits); + } + } + + // `combined` — union of all three sources. + let mut combined = Coverage::new(validator_count, cc); + combined.merge_from(&timely); + combined.merge_from(&late); + combined.merge_from(&block); + + record_section("timely", &timely); + record_section("late", &late); + record_section("block", &block); + record_section("combined", &combined); + + let (block_only, timely_only) = diff_counts(&block, &timely); + record_diff(block_only, timely_only); +} + +/// Emit `agg_start_new` coverage. Called right before fork-choice aggregation +/// runs (interval 2). +pub fn emit_agg_start_new(store: &Store, committee_count: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let mut cov = Coverage::new(validator_count, committee_count as usize); + for (_, proofs) in store.new_aggregated_payloads().values() { + cov.add_proofs(proofs); + } + record_section("agg_start_new", &cov); +} + +/// Emit `proposal_payloads`, `proposal_gossip`, `proposal_combined` for a +/// block we are about to publish. We classify validators set in the final +/// block as either covered by an existing known-payload proof for that +/// AttestationData (`payloads`) or as gossip-only (`gossip`). +pub fn emit_proposal_coverage<'a, I>(store: &Store, committee_count: u64, selected: I) +where + I: IntoIterator, +{ + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + + let mut combined = Coverage::new(validator_count, cc); + let mut payloads = Coverage::new(validator_count, cc); + let mut gossip = Coverage::new(validator_count, cc); + + // For each AttestationData in the final block, OR together the known + // payload proofs for that data — those validators are payload-covered. + let known = store.known_aggregated_payloads(); + let mut payload_seen = vec![false; validator_count]; + for att in selected { + combined.add_bits(&att.aggregation_bits); + let data_root = att.data.hash_tree_root(); + if let Some((_, proofs)) = known.get(&data_root) { + for proof in proofs { + for vid in validator_indices(&proof.participants) { + let vid = vid as usize; + if vid < payload_seen.len() { + payload_seen[vid] = true; + } + } + } + } + } + + for (vid, &is_final) in combined.seen().iter().enumerate() { + if !is_final { + continue; + } + let subnet = vid % cc; + if payload_seen[vid] { + payloads.mark(vid, subnet); + } else { + gossip.mark(vid, subnet); + } + } + + record_section("proposal_payloads", &payloads); + record_section("proposal_gossip", &gossip); + record_section("proposal_combined", &combined); +} + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_types::attestation::AggregationBits; + + fn make_bits(len: usize, indices: &[usize]) -> AggregationBits { + let mut bits = AggregationBits::with_length(len).unwrap(); + for &i in indices { + bits.set(i, true).unwrap(); + } + bits + } + + #[test] + fn add_bits_marks_validators_and_subnets() { + // 8 validators, 4 subnets → vid 1 → subnet 1, vid 5 → subnet 1, vid 6 → subnet 2. + let mut cov = Coverage::new(8, 4); + cov.add_bits(&make_bits(8, &[1, 5, 6])); + + assert!(!cov.seen()[0]); + assert!(cov.seen()[1]); + assert!(cov.seen()[5]); + assert!(cov.seen()[6]); + assert_eq!(cov.count_seen(), 3); + assert_eq!(cov.count_subnets(), 2); + } + + #[test] + fn merge_from_is_set_union() { + let mut a = Coverage::new(8, 4); + a.add_bits(&make_bits(8, &[0, 1])); + let mut b = Coverage::new(8, 4); + b.add_bits(&make_bits(8, &[1, 2])); + + a.merge_from(&b); + assert_eq!(a.count_seen(), 3); + assert!(a.seen()[0] && a.seen()[1] && a.seen()[2]); + } + + #[test] + fn diff_counts_is_symmetric_difference() { + let mut block = Coverage::new(8, 4); + block.add_bits(&make_bits(8, &[0, 1, 2])); + let mut timely = Coverage::new(8, 4); + timely.add_bits(&make_bits(8, &[1, 2, 3])); + + let (block_only, timely_only) = diff_counts(&block, &timely); + assert_eq!(block_only, 1); + assert_eq!(timely_only, 1); + } + + #[test] + fn empty_coverage_counts_zero() { + let cov = Coverage::new(8, 4); + assert_eq!(cov.count_seen(), 0); + assert_eq!(cov.count_subnets(), 0); + } + + #[test] + fn zero_committee_count_is_inert() { + let mut cov = Coverage::new(8, 0); + cov.add_bits(&make_bits(8, &[0, 1, 2])); + assert_eq!(cov.count_seen(), 0); + assert_eq!(cov.count_subnets(), 0); + } + + #[test] + fn add_bits_ignores_out_of_range_indices() { + let mut cov = Coverage::new(4, 2); + cov.add_bits(&make_bits(8, &[0, 5])); + assert!(cov.seen()[0]); + assert_eq!(cov.count_seen(), 1); + assert_eq!(cov.count_subnets(), 1); + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index f07a7a7e..7f9a354a 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -27,6 +27,7 @@ use tracing::{error, info, trace, warn}; use crate::store::StoreError; pub mod aggregation; +pub mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -60,6 +61,7 @@ impl BlockChain { store: Store, validator_keys: HashMap, aggregator: AggregatorController, + attestation_committee_count: u64, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -86,6 +88,7 @@ impl BlockChain { pending_block_parents: HashMap::new(), current_aggregation: None, last_tick_instant: None, + attestation_committee_count, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -139,6 +142,10 @@ pub struct BlockChainServer { /// Last tick instant for measuring interval duration. last_tick_instant: Option, + + /// Number of attestation committees (= subnet count). Used by the + /// attestation aggregate coverage emission. + attestation_committee_count: u64, } impl BlockChainServer { @@ -187,7 +194,18 @@ impl BlockChainServer { proposer_validator_id.is_some(), ); + // Emit the post-block attestation aggregate coverage report for the + // previous slot at the start of each new slot. + if interval == 0 && slot > 0 { + coverage::emit_post_block_report( + &self.store, + self.attestation_committee_count, + slot - 1, + ); + } + if interval == 2 && is_aggregator { + coverage::emit_agg_start_new(&self.store, self.attestation_committee_count); self.start_aggregation_session(slot, ctx).await; } @@ -341,6 +359,12 @@ impl BlockChainServer { return; }; + coverage::emit_proposal_coverage( + &self.store, + self.attestation_committee_count, + block.body.attestations.iter(), + ); + // Sign the block root with the proposal key let block_root = block.hash_tree_root(); let Ok(proposer_signature) = self diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 32850507..fac3adfc 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -685,6 +685,27 @@ pub fn set_attestation_committee_count(count: u64) { LEAN_ATTESTATION_COMMITTEE_COUNT.set(count.try_into().unwrap_or_default()); } +/// Set `lean_attestation_aggregate_coverage_validators{section, subnet}`. +pub fn set_attestation_aggregate_coverage_validators(section: &str, subnet: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS + .with_label_values(&[section, subnet]) + .set(value); +} + +/// Set `lean_attestation_aggregate_coverage_subnets{section}`. +pub fn set_attestation_aggregate_coverage_subnets(section: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS + .with_label_values(&[section]) + .set(value); +} + +/// Set `lean_attestation_aggregate_coverage_diff_validators{direction}`. +pub fn set_attestation_aggregate_coverage_diff_validators(direction: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS + .with_label_values(&[direction]) + .set(value); +} + /// Observe the depth of a fork choice reorg. pub fn observe_fork_choice_reorg_depth(depth: u64) { LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 935d3f75..d5626b75 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -5,7 +5,7 @@ use ethlambda_state_transition::{ attestation_data_matches_chain, is_proposer, justified_slots_ops, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, Store}; +use ethlambda_storage::{CoverageSnapshot, ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -40,12 +40,35 @@ pub struct PostBlockCheckpoints { /// Accept new aggregated payloads, promoting them to known for fork choice. fn accept_new_attestations(store: &mut Store, log_tree: bool) { + snapshot_pre_merge_new_coverage(store); store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); update_head(store, log_tree); } +/// Capture the participant bits of every entry in `new_payloads` for the +/// attestation aggregate coverage report. Stored on the Store so the +/// post-block report at the next slot boundary can read it. +fn snapshot_pre_merge_new_coverage(store: &Store) { + let new_payloads = store.new_aggregated_payloads(); + if new_payloads.is_empty() { + return; + } + let mut slot: u64 = 0; + let mut participant_bits: Vec = Vec::new(); + for (data, proofs) in new_payloads.values() { + slot = data.slot; + for proof in proofs { + participant_bits.push(proof.participants.clone()); + } + } + store.save_pre_merge_new_coverage(CoverageSnapshot { + slot, + participant_bits, + }); +} + /// Update the head based on the fork choice rule. /// /// When `log_tree` is true, also computes block weights and logs an ASCII @@ -514,11 +537,13 @@ fn on_block_core( // Store one proof per attestation data in known aggregated payloads. let mut known_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); + let mut block_participant_bits: Vec = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone())); + block_participant_bits.push(att.aggregation_bits.clone()); // Count each participating validator as a valid attestation let count = validator_indices(&att.aggregation_bits).count() as u64; metrics::inc_attestations_valid(count); @@ -526,6 +551,13 @@ fn on_block_core( store.insert_known_aggregated_payloads_batch(known_entries); + // Capture block-included participant bits for the attestation aggregate + // coverage report (observability-only; does not affect fork choice). + store.save_last_block_coverage(CoverageSnapshot { + slot, + participant_bits: block_participant_bits, + }); + // Update forkchoice head based on new block and attestations update_head(store, false); diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 9662a36c..ca067f9b 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,4 +3,4 @@ pub mod backend; mod store; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, GetForkchoiceStoreError, Store}; +pub use store::{CoverageSnapshot, ForkCheckpoints, GetForkchoiceStoreError, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 04b42745..8927b885 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -4,7 +4,10 @@ use std::sync::{Arc, LazyLock, Mutex}; use crate::api::{StorageBackend, StorageWriteBatch, Table}; use ethlambda_types::{ - attestation::{AttestationData, HashedAttestationData, bits_is_subset, blank_xmss_signature}, + attestation::{ + AggregationBits, AttestationData, HashedAttestationData, bits_is_subset, + blank_xmss_signature, + }, block::{ AggregatedSignatureProof, AttestationSignatures, Block, BlockBody, BlockHeader, BlockSignatures, SignedBlock, @@ -485,6 +488,17 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { (slot, root) } +/// Snapshot of `AggregationBits` for one slot, used by the attestation +/// aggregate coverage report. +/// +/// Holds raw participant bits; the consumer (blockchain crate) constructs +/// `Coverage` at emit time using the current validator and committee counts. +#[derive(Debug, Clone)] +pub struct CoverageSnapshot { + pub slot: u64, + pub participant_bits: Vec, +} + /// Fork choice store backed by a pluggable storage backend. /// /// The Store maintains all state required for fork choice and block processing: @@ -507,6 +521,12 @@ pub struct Store { known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. gossip_signatures: Arc>, + /// Snapshot of `new_payloads` participant bits captured right before each + /// promote-to-known. Observability-only. + pre_merge_new_coverage: Arc>>, + /// Snapshot of the most-recently-imported block's aggregated attestation + /// participant bits. Reset on each imported block. Observability-only. + last_block_coverage: Arc>>, } impl Store { @@ -641,6 +661,8 @@ impl Store { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + pre_merge_new_coverage: Arc::new(Mutex::new(None)), + last_block_coverage: Arc::new(Mutex::new(None)), } } @@ -1251,6 +1273,42 @@ impl Store { self.known_payloads.lock().unwrap().len() } + /// Returns a snapshot of new (pending) payloads as (AttestationData, Vec) pairs. + /// + /// Mirrors [`known_aggregated_payloads`]. Used by the attestation aggregate + /// coverage report to compute coverage from `new_payloads` before promote. + pub fn new_aggregated_payloads( + &self, + ) -> HashMap)> { + let buf = self.new_payloads.lock().unwrap(); + buf.data + .iter() + .map(|(root, entry)| (*root, (entry.data.clone(), entry.proofs.clone()))) + .collect() + } + + // ============ Coverage Snapshots ============ + // + // Observability-only state captured by `accept_new_attestations` and + // `on_block_core` in the blockchain crate. Read once per slot by the + // attestation aggregate coverage report. + + pub fn save_pre_merge_new_coverage(&self, snapshot: CoverageSnapshot) { + *self.pre_merge_new_coverage.lock().unwrap() = Some(snapshot); + } + + pub fn pre_merge_new_coverage(&self) -> Option { + self.pre_merge_new_coverage.lock().unwrap().clone() + } + + pub fn save_last_block_coverage(&self, snapshot: CoverageSnapshot) { + *self.last_block_coverage.lock().unwrap() = Some(snapshot); + } + + pub fn last_block_coverage(&self) -> Option { + self.last_block_coverage.lock().unwrap().clone() + } + /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); @@ -1424,6 +1482,8 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + pre_merge_new_coverage: Arc::new(Mutex::new(None)), + last_block_coverage: Arc::new(Mutex::new(None)), } } @@ -1437,6 +1497,8 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + pre_merge_new_coverage: Arc::new(Mutex::new(None)), + last_block_coverage: Arc::new(Mutex::new(None)), } } } From 453d3121764d24b8f0fd2fe5cdf2a82a51fd8fe4 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 26 May 2026 14:56:08 -0300 Subject: [PATCH 3/8] refactor(blockchain): inline coverage emitters into lib.rs Delete crates/blockchain/src/coverage.rs and move the three coverage emitters (post_block, agg_start_new, proposal) into lib.rs as private free functions. The Coverage struct is replaced with Vec locals (seen for validators, has_subnet for subnets), plus three small helpers (cov_add, cov_record, or_into). Each emit_* function was called from exactly one site, so the previous abstraction added no DRY benefit. The 6 unit tests in coverage.rs were exercising the wrappers bitset ops, not the emission contract, so they go away with the wrapper. Behavior, metric names, labels, and values are unchanged. Net effect on the PR: roughly -200 lines. --- crates/blockchain/src/coverage.rs | 321 ------------------------------ crates/blockchain/src/lib.rs | 192 +++++++++++++++++- 2 files changed, 183 insertions(+), 330 deletions(-) delete mode 100644 crates/blockchain/src/coverage.rs diff --git a/crates/blockchain/src/coverage.rs b/crates/blockchain/src/coverage.rs deleted file mode 100644 index c827e1ee..00000000 --- a/crates/blockchain/src/coverage.rs +++ /dev/null @@ -1,321 +0,0 @@ -//! Per-slot attestation aggregate coverage computation. -//! -//! Mirrors the producer side of [zeam#876](https://github.com/blockblaz/zeam/pull/876) -//! on top of the metrics registered by leanSpec PR #735. -//! -//! All `Coverage` instances are bound to a fixed `(validator_count, -//! committee_count)` pair from genesis state; ethlambda's validator set -//! is immutable across slots, so no resize handling is required. - -use ethlambda_storage::Store; -use ethlambda_types::{ - attestation::{AggregationBits, validator_indices}, - block::AggregatedSignatureProof, - primitives::HashTreeRoot, -}; - -use crate::metrics; - -/// Per-validator and per-subnet presence bitsets for one coverage section. -#[derive(Debug, Clone)] -pub struct Coverage { - seen: Vec, - has_subnet: Vec, -} - -impl Coverage { - pub fn new(validator_count: usize, committee_count: usize) -> Self { - Self { - seen: vec![false; validator_count], - has_subnet: vec![false; committee_count], - } - } - - /// Subnet for validator `vid` matches `crates/net/p2p/src/lib.rs:241` - /// (`vid % committee_count`). - pub fn add_bits(&mut self, bits: &AggregationBits) { - let committee_count = self.has_subnet.len(); - if committee_count == 0 { - return; - } - for vid in validator_indices(bits) { - let vid = vid as usize; - if vid < self.seen.len() { - self.seen[vid] = true; - self.has_subnet[vid % committee_count] = true; - } - } - } - - /// Convenience: merge all `proofs` for one entry. - pub fn add_proofs(&mut self, proofs: &[AggregatedSignatureProof]) { - for proof in proofs { - self.add_bits(&proof.participants); - } - } - - pub fn merge_from(&mut self, other: &Self) { - for (dst, &src) in self.seen.iter_mut().zip(other.seen.iter()) { - *dst |= src; - } - for (dst, &src) in self.has_subnet.iter_mut().zip(other.has_subnet.iter()) { - *dst |= src; - } - } - - pub fn count_seen(&self) -> usize { - self.seen.iter().filter(|&&b| b).count() - } - - pub fn count_subnets(&self) -> usize { - self.has_subnet.iter().filter(|&&b| b).count() - } - - pub fn seen(&self) -> &[bool] { - &self.seen - } - - /// Mark validator `vid` (and its derived subnet) as covered. - pub fn mark(&mut self, vid: usize, subnet: usize) { - if vid < self.seen.len() { - self.seen[vid] = true; - } - if subnet < self.has_subnet.len() { - self.has_subnet[subnet] = true; - } - } -} - -/// Symmetric-difference counts: `(a_only, b_only)` validators. -pub fn diff_counts(a: &Coverage, b: &Coverage) -> (usize, usize) { - let len = a.seen.len().min(b.seen.len()); - let mut a_only = 0; - let mut b_only = 0; - for i in 0..len { - match (a.seen[i], b.seen[i]) { - (true, false) => a_only += 1, - (false, true) => b_only += 1, - _ => {} - } - } - (a_only, b_only) -} - -/// Emit `validators{section, subnet="combined"}` + `subnets{section}` for one section. -/// -/// Per-subnet (`subnet="subnet_N"`) series intentionally stay at zero until a -/// future PR wires per-subnet emission; this matches zeam's current emission -/// pattern (one series per section). -pub fn record_section(section: &str, coverage: &Coverage) { - metrics::set_attestation_aggregate_coverage_validators( - section, - "combined", - coverage.count_seen() as i64, - ); - metrics::set_attestation_aggregate_coverage_subnets(section, coverage.count_subnets() as i64); -} - -/// Emit `diff_validators{direction}` for both directions. -pub fn record_diff(block_only: usize, timely_only: usize) { - metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only as i64); - metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only as i64); -} - -/// Emit the post-block-merge coverage report for `reporting_slot` (the slot -/// that just finished). Reads pre-merge / late / block snapshots from the -/// Store, computes `combined` as their union, and records all 5 metrics. -pub fn emit_post_block_report(store: &Store, committee_count: u64, reporting_slot: u64) { - let validator_count = store.head_state().validators.len(); - if validator_count == 0 || committee_count == 0 { - return; - } - let cc = committee_count as usize; - - // `timely` — pre-merge snapshot of new_payloads (i.e., "prev_new" in zeam). - let mut timely = Coverage::new(validator_count, cc); - if let Some(snap) = store.pre_merge_new_coverage() - && snap.slot == reporting_slot - { - for bits in &snap.participant_bits { - timely.add_bits(bits); - } - } - - // `late` — current new_payloads that match the reporting slot - // (arrived after the last merge). - let mut late = Coverage::new(validator_count, cc); - for (data, proofs) in store.new_aggregated_payloads().values() { - if data.slot == reporting_slot { - late.add_proofs(proofs); - } - } - - // `block` — participant bits from the most-recently-imported block, - // if and only if its slot matches. - let mut block = Coverage::new(validator_count, cc); - if let Some(snap) = store.last_block_coverage() - && snap.slot == reporting_slot - { - for bits in &snap.participant_bits { - block.add_bits(bits); - } - } - - // `combined` — union of all three sources. - let mut combined = Coverage::new(validator_count, cc); - combined.merge_from(&timely); - combined.merge_from(&late); - combined.merge_from(&block); - - record_section("timely", &timely); - record_section("late", &late); - record_section("block", &block); - record_section("combined", &combined); - - let (block_only, timely_only) = diff_counts(&block, &timely); - record_diff(block_only, timely_only); -} - -/// Emit `agg_start_new` coverage. Called right before fork-choice aggregation -/// runs (interval 2). -pub fn emit_agg_start_new(store: &Store, committee_count: u64) { - let validator_count = store.head_state().validators.len(); - if validator_count == 0 || committee_count == 0 { - return; - } - let mut cov = Coverage::new(validator_count, committee_count as usize); - for (_, proofs) in store.new_aggregated_payloads().values() { - cov.add_proofs(proofs); - } - record_section("agg_start_new", &cov); -} - -/// Emit `proposal_payloads`, `proposal_gossip`, `proposal_combined` for a -/// block we are about to publish. We classify validators set in the final -/// block as either covered by an existing known-payload proof for that -/// AttestationData (`payloads`) or as gossip-only (`gossip`). -pub fn emit_proposal_coverage<'a, I>(store: &Store, committee_count: u64, selected: I) -where - I: IntoIterator, -{ - let validator_count = store.head_state().validators.len(); - if validator_count == 0 || committee_count == 0 { - return; - } - let cc = committee_count as usize; - - let mut combined = Coverage::new(validator_count, cc); - let mut payloads = Coverage::new(validator_count, cc); - let mut gossip = Coverage::new(validator_count, cc); - - // For each AttestationData in the final block, OR together the known - // payload proofs for that data — those validators are payload-covered. - let known = store.known_aggregated_payloads(); - let mut payload_seen = vec![false; validator_count]; - for att in selected { - combined.add_bits(&att.aggregation_bits); - let data_root = att.data.hash_tree_root(); - if let Some((_, proofs)) = known.get(&data_root) { - for proof in proofs { - for vid in validator_indices(&proof.participants) { - let vid = vid as usize; - if vid < payload_seen.len() { - payload_seen[vid] = true; - } - } - } - } - } - - for (vid, &is_final) in combined.seen().iter().enumerate() { - if !is_final { - continue; - } - let subnet = vid % cc; - if payload_seen[vid] { - payloads.mark(vid, subnet); - } else { - gossip.mark(vid, subnet); - } - } - - record_section("proposal_payloads", &payloads); - record_section("proposal_gossip", &gossip); - record_section("proposal_combined", &combined); -} - -#[cfg(test)] -mod tests { - use super::*; - use ethlambda_types::attestation::AggregationBits; - - fn make_bits(len: usize, indices: &[usize]) -> AggregationBits { - let mut bits = AggregationBits::with_length(len).unwrap(); - for &i in indices { - bits.set(i, true).unwrap(); - } - bits - } - - #[test] - fn add_bits_marks_validators_and_subnets() { - // 8 validators, 4 subnets → vid 1 → subnet 1, vid 5 → subnet 1, vid 6 → subnet 2. - let mut cov = Coverage::new(8, 4); - cov.add_bits(&make_bits(8, &[1, 5, 6])); - - assert!(!cov.seen()[0]); - assert!(cov.seen()[1]); - assert!(cov.seen()[5]); - assert!(cov.seen()[6]); - assert_eq!(cov.count_seen(), 3); - assert_eq!(cov.count_subnets(), 2); - } - - #[test] - fn merge_from_is_set_union() { - let mut a = Coverage::new(8, 4); - a.add_bits(&make_bits(8, &[0, 1])); - let mut b = Coverage::new(8, 4); - b.add_bits(&make_bits(8, &[1, 2])); - - a.merge_from(&b); - assert_eq!(a.count_seen(), 3); - assert!(a.seen()[0] && a.seen()[1] && a.seen()[2]); - } - - #[test] - fn diff_counts_is_symmetric_difference() { - let mut block = Coverage::new(8, 4); - block.add_bits(&make_bits(8, &[0, 1, 2])); - let mut timely = Coverage::new(8, 4); - timely.add_bits(&make_bits(8, &[1, 2, 3])); - - let (block_only, timely_only) = diff_counts(&block, &timely); - assert_eq!(block_only, 1); - assert_eq!(timely_only, 1); - } - - #[test] - fn empty_coverage_counts_zero() { - let cov = Coverage::new(8, 4); - assert_eq!(cov.count_seen(), 0); - assert_eq!(cov.count_subnets(), 0); - } - - #[test] - fn zero_committee_count_is_inert() { - let mut cov = Coverage::new(8, 0); - cov.add_bits(&make_bits(8, &[0, 1, 2])); - assert_eq!(cov.count_seen(), 0); - assert_eq!(cov.count_subnets(), 0); - } - - #[test] - fn add_bits_ignores_out_of_range_indices() { - let mut cov = Coverage::new(4, 2); - cov.add_bits(&make_bits(8, &[0, 5])); - assert!(cov.seen()[0]); - assert_eq!(cov.count_seen(), 1); - assert_eq!(cov.count_subnets(), 1); - } -} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 7f9a354a..a8037e7f 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -7,7 +7,10 @@ use ethlambda_storage::{ALL_TABLES, Store}; use ethlambda_types::{ ShortRoot, aggregator::AggregatorController, - attestation::{SignedAggregatedAttestation, SignedAttestation}, + attestation::{ + AggregatedAttestation, AggregationBits, SignedAggregatedAttestation, SignedAttestation, + validator_indices, + }, block::{BlockSignatures, SignedBlock}, primitives::{H256, HashTreeRoot as _}, }; @@ -27,7 +30,6 @@ use tracing::{error, info, trace, warn}; use crate::store::StoreError; pub mod aggregation; -pub mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -197,15 +199,11 @@ impl BlockChainServer { // Emit the post-block attestation aggregate coverage report for the // previous slot at the start of each new slot. if interval == 0 && slot > 0 { - coverage::emit_post_block_report( - &self.store, - self.attestation_committee_count, - slot - 1, - ); + emit_post_block_coverage(&self.store, self.attestation_committee_count, slot - 1); } if interval == 2 && is_aggregator { - coverage::emit_agg_start_new(&self.store, self.attestation_committee_count); + emit_agg_start_new_coverage(&self.store, self.attestation_committee_count); self.start_aggregation_session(slot, ctx).await; } @@ -359,7 +357,7 @@ impl BlockChainServer { return; }; - coverage::emit_proposal_coverage( + emit_proposal_coverage( &self.store, self.attestation_committee_count, block.body.attestations.iter(), @@ -789,3 +787,179 @@ impl Handler for BlockChainServer { } } } + +// --- Attestation aggregate coverage emission --- +// +// `seen` tracks covered validators; `has_subnet` tracks covered subnets +// (subnet = `vid % committee_count`, matching the gossip subnet assignment +// in `crates/net/p2p/src/lib.rs`). Pure observability — no fork-choice or +// state-transition effect. + +fn cov_add(seen: &mut [bool], has_subnet: &mut [bool], bits: &AggregationBits) { + let cc = has_subnet.len(); + if cc == 0 { + return; + } + for vid in validator_indices(bits) { + let vid = vid as usize; + if vid < seen.len() { + seen[vid] = true; + has_subnet[vid % cc] = true; + } + } +} + +fn cov_record(section: &str, seen: &[bool], has_subnet: &[bool]) { + metrics::set_attestation_aggregate_coverage_validators( + section, + "combined", + seen.iter().filter(|&&b| b).count() as i64, + ); + metrics::set_attestation_aggregate_coverage_subnets( + section, + has_subnet.iter().filter(|&&b| b).count() as i64, + ); +} + +fn or_into(dst: &mut [bool], src: &[bool]) { + for (d, &s) in dst.iter_mut().zip(src) { + *d |= s; + } +} + +/// Post-block coverage report for `reporting_slot`. Emits `timely` / `late` / +/// `block` / `combined` sections plus the `diff_validators` symmetric +/// difference between `block` and `timely`. Called at interval 0 of the +/// next slot. +fn emit_post_block_coverage(store: &Store, committee_count: u64, reporting_slot: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let (mut timely_v, mut timely_s) = (vec![false; validator_count], vec![false; cc]); + let (mut late_v, mut late_s) = (vec![false; validator_count], vec![false; cc]); + let (mut block_v, mut block_s) = (vec![false; validator_count], vec![false; cc]); + + // `timely`: pre-merge snapshot of `new_payloads` captured before promote. + if let Some(snap) = store.pre_merge_new_coverage() + && snap.slot == reporting_slot + { + for bits in &snap.participant_bits { + cov_add(&mut timely_v, &mut timely_s, bits); + } + } + // `late`: current `new_payloads` matching the reporting slot + // (arrived after the last promote). + for (data, proofs) in store.new_aggregated_payloads().values() { + if data.slot == reporting_slot { + for proof in proofs { + cov_add(&mut late_v, &mut late_s, &proof.participants); + } + } + } + // `block`: participant bits from the most-recently-imported block. + if let Some(snap) = store.last_block_coverage() + && snap.slot == reporting_slot + { + for bits in &snap.participant_bits { + cov_add(&mut block_v, &mut block_s, bits); + } + } + + let mut combined_v = timely_v.clone(); + let mut combined_s = timely_s.clone(); + or_into(&mut combined_v, &late_v); + or_into(&mut combined_s, &late_s); + or_into(&mut combined_v, &block_v); + or_into(&mut combined_s, &block_s); + + cov_record("timely", &timely_v, &timely_s); + cov_record("late", &late_v, &late_s); + cov_record("block", &block_v, &block_s); + cov_record("combined", &combined_v, &combined_s); + + let (block_only, timely_only) = + block_v + .iter() + .zip(timely_v.iter()) + .fold((0i64, 0i64), |(b, t), (bv, tv)| match (bv, tv) { + (true, false) => (b + 1, t), + (false, true) => (b, t + 1), + _ => (b, t), + }); + metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only); + metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only); +} + +/// `agg_start_new` coverage from `new_payloads`, called right before fork- +/// choice aggregation runs at interval 2. +fn emit_agg_start_new_coverage(store: &Store, committee_count: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let mut seen = vec![false; validator_count]; + let mut has_subnet = vec![false; cc]; + for (_, proofs) in store.new_aggregated_payloads().values() { + for proof in proofs { + cov_add(&mut seen, &mut has_subnet, &proof.participants); + } + } + cov_record("agg_start_new", &seen, &has_subnet); +} + +/// `proposal_payloads` / `proposal_gossip` / `proposal_combined` for a block +/// we are about to publish. Each block-included validator is classified by +/// whether its `AttestationData` has a matching known-payload proof. +fn emit_proposal_coverage<'a>( + store: &Store, + committee_count: u64, + selected: impl IntoIterator, +) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let mut combined_v = vec![false; validator_count]; + let mut combined_s = vec![false; cc]; + let mut payload_seen = vec![false; validator_count]; + let known = store.known_aggregated_payloads(); + for att in selected { + cov_add(&mut combined_v, &mut combined_s, &att.aggregation_bits); + let data_root = att.data.hash_tree_root(); + if let Some((_, proofs)) = known.get(&data_root) { + for proof in proofs { + for vid in validator_indices(&proof.participants) { + let vid = vid as usize; + if vid < payload_seen.len() { + payload_seen[vid] = true; + } + } + } + } + } + + let mut payloads_v = vec![false; validator_count]; + let mut payloads_s = vec![false; cc]; + let mut gossip_v = vec![false; validator_count]; + let mut gossip_s = vec![false; cc]; + for vid in 0..validator_count { + if !combined_v[vid] { + continue; + } + let subnet = vid % cc; + if payload_seen[vid] { + payloads_v[vid] = true; + payloads_s[subnet] = true; + } else { + gossip_v[vid] = true; + gossip_s[subnet] = true; + } + } + cov_record("proposal_payloads", &payloads_v, &payloads_s); + cov_record("proposal_gossip", &gossip_v, &gossip_s); + cov_record("proposal_combined", &combined_v, &combined_s); +} From a9c4ea5f59cfa32667ae853b7d059999ac2dd7f0 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 26 May 2026 15:38:57 -0300 Subject: [PATCH 4/8] refactor(blockchain): drop always-zero proposal_payloads/proposal_gossip coverage emit_proposal_coverage classified block-included validators as payload- vs gossip-sourced by checking known_aggregated_payloads. But ethlambda builds blocks exclusively from that same pool (build_block -> extend_proofs_greedily), so every included validator is by construction payload-seen: proposal_gossip was always 0 and proposal_payloads always equalled proposal_combined. Keep only proposal_combined (the set of validators in the proposed block). This also removes the per-attestation hash_tree_root lookup on the block- building path and the cross-AttestationData payload_seen contamination the old classification carried. Default-seeded series drop from 18 to 14. --- crates/blockchain/src/lib.rs | 38 ++------------------------------ crates/blockchain/src/metrics.rs | 2 -- 2 files changed, 2 insertions(+), 38 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index a8037e7f..2255d80c 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -910,9 +910,8 @@ fn emit_agg_start_new_coverage(store: &Store, committee_count: u64) { cov_record("agg_start_new", &seen, &has_subnet); } -/// `proposal_payloads` / `proposal_gossip` / `proposal_combined` for a block -/// we are about to publish. Each block-included validator is classified by -/// whether its `AttestationData` has a matching known-payload proof. +/// `proposal_combined` coverage for a block we are about to publish: the full +/// set of validators included across the block's aggregated attestations. fn emit_proposal_coverage<'a>( store: &Store, committee_count: u64, @@ -925,41 +924,8 @@ fn emit_proposal_coverage<'a>( let cc = committee_count as usize; let mut combined_v = vec![false; validator_count]; let mut combined_s = vec![false; cc]; - let mut payload_seen = vec![false; validator_count]; - let known = store.known_aggregated_payloads(); for att in selected { cov_add(&mut combined_v, &mut combined_s, &att.aggregation_bits); - let data_root = att.data.hash_tree_root(); - if let Some((_, proofs)) = known.get(&data_root) { - for proof in proofs { - for vid in validator_indices(&proof.participants) { - let vid = vid as usize; - if vid < payload_seen.len() { - payload_seen[vid] = true; - } - } - } - } - } - - let mut payloads_v = vec![false; validator_count]; - let mut payloads_s = vec![false; cc]; - let mut gossip_v = vec![false; validator_count]; - let mut gossip_s = vec![false; cc]; - for vid in 0..validator_count { - if !combined_v[vid] { - continue; - } - let subnet = vid % cc; - if payload_seen[vid] { - payloads_v[vid] = true; - payloads_s[subnet] = true; - } else { - gossip_v[vid] = true; - gossip_s[subnet] = true; - } } - cov_record("proposal_payloads", &payloads_v, &payloads_s); - cov_record("proposal_gossip", &gossip_v, &gossip_s); cov_record("proposal_combined", &combined_v, &combined_s); } diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index fac3adfc..fbfff10d 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -16,8 +16,6 @@ pub const ATTESTATION_AGGREGATE_COVERAGE_SECTIONS: &[&str] = &[ "block", "combined", "agg_start_new", - "proposal_payloads", - "proposal_gossip", "proposal_combined", ]; From 04569eec35d45718d8d1ac8492c8f7ed5e7ac65e Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 26 May 2026 19:07:09 -0300 Subject: [PATCH 5/8] fix(blockchain): correct attestation aggregate coverage slot keying Key every coverage section by the attestation data.slot (voting round) and fire the per-slot report at interval 1, matching zeam #876: - timely: CoverageSnapshot now stores (data.slot, bits) per entry and the report filters by data.slot, removing the non-deterministic slot picked from HashMap iteration order. - block: read from the canonical head block at report time (filtered to data.slot) instead of an unconditionally-overwritten per-block snapshot, so fork blocks cant poison it. Removes last_block_coverage from Store. - trigger: emit at interval 1 (not 0) so the block carrying the reporting slots votes has been received and processed. - has_any gate: emit nothing when no section has coverage for the round, instead of a misleading block_only=0 / timely_only=N reading. --- crates/blockchain/src/lib.rs | 54 +++++++++++++++++++++------------- crates/blockchain/src/store.rs | 26 +++++----------- crates/storage/src/store.rs | 36 ++++++++--------------- 3 files changed, 53 insertions(+), 63 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 34320931..423d7717 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -197,12 +197,6 @@ impl BlockChainServer { proposer_validator_id.is_some(), ); - // Emit the post-block attestation aggregate coverage report for the - // previous slot at the start of each new slot. - if interval == 0 && slot > 0 { - emit_post_block_coverage(&self.store, self.attestation_committee_count, slot - 1); - } - if interval == 2 && is_aggregator { emit_agg_start_new_coverage(&self.store, self.attestation_committee_count); self.start_aggregation_session(slot, ctx).await; @@ -217,6 +211,13 @@ impl BlockChainServer { // Reuse the same snapshot so self-delivery decisions match the rest // of the tick. if interval == 1 { + // Emit the post-block coverage report for the previous slot. Fired + // at interval 1 (not 0) so the block carrying `slot - 1`'s votes — + // proposed at interval 0 of this slot — has typically been received + // and processed, letting the `block` section see the same round. + if slot > 0 { + emit_post_block_coverage(&self.store, self.attestation_committee_count, slot - 1); + } self.produce_attestations(slot, is_aggregator); } @@ -842,16 +843,19 @@ fn emit_post_block_coverage(store: &Store, committee_count: u64, reporting_slot: let (mut late_v, mut late_s) = (vec![false; validator_count], vec![false; cc]); let (mut block_v, mut block_s) = (vec![false; validator_count], vec![false; cc]); - // `timely`: pre-merge snapshot of `new_payloads` captured before promote. - if let Some(snap) = store.pre_merge_new_coverage() - && snap.slot == reporting_slot - { - for bits in &snap.participant_bits { - cov_add(&mut timely_v, &mut timely_s, bits); + // Every section is the same cohort: validators whose attestations *for* + // `reporting_slot` (`data.slot == reporting_slot`) were seen via that + // channel. + + // `timely`: pre-merge snapshot of `new_payloads`, filtered to this round. + if let Some(snap) = store.pre_merge_new_coverage() { + for (data_slot, bits) in &snap.entries { + if *data_slot == reporting_slot { + cov_add(&mut timely_v, &mut timely_s, bits); + } } } - // `late`: current `new_payloads` matching the reporting slot - // (arrived after the last promote). + // `late`: current `new_payloads` for this round (arrived after the promote). for (data, proofs) in store.new_aggregated_payloads().values() { if data.slot == reporting_slot { for proof in proofs { @@ -859,12 +863,15 @@ fn emit_post_block_coverage(store: &Store, committee_count: u64, reporting_slot: } } } - // `block`: participant bits from the most-recently-imported block. - if let Some(snap) = store.last_block_coverage() - && snap.slot == reporting_slot - { - for bits in &snap.participant_bits { - cov_add(&mut block_v, &mut block_s, bits); + // `block`: attestations included in the canonical head block. At interval 1 + // the head is normally the block proposed at `reporting_slot + 1`, which + // carries this round's votes; filter by `data.slot` so we count the same + // cohort even if the head is at a different slot. + if let Some(block) = store.get_block(&store.head()) { + for att in block.body.attestations.iter() { + if att.data.slot == reporting_slot { + cov_add(&mut block_v, &mut block_s, &att.aggregation_bits); + } } } @@ -875,6 +882,13 @@ fn emit_post_block_coverage(store: &Store, committee_count: u64, reporting_slot: or_into(&mut combined_v, &block_v); or_into(&mut combined_s, &block_s); + // Emit nothing when there is no coverage for this round, rather than + // pushing a misleading all-zero report (e.g. `block_only=0, timely_only=N` + // on a missed slot). Gauges retain their previous value. + if !combined_v.iter().any(|&b| b) { + return; + } + cov_record("timely", &timely_v, &timely_s); cov_record("late", &late_v, &late_s); cov_record("block", &block_v, &block_s); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 91e25fde..4233f3e7 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -35,25 +35,22 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) { } /// Capture the participant bits of every entry in `new_payloads` for the -/// attestation aggregate coverage report. Stored on the Store so the -/// post-block report at the next slot boundary can read it. +/// attestation aggregate coverage report. Each entry is tagged with its +/// attestation `data.slot` so the post-block report can filter to a single +/// voting round (`new_payloads` may span multiple slots). Stored on the Store +/// so the report at the next slot boundary can read it. fn snapshot_pre_merge_new_coverage(store: &Store) { let new_payloads = store.new_aggregated_payloads(); if new_payloads.is_empty() { return; } - let mut slot: u64 = 0; - let mut participant_bits: Vec = Vec::new(); + let mut entries: Vec<(u64, AggregationBits)> = Vec::new(); for (data, proofs) in new_payloads.values() { - slot = data.slot; for proof in proofs { - participant_bits.push(proof.participants.clone()); + entries.push((data.slot, proof.participants.clone())); } } - store.save_pre_merge_new_coverage(CoverageSnapshot { - slot, - participant_bits, - }); + store.save_pre_merge_new_coverage(CoverageSnapshot { entries }); } /// Update the head based on the fork choice rule. @@ -524,13 +521,11 @@ fn on_block_core( // Store one proof per attestation data in known aggregated payloads. let mut known_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); - let mut block_participant_bits: Vec = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone())); - block_participant_bits.push(att.aggregation_bits.clone()); // Count each participating validator as a valid attestation let count = validator_indices(&att.aggregation_bits).count() as u64; metrics::inc_attestations_valid(count); @@ -538,13 +533,6 @@ fn on_block_core( store.insert_known_aggregated_payloads_batch(known_entries); - // Capture block-included participant bits for the attestation aggregate - // coverage report (observability-only; does not affect fork choice). - store.save_last_block_coverage(CoverageSnapshot { - slot, - participant_bits: block_participant_bits, - }); - // Update forkchoice head based on new block and attestations update_head(store, false); diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 8927b885..f66cb747 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -488,15 +488,17 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { (slot, root) } -/// Snapshot of `AggregationBits` for one slot, used by the attestation -/// aggregate coverage report. +/// Pre-merge snapshot of `new_payloads` participant bits, used by the +/// attestation aggregate coverage report. /// -/// Holds raw participant bits; the consumer (blockchain crate) constructs -/// `Coverage` at emit time using the current validator and committee counts. +/// Each entry is tagged with its attestation `data.slot` (the voting round) so +/// the consumer can filter to a single round at emit time — `new_payloads` may +/// hold entries spanning more than one slot. Holds raw participant bits; the +/// consumer (blockchain crate) constructs coverage bitsets at emit time using +/// the current validator and committee counts. #[derive(Debug, Clone)] pub struct CoverageSnapshot { - pub slot: u64, - pub participant_bits: Vec, + pub entries: Vec<(u64, AggregationBits)>, } /// Fork choice store backed by a pluggable storage backend. @@ -524,9 +526,6 @@ pub struct Store { /// Snapshot of `new_payloads` participant bits captured right before each /// promote-to-known. Observability-only. pre_merge_new_coverage: Arc>>, - /// Snapshot of the most-recently-imported block's aggregated attestation - /// participant bits. Reset on each imported block. Observability-only. - last_block_coverage: Arc>>, } impl Store { @@ -662,7 +661,6 @@ impl Store { GOSSIP_SIGNATURE_CAP, ))), pre_merge_new_coverage: Arc::new(Mutex::new(None)), - last_block_coverage: Arc::new(Mutex::new(None)), } } @@ -1287,11 +1285,11 @@ impl Store { .collect() } - // ============ Coverage Snapshots ============ + // ============ Coverage Snapshot ============ // - // Observability-only state captured by `accept_new_attestations` and - // `on_block_core` in the blockchain crate. Read once per slot by the - // attestation aggregate coverage report. + // Observability-only state captured by `accept_new_attestations` in the + // blockchain crate. Read once per slot by the attestation aggregate + // coverage report. pub fn save_pre_merge_new_coverage(&self, snapshot: CoverageSnapshot) { *self.pre_merge_new_coverage.lock().unwrap() = Some(snapshot); @@ -1301,14 +1299,6 @@ impl Store { self.pre_merge_new_coverage.lock().unwrap().clone() } - pub fn save_last_block_coverage(&self, snapshot: CoverageSnapshot) { - *self.last_block_coverage.lock().unwrap() = Some(snapshot); - } - - pub fn last_block_coverage(&self) -> Option { - self.last_block_coverage.lock().unwrap().clone() - } - /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); @@ -1483,7 +1473,6 @@ mod tests { GOSSIP_SIGNATURE_CAP, ))), pre_merge_new_coverage: Arc::new(Mutex::new(None)), - last_block_coverage: Arc::new(Mutex::new(None)), } } @@ -1498,7 +1487,6 @@ mod tests { GOSSIP_SIGNATURE_CAP, ))), pre_merge_new_coverage: Arc::new(Mutex::new(None)), - last_block_coverage: Arc::new(Mutex::new(None)), } } } From 6d8800165d5c08f265ed3998126d4b06c825759a Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 27 May 2026 15:25:31 -0300 Subject: [PATCH 6/8] Move attestation coverage snapshot state out of the storage crate The storage Store carried observability-only state for the attestation aggregate coverage report (the CoverageSnapshot type and a pre_merge_new_coverage field), which has no fork-choice or state-transition role. Move it into the blockchain crate behind a new PreMergeCoverage handle owned by the BlockChainServer actor, threaded through on_tick / accept_new_attestations / get_proposal_head / produce_block_with_signatures as an Option so the snapshot is captured at promotion and read at the next slot boundary exactly as before. Tests and the RPC test-driver pass None. Pure refactor, no metric behavior change. --- crates/blockchain/src/lib.rs | 64 +++++++++++++++++-- crates/blockchain/src/store.rs | 41 +++++++----- .../blockchain/tests/forkchoice_spectests.rs | 4 +- .../blockchain/tests/signature_spectests.rs | 2 +- crates/net/rpc/src/test_driver.rs | 9 ++- crates/storage/src/lib.rs | 2 +- crates/storage/src/store.rs | 38 +---------- 7 files changed, 96 insertions(+), 64 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 423d7717..5263c7dc 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; @@ -92,6 +93,7 @@ impl BlockChain { current_aggregation: None, last_tick_instant: None, attestation_committee_count, + pre_merge_coverage: PreMergeCoverage::default(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -149,6 +151,11 @@ pub struct BlockChainServer { /// Number of attestation committees (= subnet count). Used by the /// attestation aggregate coverage emission. attestation_committee_count: u64, + + /// Pre-merge `new_payloads` snapshot for the attestation aggregate coverage + /// report. Written during attestation promotion, read at the next slot + /// boundary. Observability-only. + pre_merge_coverage: PreMergeCoverage, } impl BlockChainServer { @@ -195,6 +202,7 @@ impl BlockChainServer { &mut self.store, timestamp_ms, proposer_validator_id.is_some(), + Some(&self.pre_merge_coverage), ); if interval == 2 && is_aggregator { @@ -216,7 +224,12 @@ impl BlockChainServer { // proposed at interval 0 of this slot — has typically been received // and processed, letting the `block` section see the same round. if slot > 0 { - emit_post_block_coverage(&self.store, self.attestation_committee_count, slot - 1); + emit_post_block_coverage( + &self.store, + &self.pre_merge_coverage, + self.attestation_committee_count, + slot - 1, + ); } self.produce_attestations(slot, is_aggregator); } @@ -352,8 +365,13 @@ impl BlockChainServer { // Build the block with attestation signatures let Ok((block, attestation_signatures, _post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) + store::produce_block_with_signatures( + &mut self.store, + slot, + validator_id, + Some(&self.pre_merge_coverage), + ) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) else { metrics::inc_block_building_failures(); return; @@ -797,6 +815,37 @@ impl Handler for BlockChainServer { // in `crates/net/p2p/src/lib.rs`). Pure observability — no fork-choice or // state-transition effect. +/// Pre-merge snapshot of `new_payloads` participant bits, used by the +/// attestation aggregate coverage report. +/// +/// Each entry is tagged with its attestation `data.slot` (the voting round) so +/// the consumer can filter to a single round at emit time — `new_payloads` may +/// hold entries spanning more than one slot. Holds raw participant bits; the +/// consumer constructs coverage bitsets at emit time using the current +/// validator and committee counts. +#[derive(Debug, Clone)] +pub(crate) struct CoverageSnapshot { + pub(crate) entries: Vec<(u64, AggregationBits)>, +} + +/// Observability-only handle holding the most recent pre-merge `new_payloads` +/// snapshot. Owned by the blockchain actor; written during attestation +/// promotion (`accept_new_attestations`) and read once per slot by the +/// post-block coverage report. Kept out of the storage `Store` since it carries +/// no fork-choice or state-transition state. +#[derive(Clone, Default)] +pub struct PreMergeCoverage(Arc>>); + +impl PreMergeCoverage { + pub(crate) fn save(&self, snapshot: CoverageSnapshot) { + *self.0.lock().unwrap() = Some(snapshot); + } + + pub(crate) fn get(&self) -> Option { + self.0.lock().unwrap().clone() + } +} + fn cov_add(seen: &mut [bool], has_subnet: &mut [bool], bits: &AggregationBits) { let cc = has_subnet.len(); if cc == 0 { @@ -833,7 +882,12 @@ fn or_into(dst: &mut [bool], src: &[bool]) { /// `block` / `combined` sections plus the `diff_validators` symmetric /// difference between `block` and `timely`. Called at interval 0 of the /// next slot. -fn emit_post_block_coverage(store: &Store, committee_count: u64, reporting_slot: u64) { +fn emit_post_block_coverage( + store: &Store, + pre_merge_coverage: &PreMergeCoverage, + committee_count: u64, + reporting_slot: u64, +) { let validator_count = store.head_state().validators.len(); if validator_count == 0 || committee_count == 0 { return; @@ -848,7 +902,7 @@ fn emit_post_block_coverage(store: &Store, committee_count: u64, reporting_slot: // channel. // `timely`: pre-merge snapshot of `new_payloads`, filtered to this round. - if let Some(snap) = store.pre_merge_new_coverage() { + if let Some(snap) = pre_merge_coverage.get() { for (data_slot, bits) in &snap.entries { if *data_slot == reporting_slot { cov_add(&mut timely_v, &mut timely_s, bits); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 4233f3e7..4dad9f11 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use ethlambda_state_transition::{is_proposer, slot_is_justifiable_after}; -use ethlambda_storage::{CoverageSnapshot, ForkCheckpoints, Store}; +use ethlambda_storage::{ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -17,8 +17,8 @@ use ethlambda_types::{ use tracing::{info, trace, warn}; use crate::{ - GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, - MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, + CoverageSnapshot, GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, + MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, PreMergeCoverage, block_builder::{PostBlockCheckpoints, build_block}, metrics, }; @@ -26,8 +26,10 @@ use crate::{ const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// Accept new aggregated payloads, promoting them to known for fork choice. -fn accept_new_attestations(store: &mut Store, log_tree: bool) { - snapshot_pre_merge_new_coverage(store); +fn accept_new_attestations(store: &mut Store, log_tree: bool, coverage: Option<&PreMergeCoverage>) { + if let Some(coverage) = coverage { + snapshot_pre_merge_new_coverage(store, coverage); + } store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); @@ -37,9 +39,10 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) { /// Capture the participant bits of every entry in `new_payloads` for the /// attestation aggregate coverage report. Each entry is tagged with its /// attestation `data.slot` so the post-block report can filter to a single -/// voting round (`new_payloads` may span multiple slots). Stored on the Store -/// so the report at the next slot boundary can read it. -fn snapshot_pre_merge_new_coverage(store: &Store) { +/// voting round (`new_payloads` may span multiple slots). Stored on the +/// actor-owned `PreMergeCoverage` handle so the report at the next slot +/// boundary can read it. +fn snapshot_pre_merge_new_coverage(store: &Store, coverage: &PreMergeCoverage) { let new_payloads = store.new_aggregated_payloads(); if new_payloads.is_empty() { return; @@ -50,7 +53,7 @@ fn snapshot_pre_merge_new_coverage(store: &Store) { entries.push((data.slot, proof.participants.clone())); } } - store.save_pre_merge_new_coverage(CoverageSnapshot { entries }); + coverage.save(CoverageSnapshot { entries }); } /// Update the head based on the fork choice rule. @@ -212,7 +215,12 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { +pub fn on_tick( + store: &mut Store, + timestamp_ms: u64, + has_proposal: bool, + coverage: Option<&PreMergeCoverage>, +) { // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -245,7 +253,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { 0 => { // Start of slot - process attestations if proposal exists if should_signal_proposal { - accept_new_attestations(store, false); + accept_new_attestations(store, false, coverage); } } 1 => { @@ -260,7 +268,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { } 4 => { // End of slot - accept accumulated attestations and log tree - accept_new_attestations(store, true); + accept_new_attestations(store, true, coverage); } _ => unreachable!("slots only have 5 intervals"), } @@ -676,15 +684,15 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// /// Ensures store is up-to-date and processes any pending attestations /// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { +fn get_proposal_head(store: &mut Store, slot: u64, coverage: Option<&PreMergeCoverage>) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true); + on_tick(store, slot_time_ms, true, coverage); // Process any pending attestations before proposal - accept_new_attestations(store, false); + accept_new_attestations(store, false, coverage); store.head() } @@ -697,9 +705,10 @@ pub fn produce_block_with_signatures( store: &mut Store, slot: u64, validator_index: u64, + coverage: Option<&PreMergeCoverage>, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { // Get parent block and state to build upon - let head_root = get_proposal_head(store, slot); + let head_root = get_proposal_head(store, slot, coverage); let head_state = store .get_state(&head_root) .ok_or(StoreError::MissingParentState { diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e095991d..a15a2108 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -95,7 +95,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time_ms, true); + store::on_tick(&mut store, block_time_ms, true, None); let result = store::on_block_without_verification(&mut store, signed_block); assert_step_outcome(step_idx, step.valid, result)?; } @@ -111,7 +111,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { (None, None) => panic!("tick step missing both time and interval"), }; let has_proposal = step.has_proposal.unwrap_or(false); - store::on_tick(&mut store, timestamp_ms, has_proposal); + store::on_tick(&mut store, timestamp_ms, has_proposal, None); } "attestation" => { let att_data = step diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 5f6b0bd8..61987d2e 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -50,7 +50,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true); + store::on_tick(&mut st, block_time_ms, true, None); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); diff --git a/crates/net/rpc/src/test_driver.rs b/crates/net/rpc/src/test_driver.rs index 7bf4033a..bf434228 100644 --- a/crates/net/rpc/src/test_driver.rs +++ b/crates/net/rpc/src/test_driver.rs @@ -347,7 +347,12 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { } (None, None) => return Err("tick step missing time and interval".to_string()), }; - store::on_tick(store, timestamp_ms, step.has_proposal.unwrap_or(false)); + store::on_tick( + store, + timestamp_ms, + step.has_proposal.unwrap_or(false), + None, + ); Ok(()) } "block" => { @@ -359,7 +364,7 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { // before importing so the future-slot guard doesn't reject it. let block_time_ms = store.config().genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(store, block_time_ms, true); + store::on_tick(store, block_time_ms, true, None); store::on_block_without_verification(store, signed_block).map_err(|e| e.to_string()) } "attestation" => { diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index ca067f9b..9662a36c 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,4 +3,4 @@ pub mod backend; mod store; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{CoverageSnapshot, ForkCheckpoints, GetForkchoiceStoreError, Store}; +pub use store::{ForkCheckpoints, GetForkchoiceStoreError, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index f66cb747..43bc2522 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -4,10 +4,7 @@ use std::sync::{Arc, LazyLock, Mutex}; use crate::api::{StorageBackend, StorageWriteBatch, Table}; use ethlambda_types::{ - attestation::{ - AggregationBits, AttestationData, HashedAttestationData, bits_is_subset, - blank_xmss_signature, - }, + attestation::{AttestationData, HashedAttestationData, bits_is_subset, blank_xmss_signature}, block::{ AggregatedSignatureProof, AttestationSignatures, Block, BlockBody, BlockHeader, BlockSignatures, SignedBlock, @@ -488,19 +485,6 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { (slot, root) } -/// Pre-merge snapshot of `new_payloads` participant bits, used by the -/// attestation aggregate coverage report. -/// -/// Each entry is tagged with its attestation `data.slot` (the voting round) so -/// the consumer can filter to a single round at emit time — `new_payloads` may -/// hold entries spanning more than one slot. Holds raw participant bits; the -/// consumer (blockchain crate) constructs coverage bitsets at emit time using -/// the current validator and committee counts. -#[derive(Debug, Clone)] -pub struct CoverageSnapshot { - pub entries: Vec<(u64, AggregationBits)>, -} - /// Fork choice store backed by a pluggable storage backend. /// /// The Store maintains all state required for fork choice and block processing: @@ -523,9 +507,6 @@ pub struct Store { known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. gossip_signatures: Arc>, - /// Snapshot of `new_payloads` participant bits captured right before each - /// promote-to-known. Observability-only. - pre_merge_new_coverage: Arc>>, } impl Store { @@ -660,7 +641,6 @@ impl Store { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), - pre_merge_new_coverage: Arc::new(Mutex::new(None)), } } @@ -1285,20 +1265,6 @@ impl Store { .collect() } - // ============ Coverage Snapshot ============ - // - // Observability-only state captured by `accept_new_attestations` in the - // blockchain crate. Read once per slot by the attestation aggregate - // coverage report. - - pub fn save_pre_merge_new_coverage(&self, snapshot: CoverageSnapshot) { - *self.pre_merge_new_coverage.lock().unwrap() = Some(snapshot); - } - - pub fn pre_merge_new_coverage(&self) -> Option { - self.pre_merge_new_coverage.lock().unwrap().clone() - } - /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); @@ -1472,7 +1438,6 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), - pre_merge_new_coverage: Arc::new(Mutex::new(None)), } } @@ -1486,7 +1451,6 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), - pre_merge_new_coverage: Arc::new(Mutex::new(None)), } } } From c751426892be9d34d68ad5c0a099b40419c878d3 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 27 May 2026 16:22:38 -0300 Subject: [PATCH 7/8] refactor(blockchain): move attestation coverage emission into its own module Address review feedback: relocate all attestation aggregate coverage observability into a new crates/blockchain/src/coverage.rs. The pre-merge new_payloads snapshot is now captured by the actor in BlockChainServer::on_tick (when proposing or at the end of the slot) instead of being threaded as a parameter through store::on_tick / accept_new_attestations / get_proposal_head / produce_block_with_signatures. Since the snapshot is owned solely by the actor and only touched from its single-threaded message loop, the Arc handle is replaced by a plain Option field. No behavior change - coverage emission is observability-only. --- crates/blockchain/src/coverage.rs | 201 +++++++++++++++ crates/blockchain/src/lib.rs | 236 ++---------------- crates/blockchain/src/store.rs | 53 +--- .../blockchain/tests/forkchoice_spectests.rs | 4 +- .../blockchain/tests/signature_spectests.rs | 2 +- crates/net/rpc/src/test_driver.rs | 9 +- 6 files changed, 242 insertions(+), 263 deletions(-) create mode 100644 crates/blockchain/src/coverage.rs diff --git a/crates/blockchain/src/coverage.rs b/crates/blockchain/src/coverage.rs new file mode 100644 index 00000000..e249ae4f --- /dev/null +++ b/crates/blockchain/src/coverage.rs @@ -0,0 +1,201 @@ +//! Attestation aggregate coverage emission. +//! +//! Pure observability — nothing here feeds back into fork choice or the state +//! transition. The emitters build `Vec` locals (`seen` for validators, +//! `has_subnet` for subnets, with subnet = `vid % committee_count`, matching +//! the gossip subnet assignment in `crates/net/p2p/src/lib.rs`) and push the +//! resulting counts to the coverage gauges registered in +//! [`crate::metrics`]. + +use ethlambda_storage::Store; +use ethlambda_types::attestation::{AggregatedAttestation, AggregationBits, validator_indices}; + +use crate::metrics; + +/// Pre-merge snapshot of `new_payloads` participant bits, used by the +/// attestation aggregate coverage report. +/// +/// Each entry is tagged with its attestation `data.slot` (the voting round) so +/// the consumer can filter to a single round at emit time — `new_payloads` may +/// hold entries spanning more than one slot. Holds raw participant bits; the +/// consumer constructs coverage bitsets at emit time using the current +/// validator and committee counts. +#[derive(Debug, Clone)] +pub(crate) struct CoverageSnapshot { + pub(crate) entries: Vec<(u64, AggregationBits)>, +} + +/// Capture the participant bits of every entry in `new_payloads` for the +/// attestation aggregate coverage report. Each entry is tagged with its +/// attestation `data.slot` so the post-block report can filter to a single +/// voting round (`new_payloads` may span multiple slots). +/// +/// Returns `None` when `new_payloads` is empty so callers can keep their last +/// non-empty snapshot rather than overwriting it with nothing — a node that +/// missed a round still reports the round it last saw. +pub(crate) fn snapshot_new_payloads(store: &Store) -> Option { + let new_payloads = store.new_aggregated_payloads(); + if new_payloads.is_empty() { + return None; + } + let mut entries: Vec<(u64, AggregationBits)> = Vec::new(); + for (data, proofs) in new_payloads.values() { + for proof in proofs { + entries.push((data.slot, proof.participants.clone())); + } + } + Some(CoverageSnapshot { entries }) +} + +fn cov_add(seen: &mut [bool], has_subnet: &mut [bool], bits: &AggregationBits) { + let cc = has_subnet.len(); + if cc == 0 { + return; + } + for vid in validator_indices(bits) { + let vid = vid as usize; + if vid < seen.len() { + seen[vid] = true; + has_subnet[vid % cc] = true; + } + } +} + +fn cov_record(section: &str, seen: &[bool], has_subnet: &[bool]) { + metrics::set_attestation_aggregate_coverage_validators( + section, + "combined", + seen.iter().filter(|&&b| b).count() as i64, + ); + metrics::set_attestation_aggregate_coverage_subnets( + section, + has_subnet.iter().filter(|&&b| b).count() as i64, + ); +} + +fn or_into(dst: &mut [bool], src: &[bool]) { + for (d, &s) in dst.iter_mut().zip(src) { + *d |= s; + } +} + +/// Post-block coverage report for `reporting_slot`. Emits `timely` / `late` / +/// `block` / `combined` sections plus the `diff_validators` symmetric +/// difference between `block` and `timely`. Called at interval 1 of the +/// next slot. +pub(crate) fn emit_post_block_coverage( + store: &Store, + pre_merge_coverage: Option<&CoverageSnapshot>, + committee_count: u64, + reporting_slot: u64, +) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let (mut timely_v, mut timely_s) = (vec![false; validator_count], vec![false; cc]); + let (mut late_v, mut late_s) = (vec![false; validator_count], vec![false; cc]); + let (mut block_v, mut block_s) = (vec![false; validator_count], vec![false; cc]); + + // Every section is the same cohort: validators whose attestations *for* + // `reporting_slot` (`data.slot == reporting_slot`) were seen via that + // channel. + + // `timely`: pre-merge snapshot of `new_payloads`, filtered to this round. + if let Some(snap) = pre_merge_coverage { + for (data_slot, bits) in &snap.entries { + if *data_slot == reporting_slot { + cov_add(&mut timely_v, &mut timely_s, bits); + } + } + } + // `late`: current `new_payloads` for this round (arrived after the promote). + for (data, proofs) in store.new_aggregated_payloads().values() { + if data.slot == reporting_slot { + for proof in proofs { + cov_add(&mut late_v, &mut late_s, &proof.participants); + } + } + } + // `block`: attestations included in the canonical head block. At interval 1 + // the head is normally the block proposed at `reporting_slot + 1`, which + // carries this round's votes; filter by `data.slot` so we count the same + // cohort even if the head is at a different slot. + if let Some(block) = store.get_block(&store.head()) { + for att in block.body.attestations.iter() { + if att.data.slot == reporting_slot { + cov_add(&mut block_v, &mut block_s, &att.aggregation_bits); + } + } + } + + let mut combined_v = timely_v.clone(); + let mut combined_s = timely_s.clone(); + or_into(&mut combined_v, &late_v); + or_into(&mut combined_s, &late_s); + or_into(&mut combined_v, &block_v); + or_into(&mut combined_s, &block_s); + + // Emit nothing when there is no coverage for this round, rather than + // pushing a misleading all-zero report (e.g. `block_only=0, timely_only=N` + // on a missed slot). Gauges retain their previous value. + if !combined_v.iter().any(|&b| b) { + return; + } + + cov_record("timely", &timely_v, &timely_s); + cov_record("late", &late_v, &late_s); + cov_record("block", &block_v, &block_s); + cov_record("combined", &combined_v, &combined_s); + + let (block_only, timely_only) = + block_v + .iter() + .zip(timely_v.iter()) + .fold((0i64, 0i64), |(b, t), (bv, tv)| match (bv, tv) { + (true, false) => (b + 1, t), + (false, true) => (b, t + 1), + _ => (b, t), + }); + metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only); + metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only); +} + +/// `agg_start_new` coverage from `new_payloads`, called right before fork- +/// choice aggregation runs at interval 2. +pub(crate) fn emit_agg_start_new_coverage(store: &Store, committee_count: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let mut seen = vec![false; validator_count]; + let mut has_subnet = vec![false; cc]; + for (_, proofs) in store.new_aggregated_payloads().values() { + for proof in proofs { + cov_add(&mut seen, &mut has_subnet, &proof.participants); + } + } + cov_record("agg_start_new", &seen, &has_subnet); +} + +/// `proposal_combined` coverage for a block we are about to publish: the full +/// set of validators included across the block's aggregated attestations. +pub(crate) fn emit_proposal_coverage<'a>( + store: &Store, + committee_count: u64, + selected: impl IntoIterator, +) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let mut combined_v = vec![false; validator_count]; + let mut combined_s = vec![false; cc]; + for att in selected { + cov_add(&mut combined_v, &mut combined_s, &att.aggregation_bits); + } + cov_record("proposal_combined", &combined_v, &combined_s); +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 5263c7dc..414c95ad 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; @@ -8,10 +7,7 @@ use ethlambda_storage::{ALL_TABLES, Store}; use ethlambda_types::{ ShortRoot, aggregator::AggregatorController, - attestation::{ - AggregatedAttestation, AggregationBits, SignedAggregatedAttestation, SignedAttestation, - validator_indices, - }, + attestation::{SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, SignedBlock}, primitives::{H256, HashTreeRoot as _}, }; @@ -32,6 +28,7 @@ use crate::store::StoreError; pub mod aggregation; pub mod block_builder; +pub(crate) mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -93,7 +90,7 @@ impl BlockChain { current_aggregation: None, last_tick_instant: None, attestation_committee_count, - pre_merge_coverage: PreMergeCoverage::default(), + pre_merge_coverage: None, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -153,9 +150,11 @@ pub struct BlockChainServer { attestation_committee_count: u64, /// Pre-merge `new_payloads` snapshot for the attestation aggregate coverage - /// report. Written during attestation promotion, read at the next slot - /// boundary. Observability-only. - pre_merge_coverage: PreMergeCoverage, + /// report. Captured before the store promotes attestations (when proposing + /// or at the end of the slot), read at the next slot boundary. Owned solely + /// by the actor and only touched from the single-threaded message loop, so + /// no synchronization is needed. Observability-only. + pre_merge_coverage: Option, } impl BlockChainServer { @@ -197,16 +196,25 @@ impl BlockChainServer { .then(|| self.get_our_proposer(slot)) .flatten(); + // Snapshot the pre-merge `new_payloads` set before the store may promote + // it on this tick, so the post-block coverage report can see the + // "timely" cohort. Promotion happens when proposing (interval 0) or at + // the end of the slot (interval 4); skip empty snapshots so a missed + // round keeps the last set we saw. Pure observability. + let will_promote = interval == 4 || (interval == 0 && proposer_validator_id.is_some()); + if will_promote && let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) { + self.pre_merge_coverage = Some(snapshot); + } + // Tick the store first - this accepts attestations at interval 0 if we have a proposal store::on_tick( &mut self.store, timestamp_ms, proposer_validator_id.is_some(), - Some(&self.pre_merge_coverage), ); if interval == 2 && is_aggregator { - emit_agg_start_new_coverage(&self.store, self.attestation_committee_count); + coverage::emit_agg_start_new_coverage(&self.store, self.attestation_committee_count); self.start_aggregation_session(slot, ctx).await; } @@ -224,9 +232,9 @@ impl BlockChainServer { // proposed at interval 0 of this slot — has typically been received // and processed, letting the `block` section see the same round. if slot > 0 { - emit_post_block_coverage( + coverage::emit_post_block_coverage( &self.store, - &self.pre_merge_coverage, + self.pre_merge_coverage.as_ref(), self.attestation_committee_count, slot - 1, ); @@ -365,19 +373,14 @@ impl BlockChainServer { // Build the block with attestation signatures let Ok((block, attestation_signatures, _post_checkpoints)) = - store::produce_block_with_signatures( - &mut self.store, - slot, - validator_id, - Some(&self.pre_merge_coverage), - ) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) + store::produce_block_with_signatures(&mut self.store, slot, validator_id) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) else { metrics::inc_block_building_failures(); return; }; - emit_proposal_coverage( + coverage::emit_proposal_coverage( &self.store, self.attestation_committee_count, block.body.attestations.iter(), @@ -807,194 +810,3 @@ impl Handler for BlockChainServer { } } } - -// --- Attestation aggregate coverage emission --- -// -// `seen` tracks covered validators; `has_subnet` tracks covered subnets -// (subnet = `vid % committee_count`, matching the gossip subnet assignment -// in `crates/net/p2p/src/lib.rs`). Pure observability — no fork-choice or -// state-transition effect. - -/// Pre-merge snapshot of `new_payloads` participant bits, used by the -/// attestation aggregate coverage report. -/// -/// Each entry is tagged with its attestation `data.slot` (the voting round) so -/// the consumer can filter to a single round at emit time — `new_payloads` may -/// hold entries spanning more than one slot. Holds raw participant bits; the -/// consumer constructs coverage bitsets at emit time using the current -/// validator and committee counts. -#[derive(Debug, Clone)] -pub(crate) struct CoverageSnapshot { - pub(crate) entries: Vec<(u64, AggregationBits)>, -} - -/// Observability-only handle holding the most recent pre-merge `new_payloads` -/// snapshot. Owned by the blockchain actor; written during attestation -/// promotion (`accept_new_attestations`) and read once per slot by the -/// post-block coverage report. Kept out of the storage `Store` since it carries -/// no fork-choice or state-transition state. -#[derive(Clone, Default)] -pub struct PreMergeCoverage(Arc>>); - -impl PreMergeCoverage { - pub(crate) fn save(&self, snapshot: CoverageSnapshot) { - *self.0.lock().unwrap() = Some(snapshot); - } - - pub(crate) fn get(&self) -> Option { - self.0.lock().unwrap().clone() - } -} - -fn cov_add(seen: &mut [bool], has_subnet: &mut [bool], bits: &AggregationBits) { - let cc = has_subnet.len(); - if cc == 0 { - return; - } - for vid in validator_indices(bits) { - let vid = vid as usize; - if vid < seen.len() { - seen[vid] = true; - has_subnet[vid % cc] = true; - } - } -} - -fn cov_record(section: &str, seen: &[bool], has_subnet: &[bool]) { - metrics::set_attestation_aggregate_coverage_validators( - section, - "combined", - seen.iter().filter(|&&b| b).count() as i64, - ); - metrics::set_attestation_aggregate_coverage_subnets( - section, - has_subnet.iter().filter(|&&b| b).count() as i64, - ); -} - -fn or_into(dst: &mut [bool], src: &[bool]) { - for (d, &s) in dst.iter_mut().zip(src) { - *d |= s; - } -} - -/// Post-block coverage report for `reporting_slot`. Emits `timely` / `late` / -/// `block` / `combined` sections plus the `diff_validators` symmetric -/// difference between `block` and `timely`. Called at interval 0 of the -/// next slot. -fn emit_post_block_coverage( - store: &Store, - pre_merge_coverage: &PreMergeCoverage, - committee_count: u64, - reporting_slot: u64, -) { - let validator_count = store.head_state().validators.len(); - if validator_count == 0 || committee_count == 0 { - return; - } - let cc = committee_count as usize; - let (mut timely_v, mut timely_s) = (vec![false; validator_count], vec![false; cc]); - let (mut late_v, mut late_s) = (vec![false; validator_count], vec![false; cc]); - let (mut block_v, mut block_s) = (vec![false; validator_count], vec![false; cc]); - - // Every section is the same cohort: validators whose attestations *for* - // `reporting_slot` (`data.slot == reporting_slot`) were seen via that - // channel. - - // `timely`: pre-merge snapshot of `new_payloads`, filtered to this round. - if let Some(snap) = pre_merge_coverage.get() { - for (data_slot, bits) in &snap.entries { - if *data_slot == reporting_slot { - cov_add(&mut timely_v, &mut timely_s, bits); - } - } - } - // `late`: current `new_payloads` for this round (arrived after the promote). - for (data, proofs) in store.new_aggregated_payloads().values() { - if data.slot == reporting_slot { - for proof in proofs { - cov_add(&mut late_v, &mut late_s, &proof.participants); - } - } - } - // `block`: attestations included in the canonical head block. At interval 1 - // the head is normally the block proposed at `reporting_slot + 1`, which - // carries this round's votes; filter by `data.slot` so we count the same - // cohort even if the head is at a different slot. - if let Some(block) = store.get_block(&store.head()) { - for att in block.body.attestations.iter() { - if att.data.slot == reporting_slot { - cov_add(&mut block_v, &mut block_s, &att.aggregation_bits); - } - } - } - - let mut combined_v = timely_v.clone(); - let mut combined_s = timely_s.clone(); - or_into(&mut combined_v, &late_v); - or_into(&mut combined_s, &late_s); - or_into(&mut combined_v, &block_v); - or_into(&mut combined_s, &block_s); - - // Emit nothing when there is no coverage for this round, rather than - // pushing a misleading all-zero report (e.g. `block_only=0, timely_only=N` - // on a missed slot). Gauges retain their previous value. - if !combined_v.iter().any(|&b| b) { - return; - } - - cov_record("timely", &timely_v, &timely_s); - cov_record("late", &late_v, &late_s); - cov_record("block", &block_v, &block_s); - cov_record("combined", &combined_v, &combined_s); - - let (block_only, timely_only) = - block_v - .iter() - .zip(timely_v.iter()) - .fold((0i64, 0i64), |(b, t), (bv, tv)| match (bv, tv) { - (true, false) => (b + 1, t), - (false, true) => (b, t + 1), - _ => (b, t), - }); - metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only); - metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only); -} - -/// `agg_start_new` coverage from `new_payloads`, called right before fork- -/// choice aggregation runs at interval 2. -fn emit_agg_start_new_coverage(store: &Store, committee_count: u64) { - let validator_count = store.head_state().validators.len(); - if validator_count == 0 || committee_count == 0 { - return; - } - let cc = committee_count as usize; - let mut seen = vec![false; validator_count]; - let mut has_subnet = vec![false; cc]; - for (_, proofs) in store.new_aggregated_payloads().values() { - for proof in proofs { - cov_add(&mut seen, &mut has_subnet, &proof.participants); - } - } - cov_record("agg_start_new", &seen, &has_subnet); -} - -/// `proposal_combined` coverage for a block we are about to publish: the full -/// set of validators included across the block's aggregated attestations. -fn emit_proposal_coverage<'a>( - store: &Store, - committee_count: u64, - selected: impl IntoIterator, -) { - let validator_count = store.head_state().validators.len(); - if validator_count == 0 || committee_count == 0 { - return; - } - let cc = committee_count as usize; - let mut combined_v = vec![false; validator_count]; - let mut combined_s = vec![false; cc]; - for att in selected { - cov_add(&mut combined_v, &mut combined_s, &att.aggregation_bits); - } - cov_record("proposal_combined", &combined_v, &combined_s); -} diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 4dad9f11..97c06615 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -5,8 +5,8 @@ use ethlambda_storage::{ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ - AggregationBits, Attestation, AttestationData, HashedAttestationData, - SignedAggregatedAttestation, SignedAttestation, validator_indices, + Attestation, AttestationData, HashedAttestationData, SignedAggregatedAttestation, + SignedAttestation, validator_indices, }, block::{AggregatedSignatureProof, Block, SignedBlock}, checkpoint::Checkpoint, @@ -17,8 +17,8 @@ use ethlambda_types::{ use tracing::{info, trace, warn}; use crate::{ - CoverageSnapshot, GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, - MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, PreMergeCoverage, + GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, + MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, block_builder::{PostBlockCheckpoints, build_block}, metrics, }; @@ -26,36 +26,13 @@ use crate::{ const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// Accept new aggregated payloads, promoting them to known for fork choice. -fn accept_new_attestations(store: &mut Store, log_tree: bool, coverage: Option<&PreMergeCoverage>) { - if let Some(coverage) = coverage { - snapshot_pre_merge_new_coverage(store, coverage); - } +fn accept_new_attestations(store: &mut Store, log_tree: bool) { store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); update_head(store, log_tree); } -/// Capture the participant bits of every entry in `new_payloads` for the -/// attestation aggregate coverage report. Each entry is tagged with its -/// attestation `data.slot` so the post-block report can filter to a single -/// voting round (`new_payloads` may span multiple slots). Stored on the -/// actor-owned `PreMergeCoverage` handle so the report at the next slot -/// boundary can read it. -fn snapshot_pre_merge_new_coverage(store: &Store, coverage: &PreMergeCoverage) { - let new_payloads = store.new_aggregated_payloads(); - if new_payloads.is_empty() { - return; - } - let mut entries: Vec<(u64, AggregationBits)> = Vec::new(); - for (data, proofs) in new_payloads.values() { - for proof in proofs { - entries.push((data.slot, proof.participants.clone())); - } - } - coverage.save(CoverageSnapshot { entries }); -} - /// Update the head based on the fork choice rule. /// /// When `log_tree` is true, also computes block weights and logs an ASCII @@ -215,12 +192,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick( - store: &mut Store, - timestamp_ms: u64, - has_proposal: bool, - coverage: Option<&PreMergeCoverage>, -) { +pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -253,7 +225,7 @@ pub fn on_tick( 0 => { // Start of slot - process attestations if proposal exists if should_signal_proposal { - accept_new_attestations(store, false, coverage); + accept_new_attestations(store, false); } } 1 => { @@ -268,7 +240,7 @@ pub fn on_tick( } 4 => { // End of slot - accept accumulated attestations and log tree - accept_new_attestations(store, true, coverage); + accept_new_attestations(store, true); } _ => unreachable!("slots only have 5 intervals"), } @@ -684,15 +656,15 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// /// Ensures store is up-to-date and processes any pending attestations /// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64, coverage: Option<&PreMergeCoverage>) -> H256 { +fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true, coverage); + on_tick(store, slot_time_ms, true); // Process any pending attestations before proposal - accept_new_attestations(store, false, coverage); + accept_new_attestations(store, false); store.head() } @@ -705,10 +677,9 @@ pub fn produce_block_with_signatures( store: &mut Store, slot: u64, validator_index: u64, - coverage: Option<&PreMergeCoverage>, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { // Get parent block and state to build upon - let head_root = get_proposal_head(store, slot, coverage); + let head_root = get_proposal_head(store, slot); let head_state = store .get_state(&head_root) .ok_or(StoreError::MissingParentState { diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index a15a2108..e095991d 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -95,7 +95,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time_ms, true, None); + store::on_tick(&mut store, block_time_ms, true); let result = store::on_block_without_verification(&mut store, signed_block); assert_step_outcome(step_idx, step.valid, result)?; } @@ -111,7 +111,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { (None, None) => panic!("tick step missing both time and interval"), }; let has_proposal = step.has_proposal.unwrap_or(false); - store::on_tick(&mut store, timestamp_ms, has_proposal, None); + store::on_tick(&mut store, timestamp_ms, has_proposal); } "attestation" => { let att_data = step diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 61987d2e..5f6b0bd8 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -50,7 +50,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true, None); + store::on_tick(&mut st, block_time_ms, true); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); diff --git a/crates/net/rpc/src/test_driver.rs b/crates/net/rpc/src/test_driver.rs index bf434228..7bf4033a 100644 --- a/crates/net/rpc/src/test_driver.rs +++ b/crates/net/rpc/src/test_driver.rs @@ -347,12 +347,7 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { } (None, None) => return Err("tick step missing time and interval".to_string()), }; - store::on_tick( - store, - timestamp_ms, - step.has_proposal.unwrap_or(false), - None, - ); + store::on_tick(store, timestamp_ms, step.has_proposal.unwrap_or(false)); Ok(()) } "block" => { @@ -364,7 +359,7 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { // before importing so the future-slot guard doesn't reject it. let block_time_ms = store.config().genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(store, block_time_ms, true, None); + store::on_tick(store, block_time_ms, true); store::on_block_without_verification(store, signed_block).map_err(|e| e.to_string()) } "attestation" => { From 6fcefeced4e8b9688d147f97f1e5f4fa42f9dc71 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 27 May 2026 17:47:18 -0300 Subject: [PATCH 8/8] fix(blockchain): correct attestation coverage emission per review Address MegaRedHand's review findings on the coverage observability: - Drop the proposer interval-0 pre-merge snapshot. By interval 0 the round's votes are already promoted, so new_payloads holds only stragglers; capturing them overwrote the good interval-4 snapshot and made the timely section under-report on slots this node proposes. Snapshot only at interval 4; stragglers now surface in the late section as intended. - Gate the post-block report on block_v (the round's votes appearing in the canonical head block) instead of combined_v. Gating on combined still fired on a missed slot, pushing the misleading block_only=0, timely_only=N the diff is meant to avoid. - Add Store::new_aggregated_payload_participants() returning only the participant bitfields, replacing new_aggregated_payloads() which deep-cloned the multi-megabyte proof_data blobs on every read just to extract bits. - Correct the validators-gauge help text: only subnet=combined is emitted; the per-subnet subnet_N breakdown is reserved and not yet populated. No fork-choice or state-transition change - coverage stays observability-only. --- crates/blockchain/src/coverage.rs | 35 +++++++++++++------------------ crates/blockchain/src/lib.rs | 29 +++++++++++++++---------- crates/blockchain/src/metrics.rs | 5 +++-- crates/storage/src/store.rs | 29 +++++++++++++++++-------- 4 files changed, 55 insertions(+), 43 deletions(-) diff --git a/crates/blockchain/src/coverage.rs b/crates/blockchain/src/coverage.rs index e249ae4f..ebb6f5a7 100644 --- a/crates/blockchain/src/coverage.rs +++ b/crates/blockchain/src/coverage.rs @@ -34,16 +34,10 @@ pub(crate) struct CoverageSnapshot { /// non-empty snapshot rather than overwriting it with nothing — a node that /// missed a round still reports the round it last saw. pub(crate) fn snapshot_new_payloads(store: &Store) -> Option { - let new_payloads = store.new_aggregated_payloads(); - if new_payloads.is_empty() { + let entries = store.new_aggregated_payload_participants(); + if entries.is_empty() { return None; } - let mut entries: Vec<(u64, AggregationBits)> = Vec::new(); - for (data, proofs) in new_payloads.values() { - for proof in proofs { - entries.push((data.slot, proof.participants.clone())); - } - } Some(CoverageSnapshot { entries }) } @@ -111,11 +105,9 @@ pub(crate) fn emit_post_block_coverage( } } // `late`: current `new_payloads` for this round (arrived after the promote). - for (data, proofs) in store.new_aggregated_payloads().values() { - if data.slot == reporting_slot { - for proof in proofs { - cov_add(&mut late_v, &mut late_s, &proof.participants); - } + for (data_slot, bits) in store.new_aggregated_payload_participants() { + if data_slot == reporting_slot { + cov_add(&mut late_v, &mut late_s, &bits); } } // `block`: attestations included in the canonical head block. At interval 1 @@ -137,10 +129,13 @@ pub(crate) fn emit_post_block_coverage( or_into(&mut combined_v, &block_v); or_into(&mut combined_s, &block_s); - // Emit nothing when there is no coverage for this round, rather than - // pushing a misleading all-zero report (e.g. `block_only=0, timely_only=N` - // on a missed slot). Gauges retain their previous value. - if !combined_v.iter().any(|&b| b) { + // Only report a round once the canonical head block actually carries its + // votes (`block_v` non-empty). Gating on `combined` instead would still + // fire on a missed slot — the `timely` snapshot for the round is populated + // while `block_v` is all-false — pushing exactly the misleading + // `block_only=0, timely_only=N` the diff is meant to avoid. When there is + // no block for the round the gauges retain their previous value. + if !block_v.iter().any(|&b| b) { return; } @@ -172,10 +167,8 @@ pub(crate) fn emit_agg_start_new_coverage(store: &Store, committee_count: u64) { let cc = committee_count as usize; let mut seen = vec![false; validator_count]; let mut has_subnet = vec![false; cc]; - for (_, proofs) in store.new_aggregated_payloads().values() { - for proof in proofs { - cov_add(&mut seen, &mut has_subnet, &proof.participants); - } + for (_slot, bits) in store.new_aggregated_payload_participants() { + cov_add(&mut seen, &mut has_subnet, &bits); } cov_record("agg_start_new", &seen, &has_subnet); } diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 1d892ee9..1a6fb766 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -159,10 +159,10 @@ pub struct BlockChainServer { attestation_committee_count: u64, /// Pre-merge `new_payloads` snapshot for the attestation aggregate coverage - /// report. Captured before the store promotes attestations (when proposing - /// or at the end of the slot), read at the next slot boundary. Owned solely - /// by the actor and only touched from the single-threaded message loop, so - /// no synchronization is needed. Observability-only. + /// report. Captured at the end-of-slot promote (interval 4), read at the + /// next slot boundary. Owned solely by the actor and only touched from the + /// single-threaded message loop, so no synchronization is needed. + /// Observability-only. pre_merge_coverage: Option, } @@ -205,13 +205,20 @@ impl BlockChainServer { .then(|| self.get_our_proposer(slot)) .flatten(); - // Snapshot the pre-merge `new_payloads` set before the store may promote - // it on this tick, so the post-block coverage report can see the - // "timely" cohort. Promotion happens when proposing (interval 0) or at - // the end of the slot (interval 4); skip empty snapshots so a missed - // round keeps the last set we saw. Pure observability. - let will_promote = interval == 4 || (interval == 0 && proposer_validator_id.is_some()); - if will_promote && let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) { + // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote + // (interval 4), so the post-block report for this round sees its + // "timely" cohort just before it is promoted out of `new_payloads`. + // + // Only interval 4 — not the proposer's interval-0 promote. By interval 0 + // the round's votes have already been promoted at the previous slot's + // interval 4; `new_payloads` then holds only stragglers, and snapshotting + // them here would overwrite the good interval-4 snapshot the report still + // needs (those stragglers surface in the `late` section instead). Skip + // empty snapshots so a missed round keeps the last set we saw. Pure + // observability. + if interval == 4 + && let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) + { self.pre_merge_coverage = Some(snapshot); } diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index fbfff10d..8e0c6d5e 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -128,8 +128,9 @@ static LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS: std::sync::LazyLock) pairs. + /// Returns the participant bitfields of every pending (new) aggregated + /// payload, one entry per proof, each tagged with its attestation + /// `data.slot`. /// - /// Mirrors [`known_aggregated_payloads`]. Used by the attestation aggregate - /// coverage report to compute coverage from `new_payloads` before promote. - pub fn new_aggregated_payloads( - &self, - ) -> HashMap)> { + /// Used by the attestation aggregate coverage report, which needs only the + /// bitfields. Clones just the `AggregationBits` — not the proofs — so it + /// avoids deep-copying the multi-megabyte `proof_data` blobs that a full + /// payload snapshot would carry. + pub fn new_aggregated_payload_participants(&self) -> Vec<(u64, AggregationBits)> { let buf = self.new_payloads.lock().unwrap(); buf.data - .iter() - .map(|(root, entry)| (*root, (entry.data.clone(), entry.proofs.clone()))) + .values() + .flat_map(|entry| { + let slot = entry.data.slot; + entry + .proofs + .iter() + .map(move |proof| (slot, proof.participants.clone())) + }) .collect() }