Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct ReaderInitContext {
virtual ~ReaderInitContext() = default;

// ---- Owned by FileScanner, shared by all readers ----
std::vector<ColumnDescriptor>* column_descs = nullptr;
const std::vector<ColumnDescriptor>* column_descs = nullptr;
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx = nullptr;
RuntimeState* state = nullptr;
const TupleDescriptor* tuple_descriptor = nullptr;
Expand Down
79 changes: 79 additions & 0 deletions be/src/format/jni/jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <map>
#include <ostream>
#include <sstream>
#include <tuple>
#include <unordered_map>

#include "core/block/block.h"
#include "core/types.h"
Expand Down Expand Up @@ -67,6 +69,44 @@ JniReader::JniReader(std::string connector_class, std::map<std::string, std::str
_connector_name = split(_connector_class, "/").back();
}

Status JniReader::on_before_init_reader(ReaderInitContext* ctx) {
_column_descs = ctx->column_descs;
if (_col_name_to_block_idx == nullptr) {
_col_name_to_block_idx = ctx->col_name_to_block_idx;
}
_partition_values.clear();
if (ctx->range == nullptr || ctx->tuple_descriptor == nullptr ||
!ctx->range->__isset.columns_from_path_keys) {
return Status::OK();
}

std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
for (auto* slot : ctx->tuple_descriptor->slots()) {
name_to_slot.emplace(slot->col_name(), slot);
}
for (size_t i = 0; i < ctx->range->columns_from_path_keys.size(); ++i) {
const auto& key = ctx->range->columns_from_path_keys[i];
auto slot_it = name_to_slot.find(key);
if (slot_it == name_to_slot.end()) {
continue;
}
std::string value;
if (ctx->range->__isset.columns_from_path && i < ctx->range->columns_from_path.size()) {
value = ctx->range->columns_from_path[i];
}
_partition_values.emplace(key, std::make_tuple(std::move(value), slot_it->second));
}
return Status::OK();
}

Status JniReader::on_after_read_block(Block* block, size_t* read_rows) {
if (_column_descs == nullptr || _partition_values.empty() || *read_rows == 0 ||
_push_down_agg_type == TPushAggOp::type::COUNT) {
return Status::OK();
}
return _fill_partition_columns(block, *read_rows);
}

// =========================================================================
// JniReader::open (merged from JniConnector::open)
// =========================================================================
Expand Down Expand Up @@ -276,6 +316,45 @@ Status JniReader::_fill_block(Block* block, size_t num_rows) {
return Status::OK();
}

Status JniReader::_fill_partition_columns(Block* block, size_t num_rows) {
std::unordered_map<std::string, uint32_t> local_name_to_idx;
const std::unordered_map<std::string, uint32_t>* col_map = _col_name_to_block_idx;
if (col_map == nullptr) {
local_name_to_idx = block->get_name_to_pos_map();
col_map = &local_name_to_idx;
}

DataTypeSerDe::FormatOptions text_format_options;
for (const auto& desc : *_column_descs) {
if (desc.category != ColumnCategory::PARTITION_KEY) {
continue;
}
auto value_it = _partition_values.find(desc.name);
if (value_it == _partition_values.end()) {
continue;
}
auto col_it = col_map->find(desc.name);
if (col_it == col_map->end()) {
return Status::InternalError("Missing partition column {} in block {}", desc.name,
block->dump_structure());
}

auto mutable_column = block->get_by_position(col_it->second).column->assume_mutable();
const auto& [value, slot_desc] = value_it->second;
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
RETURN_IF_ERROR(text_serde->deserialize_column_from_fixed_json(
*mutable_column, slice, num_rows, &num_deserialized, text_format_options));
if (num_deserialized != num_rows) {
return Status::InternalError(
"Failed to fill partition column: {}={}. Expected rows: {}, actual: {}",
slot_desc->col_name(), value, num_rows, num_deserialized);
}
}
return Status::OK();
}

// =========================================================================
// JniReader::_get_statistics (merged from JniConnector::get_statistics)
// =========================================================================
Expand Down
5 changes: 5 additions & 0 deletions be/src/format/jni/jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class JniReader : public GenericReader {
}

protected:
Status on_before_init_reader(ReaderInitContext* ctx) override;
Status on_after_read_block(Block* block, size_t* read_rows) override;
void _collect_profile_before_close() override;

/**
Expand All @@ -137,6 +139,7 @@ class JniReader : public GenericReader {
private:
static const std::vector<SlotDescriptor*> _s_empty_slot_descs;

Status _fill_partition_columns(Block* block, size_t num_rows);
Status _init_jni_scanner(JNIEnv* env, int batch_size);
Status _fill_block(Block* block, size_t num_rows);
Status _get_statistics(JNIEnv* env, std::map<std::string, std::string>* result);
Expand Down Expand Up @@ -180,6 +183,8 @@ class JniReader : public GenericReader {

// Column name to block index map, passed from FileScanner to avoid repeated map creation
const std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_values;

void _set_meta(long meta_addr) { _table_meta.set_meta(meta_addr); }
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ Status OrcReader::on_before_init_reader(ReaderInitContext* ctx) {
_fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
RETURN_IF_ERROR(
_extract_partition_values(*ctx->range, ctx->tuple_descriptor, _fill_partition_values));
for (auto& desc : *ctx->column_descs) {
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
Expand Down
2 changes: 1 addition & 1 deletion be/src/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ Status ParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
_fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
RETURN_IF_ERROR(
_extract_partition_values(*ctx->range, ctx->tuple_descriptor, _fill_partition_values));
for (auto& desc : *ctx->column_descs) {
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
Expand Down
4 changes: 2 additions & 2 deletions be/src/format/table/hive_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Status HiveOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
_fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
RETURN_IF_ERROR(
_extract_partition_values(*ctx->range, ctx->tuple_descriptor, _fill_partition_values));
for (auto& desc : *ctx->column_descs) {
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
Expand Down Expand Up @@ -219,7 +219,7 @@ Status HiveParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
_fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
RETURN_IF_ERROR(
_extract_partition_values(*ctx->range, ctx->tuple_descriptor, _fill_partition_values));
for (auto& desc : *ctx->column_descs) {
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
Expand Down
44 changes: 26 additions & 18 deletions be/src/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,9 @@ Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
ctx->table_info_node));
} else {
bool exist_field_id = true;
RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id_with_name_mapping(
get_scan_params().history_schema_info.front().root_field, *field_desc,
ctx->table_info_node, exist_field_id));
if (!exist_field_id) {
RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
ctx->table_info_node));
}
ctx->table_info_node));
}

std::unordered_set<std::string> partition_col_names;
Expand All @@ -163,7 +158,7 @@ Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {

// Single pass: classify columns, detect $row_id, handle partition fallback.
bool has_partition_from_path = false;
for (auto& desc : *ctx->column_descs) {
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::SYNTHESIZED) {
if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
this->register_synthesized_column_handler(
Expand All @@ -181,13 +176,23 @@ Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
});
continue;
}
} else if (desc.category == ColumnCategory::PARTITION_KEY) {
bool has_partition_value = partition_col_names.contains(desc.name);
bool exists_in_file = ctx->table_info_node->children_column_exists(desc.name);
if (!has_partition_value || exists_in_file ||
!config::enable_iceberg_partition_column_fallback) {
// Keep PARTITION_KEY category stable for scan planning, but still read
// from file when the column exists there.
ctx->column_names.push_back(desc.name);
continue;
}
has_partition_from_path = true;
} else if (desc.category == ColumnCategory::REGULAR) {
// Partition fallback: if column is a partition key and NOT in the file
// (checked via field ID matching in table_info_node), read from path instead.
if (partition_col_names.contains(desc.name) &&
!ctx->table_info_node->children_column_exists(desc.name)) {
if (config::enable_iceberg_partition_column_fallback) {
desc.category = ColumnCategory::PARTITION_KEY;
has_partition_from_path = true;
continue;
}
Expand Down Expand Up @@ -432,14 +437,9 @@ Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
ctx->table_info_node));
} else {
bool exist_field_id = true;
RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id_with_name_mapping(
get_scan_params().history_schema_info.front().root_field, orc_type_ptr,
ICEBERG_ORC_ATTRIBUTE, ctx->table_info_node, exist_field_id));
if (!exist_field_id) {
RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
ctx->table_info_node));
}
ICEBERG_ORC_ATTRIBUTE, ctx->table_info_node));
}

std::unordered_set<std::string> partition_col_names;
Expand All @@ -450,7 +450,7 @@ Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {

// Single pass: classify columns, detect $row_id, handle partition fallback.
bool has_partition_from_path = false;
for (auto& desc : *ctx->column_descs) {
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::SYNTHESIZED) {
if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
this->register_synthesized_column_handler(
Expand All @@ -468,13 +468,21 @@ Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
});
continue;
}
} else if (desc.category == ColumnCategory::PARTITION_KEY) {
bool has_partition_value = partition_col_names.contains(desc.name);
bool exists_in_file = ctx->table_info_node->children_column_exists(desc.name);
if (!has_partition_value || exists_in_file ||
!config::enable_iceberg_partition_column_fallback) {
ctx->column_names.push_back(desc.name);
continue;
}
has_partition_from_path = true;
} else if (desc.category == ColumnCategory::REGULAR) {
// Partition fallback: if column is a partition key and NOT in the file
// (checked via field ID matching in table_info_node), read from path instead.
if (partition_col_names.contains(desc.name) &&
!ctx->table_info_node->children_column_exists(desc.name)) {
if (config::enable_iceberg_partition_column_fallback) {
desc.category = ColumnCategory::PARTITION_KEY;
has_partition_from_path = true;
continue;
}
Expand Down
73 changes: 73 additions & 0 deletions be/src/format/table/paimon_cpp_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <algorithm>
#include <mutex>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "arrow/c/bridge.h"
Expand Down Expand Up @@ -62,6 +64,41 @@ PaimonCppReader::PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_d

PaimonCppReader::~PaimonCppReader() = default;

Status PaimonCppReader::on_before_init_reader(ReaderInitContext* ctx) {
_column_descs = ctx->column_descs;
_partition_values.clear();
if (ctx->range == nullptr || ctx->tuple_descriptor == nullptr ||
!ctx->range->__isset.columns_from_path_keys) {
return Status::OK();
}

std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
for (auto* slot : ctx->tuple_descriptor->slots()) {
name_to_slot.emplace(slot->col_name(), slot);
}
for (size_t i = 0; i < ctx->range->columns_from_path_keys.size(); ++i) {
const auto& key = ctx->range->columns_from_path_keys[i];
auto slot_it = name_to_slot.find(key);
if (slot_it == name_to_slot.end()) {
continue;
}
std::string value;
if (ctx->range->__isset.columns_from_path && i < ctx->range->columns_from_path.size()) {
value = ctx->range->columns_from_path[i];
}
_partition_values.emplace(key, std::make_tuple(std::move(value), slot_it->second));
}
return Status::OK();
}

Status PaimonCppReader::on_after_read_block(Block* block, size_t* read_rows) {
if (_column_descs == nullptr || _partition_values.empty() || *read_rows == 0 ||
_push_down_agg_type == TPushAggOp::type::COUNT) {
return Status::OK();
}
return _fill_partition_columns(block, *read_rows);
}

Status PaimonCppReader::init_reader() {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) {
return Status::OK();
Expand Down Expand Up @@ -151,6 +188,42 @@ Status PaimonCppReader::_get_columns_impl(
return Status::OK();
}

Status PaimonCppReader::_fill_partition_columns(Block* block, size_t num_rows) {
if (_col_name_to_block_idx.empty()) {
_col_name_to_block_idx = block->get_name_to_pos_map();
}

DataTypeSerDe::FormatOptions text_format_options;
for (const auto& desc : *_column_descs) {
if (desc.category != ColumnCategory::PARTITION_KEY) {
continue;
}
auto value_it = _partition_values.find(desc.name);
if (value_it == _partition_values.end()) {
continue;
}
auto col_it = _col_name_to_block_idx.find(desc.name);
if (col_it == _col_name_to_block_idx.end()) {
return Status::InternalError("Missing partition column {} in block {}", desc.name,
block->dump_structure());
}

auto mutable_column = block->get_by_position(col_it->second).column->assume_mutable();
const auto& [value, slot_desc] = value_it->second;
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
RETURN_IF_ERROR(text_serde->deserialize_column_from_fixed_json(
*mutable_column, slice, num_rows, &num_deserialized, text_format_options));
if (num_deserialized != num_rows) {
return Status::InternalError(
"Failed to fill partition column: {}={}. Expected rows: {}, actual: {}",
slot_desc->col_name(), value, num_rows, num_deserialized);
}
}
return Status::OK();
}

Status PaimonCppReader::close() {
if (_batch_reader) {
_batch_reader->Close();
Expand Down
5 changes: 5 additions & 0 deletions be/src/format/table/paimon_cpp_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ class PaimonCppReader : public GenericReader {
}

protected:
Status on_before_init_reader(ReaderInitContext* ctx) override;
Status on_after_read_block(Block* block, size_t* read_rows) override;
Status _do_init_reader(ReaderInitContext* /*ctx*/) override { return init_reader(); }

private:
Status _fill_partition_columns(Block* block, size_t num_rows);
Status _init_paimon_reader();
Status _decode_split(std::shared_ptr<paimon::Split>* split);
// Resolve paimon table root path for schema/manifest lookup.
Expand All @@ -87,6 +90,8 @@ class PaimonCppReader : public GenericReader {
std::unique_ptr<paimon::BatchReader> _batch_reader;
std::shared_ptr<paimon::Predicate> _predicate;

std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_values;
std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
int64_t _remaining_table_level_row_count = -1;
cctz::time_zone _ctzz;
Expand Down
Loading
Loading