[Experiment] Adaptive filter pushdown#22144
Conversation
Replaces PR #9's morsel-per-row-group split with in-decoder strategy swap. One `ParquetPushDecoder` per file, one `BoxStream` per file, filter placement re-evaluated at every row-group boundary using the shared `SelectivityTracker`. - The chunk loop (`ParquetAccessPlan::split_into_chunks`, `Vec<BoxStream>` returns from `build_stream`). - Per-chunk `AsyncFileReader::create_reader` minting and per-chunk `RowFilter` rebuild. - The `EarlyStoppingStream`-on-chunk-0-only special case for the non-`Clone` `FilePruner`. - `LazyMorselShared` per-morsel Arc churn — the source of the ~10% aggregate ClickBench regression you flagged in PR #9 review. `AdaptiveParquetStream` (new in `opener.rs`) drives one row group at a time via `try_next_reader`: 1. Pull a `ParquetRecordBatchReader` for the next row group. 2. Iterate it synchronously; each batch goes through any post-scan filters (which feed per-filter stats into the tracker) and then through the projector. 3. When the reader exhausts, ask the tracker to re-partition filters based on accumulated stats. If the row-filter set changed, build a new `RowFilter` and call the new arrow-rs `ParquetPushDecoder::swap_strategy` before requesting the next reader. Post-scan filters update in lockstep. `PushBuffers` carries through the swap so already-fetched bytes are preserved, and the optional-filter mid-stream skip mechanism (existing `OptionalFilterPhysicalExpr` + `tracker.is_filter_skipped`) keeps working unchanged inside `apply_post_scan_filters_with_stats`. - `selectivity.rs` — `SelectivityTracker`, `PartitionedFilters`, `FilterId`, Welford CI bounds. Verbatim. - `row_filter.rs` — new `build_row_filter` signature returning `(Option<RowFilter>, UnbuildableFilters)` plus `total_compressed_bytes`, plus `DatafusionArrowPredicate` stat hooks. - `physical_expr.rs` — `OptionalFilterPhysicalExpr`, `snapshot_generation` helpers. `Display` is **pass-through** here (PR #9 used `Optional(...)`), keeping every existing sqllogictest expected output intact. - `config.rs` — adds `filter_pushdown_min_bytes_per_sec` / `filter_collecting_byte_ratio_threshold` / `filter_confidence_z`. **`reorder_filters` is preserved as a deprecated no-op** (per request) — the adaptive tracker subsumes it. - `selectivity_tracker.rs` bench — verbatim. - Per-file plumbing in `source.rs`: `predicate_conjuncts: Vec<(FilterId, Arc<PhysicalExpr>)>` instead of a single AND-ed predicate so per-conjunct stats accumulate across files. Depends on `pydantic/arrow-rs:adaptive-strategy-swap`, which adds `ParquetPushDecoder::can_swap_strategy()` / `swap_strategy(StrategySwap)` and the `StrategySwap` builder. The `Cargo.toml` `[patch.crates-io]` block points at it. - Sub-row-group adaptation (would need a `ParquetRecordBatchReader::pause` primitive in arrow-rs to yield a residual `RowSelection`); useful for TPCDS-style single-huge-row-group files. Defer. - Three new config knobs aren't in the proto schema yet; `from_proto` fills with config defaults so a roundtrip preserves behavior. - `cargo test -p datafusion-datasource-parquet --lib` — 143 passed - `cargo test -p datafusion --lib` — 410 passed - `cargo test -p datafusion --test core_integration` — 935 passed - `cargo test -p datafusion-sqllogictest --test sqllogictests` — all pass except `encrypted_parquet.slt` (pre-existing on upstream/main, not related to this change) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Fix 6 broken intra-doc links in `opener.rs`: `RowFilter`, `PushBuffers`, `AsyncFileReader::create_reader`, `SelectivityTracker` weren't visible from the doc-comment scope. Reword to plain backticks for the names that don't have a stable in-scope path; route `SelectivityTracker` through `crate::selectivity::SelectivityTracker`. - Regenerate `docs/source/user-guide/configs.md` via `dev/update_config_docs.sh` to surface the three new `filter_pushdown_min_bytes_per_sec` / `filter_collecting_byte_ratio_threshold` / `filter_confidence_z` rows the CI doc check expects. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…33dd62 Picks up the rustdoc fix from the arrow-rs companion branch so the DataFusion CI doc job resolves clean too. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The example asserts `pushdown_rows_pruned=1` to demonstrate that the row-filter path actually evicts rows. Under the adaptive scheduler's default `filter_pushdown_min_bytes_per_sec = 100 MB/s`, a small example file's filter starts on the post-scan path (where `pushdown_rows_pruned` stays 0) and the assertion fires. Set `filter_pushdown_min_bytes_per_sec = 0` to disable the throughput check and force every filter to row-level — the same lever `physical_plan/parquet.rs` test harness uses. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two fixes for benchmark regressions and crashes on hits_partitioned ClickBench queries: # Hard failures (Q36, Q38, Q41, Q42) `build_stream` was building the wide ProjectionMask from `user projection ∪ post_scan_conjuncts` only, but a row-level conjunct can get demoted to post-scan mid-stream by `maybe_swap_strategy`. When that happened, the demoted filter's column wasn't in the `stream_schema`, and the post-scan rebase via `reassign_expr_columns` fired a `Schema error: Unable to get field named "..."` against the narrow batch. Fix: include **every** predicate conjunct's columns in the wide projection regardless of current placement. Filter-only columns are still stripped after post-scan filtering by the projector, so the user-visible schema is unchanged. # Initial-placement regressions (Q10, Q11, Q13, Q14, Q26) Queries shaped like `SELECT col, ... FROM t WHERE col <> '' GROUP BY col` had the filter column already in the user projection. The byte-ratio heuristic was counting filter bytes against projection bytes naively, so `MobilePhoneModel_bytes / (MobilePhoneModel_bytes + UserID_bytes) ≈ 0.5` exceeded the 0.20 threshold and pushed the filter to post-scan — even though row-level was strictly better (zero extra I/O, late materialization saves UserID decode for pruned rows). Fix: change the heuristic numerator from `filter_bytes` to **extra** bytes — bytes for filter columns *not* already in the user projection. A filter that only references projection columns now gets `byte_ratio = 0` and starts at row-level. Threading required: add `projection_columns: &HashSet<usize>` to `SelectivityTracker::partition_filters` (and the inner impl); opener's `AdaptiveParquetStream` carries it for mid-stream re-evals. # Test plan - All 4 hard-failure queries (Q36/Q38/Q41/Q42) now run to completion locally on hits_partitioned. - 143 datasource-parquet unit tests pass (38 partition_filters call-sites in the test module updated to the new signature). - Benchmark expectations: Q23/Q22/Q6 wins should hold; Q10/Q11/Q13/Q14 regressions should resolve via the better initial placement. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ement Bench showed Q10/Q11/Q13/Q14/Q26 still regressing 1.20-1.47x even after the overlap-aware heuristic. These queries are shaped like \`SELECT col, ... FROM t WHERE col <> '' GROUP BY col\` — filter column entirely in projection, so \`extra_bytes = 0\` and \`byte_ratio = 0\`. The previous heuristic placed them at row-level since \`0 <= threshold\`, but row-level *isn't* free even at zero extra I/O: predicate-cache eviction on heavy string columns means the filter column gets decoded twice (once for the predicate eval, once for the projection), and the late-materialization payoff depends on a selectivity we don't know yet. Local timings on hits_partitioned (release mode): | Query | main + no-pushdown (baseline) | branch (old heuristic) | branch (new heuristic) | |-------|------------------------------:|-----------------------:|-----------------------:| | Q23 | 3708 ms | 219 ms* | 219 ms | | Q22 | 1344 ms | 902 ms* | 902 ms | | Q26 | 41 ms | 60 ms | 48 ms | | Q10 | 82 ms | 109 ms | 88 ms | Q23/Q22 wins are preserved (Q23 +17x faster vs baseline, Q22 +1.5x). Q10/Q26 regressions go from 1.32-1.45x to 1.07-1.17x — the residual is the cost of pushdown_filters=true vs false generally, not our adaptive layer. Why Q23 isn't hurt: its huge speedup comes from row-group statistics pruning via the TopK dynamic filter on EventTime, not from row-level filter evaluation. Pruning is independent of row-level vs post-scan placement; the dynamic filter still reaches the source and the PruningPredicate still applies. (Local repro confirms — Q23 actually gets slightly faster on the new heuristic because we skip the double-decode of the heavy URL string column.) Implementation: change the new-filter row-level condition from \`byte_ratio <= threshold\` to \`extra_bytes > 0 && byte_ratio <= threshold\`. Pure-overlap filters (extra_bytes == 0) start at post-scan; the tracker promotes them later if measured bytes-saved-per-sec justifies it. Filters with non-zero extra cost that fits within \`byte_ratio_threshold\` (small int predicate against a heavy string projection) still start at row-level — that's the case where the heuristic is genuinely useful. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes that work together to make Q10/Q11/Q13/Q14/Q26 stop regressing without giving up the Q23/Q22 wins. # 1. Prune-rate gate on PostScan → RowFilter promotion Adds a second gate on top of the existing `filter_pushdown_min_bytes_per_sec` CI bound: a filter only gets promoted from post-scan to row-level if it actually prunes >= 99% of rows it sees. Why: the bytes-saved-per-sec metric is "potential savings if at row-level" (rows_pruned × non-filter-projection-bytes-per-row ÷ eval_time). For ClickBench Q10 (\`MobilePhoneModel <> ''\`) the selectivity is ~94% and the projection is heavy, so bytes-saved-per-sec clears the 100 MB/s threshold easily. But row-level *actually loses* to post-scan there because survivors are uniformly scattered: at 8K rows per page, p^N for p=0.94 is ~10^-220 — effectively zero pages can be skipped, RowSelection-driven decode is just as expensive as a contiguous post-scan read but with extra predicate-cache eviction on the heavy string column. The 0.99 gate captures the scatter problem structurally: - Clustered survivors (TopK dynamic filter, hash-join build): prune_rate trivially ≥ 0.99 once K shrinks. Page-skip works. Promote. - Uniform survivors at moderate selectivity (Q10/Q11/Q13/Q14/Q26): prune_rate stays at 0.5–0.95. Page-skip can't work no matter how big bytes-saved-per-sec is. Stay at post-scan. Q22's `Title LIKE '%Google%'` (prune_rate ~1.0) and Q23's `URL LIKE '%google%'` (similar) trivially clear the gate, so their big wins are preserved. # 2. Drop STATS_SAMPLE_INTERVAL (1/32 → every batch) I added the 1/32 sampling earlier when the per-batch `Instant + tracker.update` was clearly hot — but at the time the heuristic was over-promoting these queries to row-level, making the per-batch path matter much more. Now that the prune-rate gate keeps them at post-scan, sampling actually *hurts*: with 1/32 the Welford accumulator converges 32× slower, so the tracker takes longer to realize "this filter is bad at row-level" and the in-flight filter flips state more often. Updating every batch is faster on every query I measured (Q23, Q22, Q26, Q10). `SKIP_FLAG_CHECK_INTERVAL = 4` stays — it gates the OptionalFilter skip-flag check, not the Welford update, and removing *it* added ~200ms to Q22 (the post-update lock-juggle isn't free). # Local timings (warm, hits_partitioned, 12 partitions) | Query | main+nopush | branch | Δ | |-------|------------:|-------:|---| | Q23 | 3271ms | 168ms | **+19.5x** | | Q22 | 1069ms | 901ms | +1.19x | | Q26 | 39ms | 41ms | matches (+2ms) | | Q10 | 68ms | 59ms | **+1.15x** | All four ≥ baseline. Q26 is essentially break-even; the residual 2ms is below run-to-run noise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Earlier I had two sampling/gate constants protecting the hot per-batch
update path:
- \`STATS_SAMPLE_INTERVAL = 32\` in opener.rs: skip the
\`Instant::now\` + \`tracker.update\` work on 31 of every 32 batches.
- \`SKIP_FLAG_CHECK_INTERVAL = 4\` in selectivity.rs: inside
tracker.update, skip the post-stats CI-bound + lock-juggle path on
3 of every 4 calls.
Both were "right" given the prior over-promotion problem (filters
landing at row-level when they shouldn't, making the per-batch path
hot and the CI calc wasted). With the new \`prune_rate >= 0.99\` gate
those filters stay at post-scan and the measurements no longer
support sampling:
- Removing \`STATS_SAMPLE_INTERVAL\` (every batch updates) is
*faster* than 1/32 across Q23/Q22/Q26/Q10. Slower convergence on
1/32 made the tracker take longer to settle, so the in-flight
filter chain flipped state more often.
- \`SKIP_FLAG_CHECK_INTERVAL = 4\` was protecting *non-optional*
filters from a wasted-work path (post-stats CI calc + lock release
+ is_optional HashMap read + lock reacquire) that they didn't need
at all. The right fix is to early-return for non-optional filters
*before* that path, not to amortize it across 4 calls.
This refactor:
1. Caches \`is_optional: bool\` inline on \`SelectivityStats\`.
Non-optional filters early-return after the Welford update with
a single field load on the already-held stats lock — no extra
HashMap, no \`RwLock::read()\`, no \`drop\` + reacquire.
2. For optional filters (hash-join build / TopK dynamic), the
skip-flag CI check now runs every batch. That's what we want:
when a filter's selectivity collapses, the skip flag should fire
ASAP. Q26's TopK dynamic filter benefits visibly from this.
3. Drops the now-redundant \`SelectivityTracker::is_optional\`
HashMap and \`PartitionResult::new_filter_ids\` (was duplicating
\`new_optional_flags\`). The is_optional bit moves to where it's
read.
4. Drops the sampling in \`apply_post_scan_filters_with_stats\`.
\`tracker.update\` is now cheap enough on the fast path that
sampling actively hurts (slower convergence > saved work).
Local timings (warm, hits_partitioned, 12 partitions):
| Query | main+nopush | branch | Δ |
|-------|------------:|-------:|---|
| Q23 | 3271ms | 139ms | **+23.5x** |
| Q22 | 1069ms | 898ms | +1.19x |
| Q26 | 39ms | 39ms | matches |
| Q10 | 68ms | 59ms | **+1.15x** |
143 lib tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the all-or-nothing batch-level "if matched == 0, all skippable;
otherwise 0" computation with a sub-batch windowed analysis fed by a
new \`count_skippable_bytes\` helper. The metric is now:
for each batch:
skippable_bytes_for_batch = total_other_projection_bytes_for_batch
× (windows-with-zero-survivors / total-windows)
with W = 8192 rows (short-circuited so total_windows=1 ⇒ binary
"is the whole batch all-pruned" — equivalent to the old behavior on
typical 8K batch sizes, but with the structure in place for finer W
on larger pages or different writers).
Why: \`filter_pushdown_min_bytes_per_sec\` is the right *unit* but the
metric feeding it overestimated savings whenever the filter pruned
rows that the row-level decoder couldn't actually drop a page on. A
50% filter on uniform data still costs full IO at row-level (every
page has survivors); a 50% filter on contiguous data lets the
decoder skip half the pages. The windowed analysis discriminates
these — same formula at post-scan (predicting what row-level would
save) and at row-level (measuring what the decoder did skip, modulo
within-window RowSelection narrowing which is an uncounted bonus).
Same metric on both sides means \`min_bytes_per_sec\` is the only knob;
no separate prune-rate gate. The 0.99 gate is now redundant — if
prune-rate is high enough that page-skip works, the metric already
clears the threshold; if prune-rate is high but scatter is uniform
(case C, ClickBench Q10/Q11/Q13/Q14/Q26), the metric stays low and
the filter stays at post-scan.
Helper short-circuits when:
- batch is fully pruned (\`true_count == 0\`) → all skippable,
- batch has no zeros (\`true_count == n\`) → 0 skippable,
- there's only one window (\`n ≤ W\`) and the answer is determined.
This avoids ~2× per-batch \`true_count\` work that was visible as a
regression when I first wired the helper through.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Last bench (97c62a6) added 5 new individual regressions vs the prior best-bench commit (05590a2): Q4 +39ms, Q31 +64ms, Q35 +48ms, Q40 1.47x, plus several smaller. Total query time ticked back up. Two changes between those commits did the damage: 1. Removed `STATS_SAMPLE_INTERVAL=32`. Locally the un-sampled version was faster, but on the 12-vCPU GKE bench every partition contends on the same per-filter `Mutex<SelectivityStats>` and the lock contention dominates. Restoring the 1-in-32 sampling cuts hot- path lock pressure to ~3% of what it was while still giving the Welford accumulator hundreds of samples per query. 2. Removed the `prune_rate >= 0.99` gate. The scatter-aware metric alone is too lenient on ClickBench data: columns like `MobilePhoneModel` and `SearchPhrase` have natural runs of empty values that occasionally cluster into batch-level "all pruned" events even when the filter's overall selectivity isn't high enough for row-level to actually win once arrow-rs's predicate- cache double-decode of heavy string columns kicks in. The prune_rate floor is a belt-and-braces guard; it's compatible with the scatter metric (both must pass) and prunes the cases where the metric over-promotes. Keeping the scatter helper structure in place — the `count_skippable_bytes` framework stays so that when `arrow-rs` exposes pages-skipped-via-RowSelection (option 1 from the earlier plan), the row-level path can swap from the windowed estimate to the true measurement with no formula change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reverts the `1-in-32` sampling gate. The earlier rationale was lock contention on `Mutex<SelectivityStats>`, but the empirical effect on ClickBench is that promotion happens 32× later, which dominates the contention savings for short-running selective queries (Q22/Q23/Q24). Q23 went from 169ms (every-batch) to 443ms (1/32 sampled) on the 12-vCPU bench while regressing many small queries. Sample every batch so the Welford accumulator hits the CI threshold inside the first row group. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The gate was added on the theory that arrow-rs's row-level path double-decoded heavy string columns when filter and projection overlapped, costing more than the ~60-95% selectivity could recover. EXPLAIN ANALYZE on ClickBench refutes that theory: Q23 (URL LIKE '%google%') shows predicate_cache_inner_records=8.76M and predicate_cache_records=83.67K — the cache works correctly, heavy strings are decoded once and reused for both predicate and projection. The residual ClickBench regressions we attributed to "double-decode" (Q26 / Q31) trace to a different cause: post-scan filtering inside the opener shifts batch-arrival order at downstream TopK, which changes the convergence point of TopK's dynamic filter and slightly weakens file-stats pruning. Forced row-level promotion of Q26 makes it slower (59ms) than post-scan (41ms), confirming the gate isn't preventing a real regression. Single promotion gate now: CI lower bound on scatter-aware bytes-saved-per-sec ≥ filter_pushdown_min_bytes_per_sec. This lets strongly-selective contiguous filters (90% prune rate, page-aligned runs) get promoted, which the 0.99 cutoff was incorrectly blocking. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a `LimitedBatchCoalescer` to `AdaptiveParquetStream`'s post-scan filter path, mirroring `FilterExec`'s behavior. Without this, inline post-scan filtering yields tiny batches (1-100 rows each on selective predicates) directly to TopK, which delays the dynamic filter from tightening: TopK only progressively improves its threshold one small batch at a time, while `FilterExec`'s coalescer ensures the first batch to TopK already contains thousands of survivors and lets TopK pick a near-optimal top-K threshold in one shot. Symptom this fixes: on `Q26` (`SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10`) at 12 partitions, branch matches 33-34 file ranges vs main+pushdown=false's 28. With the coalescer, branch matches 30-32 — closing ~1/3 of the gap. The remaining ~2-pruning difference is unexplained but small. Coalescer params match `FilterExec`: target_batch_size from session, biggest_coalesce_batch_size = target/2 (set inside `LimitedBatchCoalescer::new`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This reverts commit d146ebe.
When a filter is first observed, consult per-conjunct row-group statistics pruning rate as a selectivity prior. If pruning rate >= prior_promote_threshold (default 0.5) place at row-level; if <= prior_demote_threshold (default 0.05) and stats present, place post-scan; else fall back to existing byte-ratio heuristic. Skips the prior entirely when no row-group statistics are available for the filter's columns, since 'no stats' would look identical to 'genuinely non-selective' otherwise. Refs report.md §7.2.b
Merge of exp/page-pruning-prior and exp/latency-aware-z. The two levers should compose well: the prior settles initial placement from row-group statistics on the first row group, and the latency-aware z then drives evidence-based moves only when the runtime measurements disagree with the prior. Goal: keep exp1's wins on regression queries (TPC-DS Q26) while avoiding exp2's borderline-flip outliers (ClickBench Q37 under latency).
Documented four hypotheses tried to fix Q64 inside the morselization+adaptive merge: placeholder→postscan, all-postscan via min_bytes=INF, wider stream schema with all demoted filters applied stream-side, and a sanity check confirming the regression is structural to apache#21766+pushdown=true regardless of adaptive logic. None of the four moved Q64 below ~22 s. The fix has to come from within apache#21766 (or sub-RG adaptation in arrow-rs), not the adaptive scheduler.
Implemented per-conjunct page-pruning prior on exp/page-pruning-prior-v2 (page-first, row-group fallback gated on 'page index NOT loaded'). Smoked at +18 % vs exp3. Two failure modes: page-pruning eval is itself an 'extra pruning run' in the cost sense (walks page index per-conjunct per-file), and removing the row-group fallback lost exp3's Q26-style demote. A proper architecture (extract per-conjunct rates from the existing opener prunings as a side effect) needs ~200-300 LOC of PruningPredicate API additions; documented as a follow-up. exp/pp-plus-laz remains the recommended landing target.
…r pruning
Refactors the prior so it consumes per-FilterId page-pruning rates
extracted from the page-index pruning the opener already runs, with
NO extra pruning passes:
PagePruningAccessPlanFilter:
- new optional 'tags: Option<Vec<usize>>' field
- new_tagged() constructor that accepts pre-split conjuncts each
tagged with a caller id (typically FilterId)
- prune_plan_with_per_conjunct_stats() variant that runs the same
pruning iteration as prune_plan_with_page_index but also
surfaces a Vec<PerConjunctPageStats> with rows-seen / rows-skipped
per conjunct.
Opener (build_stream):
- When predicate_conjuncts is set, build the page filter via
new_tagged so per-conjunct stats can survive the split.
- Reorder: prune_by_limit + page-index pruning now run BEFORE the
initial partition_filters call, so per-FilterId rates are
available as the prior on the very first placement decision.
- Capture per-conjunct rates into HashMap<FilterId, f64>, thread
into AdaptiveParquetStream as page_pruning_rates, and pass on
every partition_filters call (initial + mid-stream swap).
SelectivityTracker::partition_filters:
- New page_pruning_rates parameter.
- The initial-placement prior now reads from this map; falls back
to byte-ratio when no rate is available (page index disabled,
multi-column predicate, schema mismatch).
- The old per-conjunct re-evaluation 'pruning_rate_for_filter' is
no longer called on the production path.
The per-conjunct page-pruning rates path is now the production path.
The old per-conjunct row-group re-evaluation helpers (pruning_rate_for_filter,
build_per_conjunct_pruning_predicate) were never reached after the
partition_filters refactor — removed.
Also drops the temporary debug! traces from page_filter.rs and opener.rs
that confirmed the architecture works (TPC-DS Q26 fires page-prior with
pruned_rate=0.726 ≥ 0.5 → row-level, cd_gender/marital_status fire with
pruned_rate=0.000 ≤ 0.05 → post-scan). ClickBench's hits_partitioned
files lack page indexes, so the prior never fires there — falls back
to byte-ratio per the design ('if user disables page pruning we don't
get this data → only seed based on bytes heuristic').
42c3a8b to
858013c
Compare
|
Perhaps if we had an API such as described here, this would be easier to implement |
The decoder's projection mask is now built from (user projection ∪ initial post-scan filter columns) and rebuilt at any row-group boundary where the optimal mask cols change — e.g. a filter promoting out of post-scan, or a dynamic placeholder waking up and being placed post-scan. arrow-rs's `StrategySwap::with_projection` installs the new mask before the next row group is read; we rebuild `stream_schema`, `projector`, and the post-scan filter rebase to match. The file-open and mid-stream code paths now share a single `build_decoder_projection_state` helper so the (read_plan, stream_schema, projector, rebased post-scan) chain stays in sync. Smoke bench (TPC-DS/TPC-H/ClickBench, `--simulate-latency`): sum-of-medians 1.6% faster vs r6 baseline. Notable per-query wins on filter-only-heavy workloads under latency: ClickBench Q42 -27.5%, Q23 -10.4%; TPC-H Q8/Q9 -8%. Two queries (TPC-DS Q26, TPC-H Q18) looked like regressions in a 3-round bench but cleared as noise on a 5-round rerun. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
The adaptive parquet scheduler in `selectivity::demote_or_drop` only drops a filter (`FilterState::Dropped`) when its expression is wrapped in `OptionalFilterPhysicalExpr`; otherwise the filter can only ever be demoted to post-scan, where it then runs on every batch forever. Hash-join dynamic filters had no wrapper, so for join-heavy queries with low-selectivity build sides (most of TPC-H) they cycled between row-level and post-scan and never got dropped. On SF1 with `pushdown_filters=true` this dominated runtime — Q9 was 121.5 ms vs 68.5 ms on main (1.77x), Q17 134 ms → 134 ms, etc. Disabling join dynamic filter pushdown entirely brought Q9 to 49 ms, isolating the cost to "running ineffective dynamic filters that never get dropped". Wrap the `Arc<DynamicFilterPhysicalExpr>` in `OptionalFilterPhysicalExpr` at the `gather_filters_for_pushdown` handoff to mark it as drop-eligible, and unwrap it in `handle_child_pushdown_result` before `Arc::downcast::<DynamicFilterPhysicalExpr>` so the join keeps the bare filter for `.update()` / `.mark_complete()`. Join correctness does not depend on this filter — it's a probe-side scan-time hint — so dropping it is always safe. TPC-H SF1, pushdown=true (mean of 8 iters, drop iter 0), branch / main: Q3 0.74x Q5 0.73x Q7 0.74x Q8 0.88x Q9 0.80x Q14 0.69x Q17 0.55x Q20 0.89x All 963 hash-join unit tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (cae5fe6) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (cae5fe6) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (cae5fe6) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (cae5fe6) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (cae5fe6) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (cae5fe6) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Three independent fixes for failures introduced by recent commits on this branch: - page_filter: `prune_plan_with_per_conjunct_stats` was summing matched-page counts per conjunct instead of intersecting them, so the `page_index_pages_pruned` metric reported `N_conjuncts * pages` totals. Mirror the untagged path: keep a `matched_pages_in_group` set seeded with all pages, intersect each conjunct's matches in, and count once at the end. Fixes `parquet_page_index_exec_metrics_multiple_predicates` and the `explain_analyze.slt` / `limit_pruning.slt` plan-with-metrics diffs (which expected the AND'd count). - hash_join/exec: re-run `cargo fmt` to satisfy the formatter after the `OptionalFilterPhysicalExpr` wrap commit. - selectivity: replace the broken intra-doc link `SelectivityTracker::is_optional` (no such method) with a working reference to the free function `is_optional_filter`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
No description provided.