Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 112 additions & 4 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ use graph::{
blockchain::{BlockPtr, IngestorError, block_stream::BlockWithTriggers},
prelude::{
BlockNumber, ChainStore, CheapClone, DynTryFuture, Error, EthereumCallCache, Logger,
TimeoutError,
TimeoutError, serde_json,
anyhow::{self, Context, anyhow, bail, ensure},
debug, error, hex, info, retry, trace, warn,
},
};
use itertools::Itertools;
use serde_json::Value;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::convert::TryFrom;
use std::iter::FromIterator;
Expand Down Expand Up @@ -91,6 +92,9 @@ type AlloyProvider = FillProvider<
AnyNetworkBare,
>;

const MISSING_TRACE_OUTPUT_ERROR: &str =
"data did not match any variant of untagged enum TraceOutput";

#[derive(Clone)]
pub struct EthereumAdapter {
logger: Logger,
Expand Down Expand Up @@ -256,7 +260,20 @@ impl EthereumAdapter {
let alloy_trace_filter = Self::build_trace_filter(from, to, &addresses);
let start = Instant::now();

let result = self.alloy.trace_filter(&alloy_trace_filter).await;
let result = match self.alloy.trace_filter(&alloy_trace_filter).await {
Ok(traces) => Ok(traces),
Err(error) if Self::is_missing_trace_output_error(&error) => {
warn!(
&logger,
"trace_filter returned traces with missing result.output; retrying with compatibility parser";
"from" => from,
"to" => to,
);
self.trace_filter_with_compat_patch(&alloy_trace_filter)
.await
}
Err(error) => Err(Error::from(error)),
};

if let Ok(traces) = &result {
self.log_trace_results(&logger, from, to, traces.len());
Expand All @@ -271,7 +288,7 @@ impl EthereumAdapter {
&logger,
);

result.map_err(Error::from)
result
}

fn build_trace_filter(
Expand Down Expand Up @@ -313,7 +330,7 @@ impl EthereumAdapter {
&self,
subgraph_metrics: &Arc<SubgraphEthRpcMetrics>,
elapsed: f64,
result: &Result<Vec<LocalizedTransactionTrace>, RpcError<TransportErrorKind>>,
result: &Result<Vec<LocalizedTransactionTrace>, Error>,
from: BlockNumber,
to: BlockNumber,
logger: &ProviderLogger,
Expand All @@ -332,6 +349,46 @@ impl EthereumAdapter {
}
}

fn is_missing_trace_output_error(error: &RpcError<TransportErrorKind>) -> bool {
error.to_string().contains(MISSING_TRACE_OUTPUT_ERROR)
}

async fn trace_filter_with_compat_patch(
&self,
alloy_trace_filter: &AlloyTraceFilter,
) -> Result<Vec<LocalizedTransactionTrace>, Error> {
let mut batch = alloy::rpc::client::BatchRequest::new(self.alloy.client());
let trace_future = batch
.add_call::<(AlloyTraceFilter,), Value>("trace_filter", &(alloy_trace_filter.clone(),))
.map_err(Error::from)?;

batch.send().await.map_err(Error::from)?;

let mut raw_traces = trace_future.await.map_err(Error::from)?;
Self::patch_missing_trace_output(&mut raw_traces);

serde_json::from_value(raw_traces).map_err(Error::from)
}

fn patch_missing_trace_output(raw_traces: &mut Value) {
let Some(traces) = raw_traces.as_array_mut() else {
return;
};

for trace in traces {
let Some(result) = trace.get_mut("result") else {
continue;
};
let Some(result_obj) = result.as_object_mut() else {
continue;
};

if result_obj.contains_key("gasUsed") && !result_obj.contains_key("output") {
result_obj.insert("output".to_owned(), Value::String("0x".to_owned()));
}
}
}

// This is a lazy check for block receipt support. It is only called once and then the result is
// cached. The result is not used for anything critical, so it is fine to be lazy.
async fn check_block_receipt_support_and_update_cache(
Expand Down Expand Up @@ -2710,6 +2767,7 @@ mod tests {
use graph::blockchain::BlockPtr;
use graph::components::ethereum::AnyNetworkBare;
use graph::prelude::alloy::primitives::{Address, B256, Bytes};
use graph::prelude::alloy::providers::ext::TraceApi;
use graph::prelude::alloy::providers::ProviderBuilder;
use graph::prelude::alloy::providers::mock::Asserter;
use graph::prelude::{EthereumCall, LightEthereumBlock, create_minimal_block_for_test};
Expand Down Expand Up @@ -2861,6 +2919,56 @@ mod tests {
.unwrap();
}

#[graph::test]
async fn missing_output_trace_repro() {
let trace_filter_response = r#"[{
"action": {
"from": "0xf7cf0d9398d06d5cb7e4d37dc1e18a829bfff934",
"value": "0x0",
"gas": "0x0",
"init": "0x",
"address": "0xf7cf0d9398d06d5cb7e4d37dc1e18a829bfff934",
"refund_address": "0x4c3ccc98c01103be72bcfd29e1d2454c98d1a6e3",
"balance": "0x0"
},
"blockHash": "0x6b747793a61c3ce4e5f3355cf80edcb6aa465913ed43f4b0136d93803cf330f3",
"blockNumber": 66762070,
"result": {
"gasUsed": "0x0"
},
"subtraces": 0,
"traceAddress": [1, 1],
"transactionHash": "0x5b3dc50c4c7bd9b0e80469b21febbc5d1b54b364a01b22b1e9c426e4632e0b8f",
"transactionPosition": 0,
"type": "suicide"
}]"#;

let json_value: Value = serde_json::from_str(trace_filter_response).unwrap();
let asserter = Asserter::new();
let provider = ProviderBuilder::<_, _, AnyNetworkBare>::default()
.network::<AnyNetworkBare>()
.connect_mocked_client(asserter.clone());

asserter.push_success(&json_value);

let err = provider
.trace_filter(
&graph::prelude::alloy::rpc::types::trace::filter::TraceFilter::default(),
)
.await
.expect_err("trace_filter should fail to deserialize when result.output is missing");

assert!(
err.to_string()
.contains(super::MISSING_TRACE_OUTPUT_ERROR),
"unexpected error: {err:#}"
);

let mut patched: Value = serde_json::from_str(trace_filter_response).unwrap();
super::EthereumAdapter::patch_missing_trace_output(&mut patched);
assert_eq!(patched[0]["result"]["output"], Value::String("0x".to_string()));
}

#[test]
fn parse_block_triggers_specific_call_not_found() {
let block = create_minimal_block_for_test(2, hash(2));
Expand Down