Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
uuid = { version = "1", features = ["v4"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
rustc-hash = "2"
futures-core = "0.3.31"
tempfile = "3.20.0"
lazy_static = "1.4.0"
Expand Down
174 changes: 173 additions & 1 deletion src/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use opentelemetry_proto::tonic::metrics::v1::{
};
use serde_json::{Map, Value};

use rustc_hash::FxHasher;
use std::hash::Hasher;
use tracing::info_span;

use crate::metrics::increment_metrics_collected_by_date;
Expand All @@ -31,7 +33,7 @@ use super::otel_utils::{
convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some,
};

pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 36] = [
pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [
"metric_name",
"metric_description",
"metric_unit",
Expand Down Expand Up @@ -68,8 +70,58 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 36] = [
"scope_dropped_attributes_count",
"resource_dropped_attributes_count",
"resource_schema_url",
// Precomputed per-sample identity of the physical series. Stable
// u64 hash of `metric_name` + sorted attribute key/value pairs,
// stored as a decimal-encoded string so arrow-json infers Utf8 and
// we get byte-exact roundtrip. Int64/Float64 inference dropped bits
// for hashes near the high range; string sidesteps that entirely.
// Lets the query layer group samples into physical series via a
// single column read instead of decoding every label column and
// hashing per row.
"__series_hash",
];

/// Compute a stable u64 identifier for the physical series a sample
/// belongs to. Hashes `metric_name` plus every attribute key/value pair
/// that survived OTel flattening — everything in the flattened data
/// point that isn't a known sample-level field is treated as a label.
///
/// Hash output must be stable across process restarts and matching at
/// query time. Uses rustc-hash's FxHasher (fast, deterministic,
/// non-cryptographic) and feeds keys in sorted order so the hash
/// doesn't depend on JSON Map iteration order.
fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
let known: std::collections::HashSet<&str> =
OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
let mut label_pairs: Vec<(&str, String)> = dp
.iter()
.filter(|(k, _)| !known.contains(k.as_str()))
.map(|(k, v)| {
let v_str = match v {
Value::String(s) => s.clone(),
other => other.to_string(),
};
(k.as_str(), v_str)
})
.collect();
label_pairs.sort_by(|a, b| a.0.cmp(b.0));

let mut hasher = FxHasher::default();
// Include metric_name in the identity. Without it, two different
// metrics with identical label sets would collide into one series.
if let Some(Value::String(name)) = dp.get("metric_name") {
hasher.write(name.as_bytes());
hasher.write(b"\0");
}
for (k, v) in &label_pairs {
hasher.write(k.as_bytes());
hasher.write(b"=");
hasher.write(v.as_bytes());
hasher.write(b"\0");
}
hasher.finish()
}

/// otel metrics event has json array for exemplar
/// this function flatten the exemplar json array
/// and returns a `Map` of the exemplar json
Expand Down Expand Up @@ -564,6 +616,19 @@ fn process_resource_metrics<T, S, M>(
for (k, v) in &envelope {
dp.insert(k.clone(), v.clone());
}
// Compute the physical-series hash AFTER envelope merge
// so resource/scope attributes participate in series
// identity (they're labels from the query layer's
// perspective). Computed once per data point — O(label
// count) per sample, ~200 ns at typical attribute counts.
let series_hash = compute_series_hash(&dp);
// Stored as decimal-encoded string. Arrow-json
// infers Utf8, preserving all 64 bits — Int64/Float64
// inference truncated values near the high range.
dp.insert(
"__series_hash".to_string(),
Value::String(series_hash.to_string()),
);
vec_otel_json.push(Value::Object(dp));
}
}
Expand Down Expand Up @@ -655,3 +720,110 @@ fn flatten_data_point_flags(flags: u32) -> Map<String, Value> {
);
data_point_flags_json
}

#[cfg(test)]
mod tests {
use super::*;

fn make_dp() -> Map<String, Value> {
let mut dp = Map::new();
dp.insert(
"metric_name".to_string(),
Value::String("counter.app.metric_0006".into()),
);
dp.insert(
"time_unix_nano".to_string(),
Value::String("2026-05-19T09:00:00Z".into()),
);
dp.insert("data_point_value".to_string(), Value::Number(1000.into()));
dp.insert("is_monotonic".to_string(), Value::Bool(true));
dp.insert("service.name".to_string(), Value::String("api".into()));
dp.insert("http.method".to_string(), Value::String("GET".into()));
dp.insert("request.id".to_string(), Value::String("req-1".into()));
dp
}

#[test]
fn series_hash_stable_across_runs() {
// Same input → same hash. Locks in the wire contract between
// ingest and query layers; any algorithm change here breaks
// grouping for already-ingested data.
let dp = make_dp();
let h1 = compute_series_hash(&dp);
let h2 = compute_series_hash(&dp);
assert_eq!(h1, h2);
}

#[test]
fn series_hash_independent_of_label_insertion_order() {
// serde_json::Map preserves insertion order; query side may see
// labels in different order. Hash must be insertion-order-agnostic.
let mut a = Map::new();
a.insert("metric_name".to_string(), Value::String("m".into()));
a.insert("service.name".to_string(), Value::String("api".into()));
a.insert("http.method".to_string(), Value::String("GET".into()));

let mut b = Map::new();
b.insert("http.method".to_string(), Value::String("GET".into()));
b.insert("metric_name".to_string(), Value::String("m".into()));
b.insert("service.name".to_string(), Value::String("api".into()));

assert_eq!(compute_series_hash(&a), compute_series_hash(&b));
}

#[test]
fn series_hash_changes_with_label_value() {
let dp = make_dp();
let mut dp2 = dp.clone();
dp2.insert("service.name".to_string(), Value::String("billing".into()));
assert_ne!(compute_series_hash(&dp), compute_series_hash(&dp2));
}

#[test]
fn series_hash_changes_with_metric_name() {
// Two different metrics with identical labels must hash to
// different values, otherwise samples for `requests_total` and
// `latency_seconds` would collide into one logical series.
let dp = make_dp();
let mut dp2 = dp.clone();
dp2.insert(
"metric_name".to_string(),
Value::String("other.metric".into()),
);
assert_ne!(compute_series_hash(&dp), compute_series_hash(&dp2));
}

#[test]
fn series_hash_ignores_sample_level_fields() {
// time_unix_nano and data_point_value belong to the SAMPLE, not
// the series. Hash must be identical across samples of the same
// physical series taken at different times with different values.
let dp = make_dp();
let mut dp_later = dp.clone();
dp_later.insert(
"time_unix_nano".to_string(),
Value::String("2026-05-19T10:00:00Z".into()),
);
dp_later.insert("data_point_value".to_string(), Value::Number(2000.into()));
assert_eq!(compute_series_hash(&dp), compute_series_hash(&dp_later));
}

#[test]
fn series_hash_distinguishes_label_kv_swap() {
// Pathological pair: {a=bc, d=e} vs {a=b, cd=e}. A naive
// concatenation hash would emit identical bytes. Delimiters in
// the hasher input prevent this — verify here so a future
// optimisation can't silently regress collision resistance.
let mut a = Map::new();
a.insert("metric_name".to_string(), Value::String("m".into()));
a.insert("a".to_string(), Value::String("bc".into()));
a.insert("d".to_string(), Value::String("e".into()));

let mut b = Map::new();
b.insert("metric_name".to_string(), Value::String("m".into()));
b.insert("a".to_string(), Value::String("b".into()));
b.insert("cd".to_string(), Value::String("e".into()));

assert_ne!(compute_series_hash(&a), compute_series_hash(&b));
}
}
93 changes: 88 additions & 5 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

use arrow_array::RecordBatch;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_ipc::reader::StreamReader;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
Expand Down Expand Up @@ -589,12 +589,26 @@ impl Stream {
Encoding::DELTA_BINARY_PACKED,
);

// Create sorting columns
let mut sorting_column_vec = vec![SortingColumn {
// Build sorting columns. For OTel-metrics streams, put
// `metric_name` ahead of the time partition so per-page parquet
// min/max stats can prune by metric (PromQL's universal selector
// predicate). The actual row order is enforced at write time
// (`sort_batch_for_metric_pruning`) — this just advertises the
// sort in parquet footer metadata so readers can rely on it.
let is_otel_metrics = self.is_otel_metrics();
let mut sorting_column_vec: Vec<SortingColumn> = Vec::new();
if is_otel_metrics && let Ok(name_idx) = merged_schema.index_of("metric_name") {
sorting_column_vec.push(SortingColumn {
column_idx: name_idx as i32,
descending: false,
nulls_first: false,
});
}
sorting_column_vec.push(SortingColumn {
column_idx: time_partition_idx as i32,
descending: true,
nulls_first: true,
}];
});

// Describe custom partition column encodings and sorting
if let Some(custom_partition) = custom_partition {
Expand All @@ -616,6 +630,66 @@ impl Stream {
props.set_sorting_columns(Some(sorting_column_vec)).build()
}

/// True if this stream's log_source carries the OTel-metrics
/// format. Determines whether per-batch sort and metric_name-first
/// SortingColumn metadata get applied at write time.
fn is_otel_metrics(&self) -> bool {
self.get_log_source()
.iter()
.any(|s| matches!(s.log_source_format, LogSource::OtelMetrics))
}

/// Permute a `RecordBatch` so rows are ordered by
/// `(metric_name ASC, time_partition DESC)`. Required for parquet
/// page-index pruning to be effective on PromQL's
/// `metric_name = 'X'` selector — without this, pages within a row
/// group hold interleaved metrics and per-page min/max stats span
/// every metric in the stream, killing pruning.
///
/// Bails out without sorting when either source column is missing
/// (non-metric stream, schema drift) so the caller can write the
/// batch unchanged.
fn sort_batch_for_metric_pruning(
batch: &RecordBatch,
time_partition_field: &str,
) -> Result<RecordBatch, StagingError> {
use arrow::compute::{SortColumn, kernels::sort::SortOptions, lexsort_to_indices, take};
let schema = batch.schema();
let Some(name_idx) = schema.index_of("metric_name").ok() else {
return Ok(batch.clone());
};
let Some(time_idx) = schema.index_of(time_partition_field).ok() else {
return Ok(batch.clone());
};
if batch.num_rows() < 2 {
return Ok(batch.clone());
}

let sort_cols = vec![
SortColumn {
values: batch.column(name_idx).clone(),
options: Some(SortOptions {
descending: false,
nulls_first: false,
}),
},
SortColumn {
values: batch.column(time_idx).clone(),
options: Some(SortOptions {
descending: true,
nulls_first: false,
}),
},
Comment on lines +676 to +682
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

nulls_first mismatch between parquet metadata and actual sort order.

The SortOptions for time_partition uses nulls_first: false (line 680), but parquet_writer_props advertises nulls_first: true in the SortingColumn metadata (line 610). This inconsistency means the parquet footer will incorrectly describe the data's sort order, potentially causing readers that rely on this metadata for optimized merging or scanning to produce incorrect results.

🐛 Proposed fix to align nulls_first
         SortColumn {
             values: batch.column(time_idx).clone(),
             options: Some(SortOptions {
                 descending: true,
-                nulls_first: false,
+                nulls_first: true,
             }),
         },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SortColumn {
values: batch.column(time_idx).clone(),
options: Some(SortOptions {
descending: true,
nulls_first: false,
}),
},
SortColumn {
values: batch.column(time_idx).clone(),
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` around lines 676 - 682, The SortOptions for the
time partition currently sets nulls_first: false while the parquet_writer_props
SortingColumn metadata advertises nulls_first: true, causing a mismatch; update
the SortOptions construction used for time_partition (the SortColumn with
values: batch.column(time_idx)) to set nulls_first: true so it matches the
parquet_writer_props/SortingColumn metadata and keep SortColumn/SortOptions and
parquet_writer_props consistent.

];
let indices = lexsort_to_indices(&sort_cols, None)?;
let columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|c| take(c.as_ref(), &indices, None))
.collect::<Result<_, _>>()?;
Ok(RecordBatch::try_new(schema, columns)?)
}

fn reset_staging_metrics(&self, tenant_id: &Option<String>) {
let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
metrics::STAGING_FILES
Expand Down Expand Up @@ -739,8 +813,17 @@ impl Stream {
.open(part_path)
.map_err(|_| StagingError::Create)?;
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
let sort_for_metric_pruning = self.is_otel_metrics();
let time_partition_field = time_partition
.map(|s| s.as_str().to_string())
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.to_string());
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
writer.write(record)?;
if sort_for_metric_pruning {
let sorted = Self::sort_batch_for_metric_pruning(record, &time_partition_field)?;
writer.write(&sorted)?;
} else {
writer.write(record)?;
}
}
writer.close()?;

Expand Down
Loading
Loading