diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f6d3ca3e..0852dd49 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -221,7 +221,12 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + let blockchain = BlockChain::spawn( + store.clone(), + validator_keys, + aggregator.clone(), + attestation_committee_count, + ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the // AggregatorController — subnet subscriptions are decided once here and diff --git a/crates/blockchain/src/coverage.rs b/crates/blockchain/src/coverage.rs new file mode 100644 index 00000000..ebb6f5a7 --- /dev/null +++ b/crates/blockchain/src/coverage.rs @@ -0,0 +1,194 @@ +//! Attestation aggregate coverage emission. +//! +//! Pure observability — nothing here feeds back into fork choice or the state +//! transition. The emitters build `Vec` locals (`seen` for validators, +//! `has_subnet` for subnets, with subnet = `vid % committee_count`, matching +//! the gossip subnet assignment in `crates/net/p2p/src/lib.rs`) and push the +//! resulting counts to the coverage gauges registered in +//! [`crate::metrics`]. + +use ethlambda_storage::Store; +use ethlambda_types::attestation::{AggregatedAttestation, AggregationBits, validator_indices}; + +use crate::metrics; + +/// Pre-merge snapshot of `new_payloads` participant bits, used by the +/// attestation aggregate coverage report. +/// +/// Each entry is tagged with its attestation `data.slot` (the voting round) so +/// the consumer can filter to a single round at emit time — `new_payloads` may +/// hold entries spanning more than one slot. Holds raw participant bits; the +/// consumer constructs coverage bitsets at emit time using the current +/// validator and committee counts. +#[derive(Debug, Clone)] +pub(crate) struct CoverageSnapshot { + pub(crate) entries: Vec<(u64, AggregationBits)>, +} + +/// Capture the participant bits of every entry in `new_payloads` for the +/// attestation aggregate coverage report. Each entry is tagged with its +/// attestation `data.slot` so the post-block report can filter to a single +/// voting round (`new_payloads` may span multiple slots). +/// +/// Returns `None` when `new_payloads` is empty so callers can keep their last +/// non-empty snapshot rather than overwriting it with nothing — a node that +/// missed a round still reports the round it last saw. +pub(crate) fn snapshot_new_payloads(store: &Store) -> Option { + let entries = store.new_aggregated_payload_participants(); + if entries.is_empty() { + return None; + } + Some(CoverageSnapshot { entries }) +} + +fn cov_add(seen: &mut [bool], has_subnet: &mut [bool], bits: &AggregationBits) { + let cc = has_subnet.len(); + if cc == 0 { + return; + } + for vid in validator_indices(bits) { + let vid = vid as usize; + if vid < seen.len() { + seen[vid] = true; + has_subnet[vid % cc] = true; + } + } +} + +fn cov_record(section: &str, seen: &[bool], has_subnet: &[bool]) { + metrics::set_attestation_aggregate_coverage_validators( + section, + "combined", + seen.iter().filter(|&&b| b).count() as i64, + ); + metrics::set_attestation_aggregate_coverage_subnets( + section, + has_subnet.iter().filter(|&&b| b).count() as i64, + ); +} + +fn or_into(dst: &mut [bool], src: &[bool]) { + for (d, &s) in dst.iter_mut().zip(src) { + *d |= s; + } +} + +/// Post-block coverage report for `reporting_slot`. Emits `timely` / `late` / +/// `block` / `combined` sections plus the `diff_validators` symmetric +/// difference between `block` and `timely`. Called at interval 1 of the +/// next slot. +pub(crate) fn emit_post_block_coverage( + store: &Store, + pre_merge_coverage: Option<&CoverageSnapshot>, + committee_count: u64, + reporting_slot: u64, +) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let (mut timely_v, mut timely_s) = (vec![false; validator_count], vec![false; cc]); + let (mut late_v, mut late_s) = (vec![false; validator_count], vec![false; cc]); + let (mut block_v, mut block_s) = (vec![false; validator_count], vec![false; cc]); + + // Every section is the same cohort: validators whose attestations *for* + // `reporting_slot` (`data.slot == reporting_slot`) were seen via that + // channel. + + // `timely`: pre-merge snapshot of `new_payloads`, filtered to this round. + if let Some(snap) = pre_merge_coverage { + for (data_slot, bits) in &snap.entries { + if *data_slot == reporting_slot { + cov_add(&mut timely_v, &mut timely_s, bits); + } + } + } + // `late`: current `new_payloads` for this round (arrived after the promote). + for (data_slot, bits) in store.new_aggregated_payload_participants() { + if data_slot == reporting_slot { + cov_add(&mut late_v, &mut late_s, &bits); + } + } + // `block`: attestations included in the canonical head block. At interval 1 + // the head is normally the block proposed at `reporting_slot + 1`, which + // carries this round's votes; filter by `data.slot` so we count the same + // cohort even if the head is at a different slot. + if let Some(block) = store.get_block(&store.head()) { + for att in block.body.attestations.iter() { + if att.data.slot == reporting_slot { + cov_add(&mut block_v, &mut block_s, &att.aggregation_bits); + } + } + } + + let mut combined_v = timely_v.clone(); + let mut combined_s = timely_s.clone(); + or_into(&mut combined_v, &late_v); + or_into(&mut combined_s, &late_s); + or_into(&mut combined_v, &block_v); + or_into(&mut combined_s, &block_s); + + // Only report a round once the canonical head block actually carries its + // votes (`block_v` non-empty). Gating on `combined` instead would still + // fire on a missed slot — the `timely` snapshot for the round is populated + // while `block_v` is all-false — pushing exactly the misleading + // `block_only=0, timely_only=N` the diff is meant to avoid. When there is + // no block for the round the gauges retain their previous value. + if !block_v.iter().any(|&b| b) { + return; + } + + cov_record("timely", &timely_v, &timely_s); + cov_record("late", &late_v, &late_s); + cov_record("block", &block_v, &block_s); + cov_record("combined", &combined_v, &combined_s); + + let (block_only, timely_only) = + block_v + .iter() + .zip(timely_v.iter()) + .fold((0i64, 0i64), |(b, t), (bv, tv)| match (bv, tv) { + (true, false) => (b + 1, t), + (false, true) => (b, t + 1), + _ => (b, t), + }); + metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only); + metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only); +} + +/// `agg_start_new` coverage from `new_payloads`, called right before fork- +/// choice aggregation runs at interval 2. +pub(crate) fn emit_agg_start_new_coverage(store: &Store, committee_count: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let mut seen = vec![false; validator_count]; + let mut has_subnet = vec![false; cc]; + for (_slot, bits) in store.new_aggregated_payload_participants() { + cov_add(&mut seen, &mut has_subnet, &bits); + } + cov_record("agg_start_new", &seen, &has_subnet); +} + +/// `proposal_combined` coverage for a block we are about to publish: the full +/// set of validators included across the block's aggregated attestations. +pub(crate) fn emit_proposal_coverage<'a>( + store: &Store, + committee_count: u64, + selected: impl IntoIterator, +) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + let mut combined_v = vec![false; validator_count]; + let mut combined_s = vec![false; cc]; + for att in selected { + cov_add(&mut combined_v, &mut combined_s, &att.aggregation_bits); + } + cov_record("proposal_combined", &combined_v, &combined_s); +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index e053517b..1a6fb766 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -28,6 +28,7 @@ use crate::store::StoreError; pub mod aggregation; pub mod block_builder; +pub(crate) mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -70,6 +71,7 @@ impl BlockChain { store: Store, validator_keys: HashMap, aggregator: AggregatorController, + attestation_committee_count: u64, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -96,6 +98,8 @@ impl BlockChain { pending_block_parents: HashMap::new(), current_aggregation: None, last_tick_instant: None, + attestation_committee_count, + pre_merge_coverage: None, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -149,6 +153,17 @@ pub struct BlockChainServer { /// Last tick instant for measuring interval duration. last_tick_instant: Option, + + /// Number of attestation committees (= subnet count). Used by the + /// attestation aggregate coverage emission. + attestation_committee_count: u64, + + /// Pre-merge `new_payloads` snapshot for the attestation aggregate coverage + /// report. Captured at the end-of-slot promote (interval 4), read at the + /// next slot boundary. Owned solely by the actor and only touched from the + /// single-threaded message loop, so no synchronization is needed. + /// Observability-only. + pre_merge_coverage: Option, } impl BlockChainServer { @@ -190,6 +205,23 @@ impl BlockChainServer { .then(|| self.get_our_proposer(slot)) .flatten(); + // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote + // (interval 4), so the post-block report for this round sees its + // "timely" cohort just before it is promoted out of `new_payloads`. + // + // Only interval 4 — not the proposer's interval-0 promote. By interval 0 + // the round's votes have already been promoted at the previous slot's + // interval 4; `new_payloads` then holds only stragglers, and snapshotting + // them here would overwrite the good interval-4 snapshot the report still + // needs (those stragglers surface in the `late` section instead). Skip + // empty snapshots so a missed round keeps the last set we saw. Pure + // observability. + if interval == 4 + && let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) + { + self.pre_merge_coverage = Some(snapshot); + } + // Tick the store first - this accepts attestations at interval 0 if we have a proposal store::on_tick( &mut self.store, @@ -198,6 +230,7 @@ impl BlockChainServer { ); if interval == 2 && is_aggregator { + coverage::emit_agg_start_new_coverage(&self.store, self.attestation_committee_count); self.start_aggregation_session(slot, ctx).await; } @@ -210,6 +243,18 @@ impl BlockChainServer { // Reuse the same snapshot so self-delivery decisions match the rest // of the tick. if interval == 1 { + // Emit the post-block coverage report for the previous slot. Fired + // at interval 1 (not 0) so the block carrying `slot - 1`'s votes — + // proposed at interval 0 of this slot — has typically been received + // and processed, letting the `block` section see the same round. + if slot > 0 { + coverage::emit_post_block_coverage( + &self.store, + self.pre_merge_coverage.as_ref(), + self.attestation_committee_count, + slot - 1, + ); + } self.produce_attestations(slot, is_aggregator); } @@ -351,6 +396,12 @@ impl BlockChainServer { return; }; + coverage::emit_proposal_coverage( + &self.store, + self.attestation_committee_count, + block.body.attestations.iter(), + ); + // Sign the block root with the proposal key let block_root = block.hash_tree_root(); let Ok(proposer_signature) = self diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index a9aef7a0..8e0c6d5e 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -4,6 +4,25 @@ use std::time::Duration; use ethlambda_metrics::*; +// --- Label sets --- + +/// Section labels for attestation aggregate coverage gauges. Order matches +/// the names printed in slot/report logs. +/// +/// Slot is the X-axis (time series), not a label dimension. +pub const ATTESTATION_AGGREGATE_COVERAGE_SECTIONS: &[&str] = &[ + "timely", + "late", + "block", + "combined", + "agg_start_new", + "proposal_combined", +]; + +/// Validator-coverage delta directions between block payloads and +/// locally-aggregated pre-merge (`timely`) payloads. +pub const ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS: &[&str] = &["block_only", "timely_only"]; + // --- Gauges --- static LEAN_HEAD_SLOT: std::sync::LazyLock = std::sync::LazyLock::new(|| { @@ -104,6 +123,43 @@ static LEAN_TABLE_BYTES: std::sync::LazyLock = std::sync::LazyLock: .unwrap() }); +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_validators", + "Validator coverage in attestation aggregate reports, labeled by section and \ + subnet. Only subnet=combined (the section total) is currently emitted; the \ + subnet=subnet_N per-subnet breakdown is reserved and not yet populated. \ + Updated each slot (slot is the X-axis).", + &["section", "subnet"] + ) + .unwrap() + }); + +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_subnets", + "Number of covered subnets in attestation aggregate reports, labeled by section. \ + Updated each slot (slot is the X-axis).", + &["section"] + ) + .unwrap() + }); + +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_diff_validators", + "Count of validators in the symmetric difference between block-included aggregates \ + and locally-aggregated pre-merge (timely) aggregates for the same slot. \ + direction=block_only: in block but not in local pool. direction=timely_only: in \ + local pool but not in block. Updated each slot (slot is the X-axis).", + &["direction"] + ) + .unwrap() + }); + // --- Counters --- static LEAN_ATTESTATIONS_VALID_TOTAL: std::sync::LazyLock = @@ -409,6 +465,25 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_IS_AGGREGATOR); std::sync::LazyLock::force(&LEAN_ATTESTATION_COMMITTEE_COUNT); std::sync::LazyLock::force(&LEAN_TABLE_BYTES); + // Attestation aggregate coverage (leanMetrics: Fork-Choice Metrics). + // Per upstream leanSpec, seed only the combined-subnet series for each + // section; per-subnet series appear lazily when instrumentation writes them. + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS); + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS); + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS); + for §ion in ATTESTATION_AGGREGATE_COVERAGE_SECTIONS { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS + .with_label_values(&[section, "combined"]) + .set(0); + LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS + .with_label_values(&[section]) + .set(0); + } + for &direction in ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS + .with_label_values(&[direction]) + .set(0); + } // Counters std::sync::LazyLock::force(&LEAN_ATTESTATIONS_VALID_TOTAL); std::sync::LazyLock::force(&LEAN_ATTESTATIONS_INVALID_TOTAL); @@ -609,6 +684,27 @@ pub fn set_attestation_committee_count(count: u64) { LEAN_ATTESTATION_COMMITTEE_COUNT.set(count.try_into().unwrap_or_default()); } +/// Set `lean_attestation_aggregate_coverage_validators{section, subnet}`. +pub fn set_attestation_aggregate_coverage_validators(section: &str, subnet: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS + .with_label_values(&[section, subnet]) + .set(value); +} + +/// Set `lean_attestation_aggregate_coverage_subnets{section}`. +pub fn set_attestation_aggregate_coverage_subnets(section: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS + .with_label_values(&[section]) + .set(value); +} + +/// Set `lean_attestation_aggregate_coverage_diff_validators{direction}`. +pub fn set_attestation_aggregate_coverage_diff_validators(direction: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS + .with_label_values(&[direction]) + .set(value); +} + /// Observe the depth of a fork choice reorg. pub fn observe_fork_choice_reorg_depth(depth: u64) { LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64); diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 04b42745..8ea1f850 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -4,7 +4,10 @@ use std::sync::{Arc, LazyLock, Mutex}; use crate::api::{StorageBackend, StorageWriteBatch, Table}; use ethlambda_types::{ - attestation::{AttestationData, HashedAttestationData, bits_is_subset, blank_xmss_signature}, + attestation::{ + AggregationBits, AttestationData, HashedAttestationData, bits_is_subset, + blank_xmss_signature, + }, block::{ AggregatedSignatureProof, AttestationSignatures, Block, BlockBody, BlockHeader, BlockSignatures, SignedBlock, @@ -1251,6 +1254,28 @@ impl Store { self.known_payloads.lock().unwrap().len() } + /// Returns the participant bitfields of every pending (new) aggregated + /// payload, one entry per proof, each tagged with its attestation + /// `data.slot`. + /// + /// Used by the attestation aggregate coverage report, which needs only the + /// bitfields. Clones just the `AggregationBits` — not the proofs — so it + /// avoids deep-copying the multi-megabyte `proof_data` blobs that a full + /// payload snapshot would carry. + pub fn new_aggregated_payload_participants(&self) -> Vec<(u64, AggregationBits)> { + let buf = self.new_payloads.lock().unwrap(); + buf.data + .values() + .flat_map(|entry| { + let slot = entry.data.slot; + entry + .proofs + .iter() + .map(move |proof| (slot, proof.participants.clone())) + }) + .collect() + } + /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap();