feat: optimize parquet reads with bucket and page-level filtering#232
feat: optimize parquet reads with bucket and page-level filtering#232liangjie3138 wants to merge 8 commits intoalibaba:mainfrom
Conversation
|
liangjie.liang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
|
Thank you for your contribution! This is a highly complex and important feature, and your work on it is greatly appreciated. Given the large scope of this PR, would it be possible to split it into smaller, focused changes? For example, separating the bucket predicate logic from the Parquet point lookup improvements could make each part easier to review and move forward incrementally. Also, could you please fix the CI failures first so we can begin the review process? We truly recognize the effort behind this change and look forward to helping get it merged smoothly. |
There was a problem hiding this comment.
Pull request overview
Implements multi-level Parquet read optimizations (bucket selection + page/row-group filtering) by leveraging Parquet page indexes (ColumnIndex/OffsetIndex) and adding page-level prefetching to reduce I/O and decode work.
Changes:
- Added page-index-based filtering infrastructure (
ColumnIndexFilter,RowRanges) and a page-filtered row-group reader with page-range prefetch support. - Integrated page-level filtering/prefetch into
ParquetFileBatchReader/FileReaderWrapper, and enabled writing page indexes via a new writer option. - Added bucket-id derivation from predicates (
BucketSelectConverter) and expanded scan bucket filtering to support multiple buckets.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/paimon/format/parquet/row_ranges.h | Introduces RowRanges abstraction for page/row-range selection. |
| src/paimon/format/parquet/row_ranges.cpp | Implements range union/intersection/overlap/add logic used by page filtering. |
| src/paimon/format/parquet/parquet_writer_builder.cpp | Enables Parquet page index writing behind an option. |
| src/paimon/format/parquet/parquet_format_defs.h | Adds new read/write options for page-index functionality. |
| src/paimon/format/parquet/parquet_file_batch_reader.h | Adds page-index filtering API and logging member. |
| src/paimon/format/parquet/parquet_file_batch_reader.cpp | Wires page-level filtering + eager prepare to start prebuffer earlier. |
| src/paimon/format/parquet/page_filtered_row_group_reader.h | Declares page-filtered row group read + page-range computation. |
| src/paimon/format/parquet/page_filtered_row_group_reader.cpp | Implements decode skipping + page-range prefetch logic for filtered reads. |
| src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp | Adds end-to-end tests for page filtering and page-range computation. |
| src/paimon/format/parquet/file_reader_wrapper.h | Extends wrapper to support page-filtered RG reads and page-range prebuffering. |
| src/paimon/format/parquet/file_reader_wrapper.cpp | Implements page-filtered RG scheduling + unified PreBufferRanges prefetch. |
| src/paimon/format/parquet/column_index_filter.h | Adds ColumnIndex-based predicate evaluation for page selection. |
| src/paimon/format/parquet/column_index_filter.cpp | Implements ColumnIndex-based page matching and RowRanges generation. |
| src/paimon/format/parquet/column_index_filter_test.cpp | Adds RowRanges unit tests + ColumnIndexFilter integration tests. |
| src/paimon/format/parquet/CMakeLists.txt | Registers new parquet sources/tests; adds Arrow source include path. |
| src/paimon/core/operation/key_value_file_store_scan.cpp | Derives bucket filter from predicates when not explicitly set. |
| src/paimon/core/operation/file_store_scan.h | Changes bucket filter to optional<set<int32_t>>; adds helpers. |
| src/paimon/core/operation/file_store_scan.cpp | Updates bucket filtering logic to handle multiple buckets. |
| src/paimon/core/operation/bucket_select_converter.h | Declares predicate→bucket-id derivation helper. |
| src/paimon/core/operation/bucket_select_converter.cpp | Implements bucket-id derivation compatible with Java hashing. |
| src/paimon/core/operation/bucket_select_converter_test.cpp | Adds tests for bucket derivation across predicate shapes/types. |
| src/paimon/core/operation/merge_file_split_read.cpp | Refactors loops to index-based iteration. |
| src/paimon/core/operation/abstract_split_read.cpp | Refactors loop to index-based iteration. |
| src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp | Refactors loop to index-based iteration. |
| src/paimon/common/utils/arrow/arrow_input_stream_adapter.h | Tracks outstanding async reads for safe destruction. |
| src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp | Waits for pending futures; prunes finished futures. |
| src/paimon/CMakeLists.txt | Registers new core operation source + test. |
| cmake_modules/arrow.diff | Patches Arrow Parquet reader to add PreBufferRanges/WhenBufferedRanges and cached page-range reads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| bool ColumnIndexFilter::PageMightContainLessThan(const std::string& encoded_min, | ||
| const std::string& encoded_max, | ||
| const Literal& literal, FieldType field_type) { | ||
| if (literal.IsNull()) { | ||
| return false; | ||
| } | ||
|
|
||
| // Page might contain values < literal if min < literal | ||
| auto cmp_min = CompareEncodedWithLiteral(encoded_min, literal, field_type); | ||
| if (!cmp_min.has_value()) return true; | ||
| return *cmp_min < 0; | ||
| } |
There was a problem hiding this comment.
PageMightContainLessThan/LessOrEqual/GreaterThan/GreaterOrEqual have unused parameters (e.g., encoded_max is unused in the Less* helpers, encoded_min is unused in the Greater* helpers). This will trigger -Wunused-parameter under GCC and break builds due to -Werror. Mark the unused parameters as [[maybe_unused]] (or drop them from the signature) to keep the build warning-free.
| + // Cache miss: zero-fill (called from Advance for skipped pages) | ||
| + memset(out, 0, static_cast<size_t>(to_read)); | ||
| + position_ += to_read; | ||
| + return to_read; |
There was a problem hiding this comment.
CachedInputStream::Read() zero-fills on cache miss. This is unsafe because Read() is also used to read actual page payload bytes, not only for skipped-page Advance(); a cache miss would silently corrupt the page stream and likely produce wrong decoded values instead of falling back to real I/O. Prefer falling back to source_->ReadAt(...) on cache miss (similar to Peek), and only use a zero-fill fast-path when you can prove the call is coming from an intentional skip/advance path.
| + // Cache miss: zero-fill (called from Advance for skipped pages) | |
| + memset(out, 0, static_cast<size_t>(to_read)); | |
| + position_ += to_read; | |
| + return to_read; | |
| + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, | |
| + source_->ReadAt(base_offset_ + position_, to_read, out)); | |
| + position_ += bytes_read; | |
| + return bytes_read; |
| // If we're still consuming slices from a page-filtered batch, return the next slice | ||
| if (current_filtered_batch_) { | ||
| int64_t remaining = current_filtered_batch_->num_rows() - filtered_batch_offset_; | ||
| int64_t slice_len = (batch_size_ > 0 && remaining > batch_size_) ? batch_size_ : remaining; | ||
| record_batch = current_filtered_batch_->Slice(filtered_batch_offset_, slice_len); | ||
| filtered_batch_offset_ += slice_len; | ||
| previous_first_row_ = next_row_to_read_; | ||
|
|
There was a problem hiding this comment.
previous_first_row_ is set to next_row_to_read_ while returning slices from a page-filtered batch. For page-level filtering, next_row_to_read_ is the row-group start, not the first returned row in this slice. This breaks callers that rely on GetPreviousBatchFirstRowNumber() being the physical file row offset (e.g., deletion vectors). Consider tracking the original RowRanges for the filtered batch and computing the slice’s starting original row index (map filtered_batch_offset_ into RowRanges), then set previous_first_row_ accordingly.
| if (record_batch) { | ||
| int64_t num_rows = record_batch->num_rows(); | ||
| previous_first_row_ = next_row_to_read_; | ||
| if (next_row_to_read_ + num_rows < target_row_groups_[current_row_group_idx_].second) { | ||
|
|
||
| // For page-filtered batches, advance to the next row group | ||
| // (unless we're in batched mode with slices remaining) | ||
| if (page_filtered_indices_.count(current_row_group_idx_) > 0) { | ||
| if (!current_filtered_batch_) { |
There was a problem hiding this comment.
For page-filtered reads, previous_first_row_ is unconditionally set to next_row_to_read_ (row-group start). The first returned row is typically meta.row_ranges.GetRanges()[0].from + row_group_start, so this value is wrong even when returning the full filtered batch (and will cause deletion vector / bitmap alignment issues). Compute the actual first original row index for the produced RecordBatch and set previous_first_row_ to that.
| if (ranges_.empty()) { | ||
| ranges_.push_back(range); | ||
| return; | ||
| } | ||
|
|
||
| Range range_to_add = range; | ||
| for (int i = static_cast<int>(ranges_.size()) - 1; i >= 0; --i) { | ||
| Range& last = ranges_[i]; | ||
| // The range to add should not be before the last range | ||
| auto u = UnionRanges(last, range_to_add); | ||
| if (!u.has_value()) { | ||
| break; | ||
| } | ||
| range_to_add = u.value(); | ||
| ranges_.erase(ranges_.begin() + i); | ||
| } | ||
| ranges_.push_back(range_to_add); |
There was a problem hiding this comment.
RowRanges::Add() appends range_to_add after only attempting to merge with trailing ranges. If the caller adds a disjoint range that is before the last existing range (e.g., existing [10,20], add [0,5]), the vector becomes unsorted, but IsOverlapping() relies on ranges_ being sorted for lower_bound. Either enforce the precondition (assert / return error) that additions are non-decreasing, or insert/merge into the correct position to preserve sorted order.
| if (ranges_.empty()) { | |
| ranges_.push_back(range); | |
| return; | |
| } | |
| Range range_to_add = range; | |
| for (int i = static_cast<int>(ranges_.size()) - 1; i >= 0; --i) { | |
| Range& last = ranges_[i]; | |
| // The range to add should not be before the last range | |
| auto u = UnionRanges(last, range_to_add); | |
| if (!u.has_value()) { | |
| break; | |
| } | |
| range_to_add = u.value(); | |
| ranges_.erase(ranges_.begin() + i); | |
| } | |
| ranges_.push_back(range_to_add); | |
| Range range_to_add = range; | |
| auto it = std::lower_bound( | |
| ranges_.begin(), ranges_.end(), range_to_add, | |
| [](const Range& existing, const Range& target) { return existing.to + 1 < target.from; }); | |
| while (it != ranges_.end()) { | |
| auto u = UnionRanges(*it, range_to_add); | |
| if (!u.has_value()) { | |
| break; | |
| } | |
| range_to_add = u.value(); | |
| it = ranges_.erase(it); | |
| } | |
| ranges_.insert(it, range_to_add); |
| /// Predicate on unknown column (schema evolution) → all rows returned | ||
| TEST_F(ColumnIndexFilterTest, UnknownColumnReturnsAllRows) { | ||
| auto pred = PredicateBuilder::Equal(0, "nonexistent", FieldType::INT, | ||
| Literal(static_cast<int32_t>(42))); | ||
| ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); | ||
| // Column not in file: IS_NULL-like behavior doesn't apply for EQUAL on non-null literal | ||
| EXPECT_TRUE(ranges.IsEmpty()); |
There was a problem hiding this comment.
The test comment says "Predicate on unknown column ... → all rows returned" but the assertion expects ranges.IsEmpty() (no rows). Please update the comment to match the intended behavior (or adjust the expectation if the comment is correct).
| if (row_ranges.IsEmpty()) { | ||
| std::vector<std::shared_ptr<arrow::Array>> empty_columns; | ||
| return arrow::RecordBatch::Make(arrow_schema, 0, std::move(empty_columns)); | ||
| } |
There was a problem hiding this comment.
ReadFilteredRowGroup() returns RecordBatch::Make(arrow_schema, 0, empty_columns) when row_ranges is empty, but empty_columns has 0 arrays even when arrow_schema has fields. Arrow expects the number of arrays to match schema->num_fields(), which can trigger assertions or invalid RecordBatches. Build one empty Array per field (e.g., MakeEmptyArray for each field type) and return those instead.
| auto field = schema->GetFieldByName(col_name); | ||
| if (field) { | ||
| fields.push_back(field); | ||
| } |
There was a problem hiding this comment.
PrepareForReading() builds read_schema by looking up schema->GetFieldByName(parquet_schema->Column(col_idx)->name()) and silently skips columns when the field isn't found. This can produce a read_schema whose field count/order no longer matches column_indices, leading to wrong Arrow field types passed into TransferColumnData (or out-of-bounds) for page-filtered reads. At minimum, validate that every requested column_idx maps to exactly one Arrow field and fail fast if not; longer-term, derive the correct Arrow Field for each Parquet leaf column (including nested paths) rather than using only the leaf name.
| auto field = schema->GetFieldByName(col_name); | |
| if (field) { | |
| fields.push_back(field); | |
| } | |
| auto matched_fields = schema->GetAllFieldsByName(col_name); | |
| if (matched_fields.size() != 1) { | |
| return Status::Invalid(fmt::format( | |
| "failed to build page-filtered read schema for parquet column index {} " | |
| "(name '{}'): expected exactly one matching Arrow field, found {}", | |
| col_idx, | |
| col_name, | |
| matched_fields.size())); | |
| } | |
| fields.push_back(matched_fields[0]); |
| std::vector<int32_t> ColumnIndexFilter::FilterPagesByEqual( | ||
| const std::shared_ptr<::parquet::ColumnIndex>& column_index, | ||
| const std::shared_ptr<::parquet::OffsetIndex>& offset_index, const Literal& literal, | ||
| FieldType field_type) { |
There was a problem hiding this comment.
Several helpers (e.g., FilterPagesByEqual/NotEqual/LessThan...) take an offset_index parameter that is unused in the implementation. With GCC builds using -Werror and not disabling -Wunused-parameter, this will fail compilation. Either remove the unused parameters from signatures, omit the parameter name in the definition, or explicitly mark them unused (e.g., [[maybe_unused]] or (void)offset_index).
| FieldType field_type) { | |
| FieldType field_type) { | |
| (void)offset_index; |
Purpose
Linked issue: close ##137
Implement multi-level filtering optimization for Parquet file reading. By leveraging ColumnIndex statistics and bucket predicate derivation, the reader can skip non-matching data at the bucket, row group, and page levels, reducing I/O and decoding overhead.
Main Features
Page-level data filtering
EQUAL,NOT_EQUAL,LESS_THAN,GREATER_THAN,IN,IS_NULL, and compound predicates withAND/OR.data_page_filtercallback.SkipRecords/ReadRecords.Page-level prefetching
Computes the byte ranges of required pages based on
RowRangesandOffsetIndex, and usesArrowPreBufferfor asynchronous prefetching.BucketSelectConverter
Derives target bucket IDs from query predicates, and is compatible with the Java Paimon hash algorithm.
Tests
bucket_select_converter_test.cpp: Covers various predicate combinations,Timestamptype, and Cartesian product computation.column_index_filter_test.cpp: Covers all predicate types (EQUAL,IN,LESS_THAN,GREATER_THAN,IS_NULL, etc.) andAND/ORcompound predicates.page_filtered_row_group_reader_test.cpp: Verifies filtering correctness, edge cases, and prefetching behavior.API and Format
No public API changes. No impact on storage format or protocol.
Documentation
Not applicable.
Generative AI Tooling
Claude Code (Opus 4.6)