diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5855af17a536..d1837f10cb145 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -86,3 +86,7 @@ harness = false [[bench]] name = "parquet_struct_filter_pushdown" harness = false + +[[bench]] +name = "selectivity_tracker" +harness = false diff --git a/datafusion/datasource-parquet/benches/selectivity_tracker.rs b/datafusion/datasource-parquet/benches/selectivity_tracker.rs new file mode 100644 index 0000000000000..45fed92d1db91 --- /dev/null +++ b/datafusion/datasource-parquet/benches/selectivity_tracker.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Microbenchmarks for [`SelectivityTracker`] hot paths. +//! +//! These benches isolate the tracker from decoder/IO so we can iterate on +//! its data structures independently. The scenarios model the load a +//! ClickBench-style partitioned query puts on the tracker: +//! +//! - a file is opened and each of its row-group morsels asks the tracker +//! where to place each user filter (`partition_filters`); +//! - inside each morsel the decoder hands us one `RecordBatch` at a time +//! and each batch feeds selectivity stats to the tracker (`update`). +//! +//! With the default ClickBench-partitioned workload (100 files × ~2–3 +//! row-group morsels × ~125 batches-per-morsel × ~1–3 filters-per-query), +//! the `update` path fires tens of thousands of times per query and +//! `partition_filters` fires hundreds — both on the scan critical path. +//! +//! Each bench reports the cost of a single representative operation so +//! the per-query overhead follows by simple multiplication. + +use std::sync::Arc; + +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_datasource_parquet::selectivity::{ + FilterId, SelectivityTracker, TrackerConfig, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::expressions::Column; +use parquet::basic::{LogicalType, Type as PhysicalType}; +use parquet::file::metadata::{ + ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData, +}; +use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor, Type as SchemaType}; + +/// How many files a ClickBench-partitioned query typically opens. +const NUM_FILES: usize = 100; +/// Morsels per file — two full-row-group chunks is typical for hits_partitioned. +const MORSELS_PER_FILE: usize = 3; +/// Batches per morsel (row_group_rows / batch_size ≈ 500k / 8k). +const BATCHES_PER_MORSEL: usize = 60; +/// Filters per query — matches the worst regressed ClickBench queries. +const FILTERS_PER_QUERY: usize = 3; + +fn build_columns(n: usize) -> SchemaDescPtr { + let fields: Vec<_> = (0..n) + .map(|i| { + let name = format!("c{i}"); + SchemaType::primitive_type_builder(&name, PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::String)) + .build() + .unwrap() + .into() + }) + .collect(); + let group = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + Arc::new(SchemaDescriptor::new(Arc::new(group))) +} + +/// One file with `rg_count` row groups, each nominally `rows_per_rg` rows, +/// `bytes_per_col` compressed bytes per column. +fn build_metadata( + rg_count: usize, + rows_per_rg: i64, + num_cols: usize, + bytes_per_col: i64, +) -> ParquetMetaData { + let schema = build_columns(num_cols); + let row_groups: Vec<_> = (0..rg_count) + .map(|_| { + let cols = (0..num_cols) + .map(|c| { + ColumnChunkMetaData::builder(schema.column(c)) + .set_num_values(rows_per_rg) + .set_total_compressed_size(bytes_per_col) + .build() + .unwrap() + }) + .collect(); + RowGroupMetaData::builder(schema.clone()) + .set_num_rows(rows_per_rg) + .set_column_metadata(cols) + .build() + .unwrap() + }) + .collect(); + let total_rows = rg_count as i64 * rows_per_rg; + let file_meta = FileMetaData::new(1, total_rows, None, None, schema, None); + ParquetMetaData::new(file_meta, row_groups) +} + +/// Produce `F` user filters, each referencing a single column. Column 0 is +/// shared by filter 0 and the projection (filter-in-projection shape, as in +/// ClickBench Q14 `WHERE SearchPhrase <> ''`); the rest sit on columns +/// outside the projection. +fn make_filters(n: usize) -> Vec<(FilterId, Arc)> { + (0..n) + .map(|i| { + let expr: Arc = Arc::new(Column::new(&format!("c{i}"), i)); + (i as FilterId, expr) + }) + .collect() +} + +/// Shared setup: tracker pre-warmed with one `partition_filters` call so +/// the filter stats / state entries exist. Models "second morsel onwards". +fn warm_tracker( + config: TrackerConfig, + filters: &[(FilterId, Arc)], + metadata: &ParquetMetaData, +) -> Arc { + let tracker = Arc::new(config.build()); + // Seed with a round-trip so HashMap entries exist; otherwise the first + // bench iteration pays the "new filter" insertion cost and later ones + // don't. + let _ = tracker.partition_filters_for_test( + filters.to_vec(), + &std::collections::HashSet::new(), + 1_000_000, + metadata, + ); + tracker +} + +/// Per-batch `update` cost. This is the tightest loop — it fires once per +/// decoded batch per active filter. At ClickBench scale that's +/// NUM_FILES × MORSELS_PER_FILE × BATCHES_PER_MORSEL × FILTERS = +/// 54,000 calls per query, so every nanosecond here matters. +fn bench_update(c: &mut Criterion) { + let metadata = build_metadata(2, 500_000, 4, 10_000_000); + let filters = make_filters(FILTERS_PER_QUERY); + let tracker = warm_tracker(TrackerConfig::new(), &filters, &metadata); + + let mut group = c.benchmark_group("selectivity_tracker/update"); + group.throughput(criterion::Throughput::Elements(1)); + group.bench_function("single_call", |b| { + let id = filters[0].0; + b.iter(|| { + tracker.update( + std::hint::black_box(id), + std::hint::black_box(4_096), + std::hint::black_box(8_192), + std::hint::black_box(50_000), + std::hint::black_box(65_536), + ); + }) + }); + + // A realistic per-batch hit: we update every active filter for this + // batch. Mirrors `apply_post_scan_filters_with_stats` calling + // `tracker.update` once per filter per batch. + group.bench_function("per_batch_all_filters", |b| { + b.iter(|| { + for (id, _) in &filters { + tracker.update( + std::hint::black_box(*id), + std::hint::black_box(4_096), + std::hint::black_box(8_192), + std::hint::black_box(50_000), + std::hint::black_box(65_536), + ); + } + }) + }); + group.finish(); +} + +/// Per-morsel `partition_filters` cost. Fires once per row-group morsel, +/// so NUM_FILES × MORSELS_PER_FILE ≈ 300 per query. We measure both the +/// "cold" (first) call and the "warm" (re-partition) case. +fn bench_partition_filters(c: &mut Criterion) { + let metadata = build_metadata(2, 500_000, 4, 10_000_000); + let filters = make_filters(FILTERS_PER_QUERY); + let projection_bytes = 40_000_000usize; + + let mut group = c.benchmark_group("selectivity_tracker/partition_filters"); + group.bench_function("cold_first_call", |b| { + b.iter_batched( + || Arc::new(TrackerConfig::new().build()), + |tracker| { + std::hint::black_box(tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + projection_bytes, + &metadata, + )); + }, + criterion::BatchSize::SmallInput, + ) + }); + + // Warm case: tracker already has state for every filter, matches the + // per-morsel path after morsel 0 of any file. + let warm = warm_tracker(TrackerConfig::new(), &filters, &metadata); + group.bench_function("warm_repeat_call", |b| { + b.iter(|| { + std::hint::black_box(warm.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + projection_bytes, + &metadata, + )); + }) + }); + + // Same warm case but after realistic stats have accumulated — this is + // the path that also evaluates the confidence-bound promote/demote + // branches. Seed the tracker with a credible number of `update` calls + // before measuring. + let promoted = warm_tracker(TrackerConfig::new(), &filters, &metadata); + for _ in 0..500 { + for (id, _) in &filters { + promoted.update(*id, 3_000, 8_192, 50_000, 65_536); + } + } + group.bench_function("warm_with_accumulated_stats", |b| { + b.iter(|| { + std::hint::black_box(promoted.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + projection_bytes, + &metadata, + )); + }) + }); + group.finish(); +} + +/// End-to-end "one file open" cost: one `partition_filters` per morsel +/// plus `update` per batch per filter. This matches what a single +/// ClickBench-partitioned file inflicts on the tracker and lets us read +/// the combined improvement from any optimization in one number. +fn bench_file_scan_simulation(c: &mut Criterion) { + let metadata = build_metadata(2, 500_000, 4, 10_000_000); + let filters = make_filters(FILTERS_PER_QUERY); + let projection_bytes = 40_000_000usize; + let warm = warm_tracker(TrackerConfig::new(), &filters, &metadata); + + let mut group = c.benchmark_group("selectivity_tracker/file_scan"); + group.throughput(criterion::Throughput::Elements( + (MORSELS_PER_FILE * BATCHES_PER_MORSEL * FILTERS_PER_QUERY) as u64, + )); + group.bench_function("one_file", |b| { + b.iter(|| { + for _morsel in 0..MORSELS_PER_FILE { + std::hint::black_box(warm.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + projection_bytes, + &metadata, + )); + for _batch in 0..BATCHES_PER_MORSEL { + for (id, _) in &filters { + warm.update(*id, 3_000, 8_192, 50_000, 65_536); + } + } + } + }) + }); + group.finish(); +} + +/// Full-query simulation: [`NUM_FILES`] sequential file scans on a single +/// tracker instance. Closest approximation to the per-query tracker cost +/// a ClickBench user sees. +/// +/// Parameterised on morsels-per-file so we can see how sensitive the +/// total cost is to the morsel-split fan-out. +fn bench_query_simulation(c: &mut Criterion) { + let metadata = build_metadata(2, 500_000, 4, 10_000_000); + let filters = make_filters(FILTERS_PER_QUERY); + let projection_bytes = 40_000_000usize; + + let mut group = c.benchmark_group("selectivity_tracker/query"); + group.sample_size(20); + for morsels in [1usize, 2, 3, 5] { + group.bench_with_input( + BenchmarkId::from_parameter(morsels), + &morsels, + |b, &morsels_per_file| { + b.iter_batched( + || Arc::new(TrackerConfig::new().build()), + |tracker| { + for _file in 0..NUM_FILES { + for _morsel in 0..morsels_per_file { + std::hint::black_box(tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + projection_bytes, + &metadata, + )); + for _batch in 0..BATCHES_PER_MORSEL { + for (id, _) in &filters { + tracker.update(*id, 3_000, 8_192, 50_000, 65_536); + } + } + } + } + }, + criterion::BatchSize::SmallInput, + ) + }, + ); + } + group.finish(); +} + +criterion_group!( + benches, + bench_update, + bench_partition_filters, + bench_file_scan_simulation, + bench_query_simulation, +); +criterion_main!(benches); diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..eb81383a93ca7 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -33,6 +33,7 @@ mod page_filter; mod reader; mod row_filter; mod row_group_filter; +pub mod selectivity; mod sort; pub mod source; mod supported_predicates; diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index fa3e2dd44d9ab..076fffc248718 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -113,6 +113,37 @@ pub struct PagePruningAccessPlanFilter { /// single column predicates (e.g. (`col = 5`) extracted from the overall /// predicate. Must all be true for a row to be included in the result. predicates: Vec, + /// Per-predicate tag (caller-supplied id, typically a `FilterId`). + /// `None` when the filter was constructed without tagging via + /// [`Self::new`]; `Some` when constructed via [`Self::new_tagged`]. + /// The vector has the same length as `predicates`. + tags: Option>, +} + +/// Per-conjunct accumulators surfaced by +/// [`PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats`]. +/// One entry per kept predicate (in the same order as `predicates`). +#[derive(Clone, Debug, Default)] +pub struct PerConjunctPageStats { + /// Caller tag (e.g. FilterId) — `None` when the filter was built + /// untagged via [`PagePruningAccessPlanFilter::new`]. + pub tag: Option, + /// Total rows in row groups where this conjunct was evaluated. + pub rows_seen: u64, + /// Rows the page index proved this conjunct alone would skip. + pub rows_skipped: u64, +} + +impl PerConjunctPageStats { + /// Returns the per-conjunct page-pruning rate, or `None` when no + /// rows were evaluated (e.g. the file has no page index for this + /// column, or the predicate's converter couldn't be built). + pub fn pruning_rate(&self) -> Option { + if self.rows_seen == 0 { + return None; + } + Some(self.rows_skipped as f64 / self.rows_seen as f64) + } } impl PagePruningAccessPlanFilter { @@ -148,7 +179,50 @@ impl PagePruningAccessPlanFilter { Some(pp) }) .collect::>(); - Self { predicates } + Self { + predicates, + tags: None, + } + } + + /// Variant of [`Self::new`] that takes already-split conjuncts each + /// carrying a caller tag (usually a `FilterId`). Predicates that + /// fail the same single-column / non-trivial filtering as `new` + /// are dropped, but tags survive for the conjuncts that make it + /// through. Subsequent calls to + /// [`Self::prune_plan_with_per_conjunct_stats`] return per-conjunct + /// pruning stats keyed by tag. + pub fn new_tagged( + conjuncts: &[(usize, Arc)], + schema: &SchemaRef, + ) -> Self { + let mut predicates = Vec::with_capacity(conjuncts.len()); + let mut tags = Vec::with_capacity(conjuncts.len()); + for (id, expr) in conjuncts { + let pp = match PruningPredicate::try_new(Arc::clone(expr), Arc::clone(schema)) + { + Ok(pp) => pp, + Err(e) => { + debug!( + "Ignoring error creating tagged page pruning predicate \ + for filter id {id}: {e}" + ); + continue; + } + }; + if pp.always_true() { + continue; + } + if pp.required_columns().single_column().is_none() { + continue; + } + predicates.push(pp); + tags.push(*id); + } + Self { + predicates, + tags: Some(tags), + } } /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the @@ -336,6 +410,190 @@ impl PagePruningAccessPlanFilter { pub fn filter_number(&self) -> usize { self.predicates.len() } + + /// Like [`Self::prune_plan_with_page_index`] but also surfaces, as a + /// side-effect of the pruning iteration that already runs, a + /// per-conjunct accumulator with the rows that conjunct alone + /// would have proven skippable. Callers use this to seed a + /// per-FilterId selectivity prior without doing any extra pruning + /// work — every page-index lookup that would have happened in + /// `prune_plan_with_page_index` happens exactly once here too. + pub fn prune_plan_with_per_conjunct_stats( + &self, + mut access_plan: ParquetAccessPlan, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + parquet_metadata: &ParquetMetaData, + file_metrics: &ParquetFileMetrics, + ) -> (ParquetAccessPlan, Vec, usize) { + // scoped timer updates on drop + let _timer_guard = file_metrics.page_index_eval_time.timer(); + + let mut per_conjunct: Vec = (0..self.predicates.len()) + .map(|i| PerConjunctPageStats { + tag: self.tags.as_ref().and_then(|t| t.get(i).copied()), + rows_seen: 0, + rows_skipped: 0, + }) + .collect(); + + if self.predicates.is_empty() { + return (access_plan, per_conjunct, 0); + } + + let groups = parquet_metadata.row_groups(); + if groups.is_empty() { + return (access_plan, per_conjunct, 0); + } + + if parquet_metadata.offset_index().is_none() + || parquet_metadata.column_index().is_none() + { + return (access_plan, per_conjunct, 0); + } + + // Same accumulators as the untagged path, plus per-conjunct. + let mut total_skip = 0; + let mut total_select = 0; + let mut total_pages_skip = 0; + let mut total_pages_select = 0; + // Pages we skipped pruning for because row-group stats already + // proved the row group is fully matched — wasted work avoided, + // surfaced as a metric. + let mut total_pages_skipped_by_fully_matched = 0; + + let row_group_indexes = access_plan.row_group_indexes(); + for row_group_index in row_group_indexes { + // Skip page pruning for fully matched row groups: all rows are + // known to satisfy the predicate, so page-level pruning is + // wasted work. Still feed the rows into `rows_seen` per + // conjunct so per-FilterId pruning rates reflect the file's + // full row count rather than just the non-fully-matched part. + if access_plan.is_fully_matched(row_group_index) { + let page_count = + fully_matched_page_count(row_group_index, parquet_metadata); + total_pages_skipped_by_fully_matched += page_count; + let rg_rows = groups[row_group_index].num_rows() as u64; + for stats in per_conjunct.iter_mut() { + stats.rows_seen = stats.rows_seen.saturating_add(rg_rows); + } + continue; + } + let rg_rows = groups[row_group_index].num_rows() as u64; + let mut overall_selection = None; + + let total_pages_in_group = + parquet_metadata.offset_index().map_or(0, |offset_index| { + offset_index[row_group_index] + .first() + .map_or(0, |column| column.page_locations.len()) + }); + // Intersection of per-conjunct matched pages, matching the + // untagged path's behavior so the page-level metric reflects + // the AND of all predicates rather than a per-conjunct sum. + let mut matched_pages_in_group: HashSet = + HashSet::from_iter(0..total_pages_in_group); + + for (i, predicate) in self.predicates.iter().enumerate() { + per_conjunct[i].rows_seen = + per_conjunct[i].rows_seen.saturating_add(rg_rows); + + let column = predicate + .required_columns() + .single_column() + .expect("Page pruning requires single column predicates"); + + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + parquet_schema, + ) { + Ok(c) => c, + Err(e) => { + debug!( + "Could not create statistics converter for column {}: {e}", + column.name() + ); + continue; + } + }; + + let selection = prune_pages_in_one_row_group( + row_group_index, + predicate, + converter, + parquet_metadata, + file_metrics, + ); + + let Some((selection, page_match_flags)) = selection else { + continue; + }; + let matched_pages_indexes: HashSet<_> = page_match_flags + .into_iter() + .enumerate() + .filter(|x| x.1) + .map(|x| x.0) + .collect(); + matched_pages_in_group.retain(|x| matched_pages_indexes.contains(x)); + + // Per-conjunct skipped rows for this row group: anything + // the predicate's selection didn't include is something + // this conjunct alone proved skippable. + let kept_rows_for_conjunct = selection.row_count() as u64; + let skipped_rows_for_conjunct = + rg_rows.saturating_sub(kept_rows_for_conjunct); + per_conjunct[i].rows_skipped = per_conjunct[i] + .rows_skipped + .saturating_add(skipped_rows_for_conjunct); + + overall_selection = update_selection(overall_selection, selection); + + let selects_any = overall_selection + .as_ref() + .map(|sel| sel.selects_any()) + .unwrap_or(true); + if !selects_any { + break; + } + } + + let pages_matched = matched_pages_in_group.len(); + total_pages_select += pages_matched; + total_pages_skip += total_pages_in_group - pages_matched; + + if let Some(overall_selection) = overall_selection { + let rows_selected = overall_selection.row_count(); + if rows_selected > 0 { + let rows_skipped = overall_selection.skipped_row_count(); + total_skip += rows_skipped; + total_select += rows_selected; + access_plan.scan_selection(row_group_index, overall_selection); + } else { + let rows_skipped = groups[row_group_index].num_rows() as usize; + access_plan.skip(row_group_index); + total_skip += rows_skipped; + } + } + } + + file_metrics.page_index_rows_pruned.add_pruned(total_skip); + file_metrics + .page_index_rows_pruned + .add_matched(total_select); + file_metrics + .page_index_pages_pruned + .add_pruned(total_pages_skip); + file_metrics + .page_index_pages_pruned + .add_matched(total_pages_select); + + ( + access_plan, + per_conjunct, + total_pages_skipped_by_fully_matched, + ) + } } fn update_selection( diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 6dfaa731ae7f9..32f57370923f2 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -1082,6 +1082,25 @@ pub fn build_row_filter( .map(|filters| Some(RowFilter::new(filters))) } +/// Sum the compressed size of the given leaf-column indices across every +/// row group in the file. Used by the adaptive selectivity tracker to +/// weigh the I/O cost of decoding a filter's columns against the cost of +/// the projection. +pub(crate) fn total_compressed_bytes( + column_indices: &[usize], + metadata: &ParquetMetaData, +) -> usize { + let mut total: i64 = 0; + for rg in metadata.row_groups() { + for &idx in column_indices { + if let Some(col) = rg.columns().get(idx) { + total += col.compressed_size(); + } + } + } + total.max(0) as usize +} + #[cfg(test)] mod test { use super::*; diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index c45e69600f70c..b4aa07c9f2378 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -253,6 +253,28 @@ impl RowGroupAccessPlanFilter { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { + self.prune_by_statistics_with_per_conjunct_stats( + arrow_schema, + parquet_schema, + groups, + predicate, + metrics, + ); + } + + /// Variant of [`Self::prune_by_statistics`] that also returns + /// per-conjunct pruning stats produced by + /// [`PruningPredicate::prune_per_conjunct`]. Returns an empty + /// `Vec` when the predicate was not constructed with tagged + /// conjuncts, so callers can ignore it on the untagged path. + pub fn prune_by_statistics_with_per_conjunct_stats( + &mut self, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, + ) -> Vec { // scoped timer updates on drop let _timer_guard = metrics.statistics_eval_time.timer(); @@ -275,9 +297,14 @@ impl RowGroupAccessPlanFilter { missing_null_counts_as_zero: true, }; - // try to prune the row groups in a single call - match predicate.prune(&pruning_stats) { - Ok(values) => { + let mut per_conjunct: Vec = Vec::new(); + + // try to prune the row groups in a single call (now also captures + // per-conjunct rates when the predicate was built with + // `try_new_tagged_conjuncts`). + match predicate.prune_per_conjunct(&pruning_stats) { + Ok((values, stats)) => { + per_conjunct = stats; let mut fully_contained_candidates_original_idx: Vec = Vec::new(); for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { if !value { @@ -305,6 +332,8 @@ impl RowGroupAccessPlanFilter { metrics.predicate_evaluation_errors.add(1); } } + + per_conjunct } /// Identifies row groups that are fully matched by the predicate. @@ -607,11 +636,11 @@ impl PruningStatistics for BloomFilterStatistics { } /// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`] -struct RowGroupPruningStatistics<'a> { - parquet_schema: &'a SchemaDescriptor, - row_group_metadatas: Vec<&'a RowGroupMetaData>, - arrow_schema: &'a Schema, - missing_null_counts_as_zero: bool, +pub(crate) struct RowGroupPruningStatistics<'a> { + pub(crate) parquet_schema: &'a SchemaDescriptor, + pub(crate) row_group_metadatas: Vec<&'a RowGroupMetaData>, + pub(crate) arrow_schema: &'a Schema, + pub(crate) missing_null_counts_as_zero: bool, } impl<'a> RowGroupPruningStatistics<'a> { diff --git a/datafusion/datasource-parquet/src/selectivity.rs b/datafusion/datasource-parquet/src/selectivity.rs new file mode 100644 index 0000000000000..3c700a62cdacf --- /dev/null +++ b/datafusion/datasource-parquet/src/selectivity.rs @@ -0,0 +1,2616 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Adaptive filter selectivity tracking for Parquet row filters. +//! +//! See [`SelectivityTracker`] for the main entry point, `FilterState` for the +//! per-filter lifecycle, `PartitionedFilters` for the output consumed by +//! `ParquetOpener::open`, and [`FilterId`] for stable filter identification. + +use arrow::array::BooleanArray; +use arrow::datatypes::SchemaRef; +use log::debug; +use parking_lot::{Mutex, RwLock}; +use parquet::file::metadata::ParquetMetaData; +use parquet::schema::types::SchemaDescriptor; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; + +use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr_common::physical_expr::{ + OptionalFilterPhysicalExpr, PhysicalExpr, snapshot_generation, +}; + +/// Window size for the per-batch scatter analysis fed to +/// [`count_skippable_bytes`]. Approximates a parquet data page so that +/// "windows with zero survivors" tracks "pages a row-level decoder +/// could skip". Hardcoded for now; making this configurable (or +/// deriving it from per-row-group page metadata) is a natural follow-up. +pub(crate) const SKIP_WINDOW_ROWS: usize = 8192; + +/// Compute the bytes that late-materialization can plausibly skip for a +/// batch given the predicate output `bool_arr` and the total non-filter +/// projection bytes for that batch. +/// +/// Splits `bool_arr` into [`SKIP_WINDOW_ROWS`]-sized windows; each window +/// with zero survivors represents a page-sized chunk whose +/// other-projection columns the row-level decoder can skip outright. +/// Returns `total_other_bytes × (empty_windows / total_windows)` — +/// scatter-discounted skippable bytes. +/// +/// Interpretation depends on which side calls this: +/// +/// - **Post-scan path**: a *prediction* of bytes-saved-per-sec the +/// row-level path would achieve. The bool_arr we see is over the wide +/// batch in the same row order the decoder would emit, so for single- +/// predicate filters the prediction is faithful (modulo `W` matching +/// the actual parquet page size). +/// +/// - **Row-level path**: a conservative *measurement* of what the +/// decoder actually skipped — within-window RowSelection narrowing is +/// an additional uncounted bonus. So at row-level this is a *lower +/// bound* of real savings, which is the safe direction for the +/// demote-or-not decision. +// Consumed by the adaptive parquet scan, added later in this stack. +#[expect(dead_code)] +pub(crate) fn count_skippable_bytes( + bool_arr: &BooleanArray, + total_other_bytes: u64, +) -> u64 { + let n = bool_arr.len(); + if n == 0 || total_other_bytes == 0 { + return 0; + } + // Short-circuit on the two extremes: avoids a redundant per-window + // SIMD scan over the same buffer when the answer is already + // determined by the batch-level total. The whole helper otherwise + // costs ~2× per-batch `true_count` for nothing. + let total_matched = bool_arr.true_count(); + if total_matched == 0 { + // Every window empty: full skippable. + return total_other_bytes; + } + if total_matched == n { + // No window empty: nothing skippable. + return 0; + } + let total_windows = n.div_ceil(SKIP_WINDOW_ROWS); + if total_windows == 1 { + // One-window batch with mixed matches → not skippable. Avoids + // a wasted slice+`true_count`. + return 0; + } + let mut empty_windows: u64 = 0; + for i in 0..total_windows { + let start = i * SKIP_WINDOW_ROWS; + let len = SKIP_WINDOW_ROWS.min(n - start); + if bool_arr.slice(start, len).true_count() == 0 { + empty_windows += 1; + } + } + ((total_other_bytes as f64 * empty_windows as f64) / total_windows as f64) as u64 +} + +/// Stable identifier for a filter conjunct, assigned by `ParquetSource::with_predicate`. +pub type FilterId = usize; + +/// Per-filter lifecycle state in the adaptive filter system. +/// +/// State transitions: +/// - **(unseen)** → [`RowFilter`](Self::RowFilter) or [`PostScan`](Self::PostScan) +/// on first encounter in [`SelectivityTracker::partition_filters`]. +/// - [`PostScan`](Self::PostScan) → [`RowFilter`](Self::RowFilter) when +/// effectiveness ≥ `min_bytes_per_sec` and enough rows have been observed. +/// - [`RowFilter`](Self::RowFilter) → [`PostScan`](Self::PostScan) when +/// effectiveness is below threshold (mandatory filter). +/// - [`RowFilter`](Self::RowFilter) → [`Dropped`](Self::Dropped) when +/// effectiveness is below threshold and the filter is optional +/// ([`OptionalFilterPhysicalExpr`]). +/// - [`RowFilter`](Self::RowFilter) → [`PostScan`](Self::PostScan)/[`Dropped`](Self::Dropped) +/// on periodic re-evaluation if effectiveness drops below threshold after +/// CI upper bound drops below threshold. +/// - **Any state** → re-evaluated when a dynamic filter's +/// `snapshot_generation` changes. +#[derive(Debug, Clone, Copy, PartialEq)] +pub(crate) enum FilterState { + /// Currently a row filter. + RowFilter, + /// Currently a post-scan filter. + PostScan, + /// Dropped entirely (insufficient throughput and optional). + Dropped, +} + +/// Result of partitioning filters into row filters vs post-scan. +/// +/// Produced by [`SelectivityTracker::partition_filters`], consumed by +/// `ParquetOpener::open` to build row-level predicates and post-scan filters. +/// +/// Filters are partitioned based on their effectiveness threshold. +/// +/// This type is `pub` to support the [selectivity tracker benchmark +/// harness](../../benches/selectivity_tracker.rs); treat the layout as +/// unstable from outside the crate. +#[derive(Debug, Clone, Default)] +#[doc(hidden)] +pub struct PartitionedFilters { + /// Filters promoted past collection — individual chained ArrowPredicates + pub row_filters: Vec<(FilterId, Arc)>, + /// Filters demoted to post-scan (fast path only) + pub post_scan: Vec<(FilterId, Arc)>, +} + +/// Tracks selectivity statistics for a single filter expression. +#[derive(Debug, Clone, Default, Copy, PartialEq)] +struct SelectivityStats { + /// Number of rows that matched (passed) the filter + rows_matched: u64, + /// Total number of rows evaluated + rows_total: u64, + /// Cumulative evaluation time in nanoseconds + eval_nanos: u64, + /// Cumulative bytes across batches this filter has been evaluated on + bytes_seen: u64, + /// Welford's online algorithm: number of per-batch effectiveness samples + sample_count: u64, + /// Welford's online algorithm: running mean of per-batch effectiveness + eff_mean: f64, + /// Welford's online algorithm: running sum of squared deviations (M2) + eff_m2: f64, + /// Whether the underlying expression is wrapped in + /// `OptionalFilterPhysicalExpr`. Cached here (rather than re-checked + /// via [`is_optional_filter`] on every batch) so the per-batch hot + /// path in [`SelectivityTracker::update`] can skip the + /// SKIP_FLAG/CI-bound work entirely for non-optional filters with a + /// single field load on the already-held stats lock — no extra + /// HashMap or `RwLock::read()` per batch. + is_optional: bool, +} + +impl SelectivityStats { + /// Returns the cumulative effectiveness as an opaque ordering score + /// (higher = run first). + /// + /// Computed from `eff_mean` so it matches the Welford-tracked metric + /// fed to CI bounds: per-batch scatter-aware bytes-saved-per-second. + /// Callers should not assume the unit. + fn effectiveness(&self) -> Option { + if self.sample_count == 0 { + return None; + } + Some(self.eff_mean) + } + + /// Returns the lower bound of a confidence interval on mean effectiveness. + /// + /// Uses Welford's online variance to compute a one-sided CI: + /// `mean - z * stderr`. Returns `None` if fewer than 2 samples. + fn confidence_lower_bound(&self, confidence_z: f64) -> Option { + if self.sample_count < 2 { + return None; + } + let variance = self.eff_m2 / (self.sample_count - 1) as f64; + let stderr = (variance / self.sample_count as f64).sqrt(); + Some(self.eff_mean - confidence_z * stderr) + } + + /// Returns the upper bound of a confidence interval on mean effectiveness. + /// + /// Uses Welford's online variance: `mean + z * stderr`. + /// Returns `None` if fewer than 2 samples. + fn confidence_upper_bound(&self, confidence_z: f64) -> Option { + if self.sample_count < 2 { + return None; + } + let variance = self.eff_m2 / (self.sample_count - 1) as f64; + let stderr = (variance / self.sample_count as f64).sqrt(); + Some(self.eff_mean + confidence_z * stderr) + } + + /// Update stats with new observations. + /// + /// `skippable_bytes` is the caller's already-computed estimate of + /// non-filter projection bytes that late-materialization would + /// actually save for this batch — see [`count_skippable_bytes`] for + /// the windowed scatter calculation. The Welford accumulator tracks + /// `skippable_bytes × 1e9 / eval_nanos` (= scatter-aware + /// bytes-saved-per-second), which is what the promote/demote + /// gates compare against `min_bytes_per_sec`. + fn update( + &mut self, + matched: u64, + total: u64, + eval_nanos: u64, + skippable_bytes: u64, + ) { + self.rows_matched += matched; + self.rows_total += total; + self.eval_nanos += eval_nanos; + self.bytes_seen += skippable_bytes; + + if total > 0 && eval_nanos > 0 { + let batch_eff = skippable_bytes as f64 * 1e9 / eval_nanos as f64; + + self.sample_count += 1; + let delta = batch_eff - self.eff_mean; + self.eff_mean += delta / self.sample_count as f64; + let delta2 = batch_eff - self.eff_mean; + self.eff_m2 += delta * delta2; + } + } +} + +/// Immutable configuration for a [`SelectivityTracker`]. +/// +/// Use the builder methods to customise, then call [`build()`](TrackerConfig::build) +/// to produce a ready-to-use tracker. +#[doc(hidden)] +pub struct TrackerConfig { + /// Minimum bytes/sec throughput for promoting a filter (default: INFINITY = disabled). + pub min_bytes_per_sec: f64, + /// Byte-ratio threshold for initial filter placement (row-level vs post-scan). + /// Computed as `filter_compressed_bytes / projection_compressed_bytes`. + /// When low, the filter columns are small relative to the projection, + /// so row-level placement enables large late-materialization savings. + /// When high, the filter columns dominate the projection, so there's + /// little benefit from late materialization. + /// Default is 0.20. + pub byte_ratio_threshold: f64, + /// Z-score for confidence intervals on filter effectiveness. + /// Lower values (e.g. 1.0 or 0.0) will make the tracker more aggressive about promotion/demotion based on limited data. + /// Higher values (e.g. 3.0) will require more confidence before changing filter states. + /// Default is 2.0, corresponding to ~97.5% one-sided confidence. + /// Set to <= 0.0 to disable confidence intervals and promote/demote based on point estimates alone (not recommended). + /// Set to INFINITY to disable promotion entirely (overrides `min_bytes_per_sec`). + pub confidence_z: f64, + /// Initial-placement prior threshold: if per-conjunct row-group + /// statistics pruning prunes ≥ this fraction of the file's row + /// groups, place the filter at row-level on first encounter. Set + /// to >1.0 to disable the prior. Default 0.5. + pub prior_promote_threshold: f64, + /// Initial-placement prior threshold: if per-conjunct row-group + /// statistics pruning prunes ≤ this fraction of the file's row + /// groups, place the filter at post-scan on first encounter. Set + /// to <0.0 to disable the prior. Default 0.05. + pub prior_demote_threshold: f64, + /// Per-fetch latency baseline in milliseconds — at this average + /// per-fetch RTT the tracker uses the unmodified `confidence_z`. + /// Above this, `confidence_z` is shrunk proportionally so the + /// tracker becomes more aggressive about state changes when + /// per-request cost is high. 0.0 disables. Default 5.0. + pub latency_z_baseline_ms: f64, + /// Maximum scale factor for the latency-aware z shrink. Default 8.0. + pub latency_z_max_scale: f64, +} + +impl TrackerConfig { + pub fn new() -> Self { + Self { + min_bytes_per_sec: f64::INFINITY, + byte_ratio_threshold: 0.20, + confidence_z: 2.0, + prior_promote_threshold: 0.5, + prior_demote_threshold: 0.05, + latency_z_baseline_ms: 5.0, + latency_z_max_scale: 8.0, + } + } + + pub fn with_min_bytes_per_sec(mut self, v: f64) -> Self { + self.min_bytes_per_sec = v; + self + } + + pub fn with_byte_ratio_threshold(mut self, v: f64) -> Self { + self.byte_ratio_threshold = v; + self + } + + pub fn with_confidence_z(mut self, v: f64) -> Self { + self.confidence_z = v; + self + } + + pub fn with_prior_promote_threshold(mut self, v: f64) -> Self { + self.prior_promote_threshold = v; + self + } + + pub fn with_prior_demote_threshold(mut self, v: f64) -> Self { + self.prior_demote_threshold = v; + self + } + + pub fn with_latency_z_baseline_ms(mut self, v: f64) -> Self { + self.latency_z_baseline_ms = v; + self + } + + pub fn with_latency_z_max_scale(mut self, v: f64) -> Self { + self.latency_z_max_scale = v; + self + } + + pub fn build(self) -> SelectivityTracker { + SelectivityTracker { + config: self, + filter_stats: RwLock::new(HashMap::new()), + skip_flags: RwLock::new(HashMap::new()), + inner: Mutex::new(SelectivityTrackerInner::new()), + total_fetch_ns: AtomicU64::new(0), + total_fetches: AtomicU64::new(0), + } + } +} + +impl Default for TrackerConfig { + fn default() -> Self { + Self::new() + } +} + +/// Cross-file adaptive system that measures filter effectiveness and decides +/// which filters are promoted to row-level predicates (pushed into the Parquet +/// reader) vs. applied post-scan (demoted) or dropped entirely. +/// +/// # Locking design +/// +/// All locks are **private** to this struct — external callers cannot hold a +/// guard across expensive work, and all lock-holding code paths are auditable +/// in this file alone. +/// +/// State is split across two independent locks to minimise contention between +/// the hot per-batch `update()` path and the cold per-file-open +/// `partition_filters()` path: +/// +/// - **`filter_stats`** (`RwLock>>`) +/// — `update()` acquires a *shared read* lock on the outer map, then a +/// per-filter `Mutex` to increment counters. Multiple threads updating +/// *different* filters never contend at all; threads updating the *same* +/// filter serialize only on the cheap per-filter `Mutex` (~100 ns). +/// `partition_filters()` also takes a read lock here when it needs to +/// inspect stats for promotion/demotion decisions, so it never blocks +/// `update()` callers. The write lock is taken only briefly in Phase 2 +/// of `partition_filters()` to insert entries for newly-seen filter IDs. +/// +/// - **`inner`** (`Mutex`) — holds the filter +/// state-machine (`filter_states`) and dynamic-filter generation tracking. +/// Only `partition_filters()` acquires this lock (once per file open), so +/// concurrent `update()` calls are completely unaffected. +/// +/// ## Lock ordering (deadlock-free) +/// +/// Locks are always acquired in the order `inner` → `filter_stats` → +/// per-filter `Mutex`. Because `update()` never acquires `inner`, no +/// cycle is possible. +/// +/// ## Correctness of concurrent access +/// +/// `update()` may write stats while `partition_filters()` reads them for +/// promotion/demotion. Both hold a shared `filter_stats` read lock; the +/// per-filter `Mutex` ensures they do not interleave on the same filter's +/// stats. One proceeds first; the other sees a consistent (slightly newer +/// or older) snapshot. This is benign — the single-lock design that +/// preceded this split already allowed stats to change between consecutive +/// reads within `partition_filters()`. +/// +/// On promote/demote, `partition_filters()` zeros a filter's stats via the +/// per-filter `Mutex`. An `update()` running concurrently may write one +/// stale batch's worth of data to the freshly-zeroed stats; this is quickly +/// diluted by hundreds of correct-context batches and is functionally +/// identical to the old design where `update()` queued behind the write +/// lock and ran immediately after. +/// +/// # Filter state machine +/// +/// ```text +/// ┌─────────┐ +/// │ New │ +/// └─────────┘ +/// │ +/// ▼ +/// ┌────────────────────────┐ +/// │ Estimated Cost │ +/// │Bytes needed for filter │ +/// └────────────────────────┘ +/// │ +/// ┌──────────────────┴──────────────────┐ +/// ┌────────▼────────┐ ┌────────▼────────┐ +/// │ Post-scan │ │ Row filter │ +/// │ │ │ │ +/// └─────────────────┘ └─────────────────┘ +/// │ │ +/// ▼ ▼ +/// ┌─────────────────┐ ┌─────────────────┐ +/// │ Effectiveness │ │ Effectiveness │ +/// │ Bytes pruned │ │ Bytes pruned │ +/// │ per │ │ per │ +/// │Second of compute│ │Second of compute│ +/// └─────────────────┘ └─────────────────┘ +/// │ │ +/// └──────────────────┬──────────────────┘ +/// ▼ +/// ┌───────────────────────────────────────────────┐ +/// │ New Scan │ +/// │ Move filters based on effectiveness. │ +/// │ Promote (move post-scan -> row filter). │ +/// │ Demote (move row-filter -> post-scan). │ +/// │ Disable (for optional filters; either row │ +/// │ filter or disabled). │ +/// └───────────────────────────────────────────────┘ +/// │ +/// ┌──────────────────┴──────────────────┐ +/// ┌────────▼────────┐ ┌────────▼────────┐ +/// │ Post-scan │ │ Row filter │ +/// │ │ │ │ +/// └─────────────────┘ └─────────────────┘ +/// ``` +/// +/// See `TrackerConfig` for configuration knobs. +pub struct SelectivityTracker { + config: TrackerConfig, + /// Per-filter selectivity statistics, each individually `Mutex`-protected. + /// + /// The outer `RwLock` is almost always read-locked: both `update()` (hot, + /// per-batch) and `partition_filters()` (cold, per-file-open) only need + /// shared access to look up existing entries. The write lock is taken + /// only when `partition_filters()` inserts entries for newly-seen filter + /// IDs — a brief, infrequent operation. + /// + /// Each inner `Mutex` protects a single filter's + /// counters, so concurrent `update()` calls on *different* filters + /// proceed in parallel with zero contention. + /// Cumulative wall time spent inside `AsyncFileReader::get_byte_ranges` + /// across all openers using this tracker. + total_fetch_ns: AtomicU64, + /// Number of byte-range fetches recorded. + total_fetches: AtomicU64, + filter_stats: RwLock>>, + /// Per-filter "skip" flags — when set, the corresponding filter is + /// treated as a no-op by both the row-filter + /// (`DatafusionArrowPredicate::evaluate`) and the post-scan path + /// (`apply_post_scan_filters_with_stats`). This is the mid-stream + /// equivalent of dropping an optional filter: once the per-batch + /// `update()` path proves an `OptionalFilterPhysicalExpr` is + /// CPU-dominated and ineffective, it flips the flag and subsequent + /// batches stop paying the evaluation cost. The decoder still decodes + /// the filter columns (we cannot rebuild it mid-scan), so I/O is not + /// reclaimed; only the predicate evaluation is skipped. + /// + /// Only ever set for filters whose `is_optional` flag (cached on the + /// per-filter [`SelectivityStats`]) is `true` — mandatory filters + /// must always execute or queries return wrong rows. + skip_flags: RwLock>>, + /// Filter lifecycle state machine and dynamic-filter generation tracking. + /// + /// Only `partition_filters()` acquires this lock (once per file open). + /// `update()` never touches it, so the hot per-batch path is completely + /// decoupled from the cold state-machine path. + inner: Mutex, +} + +impl std::fmt::Debug for SelectivityTracker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SelectivityTracker") + .field("config.min_bytes_per_sec", &self.config.min_bytes_per_sec) + .finish() + } +} + +impl Default for SelectivityTracker { + fn default() -> Self { + Self::new() + } +} + +impl SelectivityTracker { + /// Create a new tracker with default settings (feature disabled). + pub fn new() -> Self { + TrackerConfig::new().build() + } + + /// Record one batch of `get_byte_ranges` activity (latency-aware z input). + pub fn record_fetch(&self, ranges: usize, elapsed_ns: u64) { + if ranges == 0 || elapsed_ns == 0 { + return; + } + self.total_fetch_ns.fetch_add(elapsed_ns, Ordering::Relaxed); + self.total_fetches + .fetch_add(ranges as u64, Ordering::Relaxed); + } + + fn avg_fetch_ms(&self) -> f64 { + let fetches = self.total_fetches.load(Ordering::Relaxed); + if fetches == 0 { + return 0.0; + } + let ns = self.total_fetch_ns.load(Ordering::Relaxed) as f64; + ns / fetches as f64 / 1_000_000.0 + } + + fn effective_z(&self) -> f64 { + let z = self.config.confidence_z; + if self.config.latency_z_baseline_ms <= 0.0 { + return z; + } + let avg = self.avg_fetch_ms(); + if avg <= self.config.latency_z_baseline_ms { + return z; + } + let factor = (avg / self.config.latency_z_baseline_ms) + .clamp(1.0, self.config.latency_z_max_scale); + z / factor + } + + /// Update stats for a filter after processing a batch. + /// + /// **Locking:** acquires `filter_stats.read()` (shared) then a per-filter + /// `Mutex`. Never touches `inner`, so this hot per-batch path cannot + /// contend with the cold per-file-open `partition_filters()` path. + /// + /// Silently skips unknown filter IDs (can occur if `update()` is called + /// before `partition_filters()` has registered the filter — in practice + /// this cannot happen because `partition_filters()` runs during file open + /// before any batches are processed). + /// + /// **Mid-stream drop:** after every `SKIP_FLAG_CHECK_INTERVAL`'th batch + /// we evaluate the CI upper bound; if it falls below + /// `min_bytes_per_sec` and the filter is wrapped in + /// `OptionalFilterPhysicalExpr`, we set the per-filter skip flag. + /// Subsequent calls to `DatafusionArrowPredicate::evaluate` (row-level) + /// and `apply_post_scan_filters_with_stats` (post-scan) observe the + /// flag and short-circuit their work for that filter. Mandatory + /// filters are never flagged because doing so would change the result + /// set. + #[doc(hidden)] + pub fn update( + &self, + id: FilterId, + matched: u64, + total: u64, + eval_nanos: u64, + batch_bytes: u64, + ) { + let stats_map = self.filter_stats.read(); + let Some(entry) = stats_map.get(&id) else { + return; + }; + let mut stats = entry.lock(); + stats.update(matched, total, eval_nanos, batch_bytes); + + // Fast path for non-optional filters: nothing else to do. The + // SKIP_FLAG mid-stream drop only applies to + // `OptionalFilterPhysicalExpr`-wrapped filters (hash-join / + // TopK dynamic), and `is_optional` is cached inline on + // `SelectivityStats` at filter registration so this is a single + // field load on the already-held lock. + if !stats.is_optional { + return; + } + + // Optional filter: do the SKIP_FLAG check every batch — there's + // no SKIP_FLAG_CHECK_INTERVAL gate here on purpose. We want + // join/TopK skip flags to fire as soon as stats prove the + // filter's selectivity has collapsed, even mid-row-group. The + // CI-bound calc is cheap arithmetic on already-locked stats. + if !self.config.min_bytes_per_sec.is_finite() { + return; + } + let z = self.effective_z(); + let Some(ub) = stats.confidence_upper_bound(z) else { + return; + }; + if ub >= self.config.min_bytes_per_sec { + return; + } + drop(stats); + drop(stats_map); + + if let Some(flag) = self.skip_flags.read().get(&id) + && !flag.swap(true, Ordering::Release) + { + debug!( + "FilterId {id}: mid-stream skip — CI upper bound {ub} < {} bytes/sec", + self.config.min_bytes_per_sec + ); + } + } + + /// Returns the shared skip flag for `id`, creating one if absent. + /// + /// Cloned into [`crate::row_filter::DatafusionArrowPredicate`] so the + /// row-filter path can short-circuit when the per-batch update path + /// decides the filter has stopped pulling its weight. The post-scan + /// path uses [`Self::is_filter_skipped`] instead — it does not need a + /// long-lived handle. + // Consumed by the adaptive parquet scan, added later in this stack. + #[expect(dead_code)] + pub(crate) fn skip_flag(&self, id: FilterId) -> Arc { + if let Some(existing) = self.skip_flags.read().get(&id) { + return Arc::clone(existing); + } + let mut write = self.skip_flags.write(); + Arc::clone( + write + .entry(id) + .or_insert_with(|| Arc::new(AtomicBool::new(false))), + ) + } + + /// Returns `true` when `id` has been mid-stream-dropped by the tracker. + /// + /// Cheap: a single `RwLock::read` plus an atomic load. Called from the + /// post-scan filter loop in `apply_post_scan_filters_with_stats`. + // Consumed by the adaptive parquet scan, added later in this stack. + #[expect(dead_code)] + pub(crate) fn is_filter_skipped(&self, id: FilterId) -> bool { + self.skip_flags + .read() + .get(&id) + .is_some_and(|f| f.load(Ordering::Acquire)) + } + + /// Partition filters into row-level predicates vs post-scan filters. + /// + /// Called once per file open (cold path). + /// + /// **Locking — two phases:** + /// 1. Acquires `inner` (exclusive) and `filter_stats` (shared read) for + /// all decision logic — promotion, demotion, initial placement, and + /// sorting by effectiveness. Because `filter_stats` is only + /// read-locked, concurrent `update()` calls proceed unblocked. + /// 2. If new filter IDs were seen, briefly acquires `filter_stats` (write) + /// to insert per-filter `Mutex` entries so that future `update()` calls + /// can find them. + #[doc(hidden)] + #[expect(clippy::too_many_arguments)] + pub fn partition_filters( + &self, + filters: Vec<(FilterId, Arc)>, + projection_columns: &std::collections::HashSet, + projection_scan_size: usize, + metadata: &ParquetMetaData, + arrow_schema: &SchemaRef, + parquet_schema: &SchemaDescriptor, + page_pruning_rates: &HashMap, + ) -> PartitionedFilters { + // Phase 1: inner.lock() + filter_stats.read() → all decision logic + let z_eff = self.effective_z(); + let mut guard = self.inner.lock(); + let stats_map = self.filter_stats.read(); + let result = guard.partition_filters( + filters, + projection_columns, + projection_scan_size, + metadata, + arrow_schema, + parquet_schema, + &self.config, + z_eff, + page_pruning_rates, + &stats_map, + ); + drop(stats_map); + drop(guard); + + // Phase 2: if new filters were seen, briefly acquire write locks + // to insert per-filter `Mutex` (with + // `is_optional` cached inline so the per-batch `update()` hot + // path can fast-return for mandatory filters) and an + // `AtomicBool` skip-flag (only consulted for optional filters). + if !result.new_optional_flags.is_empty() { + let mut stats_write = self.filter_stats.write(); + let mut skip_write = self.skip_flags.write(); + for (id, is_optional) in result.new_optional_flags { + stats_write.entry(id).or_insert_with(|| { + Mutex::new(SelectivityStats { + is_optional, + ..Default::default() + }) + }); + skip_write + .entry(id) + .or_insert_with(|| Arc::new(AtomicBool::new(false))); + } + } + + result.partitioned + } + + /// Test-only convenience that derives `arrow_schema` / `parquet_schema` + /// from the parquet metadata and forwards to the public + /// [`Self::partition_filters`]. Lets test code keep its existing call + /// sites without threading two more arguments through every test. + #[doc(hidden)] + pub fn partition_filters_for_test( + &self, + filters: Vec<(FilterId, Arc)>, + projection_columns: &std::collections::HashSet, + projection_scan_size: usize, + metadata: &ParquetMetaData, + ) -> PartitionedFilters { + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + let arrow_schema: SchemaRef = match parquet::arrow::parquet_to_arrow_schema( + parquet_schema.as_ref(), + None, + ) { + Ok(s) => Arc::new(s), + Err(_) => Arc::new(arrow::datatypes::Schema::empty()), + }; + self.partition_filters( + filters, + projection_columns, + projection_scan_size, + metadata, + &arrow_schema, + parquet_schema.as_ref(), + &HashMap::new(), + ) + } + + /// Test helper: ensure a stats entry exists for the given filter ID. + /// In production, `partition_filters()` inserts entries for new filters. + /// Tests that call `update()` without prior `partition_filters()` need this. + #[cfg(test)] + fn ensure_stats_entry(&self, id: FilterId) { + let map = self.filter_stats.read(); + if map.get(&id).is_none() { + drop(map); + self.filter_stats + .write() + .entry(id) + .or_insert_with(|| Mutex::new(SelectivityStats::default())); + } + } +} + +/// Internal result from [`SelectivityTrackerInner::partition_filters`]. +/// +/// Carries both the partitioned filters and the `(FilterId, is_optional)` +/// entries seen for the first time, so the outer +/// [`SelectivityTracker::partition_filters`] can insert per-filter +/// `Mutex` entries (with `is_optional` cached inline) +/// in a brief Phase 2 write lock. +struct PartitionResult { + partitioned: PartitionedFilters, + /// `(FilterId, is_optional)` entries observed for the first time in + /// this `partition_filters` call. + new_optional_flags: Vec<(FilterId, bool)>, +} + +/// Filter state-machine and generation tracking, guarded by the `Mutex` +/// inside [`SelectivityTracker`]. +/// +/// This struct intentionally does **not** contain per-filter stats — those +/// live in the separate `filter_stats` lock so that the hot `update()` path +/// can modify stats without acquiring this lock. Only the cold +/// `partition_filters()` path (once per file open) needs this lock. +#[derive(Debug)] +struct SelectivityTrackerInner { + /// Per-filter lifecycle state (RowFilter / PostScan / Dropped). + filter_states: HashMap, + /// Last-seen snapshot generation per filter, for detecting when a dynamic + /// filter's selectivity has changed (e.g. hash-join build side grew). + snapshot_generations: HashMap, +} + +impl SelectivityTrackerInner { + fn new() -> Self { + Self { + filter_states: HashMap::new(), + snapshot_generations: HashMap::new(), + } + } + + /// Check and update the snapshot generation for a filter. + fn note_generation( + &mut self, + id: FilterId, + generation: u64, + stats_map: &HashMap>, + ) { + if generation == 0 { + return; + } + match self.snapshot_generations.get(&id) { + Some(&prev_generation) if prev_generation == generation => {} + Some(_) => { + let current_state = self.filter_states.get(&id).copied(); + // Always reset stats since selectivity changed with new generation. + if let Some(entry) = stats_map.get(&id) { + *entry.lock() = SelectivityStats::default(); + } + self.snapshot_generations.insert(id, generation); + + // Optional/dynamic filters only get more selective over time + // (hash join build side accumulates more values). So if the + // filter was already working (RowFilter or PostScan), preserve + // its state. Only un-drop Dropped filters back to PostScan + // so they get another chance with the new selectivity. + if current_state == Some(FilterState::Dropped) { + debug!("FilterId {id} generation changed, un-dropping to PostScan"); + self.filter_states.insert(id, FilterState::PostScan); + } else { + debug!( + "FilterId {id} generation changed, resetting stats but preserving state {current_state:?}" + ); + } + } + None => { + self.snapshot_generations.insert(id, generation); + } + } + } + + /// Get the effectiveness for a filter by ID. + fn get_effectiveness_by_id( + &self, + id: FilterId, + stats_map: &HashMap>, + ) -> Option { + stats_map + .get(&id) + .and_then(|entry| entry.lock().effectiveness()) + } + + /// Demote a filter to post-scan or drop it entirely if optional. + fn demote_or_drop( + &mut self, + id: FilterId, + expr: &Arc, + post_scan: &mut Vec<(FilterId, Arc)>, + stats_map: &HashMap>, + ) { + if expr.downcast_ref::().is_none() { + self.filter_states.insert(id, FilterState::PostScan); + post_scan.push((id, Arc::clone(expr))); + // Reset stats for this filter so it can be re-evaluated as a post-scan filter. + if let Some(entry) = stats_map.get(&id) { + *entry.lock() = SelectivityStats::default(); + } + } else { + self.filter_states.insert(id, FilterState::Dropped); + } + } + + /// Promote a filter to row-level. + fn promote( + &mut self, + id: FilterId, + expr: Arc, + row_filters: &mut Vec<(FilterId, Arc)>, + stats_map: &HashMap>, + ) { + row_filters.push((id, expr)); + self.filter_states.insert(id, FilterState::RowFilter); + // Reset stats for this filter since it will be evaluated at row-level now. + if let Some(entry) = stats_map.get(&id) { + *entry.lock() = SelectivityStats::default(); + } + } + + /// Partition filters into collecting / promoted / post-scan buckets. + #[expect(clippy::too_many_arguments)] + fn partition_filters( + &mut self, + filters: Vec<(FilterId, Arc)>, + projection_columns: &std::collections::HashSet, + projection_scan_size: usize, + metadata: &ParquetMetaData, + arrow_schema: &SchemaRef, + parquet_schema: &SchemaDescriptor, + config: &TrackerConfig, + z_eff: f64, + page_pruning_rates: &HashMap, + stats_map: &HashMap>, + ) -> PartitionResult { + let mut new_optional_flags: Vec<(FilterId, bool)> = Vec::new(); + + // If min_bytes_per_sec is INFINITY -> all filters are post-scan. + if config.min_bytes_per_sec.is_infinite() { + debug!( + "Filter promotion disabled via min_bytes_per_sec=INFINITY; all {} filters post-scan", + filters.len() + ); + // Register all filter IDs so update() can find them + for (id, expr) in &filters { + if !stats_map.contains_key(id) { + new_optional_flags.push((*id, is_optional_filter(expr))); + } + } + return PartitionResult { + partitioned: PartitionedFilters { + row_filters: Vec::new(), + post_scan: filters, + }, + new_optional_flags, + }; + } + // If min_bytes_per_sec is 0 -> all filters are promoted. + if config.min_bytes_per_sec == 0.0 { + debug!( + "All filters promoted via min_bytes_per_sec=0; all {} filters row-level", + filters.len() + ); + // Register all filter IDs so update() can find them + for (id, expr) in &filters { + if !stats_map.contains_key(id) { + new_optional_flags.push((*id, is_optional_filter(expr))); + } + } + return PartitionResult { + partitioned: PartitionedFilters { + row_filters: filters, + post_scan: Vec::new(), + }, + new_optional_flags, + }; + } + + // Note snapshot generations for dynamic filter detection. + // This clears stats for any filter whose generation has changed since the last scan. + // This must be done before any other logic since it can change filter states and stats. + for &(id, ref expr) in &filters { + let generation = snapshot_generation(expr); + self.note_generation(id, generation, stats_map); + } + + // Separate into row filters and post-scan filters based on effectiveness and state. + let mut row_filters: Vec<(FilterId, Arc)> = Vec::new(); + let mut post_scan_filters: Vec<(FilterId, Arc)> = Vec::new(); + + // Use the latency-aware effective z (clamped to <= config.confidence_z). + let confidence_z = z_eff; + for (id, expr) in filters { + let state = self.filter_states.get(&id).copied(); + + let Some(state) = state else { + // New filter: decide initial placement. + // + // We start at row-level only when the filter pulls in a + // small amount of *extra* I/O — bytes for filter columns + // **not already in the user projection** — relative to the + // projection. These are the cases where the row-level + // I/O cost is bounded and late materialization on a + // selective filter is a clear win (think a small int + // column predicate against a heavy string projection). + // + // Two cases default to post-scan instead, with the + // tracker free to promote later if measured + // bytes-saved-per-sec exceeds `min_bytes_per_sec`: + // + // - `extra_bytes == 0`: filter cols are entirely in the + // projection (e.g. `WHERE col <> '' GROUP BY col`). + // There's no I/O to save; the only payoff is late + // materialization on the *non*-filter projection + // columns, which depends on selectivity we don't know + // yet. Empirically (ClickBench Q10/11/13/14/26) + // defaulting these to row-level loses to post-scan + // because predicate-cache eviction on heavy string + // columns means the filter column is decoded twice. + // + // - `byte_ratio > byte_ratio_threshold`: extra I/O is + // too high to justify before we have evidence the + // filter is selective. + // + // Pre-existing snapshot-generation handling + // ([`SelectivityTrackerInner::note_generation`]) keeps + // dynamic filters (hash-join, TopK) at post-scan when + // they re-arm with new values — those rely on row-group + // statistics pruning rather than row-level I/O savings, + // so post-scan is correct for them too. + let filter_columns: Vec = collect_columns(&expr) + .iter() + .map(|col| col.index()) + .collect(); + let extra_columns: Vec = filter_columns + .iter() + .copied() + .filter(|c| !projection_columns.contains(c)) + .collect(); + let extra_bytes = + crate::row_filter::total_compressed_bytes(&extra_columns, metadata); + let byte_ratio = if projection_scan_size > 0 { + extra_bytes as f64 / projection_scan_size as f64 + } else { + 1.0 + }; + + if !stats_map.contains_key(&id) { + new_optional_flags.push((id, is_optional_filter(&expr))); + } + + // Selectivity prior from page-index pruning that the + // opener already ran on this file (see + // `PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats`). + // No extra pruning work is done here — we just look up + // this filter's per-conjunct rate. When no rate is + // available (page index disabled, predicate not + // single-column, or schema mismatch), we fall back to + // the existing byte-ratio heuristic. + // + // **Dynamic-filter refresh**: when this conjunct is a + // populated DynamicFilter (snapshot_generation > 0) + // we evaluate a per-conjunct `PruningPredicate` against + // the file's row-group stats *now*, because the + // side-effect rates captured at file open were taken + // when the filter was still a placeholder. This is + // targeted re-evaluation — only for dynamic conjuncts + // that have updated since file open — so it doesn't + // count as an "extra pruning run" on the static path. + let dynamic_rate = if snapshot_generation(&expr) > 0 { + fresh_rate_for_dynamic_conjunct( + &expr, + arrow_schema, + parquet_schema, + metadata, + ) + } else { + None + }; + let prior = dynamic_rate.or_else(|| page_pruning_rates.get(&id).copied()); + + let row_level = match prior { + Some(p) if p >= config.prior_promote_threshold => { + debug!( + "FilterId {id}: New filter → Row filter via page-prior (pruned_rate={p:.3} >= {}) — {expr}", + config.prior_promote_threshold + ); + true + } + Some(p) if p <= config.prior_demote_threshold => { + debug!( + "FilterId {id}: New filter → Post-scan via page-prior (pruned_rate={p:.3} <= {}) — {expr}", + config.prior_demote_threshold + ); + false + } + _ => { + let r = + extra_bytes > 0 && byte_ratio <= config.byte_ratio_threshold; + debug!( + "FilterId {id}: New filter → {} via byte_ratio (byte_ratio={byte_ratio:.4}, extra_bytes={extra_bytes}, prior={prior:?}) — {expr}", + if r { "Row filter" } else { "Post-scan" } + ); + r + } + }; + + if row_level { + self.filter_states.insert(id, FilterState::RowFilter); + row_filters.push((id, expr)); + } else { + self.filter_states.insert(id, FilterState::PostScan); + post_scan_filters.push((id, expr)); + } + continue; + }; + + match state { + FilterState::RowFilter => { + // Should we demote this filter based on CI upper bound? + if let Some(entry) = stats_map.get(&id) { + let stats = entry.lock(); + if let Some(ub) = stats.confidence_upper_bound(confidence_z) + && ub < config.min_bytes_per_sec + { + drop(stats); + debug!( + "FilterId {id}: Row filter → Post-scan via CI upper bound {ub} < {} bytes/sec — {expr}", + config.min_bytes_per_sec + ); + self.demote_or_drop( + id, + &expr, + &mut post_scan_filters, + stats_map, + ); + continue; + } + } + // If not demoted, keep as row filter. + row_filters.push((id, expr)); + } + FilterState::PostScan => { + // Single gate: scatter-aware CI lower bound on + // bytes-saved-per-sec ≥ `min_bytes_per_sec`. + // + // The metric (see [`SelectivityStats::update`]) + // counts only sub-batch windows the filter empties + // out, so a 50% uniform filter scores ~0 and stays + // at post-scan; a TopK / hash-join / `Title LIKE` + // style filter where most batches drop entirely + // blows past the threshold. + // + // Earlier revisions also required `prune_rate ≥ 99%` + // on the theory that arrow-rs's row-level path + // double-decoded heavy string columns when the + // filter and projection overlapped. EXPLAIN ANALYZE + // on the ClickBench Q23 workload (URL LIKE + // `%google%`) showed the predicate cache is in fact + // active (`predicate_cache_inner_records=8.76M`) + // and the filter column is decoded once. The gate + // was removed; the residual ClickBench regressions + // we attributed to it (Q26 / Q31) trace to a + // different cause: post-scan filtering inside the + // opener changes batch-arrival order at downstream + // TopK, shifting the convergence point of TopK's + // dynamic filter and slightly weakening file-stats + // pruning. That has nothing to do with the + // promotion decision. + if let Some(entry) = stats_map.get(&id) { + let stats = entry.lock(); + if let Some(lb) = stats.confidence_lower_bound(confidence_z) + && lb >= config.min_bytes_per_sec + { + drop(stats); + debug!( + "FilterId {id}: Post-scan → Row filter via CI lower bound {lb} >= {} bytes/sec — {expr}", + config.min_bytes_per_sec + ); + self.promote(id, expr, &mut row_filters, stats_map); + continue; + } + } + // Should we drop this filter if it's optional and ineffective? + // Non-optional filters must stay as post-scan regardless. + if let Some(entry) = stats_map.get(&id) { + let stats = entry.lock(); + if let Some(ub) = stats.confidence_upper_bound(confidence_z) + && ub < config.min_bytes_per_sec + && expr.downcast_ref::().is_some() + { + drop(stats); + debug!( + "FilterId {id}: Post-scan → Dropped via CI upper bound {ub} < {} bytes/sec — {expr}", + config.min_bytes_per_sec + ); + self.filter_states.insert(id, FilterState::Dropped); + continue; + } + } + // Keep as post-scan filter (don't reset stats for mandatory filters). + post_scan_filters.push((id, expr)); + } + FilterState::Dropped => continue, + } + } + + // Sort row filters by: + // - Effectiveness (descending, higher = better) if available for both filters. + // - Scan size (ascending, cheapest first) as fallback — cheap filters prune + // rows before expensive ones, reducing downstream evaluation cost. + let cmp_row_filters = + |(id_a, expr_a): &(FilterId, Arc), + (id_b, expr_b): &(FilterId, Arc)| { + let eff_a = self.get_effectiveness_by_id(*id_a, stats_map); + let eff_b = self.get_effectiveness_by_id(*id_b, stats_map); + if let (Some(eff_a), Some(eff_b)) = (eff_a, eff_b) { + eff_b + .partial_cmp(&eff_a) + .unwrap_or(std::cmp::Ordering::Equal) + } else { + let size_a = filter_scan_size(expr_a, metadata); + let size_b = filter_scan_size(expr_b, metadata); + size_a.cmp(&size_b) + } + }; + row_filters.sort_by(cmp_row_filters); + // Post-scan filters: same logic (cheaper post-scan filters first to reduce + // the batch size for subsequent filters). + post_scan_filters.sort_by(cmp_row_filters); + + debug!( + "Partitioned filters: {} row-level, {} post-scan", + row_filters.len(), + post_scan_filters.len() + ); + PartitionResult { + partitioned: PartitionedFilters { + row_filters, + post_scan: post_scan_filters, + }, + new_optional_flags, + } + } +} + +/// Returns `true` if `expr` is wrapped in [`OptionalFilterPhysicalExpr`]. +fn is_optional_filter(expr: &Arc) -> bool { + expr.downcast_ref::().is_some() +} + +/// Calculate the estimated number of bytes needed to evaluate a filter based on the columns +/// it references as if it were applied to the entire file. +/// This is used for initial placement of new filters before any stats are available, and as a fallback for filters without stats. +fn filter_scan_size(expr: &Arc, metadata: &ParquetMetaData) -> usize { + let columns: Vec = collect_columns(expr) + .iter() + .map(|col| col.index()) + .collect(); + + crate::row_filter::total_compressed_bytes(&columns, metadata) +} + +// (Per-conjunct page-pruning rates are now extracted as a side-effect +// of the opener's existing page-index pruning pass — see +// `PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats`. +// `partition_filters` reads them through its `page_pruning_rates` +// parameter; no extra pruning runs happen on the static path.) + +/// Compute a fresh row-group pruning rate for a single dynamic +/// conjunct, evaluated against the file's row-group statistics +/// *now*. Used by `partition_filters` to refresh the prior for +/// dynamic filters that were placeholders when the side-effect +/// rates were captured at file open and have since been populated +/// by the join build side. +/// +/// Returns `None` when the conjunct doesn't translate into a +/// usable pruning predicate (e.g. always-true after rewriting, +/// references columns missing from the schema, contains +/// hash_lookup-style nodes the rewriter can't handle). +fn fresh_rate_for_dynamic_conjunct( + expr: &Arc, + arrow_schema: &SchemaRef, + parquet_schema: &SchemaDescriptor, + metadata: &ParquetMetaData, +) -> Option { + use datafusion_pruning::PruningPredicate; + // Unwrap OptionalFilterPhysicalExpr — pruning should evaluate + // the underlying predicate, not the marker. + let inner = if let Some(opt) = expr.downcast_ref::() { + opt.inner() + } else { + Arc::clone(expr) + }; + let groups = metadata.row_groups(); + if groups.is_empty() { + return None; + } + let stats = crate::row_group_filter::RowGroupPruningStatistics { + parquet_schema, + row_group_metadatas: groups.iter().collect(), + arrow_schema: arrow_schema.as_ref(), + missing_null_counts_as_zero: false, + }; + + // First try: build a PruningPredicate from the whole conjunct. + if let Ok(pp) = + PruningPredicate::try_new(Arc::clone(&inner), Arc::clone(arrow_schema)) + && !pp.always_true() + && let Ok(kept) = pp.prune(&stats) + && !kept.is_empty() + { + let total = kept.len(); + let pruned = total - kept.iter().filter(|b| **b).count(); + return Some(pruned as f64 / total as f64); + } + + // Second try (the AND-with-hash-lookup case): snapshot the + // dynamic filter to materialize its current inner expression, + // then split the AND inside. `split_conjunction` doesn't descend + // into DynamicFilterPhysicalExpr wrappers, so without this step + // the split would return `[dynamic_filter]` and miss the + // prunable parts inside. We take the *max* pruning rate across + // sub-parts as a *promote* signal — if any sub-conjunct prunes + // a high fraction, the whole AND prunes at least that much. We + // deliberately do NOT use this as a demote signal. + let snapshot_result = + datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt( + Arc::clone(&inner), + ) + .ok()?; + let snapshotted = snapshot_result.data; + let parts = datafusion_physical_expr::split_conjunction(&snapshotted); + if parts.len() < 2 { + return None; + } + let mut max_rate: Option = None; + for part in parts { + let Ok(pp) = + PruningPredicate::try_new(Arc::clone(part), Arc::clone(arrow_schema)) + else { + continue; + }; + if pp.always_true() { + continue; + } + let Ok(kept) = pp.prune(&stats) else { continue }; + if kept.is_empty() { + continue; + } + let total = kept.len(); + let pruned = total - kept.iter().filter(|b| **b).count(); + let rate = pruned as f64 / total as f64; + max_rate = Some(max_rate.map_or(rate, |m| m.max(rate))); + } + // Promote-only semantics: only return when the partial-AND rate + // is high enough to be a confident promote signal. Below that we + // return None and let the standard prior / byte-ratio fallback + // run, which won't be misled by an undercounted rate. + max_rate.filter(|&r| r >= 0.5) +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_physical_expr::expressions::Column; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::{ColumnChunkMetaData, FileMetaData, RowGroupMetaData}; + use parquet::schema::types::SchemaDescPtr; + use parquet::schema::types::Type as SchemaType; + use std::sync::Arc; + + mod helper_functions { + use super::*; + + /// Creates test ParquetMetaData with specified row groups and column sizes. + /// + /// # Arguments + /// * `specs` - Vec of (num_rows, vec![compressed_size]) tuples for each row group + pub fn create_test_metadata(specs: Vec<(i64, Vec)>) -> ParquetMetaData { + // Get the maximum number of columns from all specs + let num_columns = specs + .iter() + .map(|(_, sizes)| sizes.len()) + .max() + .unwrap_or(1); + let schema_descr = get_test_schema_descr_with_columns(num_columns); + + let row_group_metadata: Vec<_> = specs + .into_iter() + .map(|(num_rows, column_sizes)| { + let columns = column_sizes + .into_iter() + .enumerate() + .map(|(col_idx, size)| { + ColumnChunkMetaData::builder(schema_descr.column(col_idx)) + .set_num_values(num_rows) + .set_total_compressed_size(size as i64) + .build() + .unwrap() + }) + .collect(); + + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_column_metadata(columns) + .build() + .unwrap() + }) + .collect(); + + let total_rows: i64 = row_group_metadata.iter().map(|rg| rg.num_rows()).sum(); + let file_metadata = + FileMetaData::new(1, total_rows, None, None, schema_descr.clone(), None); + + ParquetMetaData::new(file_metadata, row_group_metadata) + } + + /// Creates a simple column expression with given name and index. + pub fn col_expr(name: &str, index: usize) -> Arc { + Arc::new(Column::new(name, index)) + } + + /// Create schema with specified number of columns, each named "a", "b", etc. + pub fn get_test_schema_descr_with_columns(num_columns: usize) -> SchemaDescPtr { + use parquet::basic::LogicalType; + + let fields: Vec<_> = (0..num_columns) + .map(|i| { + let col_name = format!("{}", (b'a' + i as u8) as char); + SchemaType::primitive_type_builder( + &col_name, + PhysicalType::BYTE_ARRAY, + ) + .with_logical_type(Some(LogicalType::String)) + .build() + .unwrap() + }) + .map(Arc::new) + .collect(); + + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } + } + + mod selectivity_stats_tests { + use super::*; + + #[test] + fn test_effectiveness_basic_calculation() { + let mut stats = SelectivityStats::default(); + + // skippable_bytes is now caller-computed (= rows_pruned * + // bytes_per_row in the simple case), so passing 5000 directly + // models the same scenario the old test described: + // "100 rows total, 50 pruned, 100 bytes/row → 5000 saved". + stats.update(50, 100, 1_000_000_000, 5_000); + + let eff = stats.effectiveness().unwrap(); + assert!((eff - 5000.0).abs() < 0.1); + } + + #[test] + fn test_effectiveness_zero_rows_total() { + let mut stats = SelectivityStats::default(); + stats.update(0, 0, 1_000_000_000, 10_000); + + assert_eq!(stats.effectiveness(), None); + } + + #[test] + fn test_effectiveness_zero_eval_nanos() { + let mut stats = SelectivityStats::default(); + stats.update(50, 100, 0, 10_000); + + assert_eq!(stats.effectiveness(), None); + } + + #[test] + fn test_effectiveness_zero_bytes_seen() { + // A batch with zero skippable_bytes is a legitimate sample + // ("filter ran, late-mat had nothing to save") — Welford + // records it as eff=0 rather than discarding it, so the + // demotion path can see "CPU spent, no payoff." + let mut stats = SelectivityStats::default(); + stats.update(50, 100, 1_000_000_000, 0); + + assert_eq!(stats.effectiveness(), Some(0.0)); + } + + #[test] + fn test_effectiveness_all_rows_matched() { + let mut stats = SelectivityStats::default(); + // All rows matched (no pruning) — caller computes + // skippable_bytes = rows_pruned * bytes_per_row = 0. + stats.update(100, 100, 1_000_000_000, 0); + + let eff = stats.effectiveness().unwrap(); + assert_eq!(eff, 0.0); + } + + #[test] + fn test_confidence_bounds_single_sample() { + let mut stats = SelectivityStats::default(); + stats.update(50, 100, 1_000_000_000, 10_000); + + // Single sample returns None for confidence bounds + assert_eq!(stats.confidence_lower_bound(2.0), None); + assert_eq!(stats.confidence_upper_bound(2.0), None); + } + + #[test] + fn test_welford_identical_samples() { + let mut stats = SelectivityStats::default(); + + // Add two identical samples + stats.update(50, 100, 1_000_000_000, 10_000); + stats.update(50, 100, 1_000_000_000, 10_000); + + // Variance should be 0 + assert_eq!(stats.sample_count, 2); + let lb = stats.confidence_lower_bound(2.0).unwrap(); + let ub = stats.confidence_upper_bound(2.0).unwrap(); + + // Both should be equal to the mean since variance is 0 + assert!((lb - ub).abs() < 0.01); + } + + #[test] + fn test_welford_variance_calculation() { + let mut stats = SelectivityStats::default(); + + // Add samples that produce effectiveness values 5000, 6000, 7000 + // (caller-computed skippable_bytes is the lever now). + stats.update(50, 100, 1_000_000_000, 5_000); // eff = 5000 + stats.update(40, 100, 1_000_000_000, 6_000); // eff = 6000 + stats.update(30, 100, 1_000_000_000, 7_000); // eff = 7000 + + // We should have 3 samples + assert_eq!(stats.sample_count, 3); + + // Mean should be 6000 + assert!((stats.eff_mean - 6000.0).abs() < 1.0); + + // Both bounds should be defined + let lb = stats.confidence_lower_bound(1.0).unwrap(); + let ub = stats.confidence_upper_bound(1.0).unwrap(); + + assert!(lb < stats.eff_mean); + assert!(ub > stats.eff_mean); + } + + #[test] + fn test_confidence_bounds_asymmetry() { + let mut stats = SelectivityStats::default(); + + stats.update(50, 100, 1_000_000_000, 10_000); + stats.update(40, 100, 1_000_000_000, 10_000); + + let lb = stats.confidence_lower_bound(2.0).unwrap(); + let ub = stats.confidence_upper_bound(2.0).unwrap(); + + // Bounds should be symmetric around the mean + let lower_dist = stats.eff_mean - lb; + let upper_dist = ub - stats.eff_mean; + + assert!((lower_dist - upper_dist).abs() < 0.01); + } + + #[test] + fn test_welford_incremental_vs_batch() { + // Create two identical stats objects + let mut stats_incremental = SelectivityStats::default(); + let mut stats_batch = SelectivityStats::default(); + + // Incremental: add one at a time + stats_incremental.update(50, 100, 1_000_000_000, 10_000); + stats_incremental.update(40, 100, 1_000_000_000, 10_000); + stats_incremental.update(30, 100, 1_000_000_000, 10_000); + + // Batch: simulate batch update (all at once) + stats_batch.update(120, 300, 3_000_000_000, 30_000); + + // Both should produce the same overall statistics + assert_eq!(stats_incremental.rows_total, stats_batch.rows_total); + assert_eq!(stats_incremental.rows_matched, stats_batch.rows_matched); + + // Means should be close + assert!((stats_incremental.eff_mean - stats_batch.eff_mean).abs() < 100.0); + } + + #[test] + fn test_effectiveness_numerical_stability() { + let mut stats = SelectivityStats::default(); + + // Test with large values to ensure numerical stability + stats.update( + 500_000_000, + 1_000_000_000, + 10_000_000_000_000, + 1_000_000_000_000, + ); + + let eff = stats.effectiveness(); + assert!(eff.is_some()); + assert!(eff.unwrap() > 0.0); + assert!(!eff.unwrap().is_nan()); + assert!(!eff.unwrap().is_infinite()); + } + } + + mod tracker_config_tests { + use super::*; + + #[test] + fn test_default_config() { + let config = TrackerConfig::default(); + + assert!(config.min_bytes_per_sec.is_infinite()); + assert_eq!(config.byte_ratio_threshold, 0.20); + assert_eq!(config.confidence_z, 2.0); + } + + #[test] + fn test_with_min_bytes_per_sec() { + let config = TrackerConfig::new().with_min_bytes_per_sec(1000.0); + + assert_eq!(config.min_bytes_per_sec, 1000.0); + } + + #[test] + fn test_with_byte_ratio_threshold() { + let config = TrackerConfig::new().with_byte_ratio_threshold(0.5); + + assert_eq!(config.byte_ratio_threshold, 0.5); + } + + #[test] + fn test_with_confidence_z() { + let config = TrackerConfig::new().with_confidence_z(3.0); + + assert_eq!(config.confidence_z, 3.0); + } + + #[test] + fn test_builder_chain() { + let config = TrackerConfig::new() + .with_min_bytes_per_sec(500.0) + .with_byte_ratio_threshold(0.3) + .with_confidence_z(1.5); + + assert_eq!(config.min_bytes_per_sec, 500.0); + assert_eq!(config.byte_ratio_threshold, 0.3); + assert_eq!(config.confidence_z, 1.5); + } + + #[test] + fn test_build_creates_tracker() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(1000.0).build(); + + // Tracker should be created and functional + assert_eq!(tracker.config.min_bytes_per_sec, 1000.0); + } + } + + mod state_machine_tests { + use super::helper_functions::*; + use super::*; + + #[test] + fn test_initial_placement_low_byte_ratio() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.2) + .build(); + + // Create metadata: 1 row group, 100 rows, 1000 bytes for column + let metadata = create_test_metadata(vec![(100, vec![1000])]); + + // Filter using column 0 (1000 bytes out of 1000 projection = 100% ratio > 0.2) + // So this should be placed in post-scan initially + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // With 100% byte ratio, should go to post-scan + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 1); + } + + #[test] + fn test_initial_placement_filter_in_projection_low_ratio() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) + .build(); + + // Create metadata: 1 row group, 100 rows, 100 bytes for column + let metadata = create_test_metadata(vec![(100, vec![100])]); + + // Filter using column 0 which IS in the projection. + // filter_bytes=100, projection=1000, ratio=0.10 <= 0.5 → RowFilter + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_initial_placement_high_byte_ratio() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) + .build(); + + // Create metadata: 1 row group, 100 rows, 100 bytes for column + let metadata = create_test_metadata(vec![(100, vec![100])]); + + // Filter using column 0 (100 bytes / 1000 projection = 10% ratio <= 0.5) + // So this should be placed in row-filter immediately + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // With 10% byte ratio, should go to row-filter + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_min_bytes_per_sec_infinity_disables_promotion() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(f64::INFINITY) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // All filters should go to post_scan when min_bytes_per_sec is INFINITY + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 1); + } + + #[test] + fn test_min_bytes_per_sec_zero_promotes_all() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(0.0).build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // All filters should be promoted to row_filters when min_bytes_per_sec is 0 + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_promotion_via_confidence_lower_bound() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) // Force to PostScan initially + .with_confidence_z(0.5) // Lower z for easier promotion + .build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // First partition: goes to PostScan (high byte ratio) + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.post_scan.len(), 1); + assert_eq!(result.row_filters.len(), 0); + + // Feed high effectiveness stats + for _ in 0..5 { + tracker.update(1, 1, 100, 100_000, 1000); // high effectiveness + } + + // Second partition: should be promoted to RowFilter + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_demotion_via_confidence_upper_bound() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(10000.0) + .with_byte_ratio_threshold(0.1) // Force to RowFilter initially + .with_confidence_z(0.5) // Lower z for easier demotion + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // First partition: goes to RowFilter (low byte ratio) + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + + // Feed low effectiveness stats — all rows matched, no rows + // pruned, so caller-computed skippable_bytes is 0. + for _ in 0..5 { + tracker.update(1, 100, 100, 100_000, 0); + } + + // Second partition: should be demoted to PostScan + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 1); + } + + #[test] + fn test_demotion_resets_stats() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(10000.0) + .with_byte_ratio_threshold(0.1) + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Start as RowFilter + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Add stats — no pruning, so skippable_bytes = 0 + tracker.update(1, 100, 100, 100_000, 0); + tracker.update(1, 100, 100, 100_000, 0); + + // Demote + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Stats should be zeroed after demotion + let stats_map = tracker.filter_stats.read(); + assert_eq!( + *stats_map.get(&1).unwrap().lock(), + SelectivityStats::default() + ); + } + + #[test] + fn test_promotion_resets_stats() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(100.0) + .with_byte_ratio_threshold(0.5) + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Start as PostScan + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Add stats with high prune_rate so the selectivity gate + // (>= 0.99) lets the promotion fire. + for _ in 0..3 { + tracker.update(1, 1, 100, 100_000, 1000); + } + + // Promote + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Stats should be zeroed after promotion + let stats_map = tracker.filter_stats.read(); + assert_eq!( + *stats_map.get(&1).unwrap().lock(), + SelectivityStats::default() + ); + } + + #[test] + fn test_optional_filter_dropping() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(10000.0) + .with_byte_ratio_threshold(0.5) + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Start as PostScan + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Feed poor effectiveness stats — no pruning, no skippable_bytes + for _ in 0..5 { + tracker.update(1, 100, 100, 100_000, 0); + } + + // Next partition: should stay as PostScan (not dropped because not optional) + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.post_scan.len(), 1); + assert_eq!(result.row_filters.len(), 0); + } + + #[test] + fn test_persistent_dropped_state() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(10000.0) + .with_byte_ratio_threshold(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Mark filter as dropped by manually setting state + tracker + .inner + .lock() + .filter_states + .insert(1, FilterState::Dropped); + + // On next partition, dropped filters should not reappear + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 0); + } + } + + mod filter_ordering_tests { + use super::helper_functions::*; + use super::*; + + #[test] + fn test_filters_get_partitioned() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1.0) // Very low threshold + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100, 100, 100])]); + let filters = vec![ + (1, col_expr("a", 0)), + (2, col_expr("a", 1)), + (3, col_expr("a", 2)), + ]; + + // Partition should process all filters + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // With min_bytes_per_sec=1.0, filters should be partitioned + assert!(result.row_filters.len() + result.post_scan.len() > 0); + + // Add stats and partition again + tracker.update(1, 60, 100, 1_000_000, 100); + tracker.update(2, 1, 100, 1_000_000, 100); + tracker.update(3, 40, 100, 1_000_000, 100); + + let result2 = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Filters should still be partitioned + assert!(result2.row_filters.len() + result2.post_scan.len() > 0); + } + + #[test] + fn test_filters_processed_without_stats() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1.0) // Very low threshold + .build(); + + // Different column sizes: 300, 200, 100 bytes + let metadata = create_test_metadata(vec![(100, vec![300, 200, 100])]); + let filters = vec![ + (1, col_expr("a", 0)), + (2, col_expr("a", 1)), + (3, col_expr("a", 2)), + ]; + + // First partition - no stats yet + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // All filters should be processed (partitioned into row/post-scan) + assert!(result.row_filters.len() + result.post_scan.len() > 0); + + // Filters should be consistent on repeated calls + let result2 = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!( + result.row_filters.len() + result.post_scan.len(), + result2.row_filters.len() + result2.post_scan.len() + ); + } + + #[test] + fn test_filters_with_partial_stats() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(1.0).build(); + + // Give filter 2 larger bytes so it's prioritized when falling back to byte ratio + let metadata = create_test_metadata(vec![(100, vec![100, 300, 100])]); + let filters = vec![ + (1, col_expr("a", 0)), + (2, col_expr("a", 1)), + (3, col_expr("a", 2)), + ]; + + // First partition + let result1 = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert!(result1.row_filters.len() + result1.post_scan.len() > 0); + + // Only add stats for filters 1 and 3, not 2 + tracker.update(1, 60, 100, 1_000_000, 100); + tracker.update(3, 60, 100, 1_000_000, 100); + + // Second partition with partial stats + let result2 = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert!(result2.row_filters.len() + result2.post_scan.len() > 0); + } + + #[test] + fn test_ordering_stability_with_identical_values() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(0.0).build(); + + let metadata = create_test_metadata(vec![(100, vec![100, 100, 100])]); + let filters = vec![ + (1, col_expr("a", 0)), + (2, col_expr("a", 1)), + (3, col_expr("a", 2)), + ]; + + let result1 = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + let result2 = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // Without stats and with identical byte sizes, order should be stable + assert_eq!(result1.row_filters[0].0, result2.row_filters[0].0); + assert_eq!(result1.row_filters[1].0, result2.row_filters[1].0); + assert_eq!(result1.row_filters[2].0, result2.row_filters[2].0); + } + } + + mod dynamic_filter_tests { + use super::helper_functions::*; + use super::*; + + #[test] + fn test_generation_zero_ignored() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + + // Create two filters with same ID but generation 0 and 1 + // Generation 0 should be ignored + let expr1 = col_expr("a", 0); + let filters1 = vec![(1, expr1)]; + + tracker.partition_filters_for_test( + filters1, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + tracker.update(1, 50, 100, 100_000, 1000); + + // Generation 0 doesn't trigger state reset + let snapshot_gen = tracker.inner.lock().snapshot_generations.get(&1).copied(); + assert_eq!(snapshot_gen, None); + } + + #[test] + fn test_generation_change_clears_stats() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) + .build(); + + // Pre-populate stats entry so update() can find it + tracker.ensure_stats_entry(1); + + // Initialize generation to 100 + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 100, &stats); + } + + // Add stats + tracker.update(1, 50, 100, 100_000, 1000); + tracker.update(1, 50, 100, 100_000, 1000); + + let stats_before = { + let stats_map = tracker.filter_stats.read(); + *stats_map.get(&1).unwrap().lock() != SelectivityStats::default() + }; + assert!(stats_before); + + // Simulate generation change to a different value + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 101, &stats); + } + + // Stats should be zeroed on generation change + let stats_after = { + let stats_map = tracker.filter_stats.read(); + *stats_map.get(&1).unwrap().lock() == SelectivityStats::default() + }; + assert!(stats_after); + } + + #[test] + fn test_generation_unchanged_preserves_stats() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(1000.0).build(); + + // Pre-populate stats entry so update() can find it + tracker.ensure_stats_entry(1); + + // Manually set generation + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 100, &stats); + } + + // Add stats + tracker.update(1, 50, 100, 100_000, 1000); + tracker.update(1, 50, 100, 100_000, 1000); + + let sample_count_before = { + let stats_map = tracker.filter_stats.read(); + stats_map.get(&1).map(|s| s.lock().sample_count) + }; + assert_eq!(sample_count_before, Some(2)); + + // Call note_generation with same generation + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 100, &stats); + } + + // Stats should be preserved + let sample_count_after = { + let stats_map = tracker.filter_stats.read(); + stats_map.get(&1).map(|s| s.lock().sample_count) + }; + assert_eq!(sample_count_after, Some(2)); + } + + #[test] + fn test_generation_change_preserves_state() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.1) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + + // First partition: goes to RowFilter + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + let state_before = tracker.inner.lock().filter_states.get(&1).copied(); + assert_eq!(state_before, Some(FilterState::RowFilter)); + + // Simulate generation change + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 100, &stats); + } + + // State should be preserved despite stats being cleared + let state_after = tracker.inner.lock().filter_states.get(&1).copied(); + assert_eq!(state_after, Some(FilterState::RowFilter)); + } + + #[test] + fn test_generation_change_undrops_dropped_filter() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.1) + .build(); + + // Manually set filter state to Dropped + tracker + .inner + .lock() + .filter_states + .insert(1, FilterState::Dropped); + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 100, &stats); + } + + // Simulate generation change + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 101, &stats); + } + + // Dropped filter should be un-dropped to PostScan + let state_after = tracker.inner.lock().filter_states.get(&1).copied(); + assert_eq!(state_after, Some(FilterState::PostScan)); + } + + #[test] + fn test_multiple_filters_independent_generation_tracking() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(1000.0).build(); + + // Pre-populate stats entries so update() can find them + tracker.ensure_stats_entry(1); + tracker.ensure_stats_entry(2); + + // Set generations for multiple filters + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 100, &stats); + inner.note_generation(2, 200, &stats); + } + + // Add stats to both + tracker.update(1, 50, 100, 100_000, 1000); + tracker.update(2, 50, 100, 100_000, 1000); + + // Change generation of filter 1 only + { + let mut inner = tracker.inner.lock(); + let stats = tracker.filter_stats.read(); + inner.note_generation(1, 101, &stats); + } + + // Filter 1 stats should be zeroed, filter 2 preserved + let stats_map = tracker.filter_stats.read(); + assert_eq!( + *stats_map.get(&1).unwrap().lock(), + SelectivityStats::default() + ); + assert_ne!( + *stats_map.get(&2).unwrap().lock(), + SelectivityStats::default() + ); + } + } + + mod integration_tests { + use super::helper_functions::*; + use super::*; + + #[test] + fn test_full_promotion_lifecycle() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(500.0) + .with_byte_ratio_threshold(0.5) // Force initial PostScan + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Step 1: Initial placement (PostScan) + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.post_scan.len(), 1); + assert_eq!(result.row_filters.len(), 0); + + // Step 2: Accumulate high effectiveness stats + for _ in 0..5 { + tracker.update(1, 1, 100, 100_000, 1000); // high effectiveness + } + + // Step 3: Promotion should occur + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + + // Step 4: Continue to partition without additional updates + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_full_demotion_lifecycle() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(10000.0) + .with_byte_ratio_threshold(0.1) // Force initial RowFilter + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Step 1: Initial placement (RowFilter) + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + + // Step 2: Accumulate low effectiveness stats — no pruning, + // so skippable_bytes = 0 + for _ in 0..5 { + tracker.update(1, 100, 100, 100_000, 0); + } + + // Step 3: Demotion should occur + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 1); + + // Step 4: Continue to partition without additional updates + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 1); + } + + #[test] + fn test_multiple_filters_mixed_states() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.4) // Force PostScan initially (500/1000=0.5 > 0.4) + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![500, 500])]); + let filters = vec![(1, col_expr("a", 0)), (2, col_expr("a", 1))]; + + // Initial partition: both go to PostScan (500/1000 = 0.5 > 0.4) + let result = tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.post_scan.len(), 2); + + // Filter 1: high effectiveness — 99/100 rows pruned out of + // 500 batch bytes ≈ 495 skippable bytes + for _ in 0..3 { + tracker.update(1, 1, 100, 100_000, 495); + } + + // Filter 2: low effectiveness — no rows pruned, so 0 skippable + for _ in 0..3 { + tracker.update(2, 100, 100, 100_000, 0); + } + + // Next partition: Filter 1 promoted, Filter 2 stays PostScan + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 1); + assert_eq!(result.row_filters[0].0, 1); + assert_eq!(result.post_scan[0].0, 2); + } + + #[test] + fn test_empty_filter_list() { + let tracker = TrackerConfig::new().build(); + let metadata = create_test_metadata(vec![(100, vec![1000])]); + let filters = vec![]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_single_filter() { + let tracker = TrackerConfig::new().with_min_bytes_per_sec(0.0).build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr)]; + + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + assert_eq!(result.row_filters.len(), 1); + assert_eq!(result.post_scan.len(), 0); + } + + #[test] + fn test_zero_effectiveness_stays_at_boundary() { + let tracker = TrackerConfig::new() + .with_min_bytes_per_sec(100.0) + .with_byte_ratio_threshold(0.1) + .with_confidence_z(0.5) + .build(); + + let metadata = create_test_metadata(vec![(100, vec![100])]); + let expr = col_expr("a", 0); + let filters = vec![(1, expr.clone())]; + + // Start as RowFilter + tracker.partition_filters_for_test( + filters.clone(), + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + + // All rows match (zero effectiveness) — no rows pruned, so + // skippable_bytes = 0 + for _ in 0..5 { + tracker.update(1, 100, 100, 100_000, 0); + } + + // Should demote due to CI upper bound being 0 + let result = tracker.partition_filters_for_test( + filters, + &std::collections::HashSet::new(), + 1000, + &metadata, + ); + assert_eq!(result.row_filters.len(), 0); + assert_eq!(result.post_scan.len(), 1); + } + + #[test] + fn test_confidence_z_parameter_stored() { + // Test that different confidence_z values are properly stored in config + let tracker_conservative = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) + .with_confidence_z(3.0) // Harder to promote + .build(); + + let tracker_aggressive = TrackerConfig::new() + .with_min_bytes_per_sec(1000.0) + .with_byte_ratio_threshold(0.5) + .with_confidence_z(0.5) // Easier to promote + .build(); + + // Verify configs are stored correctly + assert_eq!(tracker_conservative.config.confidence_z, 3.0); + assert_eq!(tracker_aggressive.config.confidence_z, 0.5); + + // The z-score affects confidence intervals during promotion/demotion decisions. + // With identical stats, higher z requires narrower confidence intervals, + // making promotion harder. With lower z, confidence intervals are wider, + // making promotion easier. This is tested in other integration tests + // that verify actual promotion/demotion behavior. + } + } +} diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 212dca6cd57b0..1def37601dd1b 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -724,6 +724,163 @@ pub fn is_volatile(expr: &Arc) -> bool { is_volatile } +/// A transparent wrapper that marks a [`PhysicalExpr`] as *optional* — i.e., +/// droppable without affecting query correctness. +/// +/// This is used for filters that are performance hints (e.g., dynamic join +/// filters) as opposed to mandatory predicates. The selectivity tracker can +/// detect this wrapper via `expr.as_any().downcast_ref::()` +/// and choose to drop the filter entirely when it is not cost-effective. +/// +/// All [`PhysicalExpr`] methods are delegated to the wrapped inner expression. +/// +/// Currently used by `HashJoinExec` for dynamic join filters. When the +/// selectivity tracker drops such a filter, the join still enforces +/// correctness independently — "dropped" simply means the filter is never +/// applied as a scan-time optimization. +#[derive(Debug)] +pub struct OptionalFilterPhysicalExpr { + inner: Arc, +} + +impl OptionalFilterPhysicalExpr { + /// Create a new optional filter wrapping the given expression. + pub fn new(inner: Arc) -> Self { + Self { inner } + } + + /// Returns a clone of the inner (unwrapped) expression. + pub fn inner(&self) -> Arc { + Arc::clone(&self.inner) + } +} + +impl Display for OptionalFilterPhysicalExpr { + /// Pass through to the inner expression. Surfacing the `Optional(..)` + /// wrapper in plan output would require updating dozens of sqllogictest + /// baselines for what is purely a runtime concept (the adaptive + /// scheduler's permission to drop this filter); plan readers don't need + /// to see it. + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl PartialEq for OptionalFilterPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + self.inner.as_ref() == other.inner.as_ref() + } +} + +impl Eq for OptionalFilterPhysicalExpr {} + +impl Hash for OptionalFilterPhysicalExpr { + fn hash(&self, state: &mut H) { + self.inner.as_ref().hash(state); + } +} + +impl PhysicalExpr for OptionalFilterPhysicalExpr { + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + self.inner.evaluate(batch) + } + + fn return_field(&self, input_schema: &Schema) -> Result { + self.inner.return_field(input_schema) + } + + fn evaluate_selection( + &self, + batch: &RecordBatch, + selection: &BooleanArray, + ) -> Result { + self.inner.evaluate_selection(batch, selection) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq_or_internal_err!( + children.len(), + 1, + "OptionalFilterPhysicalExpr: expected 1 child" + ); + Ok(Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone( + &children[0], + )))) + } + + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + self.inner.evaluate_bounds(children) + } + + fn propagate_constraints( + &self, + interval: &Interval, + children: &[&Interval], + ) -> Result>> { + self.inner.propagate_constraints(interval, children) + } + + #[expect(deprecated)] + fn evaluate_statistics(&self, children: &[&Distribution]) -> Result { + self.inner.evaluate_statistics(children) + } + + #[expect(deprecated)] + fn propagate_statistics( + &self, + parent: &Distribution, + children: &[&Distribution], + ) -> Result>> { + self.inner.propagate_statistics(parent, children) + } + + fn get_properties(&self, children: &[ExprProperties]) -> Result { + self.inner.get_properties(children) + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.fmt_sql(f) + } + + fn snapshot(&self) -> Result>> { + // Always unwrap the Optional wrapper for snapshot consumers (e.g. PruningPredicate). + // If inner has a snapshot, use it; otherwise return the inner directly. + Ok(Some(match self.inner.snapshot()? { + Some(snap) => snap, + None => Arc::clone(&self.inner), + })) + } + + fn snapshot_generation(&self) -> u64 { + // The wrapper itself is not dynamic; tree-walking picks up + // inner's generation via children(). + 0 + } + + fn is_volatile_node(&self) -> bool { + self.inner.is_volatile_node() + } + + fn placement(&self) -> ExpressionPlacement { + self.inner.placement() + } +} + #[cfg(test)] mod test { use crate::physical_expr::PhysicalExpr; @@ -731,6 +888,7 @@ mod test { use arrow::datatypes::{DataType, Schema}; use datafusion_expr_common::columnar_value::ColumnarValue; use std::fmt::{Display, Formatter}; + use std::hash::{Hash, Hasher}; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] @@ -905,4 +1063,104 @@ mod test { &BooleanArray::from(vec![true; 5]), ); } + + #[test] + fn test_optional_filter_downcast() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(&inner))); + + // Can downcast to detect the wrapper + let as_physical: Arc = optional; + assert!( + as_physical + .downcast_ref::() + .is_some() + ); + + // Inner expr is NOT detectable as optional + assert!(inner.downcast_ref::().is_none()); + } + + #[test] + fn test_optional_filter_delegates_evaluate() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = OptionalFilterPhysicalExpr::new(Arc::clone(&inner)); + + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 5) }; + let result = optional.evaluate(&batch).unwrap(); + let array = result.to_array(5).unwrap(); + assert_eq!(array.len(), 5); + } + + #[test] + fn test_optional_filter_children_and_with_new_children() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(&inner))); + + // children() returns the inner + let children = optional.children(); + assert_eq!(children.len(), 1); + + // with_new_children preserves the wrapper + let new_inner: Arc = Arc::new(TestExpr {}); + let rewrapped = Arc::clone(&optional) + .with_new_children(vec![new_inner]) + .unwrap(); + assert!( + rewrapped + .downcast_ref::() + .is_some() + ); + } + + #[test] + fn test_optional_filter_inner() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = OptionalFilterPhysicalExpr::new(Arc::clone(&inner)); + + // inner() returns a clone of the wrapped expression + let unwrapped = optional.inner(); + assert!(unwrapped.downcast_ref::().is_some()); + } + + #[test] + fn test_optional_filter_snapshot_generation_zero() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = OptionalFilterPhysicalExpr::new(inner); + + assert_eq!(optional.snapshot_generation(), 0); + } + + #[test] + fn test_optional_filter_eq_hash() { + use super::OptionalFilterPhysicalExpr; + use std::collections::hash_map::DefaultHasher; + + let inner1: Arc = Arc::new(TestExpr {}); + let inner2: Arc = Arc::new(TestExpr {}); + + let opt1 = OptionalFilterPhysicalExpr::new(inner1); + let opt2 = OptionalFilterPhysicalExpr::new(inner2); + + // Same inner type → equal + assert_eq!(opt1, opt2); + + // Same hash + let mut h1 = DefaultHasher::new(); + let mut h2 = DefaultHasher::new(); + opt1.hash(&mut h1); + opt2.hash(&mut h2); + assert_eq!(h1.finish(), h2.finish()); + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 865887d41e111..1edf45c550bf2 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -936,6 +936,8 @@ message PhysicalExprNode { PhysicalScalarSubqueryExprNode scalar_subquery = 22; PhysicalDynamicFilterNode dynamic_filter = 23; + + PhysicalOptionalFilterNode optional_filter = 24; } } @@ -947,6 +949,10 @@ message PhysicalDynamicFilterNode { bool is_complete = 5; } +message PhysicalOptionalFilterNode { + PhysicalExprNode inner = 1; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8639afd04a89..3a9c4d27a1321 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16972,6 +16972,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::DynamicFilter(v) => { struct_ser.serialize_field("dynamicFilter", v)?; } + physical_expr_node::ExprType::OptionalFilter(v) => { + struct_ser.serialize_field("optionalFilter", v)?; + } } } struct_ser.end() @@ -17022,6 +17025,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "scalarSubquery", "dynamic_filter", "dynamicFilter", + "optional_filter", + "optionalFilter", ]; #[allow(clippy::enum_variant_names)] @@ -17048,6 +17053,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { HashExpr, ScalarSubquery, DynamicFilter, + OptionalFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -17091,6 +17097,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), "scalarSubquery" | "scalar_subquery" => Ok(GeneratedField::ScalarSubquery), "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), + "optionalFilter" | "optional_filter" => Ok(GeneratedField::OptionalFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -17267,6 +17274,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("dynamicFilter")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) +; + } + GeneratedField::OptionalFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("optionalFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::OptionalFilter) ; } } @@ -18380,6 +18394,97 @@ impl<'de> serde::Deserialize<'de> for PhysicalNot { deserializer.deserialize_struct("datafusion.PhysicalNot", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalOptionalFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.inner.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalOptionalFilterNode", len)?; + if let Some(v) = self.inner.as_ref() { + struct_ser.serialize_field("inner", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalOptionalFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "inner", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Inner, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "inner" => Ok(GeneratedField::Inner), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalOptionalFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalOptionalFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut inner__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Inner => { + if inner__.is_some() { + return Err(serde::de::Error::duplicate_field("inner")); + } + inner__ = map_.next_value()?; + } + } + } + Ok(PhysicalOptionalFilterNode { + inner: inner__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalOptionalFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalPlanNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b742e82ea24ec..c661872454afb 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1336,7 +1336,7 @@ pub struct PhysicalExprNode { pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22, 23" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22, 23, 24" )] pub expr_type: ::core::option::Option, } @@ -1393,6 +1393,8 @@ pub mod physical_expr_node { ScalarSubquery(super::PhysicalScalarSubqueryExprNode), #[prost(message, tag = "23")] DynamicFilter(::prost::alloc::boxed::Box), + #[prost(message, tag = "24")] + OptionalFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1409,6 +1411,11 @@ pub struct PhysicalDynamicFilterNode { pub is_complete: bool, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalOptionalFilterNode { + #[prost(message, optional, boxed, tag = "1")] + pub inner: ::core::option::Option<::prost::alloc::boxed::Box>, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 43ebf0474320a..41807491bda79 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -63,6 +63,7 @@ use crate::{convert_required, protobuf}; use datafusion_physical_expr::expressions::{ DynamicFilterInner, DynamicFilterPhysicalExpr, }; +use datafusion_physical_expr_common::physical_expr::OptionalFilterPhysicalExpr; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -561,6 +562,16 @@ pub fn parse_physical_expr_with_converter( )); base_filter } + ExprType::OptionalFilter(optional_filter) => { + let inner = parse_required_physical_expr( + optional_filter.inner.as_deref(), + ctx, + "inner", + input_schema, + proto_converter, + )?; + Arc::new(OptionalFilterPhysicalExpr::new(inner)) + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 83c11cfc6b299..84df5acec73bb 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -34,6 +34,7 @@ use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; +use datafusion_physical_expr_common::physical_expr::OptionalFilterPhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr, @@ -569,6 +570,17 @@ pub fn serialize_physical_expr_with_converter( }), )), }) + } else if let Some(opt) = expr.downcast_ref::() { + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&opt.inner(), codec)?); + Ok(protobuf::PhysicalExprNode { + expr_id, + expr_type: Some(protobuf::physical_expr_node::ExprType::OptionalFilter( + Box::new(protobuf::PhysicalOptionalFilterNode { + inner: Some(inner_expr), + }), + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(value, &mut buf) { diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index be17f29eaafa0..334aed77a7b97 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -22,6 +22,6 @@ mod pruning_predicate; pub use file_pruner::FilePruner; pub use pruning_predicate::{ - PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, - UnhandledPredicateHook, build_pruning_predicate, + PerConjunctPruneStats, PredicateRewriter, PruningPredicate, PruningStatistics, + RequiredColumns, UnhandledPredicateHook, build_pruning_predicate, }; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 013a06812a13c..986c34e5d21a8 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -375,6 +375,51 @@ pub struct PruningPredicate { /// /// See [`PruningPredicate::literal_guarantees`] for more details. literal_guarantees: Vec, + /// Optional per-conjunct sub-predicates, populated when the + /// constructor splits a top-level AND into separate + /// `PruningPredicate`s. When present, [`Self::prune_per_conjunct`] + /// evaluates each sub-predicate to produce per-conjunct pruning + /// rates; ordinary [`Self::prune`] is unchanged. + /// + /// Only the leaves are populated — the sub-predicates themselves + /// have `sub_predicates: None`. + sub_predicates: Option>, +} + +/// A per-conjunct sub-predicate paired with an optional caller tag +/// (typically a `FilterId` chosen by the caller). Lives behind +/// [`PruningPredicate::sub_predicates`]. +#[derive(Debug, Clone)] +struct TaggedSubPredicate { + /// Caller tag (`None` when constructed without tagging). + tag: Option, + /// The per-conjunct PruningPredicate. + predicate: PruningPredicate, +} + +/// Result of [`PruningPredicate::prune_per_conjunct`] for one +/// sub-predicate. +#[derive(Debug, Clone, Default)] +pub struct PerConjunctPruneStats { + /// Caller tag (e.g. `FilterId`), `None` when constructed via + /// [`PruningPredicate::try_new`] without tagging. + pub tag: Option, + /// Number of containers (row groups) the sub-predicate was + /// evaluated against. + pub containers_seen: usize, + /// Number of containers this sub-predicate alone would prune. + pub containers_pruned: usize, +} + +impl PerConjunctPruneStats { + /// Pruning rate for this conjunct, or `None` when no containers + /// were evaluated. + pub fn pruning_rate(&self) -> Option { + if self.containers_seen == 0 { + return None; + } + Some(self.containers_pruned as f64 / self.containers_seen as f64) + } } /// Build a pruning predicate from an optional predicate expression. @@ -499,9 +544,77 @@ impl PruningPredicate { required_columns, orig_expr: expr, literal_guarantees, + sub_predicates: None, }) } + /// Like [`Self::try_new`] but takes already-split top-level + /// conjuncts each carrying a caller tag (typically a `FilterId`). + /// Builds one [`PruningPredicate`] per conjunct as a leaf + /// sub-predicate. The wrapper itself is a marker holding the + /// leaves; calls to [`Self::prune`] AND the leaves' results, + /// avoiding a redundant combined-predicate construction. + /// [`Self::prune_per_conjunct`] also reads from the same leaves. + /// + /// Conjuncts whose individual `try_new` would error or return + /// always-true are silently skipped (their tags do not appear in + /// the per-conjunct output). + pub fn try_new_tagged_conjuncts( + tagged: &[(usize, Arc)], + schema: SchemaRef, + ) -> Result { + // Build per-conjunct PruningPredicates (each is a leaf — i.e. + // its own `sub_predicates` is `None`). + let mut sub_predicates: Vec = Vec::new(); + for (tag, expr) in tagged { + match Self::try_new(Arc::clone(expr), Arc::clone(&schema)) { + Ok(p) if !p.always_true() => { + sub_predicates.push(TaggedSubPredicate { + tag: Some(*tag), + predicate: p, + }); + } + Ok(_) => { + // always-true: skip; leaves the tag unrepresented. + continue; + } + Err(e) => { + debug!("try_new_tagged_conjuncts: skipping conjunct {tag}: {e}"); + continue; + } + } + } + + // Build a marker wrapper. Its own `predicate_expr` is a + // literal `true` placeholder; `prune` is special-cased below + // to AND the leaves' results when `sub_predicates` is set. + let placeholder_expr: Arc = + Arc::new(phys_expr::Literal::new(ScalarValue::from(true))); + let combined_orig: Arc = if tagged.is_empty() { + Arc::clone(&placeholder_expr) + } else { + datafusion_physical_expr::conjunction( + tagged + .iter() + .map(|(_, e)| Arc::clone(e)) + .collect::>(), + ) + }; + let wrapper = Self { + schema, + predicate_expr: placeholder_expr, + required_columns: RequiredColumns::new(), + orig_expr: combined_orig, + literal_guarantees: Vec::new(), + sub_predicates: if sub_predicates.is_empty() { + None + } else { + Some(sub_predicates) + }, + }; + Ok(wrapper) + } + /// For each set of statistics, evaluates the pruning predicate /// and returns a `bool` with the following meaning for a /// all rows whose values match the statistics: @@ -520,6 +633,22 @@ impl PruningPredicate { &self, statistics: &S, ) -> Result> { + // If we're a tagged-conjunct wrapper (no own predicate_expr, + // just leaf sub_predicates), AND the leaves' results. + if let Some(sub_predicates) = &self.sub_predicates { + let n = statistics.num_containers(); + let mut combined = vec![true; n]; + for sub in sub_predicates { + let leaf = sub.predicate.prune(statistics)?; + for (i, val) in leaf.iter().enumerate() { + if i < combined.len() { + combined[i] = combined[i] && *val; + } + } + } + return Ok(combined); + } + let mut builder = BoolVecBuilder::new(statistics.num_containers()); // Try to prove the predicate can't be true for the containers based on @@ -568,6 +697,50 @@ impl PruningPredicate { Ok(builder.build()) } + /// Like [`Self::prune`] but also returns per-conjunct pruning + /// stats when this predicate was constructed via + /// [`Self::try_new_tagged_conjuncts`]. The `Vec` is the same + /// AND'd result `prune` would return; the per-conjunct stats are + /// computed by evaluating each leaf sub-predicate against the same + /// `statistics` and counting pruned/seen containers. + /// + /// Returns `(prune_result, vec![])` for predicates constructed via + /// the plain [`Self::try_new`] (no sub-predicates available). + pub fn prune_per_conjunct( + &self, + statistics: &S, + ) -> Result<(Vec, Vec)> { + let combined = self.prune(statistics)?; + let Some(sub_predicates) = &self.sub_predicates else { + return Ok((combined, Vec::new())); + }; + + let total_containers = statistics.num_containers(); + let mut per_conjunct: Vec = + Vec::with_capacity(sub_predicates.len()); + for sub in sub_predicates { + let kept = sub.predicate.prune(statistics)?; + let containers_seen = kept.len(); + let containers_pruned = containers_seen - kept.iter().filter(|b| **b).count(); + // Sanity: every sub-predicate evaluates against the same + // statistics shape, so `kept.len() == total_containers`. + // If the implementation drift breaks that, fall back to + // skipping this conjunct rather than panicking. + if containers_seen != total_containers { + debug!( + "prune_per_conjunct: sub-predicate seen={containers_seen} expected={total_containers}, skipping conjunct" + ); + continue; + } + per_conjunct.push(PerConjunctPruneStats { + tag: sub.tag, + containers_seen, + containers_pruned, + }); + } + Ok((combined, per_conjunct)) + } + /// Return a reference to the input schema pub fn schema(&self) -> &SchemaRef { &self.schema @@ -599,6 +772,13 @@ impl PruningPredicate { /// /// This can happen when a predicate is simplified to a constant `true` pub fn always_true(&self) -> bool { + // A tagged-conjunct wrapper is never always-true unless every + // leaf is (which can't happen — always-true leaves are dropped + // at construction). So when sub_predicates is Some and + // non-empty, return false. + if let Some(subs) = &self.sub_predicates { + return subs.is_empty(); + } is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty() } @@ -614,14 +794,29 @@ impl PruningPredicate { /// used in the predicate. For example, it can be used to avoid reading /// unneeded bloom filters (a non trivial operation). pub fn literal_columns(&self) -> Vec { + // For tagged-conjunct wrappers, union the leaves' columns — + // the wrapper's own `literal_guarantees` is empty (its + // `predicate_expr` is a literal-true placeholder) but + // downstream consumers (e.g. `ParquetOpener` deciding which + // bloom filters to fetch) need the full set. let mut seen = HashSet::new(); - self.literal_guarantees - .iter() - .map(|e| &e.column.name) - // avoid duplicates - .filter(|name| seen.insert(*name)) - .map(|s| s.to_string()) - .collect() + let mut out: Vec = Vec::new(); + if let Some(subs) = &self.sub_predicates { + for sub in subs { + for name in sub.predicate.literal_columns() { + if seen.insert(name.clone()) { + out.push(name); + } + } + } + } + for e in &self.literal_guarantees { + let name = &e.column.name; + if seen.insert(name.to_string()) { + out.push(name.to_string()); + } + } + out } }