diff --git a/Cargo.lock b/Cargo.lock index 0f1361659..37e4b67f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3945,6 +3945,7 @@ dependencies = [ "relative-path", "reqwest", "rstest", + "rustc-hash", "rustls", "rustls-pemfile", "sasl2-sys", diff --git a/Cargo.toml b/Cargo.toml index f8c016042..58613f4cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,6 +181,7 @@ thiserror = "2.0" ulid = { version = "1.0", features = ["serde"] } uuid = { version = "1", features = ["v4"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } +rustc-hash = "2" futures-core = "0.3.31" tempfile = "3.20.0" lazy_static = "1.4.0" diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 4e5f1d054..c6f017c28 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -23,6 +23,8 @@ use opentelemetry_proto::tonic::metrics::v1::{ }; use serde_json::{Map, Value}; +use rustc_hash::FxHasher; +use std::hash::Hasher; use tracing::info_span; use crate::metrics::increment_metrics_collected_by_date; @@ -31,7 +33,7 @@ use super::otel_utils::{ convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, }; -pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 36] = [ +pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [ "metric_name", "metric_description", "metric_unit", @@ -68,8 +70,58 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 36] = [ "scope_dropped_attributes_count", "resource_dropped_attributes_count", "resource_schema_url", + // Precomputed per-sample identity of the physical series. Stable + // u64 hash of `metric_name` + sorted attribute key/value pairs, + // stored as a decimal-encoded string so arrow-json infers Utf8 and + // we get byte-exact roundtrip. Int64/Float64 inference dropped bits + // for hashes near the high range; string sidesteps that entirely. + // Lets the query layer group samples into physical series via a + // single column read instead of decoding every label column and + // hashing per row. + "__series_hash", ]; +/// Compute a stable u64 identifier for the physical series a sample +/// belongs to. Hashes `metric_name` plus every attribute key/value pair +/// that survived OTel flattening — everything in the flattened data +/// point that isn't a known sample-level field is treated as a label. +/// +/// Hash output must be stable across process restarts and matching at +/// query time. Uses rustc-hash's FxHasher (fast, deterministic, +/// non-cryptographic) and feeds keys in sorted order so the hash +/// doesn't depend on JSON Map iteration order. +fn compute_series_hash(dp: &Map) -> u64 { + let known: std::collections::HashSet<&str> = + OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect(); + let mut label_pairs: Vec<(&str, String)> = dp + .iter() + .filter(|(k, _)| !known.contains(k.as_str())) + .map(|(k, v)| { + let v_str = match v { + Value::String(s) => s.clone(), + other => other.to_string(), + }; + (k.as_str(), v_str) + }) + .collect(); + label_pairs.sort_by(|a, b| a.0.cmp(b.0)); + + let mut hasher = FxHasher::default(); + // Include metric_name in the identity. Without it, two different + // metrics with identical label sets would collide into one series. + if let Some(Value::String(name)) = dp.get("metric_name") { + hasher.write(name.as_bytes()); + hasher.write(b"\0"); + } + for (k, v) in &label_pairs { + hasher.write(k.as_bytes()); + hasher.write(b"="); + hasher.write(v.as_bytes()); + hasher.write(b"\0"); + } + hasher.finish() +} + /// otel metrics event has json array for exemplar /// this function flatten the exemplar json array /// and returns a `Map` of the exemplar json @@ -564,6 +616,19 @@ fn process_resource_metrics( for (k, v) in &envelope { dp.insert(k.clone(), v.clone()); } + // Compute the physical-series hash AFTER envelope merge + // so resource/scope attributes participate in series + // identity (they're labels from the query layer's + // perspective). Computed once per data point — O(label + // count) per sample, ~200 ns at typical attribute counts. + let series_hash = compute_series_hash(&dp); + // Stored as decimal-encoded string. Arrow-json + // infers Utf8, preserving all 64 bits — Int64/Float64 + // inference truncated values near the high range. + dp.insert( + "__series_hash".to_string(), + Value::String(series_hash.to_string()), + ); vec_otel_json.push(Value::Object(dp)); } } @@ -655,3 +720,110 @@ fn flatten_data_point_flags(flags: u32) -> Map { ); data_point_flags_json } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_dp() -> Map { + let mut dp = Map::new(); + dp.insert( + "metric_name".to_string(), + Value::String("counter.app.metric_0006".into()), + ); + dp.insert( + "time_unix_nano".to_string(), + Value::String("2026-05-19T09:00:00Z".into()), + ); + dp.insert("data_point_value".to_string(), Value::Number(1000.into())); + dp.insert("is_monotonic".to_string(), Value::Bool(true)); + dp.insert("service.name".to_string(), Value::String("api".into())); + dp.insert("http.method".to_string(), Value::String("GET".into())); + dp.insert("request.id".to_string(), Value::String("req-1".into())); + dp + } + + #[test] + fn series_hash_stable_across_runs() { + // Same input → same hash. Locks in the wire contract between + // ingest and query layers; any algorithm change here breaks + // grouping for already-ingested data. + let dp = make_dp(); + let h1 = compute_series_hash(&dp); + let h2 = compute_series_hash(&dp); + assert_eq!(h1, h2); + } + + #[test] + fn series_hash_independent_of_label_insertion_order() { + // serde_json::Map preserves insertion order; query side may see + // labels in different order. Hash must be insertion-order-agnostic. + let mut a = Map::new(); + a.insert("metric_name".to_string(), Value::String("m".into())); + a.insert("service.name".to_string(), Value::String("api".into())); + a.insert("http.method".to_string(), Value::String("GET".into())); + + let mut b = Map::new(); + b.insert("http.method".to_string(), Value::String("GET".into())); + b.insert("metric_name".to_string(), Value::String("m".into())); + b.insert("service.name".to_string(), Value::String("api".into())); + + assert_eq!(compute_series_hash(&a), compute_series_hash(&b)); + } + + #[test] + fn series_hash_changes_with_label_value() { + let dp = make_dp(); + let mut dp2 = dp.clone(); + dp2.insert("service.name".to_string(), Value::String("billing".into())); + assert_ne!(compute_series_hash(&dp), compute_series_hash(&dp2)); + } + + #[test] + fn series_hash_changes_with_metric_name() { + // Two different metrics with identical labels must hash to + // different values, otherwise samples for `requests_total` and + // `latency_seconds` would collide into one logical series. + let dp = make_dp(); + let mut dp2 = dp.clone(); + dp2.insert( + "metric_name".to_string(), + Value::String("other.metric".into()), + ); + assert_ne!(compute_series_hash(&dp), compute_series_hash(&dp2)); + } + + #[test] + fn series_hash_ignores_sample_level_fields() { + // time_unix_nano and data_point_value belong to the SAMPLE, not + // the series. Hash must be identical across samples of the same + // physical series taken at different times with different values. + let dp = make_dp(); + let mut dp_later = dp.clone(); + dp_later.insert( + "time_unix_nano".to_string(), + Value::String("2026-05-19T10:00:00Z".into()), + ); + dp_later.insert("data_point_value".to_string(), Value::Number(2000.into())); + assert_eq!(compute_series_hash(&dp), compute_series_hash(&dp_later)); + } + + #[test] + fn series_hash_distinguishes_label_kv_swap() { + // Pathological pair: {a=bc, d=e} vs {a=b, cd=e}. A naive + // concatenation hash would emit identical bytes. Delimiters in + // the hasher input prevent this — verify here so a future + // optimisation can't silently regress collision resistance. + let mut a = Map::new(); + a.insert("metric_name".to_string(), Value::String("m".into())); + a.insert("a".to_string(), Value::String("bc".into())); + a.insert("d".to_string(), Value::String("e".into())); + + let mut b = Map::new(); + b.insert("metric_name".to_string(), Value::String("m".into())); + b.insert("a".to_string(), Value::String("b".into())); + b.insert("cd".to_string(), Value::String("e".into())); + + assert_ne!(compute_series_hash(&a), compute_series_hash(&b)); + } +} diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b7aec7ef2..6c75dec75 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -17,7 +17,7 @@ * */ -use arrow_array::RecordBatch; +use arrow_array::{ArrayRef, RecordBatch}; use arrow_ipc::reader::StreamReader; use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; @@ -589,12 +589,26 @@ impl Stream { Encoding::DELTA_BINARY_PACKED, ); - // Create sorting columns - let mut sorting_column_vec = vec![SortingColumn { + // Build sorting columns. For OTel-metrics streams, put + // `metric_name` ahead of the time partition so per-page parquet + // min/max stats can prune by metric (PromQL's universal selector + // predicate). The actual row order is enforced at write time + // (`sort_batch_for_metric_pruning`) — this just advertises the + // sort in parquet footer metadata so readers can rely on it. + let is_otel_metrics = self.is_otel_metrics(); + let mut sorting_column_vec: Vec = Vec::new(); + if is_otel_metrics && let Ok(name_idx) = merged_schema.index_of("metric_name") { + sorting_column_vec.push(SortingColumn { + column_idx: name_idx as i32, + descending: false, + nulls_first: false, + }); + } + sorting_column_vec.push(SortingColumn { column_idx: time_partition_idx as i32, descending: true, - nulls_first: true, - }]; + nulls_first: false, + }); // Describe custom partition column encodings and sorting if let Some(custom_partition) = custom_partition { @@ -616,6 +630,66 @@ impl Stream { props.set_sorting_columns(Some(sorting_column_vec)).build() } + /// True if this stream's log_source carries the OTel-metrics + /// format. Determines whether per-batch sort and metric_name-first + /// SortingColumn metadata get applied at write time. + fn is_otel_metrics(&self) -> bool { + self.get_log_source() + .iter() + .any(|s| matches!(s.log_source_format, LogSource::OtelMetrics)) + } + + /// Permute a `RecordBatch` so rows are ordered by + /// `(metric_name ASC, time_partition DESC)`. Required for parquet + /// page-index pruning to be effective on PromQL's + /// `metric_name = 'X'` selector — without this, pages within a row + /// group hold interleaved metrics and per-page min/max stats span + /// every metric in the stream, killing pruning. + /// + /// Bails out without sorting when either source column is missing + /// (non-metric stream, schema drift) so the caller can write the + /// batch unchanged. + fn sort_batch_for_metric_pruning( + batch: &RecordBatch, + time_partition_field: &str, + ) -> Result { + use arrow::compute::{SortColumn, kernels::sort::SortOptions, lexsort_to_indices, take}; + let schema = batch.schema(); + let Some(name_idx) = schema.index_of("metric_name").ok() else { + return Ok(batch.clone()); + }; + let Some(time_idx) = schema.index_of(time_partition_field).ok() else { + return Ok(batch.clone()); + }; + if batch.num_rows() < 2 { + return Ok(batch.clone()); + } + + let sort_cols = vec![ + SortColumn { + values: batch.column(name_idx).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: false, + }), + }, + SortColumn { + values: batch.column(time_idx).clone(), + options: Some(SortOptions { + descending: true, + nulls_first: false, + }), + }, + ]; + let indices = lexsort_to_indices(&sort_cols, None)?; + let columns: Vec = batch + .columns() + .iter() + .map(|c| take(c.as_ref(), &indices, None)) + .collect::>()?; + Ok(RecordBatch::try_new(schema, columns)?) + } + fn reset_staging_metrics(&self, tenant_id: &Option) { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); metrics::STAGING_FILES @@ -739,8 +813,43 @@ impl Stream { .open(part_path) .map_err(|_| StagingError::Create)?; let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; - for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { - writer.write(record)?; + let sort_for_metric_pruning = self.is_otel_metrics(); + let time_partition_field = time_partition.map_or_else( + || DEFAULT_TIMESTAMP_KEY.to_string(), + |s| s.as_str().to_string(), + ); + + if sort_for_metric_pruning { + // Buffer batches up to the row-group target, then + // concat + sort + write as a single contiguous batch. The + // ArrowWriter splits the sorted batch into row groups at the + // same boundary, so the row order survives intact and + // per-page (metric_name min, max) stats narrow to the slice + // each page actually carries. + let target = self.options.row_group_size; + let mut buffer: Vec = Vec::new(); + let mut buffered_rows: usize = 0; + for record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { + buffered_rows += record.num_rows(); + buffer.push(record); + if buffered_rows >= target { + let combined = arrow::compute::concat_batches(schema, &buffer)?; + let sorted = + Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?; + writer.write(&sorted)?; + buffer.clear(); + buffered_rows = 0; + } + } + if !buffer.is_empty() { + let combined = arrow::compute::concat_batches(schema, &buffer)?; + let sorted = Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?; + writer.write(&sorted)?; + } + } else { + for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { + writer.write(record)?; + } } writer.close()?; diff --git a/src/query/mod.rs b/src/query/mod.rs index e1df94c1a..2cc49b068 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -43,7 +43,6 @@ use datafusion::sql::parser::DFParser; use datafusion::sql::resolve::resolve_table_references; use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; use futures::Stream; -use futures::stream::select_all; use itertools::Itertools; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -55,6 +54,8 @@ use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use sysinfo::System; use tokio::runtime::Runtime; +use tokio_stream::wrappers::ReceiverStream; +use tracing::Instrument; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; @@ -74,21 +75,7 @@ use crate::storage::{ObjectStorageProvider, ObjectStoreFormat}; use crate::utils::time::TimeRange; /// Boxed record-batch stream used as the streaming half of query results. -type BoxedBatchStream = Pin< - Box< - RecordBatchStreamAdapter< - select_all::SelectAll< - Pin< - Box< - dyn RecordBatchStream< - Item = Result, - > + Send, - >, - >, - >, - >, - >, ->; +type BoxedBatchStream = SendableRecordBatchStream; /// Result type returned by query execution: either collected batches or a streaming adapter, plus field names. type QueryResult = Result<(Either, BoxedBatchStream>, Vec), ExecuteError>; @@ -320,19 +307,36 @@ impl Query { active_streams: AtomicUsize::new(output_partitions), }); - let streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())? - .into_iter() - .map(|s| { - let wrapped = - PartitionedMetricMonitor::new(s, monitor_state.clone(), tenant_id.clone()); - Box::pin(wrapped) as SendableRecordBatchStream - }) - .collect_vec(); - - let merged_stream = futures::stream::select_all(streams); + let partition_streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())?; + let n = partition_streams.len(); + // Bound channel so a slow consumer backpressures producers — caps peak memory. + let (tx, rx) = tokio::sync::mpsc::channel::< + Result, + >((num_cpus::get() * 4).max(n * 2).max(1)); + + for s in partition_streams { + let wrapped = + PartitionedMetricMonitor::new(s, monitor_state.clone(), tenant_id.clone()); + let tx = tx.clone(); + let span = tracing::Span::current(); + tokio::spawn( + async move { + let mut stream: SendableRecordBatchStream = Box::pin(wrapped); + use futures::StreamExt; + while let Some(batch) = stream.next().await { + if tx.send(batch).await.is_err() { + break; + } + } + } + .instrument(span), + ); + } + drop(tx); - let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged_stream); - Either::Right(Box::pin(final_stream)) + let merged = ReceiverStream::new(rx); + let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged); + Either::Right(Box::pin(final_stream) as SendableRecordBatchStream) }; Ok((results, fields))