Skip to content

feat: optimize parquet reads with bucket and page-level filtering#232

Open
liangjie3138 wants to merge 8 commits intoalibaba:mainfrom
liangjie3138:dev_parquet
Open

feat: optimize parquet reads with bucket and page-level filtering#232
liangjie3138 wants to merge 8 commits intoalibaba:mainfrom
liangjie3138:dev_parquet

Conversation

@liangjie3138
Copy link
Copy Markdown

Purpose

Linked issue: close ##137

Implement multi-level filtering optimization for Parquet file reading. By leveraging ColumnIndex statistics and bucket predicate derivation, the reader can skip non-matching data at the bucket, row group, and page levels, reducing I/O and decoding overhead.

Main Features

  1. Page-level data filtering

    • ColumnIndexFilter: Filters data pages based on Parquet ColumnIndex min/max statistics. Supports predicates such as EQUAL, NOT_EQUAL, LESS_THAN, GREATER_THAN, IN, IS_NULL, and compound predicates with AND/OR.
    • PageFilteredRowGroupReader: Reads row groups after page-level filtering:
      • I/O layer: Skips non-matching pages through the data_page_filter callback.
      • Decoding layer: Skips rows through SkipRecords/ReadRecords.
  2. Page-level prefetching
    Computes the byte ranges of required pages based on RowRanges and OffsetIndex, and uses ArrowPreBuffer for asynchronous prefetching.

  3. BucketSelectConverter
    Derives target bucket IDs from query predicates, and is compatible with the Java Paimon hash algorithm.

Tests

  • bucket_select_converter_test.cpp: Covers various predicate combinations, Timestamp type, and Cartesian product computation.
  • column_index_filter_test.cpp: Covers all predicate types (EQUAL, IN, LESS_THAN, GREATER_THAN, IS_NULL, etc.) and AND/OR compound predicates.
  • page_filtered_row_group_reader_test.cpp: Verifies filtering correctness, edge cases, and prefetching behavior.

API and Format

No public API changes. No impact on storage format or protocol.

Documentation

Not applicable.

Generative AI Tooling

Claude Code (Opus 4.6)

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 16, 2026

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
0 out of 2 committers have signed the CLA.

❌ liangjie.liang
❌ liangjie3138


liangjie.liang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@liangjie3138 liangjie3138 changed the title Dev parquet feat: optimize parquet reads with bucket and page-level filtering Apr 16, 2026
@lxy-9602
Copy link
Copy Markdown
Collaborator

lxy-9602 commented Apr 17, 2026

Thank you for your contribution! This is a highly complex and important feature, and your work on it is greatly appreciated.

Given the large scope of this PR, would it be possible to split it into smaller, focused changes? For example, separating the bucket predicate logic from the Parquet point lookup improvements could make each part easier to review and move forward incrementally.

Also, could you please fix the CI failures first so we can begin the review process?

We truly recognize the effort behind this change and look forward to helping get it merged smoothly.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements multi-level Parquet read optimizations (bucket selection + page/row-group filtering) by leveraging Parquet page indexes (ColumnIndex/OffsetIndex) and adding page-level prefetching to reduce I/O and decode work.

Changes:

  • Added page-index-based filtering infrastructure (ColumnIndexFilter, RowRanges) and a page-filtered row-group reader with page-range prefetch support.
  • Integrated page-level filtering/prefetch into ParquetFileBatchReader/FileReaderWrapper, and enabled writing page indexes via a new writer option.
  • Added bucket-id derivation from predicates (BucketSelectConverter) and expanded scan bucket filtering to support multiple buckets.

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
src/paimon/format/parquet/row_ranges.h Introduces RowRanges abstraction for page/row-range selection.
src/paimon/format/parquet/row_ranges.cpp Implements range union/intersection/overlap/add logic used by page filtering.
src/paimon/format/parquet/parquet_writer_builder.cpp Enables Parquet page index writing behind an option.
src/paimon/format/parquet/parquet_format_defs.h Adds new read/write options for page-index functionality.
src/paimon/format/parquet/parquet_file_batch_reader.h Adds page-index filtering API and logging member.
src/paimon/format/parquet/parquet_file_batch_reader.cpp Wires page-level filtering + eager prepare to start prebuffer earlier.
src/paimon/format/parquet/page_filtered_row_group_reader.h Declares page-filtered row group read + page-range computation.
src/paimon/format/parquet/page_filtered_row_group_reader.cpp Implements decode skipping + page-range prefetch logic for filtered reads.
src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp Adds end-to-end tests for page filtering and page-range computation.
src/paimon/format/parquet/file_reader_wrapper.h Extends wrapper to support page-filtered RG reads and page-range prebuffering.
src/paimon/format/parquet/file_reader_wrapper.cpp Implements page-filtered RG scheduling + unified PreBufferRanges prefetch.
src/paimon/format/parquet/column_index_filter.h Adds ColumnIndex-based predicate evaluation for page selection.
src/paimon/format/parquet/column_index_filter.cpp Implements ColumnIndex-based page matching and RowRanges generation.
src/paimon/format/parquet/column_index_filter_test.cpp Adds RowRanges unit tests + ColumnIndexFilter integration tests.
src/paimon/format/parquet/CMakeLists.txt Registers new parquet sources/tests; adds Arrow source include path.
src/paimon/core/operation/key_value_file_store_scan.cpp Derives bucket filter from predicates when not explicitly set.
src/paimon/core/operation/file_store_scan.h Changes bucket filter to optional<set<int32_t>>; adds helpers.
src/paimon/core/operation/file_store_scan.cpp Updates bucket filtering logic to handle multiple buckets.
src/paimon/core/operation/bucket_select_converter.h Declares predicate→bucket-id derivation helper.
src/paimon/core/operation/bucket_select_converter.cpp Implements bucket-id derivation compatible with Java hashing.
src/paimon/core/operation/bucket_select_converter_test.cpp Adds tests for bucket derivation across predicate shapes/types.
src/paimon/core/operation/merge_file_split_read.cpp Refactors loops to index-based iteration.
src/paimon/core/operation/abstract_split_read.cpp Refactors loop to index-based iteration.
src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp Refactors loop to index-based iteration.
src/paimon/common/utils/arrow/arrow_input_stream_adapter.h Tracks outstanding async reads for safe destruction.
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp Waits for pending futures; prunes finished futures.
src/paimon/CMakeLists.txt Registers new core operation source + test.
cmake_modules/arrow.diff Patches Arrow Parquet reader to add PreBufferRanges/WhenBufferedRanges and cached page-range reads.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +703 to +714
bool ColumnIndexFilter::PageMightContainLessThan(const std::string& encoded_min,
const std::string& encoded_max,
const Literal& literal, FieldType field_type) {
if (literal.IsNull()) {
return false;
}

// Page might contain values < literal if min < literal
auto cmp_min = CompareEncodedWithLiteral(encoded_min, literal, field_type);
if (!cmp_min.has_value()) return true;
return *cmp_min < 0;
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PageMightContainLessThan/LessOrEqual/GreaterThan/GreaterOrEqual have unused parameters (e.g., encoded_max is unused in the Less* helpers, encoded_min is unused in the Greater* helpers). This will trigger -Wunused-parameter under GCC and break builds due to -Werror. Mark the unused parameters as [[maybe_unused]] (or drop them from the signature) to keep the build warning-free.

Copilot uses AI. Check for mistakes.
Comment thread cmake_modules/arrow.diff
Comment on lines +287 to +290
+ // Cache miss: zero-fill (called from Advance for skipped pages)
+ memset(out, 0, static_cast<size_t>(to_read));
+ position_ += to_read;
+ return to_read;
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachedInputStream::Read() zero-fills on cache miss. This is unsafe because Read() is also used to read actual page payload bytes, not only for skipped-page Advance(); a cache miss would silently corrupt the page stream and likely produce wrong decoded values instead of falling back to real I/O. Prefer falling back to source_->ReadAt(...) on cache miss (similar to Peek), and only use a zero-fill fast-path when you can prove the call is coming from an intentional skip/advance path.

Suggested change
+ // Cache miss: zero-fill (called from Advance for skipped pages)
+ memset(out, 0, static_cast<size_t>(to_read));
+ position_ += to_read;
+ return to_read;
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ source_->ReadAt(base_offset_ + position_, to_read, out));
+ position_ += bytes_read;
+ return bytes_read;

Copilot uses AI. Check for mistakes.
Comment on lines +147 to +154
// If we're still consuming slices from a page-filtered batch, return the next slice
if (current_filtered_batch_) {
int64_t remaining = current_filtered_batch_->num_rows() - filtered_batch_offset_;
int64_t slice_len = (batch_size_ > 0 && remaining > batch_size_) ? batch_size_ : remaining;
record_batch = current_filtered_batch_->Slice(filtered_batch_offset_, slice_len);
filtered_batch_offset_ += slice_len;
previous_first_row_ = next_row_to_read_;

Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous_first_row_ is set to next_row_to_read_ while returning slices from a page-filtered batch. For page-level filtering, next_row_to_read_ is the row-group start, not the first returned row in this slice. This breaks callers that rely on GetPreviousBatchFirstRowNumber() being the physical file row offset (e.g., deletion vectors). Consider tracking the original RowRanges for the filtered batch and computing the slice’s starting original row index (map filtered_batch_offset_ into RowRanges), then set previous_first_row_ accordingly.

Copilot uses AI. Check for mistakes.
Comment on lines 198 to +205
if (record_batch) {
int64_t num_rows = record_batch->num_rows();
previous_first_row_ = next_row_to_read_;
if (next_row_to_read_ + num_rows < target_row_groups_[current_row_group_idx_].second) {

// For page-filtered batches, advance to the next row group
// (unless we're in batched mode with slices remaining)
if (page_filtered_indices_.count(current_row_group_idx_) > 0) {
if (!current_filtered_batch_) {
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For page-filtered reads, previous_first_row_ is unconditionally set to next_row_to_read_ (row-group start). The first returned row is typically meta.row_ranges.GetRanges()[0].from + row_group_start, so this value is wrong even when returning the full filtered batch (and will cause deletion vector / bitmap alignment issues). Compute the actual first original row index for the produced RecordBatch and set previous_first_row_ to that.

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +141
if (ranges_.empty()) {
ranges_.push_back(range);
return;
}

Range range_to_add = range;
for (int i = static_cast<int>(ranges_.size()) - 1; i >= 0; --i) {
Range& last = ranges_[i];
// The range to add should not be before the last range
auto u = UnionRanges(last, range_to_add);
if (!u.has_value()) {
break;
}
range_to_add = u.value();
ranges_.erase(ranges_.begin() + i);
}
ranges_.push_back(range_to_add);
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowRanges::Add() appends range_to_add after only attempting to merge with trailing ranges. If the caller adds a disjoint range that is before the last existing range (e.g., existing [10,20], add [0,5]), the vector becomes unsorted, but IsOverlapping() relies on ranges_ being sorted for lower_bound. Either enforce the precondition (assert / return error) that additions are non-decreasing, or insert/merge into the correct position to preserve sorted order.

Suggested change
if (ranges_.empty()) {
ranges_.push_back(range);
return;
}
Range range_to_add = range;
for (int i = static_cast<int>(ranges_.size()) - 1; i >= 0; --i) {
Range& last = ranges_[i];
// The range to add should not be before the last range
auto u = UnionRanges(last, range_to_add);
if (!u.has_value()) {
break;
}
range_to_add = u.value();
ranges_.erase(ranges_.begin() + i);
}
ranges_.push_back(range_to_add);
Range range_to_add = range;
auto it = std::lower_bound(
ranges_.begin(), ranges_.end(), range_to_add,
[](const Range& existing, const Range& target) { return existing.to + 1 < target.from; });
while (it != ranges_.end()) {
auto u = UnionRanges(*it, range_to_add);
if (!u.has_value()) {
break;
}
range_to_add = u.value();
it = ranges_.erase(it);
}
ranges_.insert(it, range_to_add);

Copilot uses AI. Check for mistakes.
Comment on lines +467 to +473
/// Predicate on unknown column (schema evolution) → all rows returned
TEST_F(ColumnIndexFilterTest, UnknownColumnReturnsAllRows) {
auto pred = PredicateBuilder::Equal(0, "nonexistent", FieldType::INT,
Literal(static_cast<int32_t>(42)));
ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred));
// Column not in file: IS_NULL-like behavior doesn't apply for EQUAL on non-null literal
EXPECT_TRUE(ranges.IsEmpty());
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test comment says "Predicate on unknown column ... → all rows returned" but the assertion expects ranges.IsEmpty() (no rows). Please update the comment to match the intended behavior (or adjust the expectation if the comment is correct).

Copilot uses AI. Check for mistakes.
Comment on lines +200 to +203
if (row_ranges.IsEmpty()) {
std::vector<std::shared_ptr<arrow::Array>> empty_columns;
return arrow::RecordBatch::Make(arrow_schema, 0, std::move(empty_columns));
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadFilteredRowGroup() returns RecordBatch::Make(arrow_schema, 0, empty_columns) when row_ranges is empty, but empty_columns has 0 arrays even when arrow_schema has fields. Arrow expects the number of arrays to match schema->num_fields(), which can trigger assertions or invalid RecordBatches. Build one empty Array per field (e.g., MakeEmptyArray for each field type) and return those instead.

Copilot uses AI. Check for mistakes.
Comment on lines +295 to +298
auto field = schema->GetFieldByName(col_name);
if (field) {
fields.push_back(field);
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PrepareForReading() builds read_schema by looking up schema->GetFieldByName(parquet_schema->Column(col_idx)->name()) and silently skips columns when the field isn't found. This can produce a read_schema whose field count/order no longer matches column_indices, leading to wrong Arrow field types passed into TransferColumnData (or out-of-bounds) for page-filtered reads. At minimum, validate that every requested column_idx maps to exactly one Arrow field and fail fast if not; longer-term, derive the correct Arrow Field for each Parquet leaf column (including nested paths) rather than using only the leaf name.

Suggested change
auto field = schema->GetFieldByName(col_name);
if (field) {
fields.push_back(field);
}
auto matched_fields = schema->GetAllFieldsByName(col_name);
if (matched_fields.size() != 1) {
return Status::Invalid(fmt::format(
"failed to build page-filtered read schema for parquet column index {} "
"(name '{}'): expected exactly one matching Arrow field, found {}",
col_idx,
col_name,
matched_fields.size()));
}
fields.push_back(matched_fields[0]);

Copilot uses AI. Check for mistakes.
std::vector<int32_t> ColumnIndexFilter::FilterPagesByEqual(
const std::shared_ptr<::parquet::ColumnIndex>& column_index,
const std::shared_ptr<::parquet::OffsetIndex>& offset_index, const Literal& literal,
FieldType field_type) {
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several helpers (e.g., FilterPagesByEqual/NotEqual/LessThan...) take an offset_index parameter that is unused in the implementation. With GCC builds using -Werror and not disabling -Wunused-parameter, this will fail compilation. Either remove the unused parameters from signatures, omit the parameter name in the definition, or explicitly mark them unused (e.g., [[maybe_unused]] or (void)offset_index).

Suggested change
FieldType field_type) {
FieldType field_type) {
(void)offset_index;

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants