Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 259 additions & 1 deletion datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,37 @@ pub struct PagePruningAccessPlanFilter {
/// single column predicates (e.g. (`col = 5`) extracted from the overall
/// predicate. Must all be true for a row to be included in the result.
predicates: Vec<PruningPredicate>,
/// Per-predicate tag (caller-supplied id, typically a `FilterId`).
/// `None` when the filter was constructed without tagging via
/// [`Self::new`]; `Some` when constructed via [`Self::new_tagged`].
/// The vector has the same length as `predicates`.
tags: Option<Vec<usize>>,
}

/// Per-conjunct accumulators surfaced by
/// [`PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats`].
/// One entry per kept predicate (in the same order as `predicates`).
#[derive(Clone, Debug, Default)]
pub struct PerConjunctPageStats {
/// Caller tag (e.g. FilterId) — `None` when the filter was built
/// untagged via [`PagePruningAccessPlanFilter::new`].
pub tag: Option<usize>,
/// Total rows in row groups where this conjunct was evaluated.
pub rows_seen: u64,
/// Rows the page index proved this conjunct alone would skip.
pub rows_skipped: u64,
}

impl PerConjunctPageStats {
/// Returns the per-conjunct page-pruning rate, or `None` when no
/// rows were evaluated (e.g. the file has no page index for this
/// column, or the predicate's converter couldn't be built).
pub fn pruning_rate(&self) -> Option<f64> {
if self.rows_seen == 0 {
return None;
}
Some(self.rows_skipped as f64 / self.rows_seen as f64)
}
}

impl PagePruningAccessPlanFilter {
Expand Down Expand Up @@ -148,7 +179,50 @@ impl PagePruningAccessPlanFilter {
Some(pp)
})
.collect::<Vec<_>>();
Self { predicates }
Self {
predicates,
tags: None,
}
}

/// Variant of [`Self::new`] that takes already-split conjuncts each
/// carrying a caller tag (usually a `FilterId`). Predicates that
/// fail the same single-column / non-trivial filtering as `new`
/// are dropped, but tags survive for the conjuncts that make it
/// through. Subsequent calls to
/// [`Self::prune_plan_with_per_conjunct_stats`] return per-conjunct
/// pruning stats keyed by tag.
pub fn new_tagged(
conjuncts: &[(usize, Arc<dyn PhysicalExpr>)],
schema: &SchemaRef,
) -> Self {
let mut predicates = Vec::with_capacity(conjuncts.len());
let mut tags = Vec::with_capacity(conjuncts.len());
for (id, expr) in conjuncts {
let pp = match PruningPredicate::try_new(Arc::clone(expr), Arc::clone(schema))
{
Ok(pp) => pp,
Err(e) => {
debug!(
"Ignoring error creating tagged page pruning predicate \
for filter id {id}: {e}"
);
continue;
}
};
if pp.always_true() {
continue;
}
if pp.required_columns().single_column().is_none() {
continue;
}
predicates.push(pp);
tags.push(*id);
}
Self {
predicates,
tags: Some(tags),
}
}

/// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
Expand Down Expand Up @@ -336,6 +410,190 @@ impl PagePruningAccessPlanFilter {
pub fn filter_number(&self) -> usize {
self.predicates.len()
}

/// Like [`Self::prune_plan_with_page_index`] but also surfaces, as a
/// side-effect of the pruning iteration that already runs, a
/// per-conjunct accumulator with the rows that conjunct alone
/// would have proven skippable. Callers use this to seed a
/// per-FilterId selectivity prior without doing any extra pruning
/// work — every page-index lookup that would have happened in
/// `prune_plan_with_page_index` happens exactly once here too.
pub fn prune_plan_with_per_conjunct_stats(
&self,
mut access_plan: ParquetAccessPlan,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
parquet_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> (ParquetAccessPlan, Vec<PerConjunctPageStats>, usize) {
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();

let mut per_conjunct: Vec<PerConjunctPageStats> = (0..self.predicates.len())
.map(|i| PerConjunctPageStats {
tag: self.tags.as_ref().and_then(|t| t.get(i).copied()),
rows_seen: 0,
rows_skipped: 0,
})
.collect();

if self.predicates.is_empty() {
return (access_plan, per_conjunct, 0);
}

let groups = parquet_metadata.row_groups();
if groups.is_empty() {
return (access_plan, per_conjunct, 0);
}

if parquet_metadata.offset_index().is_none()
|| parquet_metadata.column_index().is_none()
{
return (access_plan, per_conjunct, 0);
}

// Same accumulators as the untagged path, plus per-conjunct.
let mut total_skip = 0;
let mut total_select = 0;
let mut total_pages_skip = 0;
let mut total_pages_select = 0;
// Pages we skipped pruning for because row-group stats already
// proved the row group is fully matched — wasted work avoided,
// surfaced as a metric.
let mut total_pages_skipped_by_fully_matched = 0;

let row_group_indexes = access_plan.row_group_indexes();
for row_group_index in row_group_indexes {
// Skip page pruning for fully matched row groups: all rows are
// known to satisfy the predicate, so page-level pruning is
// wasted work. Still feed the rows into `rows_seen` per
// conjunct so per-FilterId pruning rates reflect the file's
// full row count rather than just the non-fully-matched part.
if access_plan.is_fully_matched(row_group_index) {
let page_count =
fully_matched_page_count(row_group_index, parquet_metadata);
total_pages_skipped_by_fully_matched += page_count;
let rg_rows = groups[row_group_index].num_rows() as u64;
for stats in per_conjunct.iter_mut() {
stats.rows_seen = stats.rows_seen.saturating_add(rg_rows);
}
continue;
}
let rg_rows = groups[row_group_index].num_rows() as u64;
let mut overall_selection = None;

let total_pages_in_group =
parquet_metadata.offset_index().map_or(0, |offset_index| {
offset_index[row_group_index]
.first()
.map_or(0, |column| column.page_locations.len())
});
// Intersection of per-conjunct matched pages, matching the
// untagged path's behavior so the page-level metric reflects
// the AND of all predicates rather than a per-conjunct sum.
let mut matched_pages_in_group: HashSet<usize> =
HashSet::from_iter(0..total_pages_in_group);

for (i, predicate) in self.predicates.iter().enumerate() {
per_conjunct[i].rows_seen =
per_conjunct[i].rows_seen.saturating_add(rg_rows);

let column = predicate
.required_columns()
.single_column()
.expect("Page pruning requires single column predicates");

let converter = match StatisticsConverter::try_new(
column.name(),
arrow_schema,
parquet_schema,
) {
Ok(c) => c,
Err(e) => {
debug!(
"Could not create statistics converter for column {}: {e}",
column.name()
);
continue;
}
};

let selection = prune_pages_in_one_row_group(
row_group_index,
predicate,
converter,
parquet_metadata,
file_metrics,
);

let Some((selection, page_match_flags)) = selection else {
continue;
};
let matched_pages_indexes: HashSet<_> = page_match_flags
.into_iter()
.enumerate()
.filter(|x| x.1)
.map(|x| x.0)
.collect();
matched_pages_in_group.retain(|x| matched_pages_indexes.contains(x));

// Per-conjunct skipped rows for this row group: anything
// the predicate's selection didn't include is something
// this conjunct alone proved skippable.
let kept_rows_for_conjunct = selection.row_count() as u64;
let skipped_rows_for_conjunct =
rg_rows.saturating_sub(kept_rows_for_conjunct);
per_conjunct[i].rows_skipped = per_conjunct[i]
.rows_skipped
.saturating_add(skipped_rows_for_conjunct);

overall_selection = update_selection(overall_selection, selection);

let selects_any = overall_selection
.as_ref()
.map(|sel| sel.selects_any())
.unwrap_or(true);
if !selects_any {
break;
}
}

let pages_matched = matched_pages_in_group.len();
total_pages_select += pages_matched;
total_pages_skip += total_pages_in_group - pages_matched;

if let Some(overall_selection) = overall_selection {
let rows_selected = overall_selection.row_count();
if rows_selected > 0 {
let rows_skipped = overall_selection.skipped_row_count();
total_skip += rows_skipped;
total_select += rows_selected;
access_plan.scan_selection(row_group_index, overall_selection);
} else {
let rows_skipped = groups[row_group_index].num_rows() as usize;
access_plan.skip(row_group_index);
total_skip += rows_skipped;
}
}
}

file_metrics.page_index_rows_pruned.add_pruned(total_skip);
file_metrics
.page_index_rows_pruned
.add_matched(total_select);
file_metrics
.page_index_pages_pruned
.add_pruned(total_pages_skip);
file_metrics
.page_index_pages_pruned
.add_matched(total_pages_select);

(
access_plan,
per_conjunct,
total_pages_skipped_by_fully_matched,
)
}
}

fn update_selection(
Expand Down
45 changes: 37 additions & 8 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,28 @@ impl RowGroupAccessPlanFilter {
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) {
self.prune_by_statistics_with_per_conjunct_stats(
arrow_schema,
parquet_schema,
groups,
predicate,
metrics,
);
}

/// Variant of [`Self::prune_by_statistics`] that also returns
/// per-conjunct pruning stats produced by
/// [`PruningPredicate::prune_per_conjunct`]. Returns an empty
/// `Vec` when the predicate was not constructed with tagged
/// conjuncts, so callers can ignore it on the untagged path.
pub fn prune_by_statistics_with_per_conjunct_stats(
&mut self,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) -> Vec<datafusion_pruning::PerConjunctPruneStats> {
// scoped timer updates on drop
let _timer_guard = metrics.statistics_eval_time.timer();

Expand All @@ -275,9 +297,14 @@ impl RowGroupAccessPlanFilter {
missing_null_counts_as_zero: true,
};

// try to prune the row groups in a single call
match predicate.prune(&pruning_stats) {
Ok(values) => {
let mut per_conjunct: Vec<datafusion_pruning::PerConjunctPruneStats> = Vec::new();

// try to prune the row groups in a single call (now also captures
// per-conjunct rates when the predicate was built with
// `try_new_tagged_conjuncts`).
match predicate.prune_per_conjunct(&pruning_stats) {
Ok((values, stats)) => {
per_conjunct = stats;
let mut fully_contained_candidates_original_idx: Vec<usize> = Vec::new();
for (idx, &value) in row_group_indexes.iter().zip(values.iter()) {
if !value {
Expand Down Expand Up @@ -305,6 +332,8 @@ impl RowGroupAccessPlanFilter {
metrics.predicate_evaluation_errors.add(1);
}
}

per_conjunct
}

/// Identifies row groups that are fully matched by the predicate.
Expand Down Expand Up @@ -607,11 +636,11 @@ impl PruningStatistics for BloomFilterStatistics {
}

/// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a SchemaDescriptor,
row_group_metadatas: Vec<&'a RowGroupMetaData>,
arrow_schema: &'a Schema,
missing_null_counts_as_zero: bool,
pub(crate) struct RowGroupPruningStatistics<'a> {
pub(crate) parquet_schema: &'a SchemaDescriptor,
pub(crate) row_group_metadatas: Vec<&'a RowGroupMetaData>,
pub(crate) arrow_schema: &'a Schema,
pub(crate) missing_null_counts_as_zero: bool,
}

impl<'a> RowGroupPruningStatistics<'a> {
Expand Down
Loading
Loading