Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,16 @@ class CoreOptions:
)
)

DYNAMIC_PARTITION_OVERWRITE: ConfigOption[bool] = (
ConfigOptions.key("dynamic-partition-overwrite")
.boolean_type()
.default_value(True)
.with_description(
"Whether only overwrite dynamic partition when overwriting a partitioned table "
"with dynamic partition columns. Works only when the table has partition keys."
)
)

def __init__(self, options: Options):
self.options = options

Expand Down Expand Up @@ -622,3 +632,6 @@ def read_batch_size(self, default=None) -> int:

def add_column_before_partition(self) -> bool:
return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)

def dynamic_partition_overwrite(self) -> bool:
return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE)
64 changes: 64 additions & 0 deletions paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,70 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_dynamic_partition_overwrite(self):
pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
self.rest_catalog.create_table('default.test_dynamic_overwrite', schema, False)
table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
read_builder = table.new_read_builder()

self._batch_write(table, pd.DataFrame({
'f0': [1, 2, 3],
'f1': ['apple', 'banana', 'cherry'],
}))

# dynamic overwrite partition f0=1 only; f0=2, f0=3 untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': [1],
'f1': ['watermelon'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': pd.array([1, 2, 3], dtype='int32'),
'f1': ['watermelon', 'banana', 'cherry'],
}), sort_by='f0')

# dynamic overwrite partitions f0=1, f0=2; f0=3 untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': [1, 2],
'f1': ['mango', 'grape'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': pd.array([1, 2, 3], dtype='int32'),
'f1': ['mango', 'grape', 'cherry'],
}), sort_by='f0')

def _batch_write(self, table, df):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _batch_overwrite(self, table, df, partition=None):
write_builder = table.new_batch_write_builder().overwrite(partition)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df = table_read.to_pandas(table_scan.plan().splits())
if sort_by:
actual_df = actual_df.sort_values(by=sort_by)
pd.testing.assert_frame_equal(
actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))

def test_full_data_types(self):
simple_pa_schema = pa.schema([
('f0', pa.int8()),
Expand Down
64 changes: 64 additions & 0 deletions paimon-python/pypaimon/tests/reader_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,70 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_dynamic_partition_overwrite(self):
pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
self.catalog.create_table('default.test_dynamic_overwrite', schema, False)
table = self.catalog.get_table('default.test_dynamic_overwrite')
read_builder = table.new_read_builder()

self._batch_write(table, pd.DataFrame({
'f0': [1, 2, 3],
'f1': ['apple', 'banana', 'cherry'],
}))

# dynamic overwrite partition f0=1 only; f0=2, f0=3 untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': [1],
'f1': ['watermelon'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': pd.array([1, 2, 3], dtype='int32'),
'f1': ['watermelon', 'banana', 'cherry'],
}), sort_by='f0')

# dynamic overwrite partitions f0=1, f0=2; f0=3 untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': [1, 2],
'f1': ['mango', 'grape'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': pd.array([1, 2, 3], dtype='int32'),
'f1': ['mango', 'grape', 'cherry'],
}), sort_by='f0')

def _batch_write(self, table, df):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _batch_overwrite(self, table, df, partition=None):
write_builder = table.new_batch_write_builder().overwrite(partition)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df = table_read.to_pandas(table_scan.plan().splits())
if sort_by:
actual_df = actual_df.sort_values(by=sort_by)
pd.testing.assert_frame_equal(
actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))

def test_full_data_types(self):
simple_pa_schema = pa.schema([
('f0', pa.int8()),
Expand Down
64 changes: 64 additions & 0 deletions paimon-python/pypaimon/tests/rest/rest_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,70 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_dynamic_partition_overwrite(self):
pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
self.rest_catalog.create_table('default.test_dynamic_overwrite', schema, False)
table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
read_builder = table.new_read_builder()

self._batch_write(table, pd.DataFrame({
'f0': [1, 2, 3],
'f1': ['apple', 'banana', 'cherry'],
}))

# dynamic overwrite partition f0=1 only; f0=2, f0=3 untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': [1],
'f1': ['watermelon'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': pd.array([1, 2, 3], dtype='int32'),
'f1': ['watermelon', 'banana', 'cherry'],
}), sort_by='f0')

# dynamic overwrite partitions f0=1, f0=2; f0=3 untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': [1, 2],
'f1': ['mango', 'grape'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': pd.array([1, 2, 3], dtype='int32'),
'f1': ['mango', 'grape', 'cherry'],
}), sort_by='f0')

def _batch_write(self, table, df):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _batch_overwrite(self, table, df, partition=None):
write_builder = table.new_batch_write_builder().overwrite(partition)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df = table_read.to_pandas(table_scan.plan().splits())
if sort_by:
actual_df = actual_df.sort_values(by=sort_by)
pd.testing.assert_frame_equal(
actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))

def test_parquet_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)
Expand Down
72 changes: 48 additions & 24 deletions paimon-python/pypaimon/write/file_store_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,37 +159,33 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):

def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int):
"""Commit the given commit messages in overwrite mode."""
if not commit_messages:
return

logger.info(
"Ready to overwrite to table %s, number of commit messages: %d",
self.table.identifier,
len(commit_messages),
)
skip_overwrite = False
partition_filter = None
# sanity check, all changes must be done within the given partition, meanwhile build a partition filter
if len(overwrite_partition) > 0:
predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
sub_predicates = []
for key, value in overwrite_partition.items():
sub_predicates.append(predicate_builder.equal(key, value))
partition_filter = predicate_builder.and_predicates(sub_predicates)

for msg in commit_messages:
row = OffsetRow(msg.partition, 0, len(msg.partition))
if not partition_filter.test(row):
raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes "
f"in {msg.partition} does not belong to this partition")

self._try_commit(
commit_kind="OVERWRITE",
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(
snapshot, partition_filter, commit_messages),
detect_conflicts=True,
allow_rollback=False,
)
# Partition filter is built from dynamic or static partition according to options.
if len(self.table.partition_keys) > 0 and self.table.options.dynamic_partition_overwrite():
if not commit_messages:
# In dynamic mode, if there are no changes to commit, no data will be deleted
skip_overwrite = True
else:
partition_filter = self._create_dynamic_partition_filter(commit_messages)
else:
partition_filter = self._create_static_partition_filter(overwrite_partition, commit_messages)

if not skip_overwrite:
self._try_commit(
commit_kind="OVERWRITE",
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(
snapshot, partition_filter, commit_messages),
detect_conflicts=True,
allow_rollback=False,
)

def drop_partitions(self, partitions: List[Dict[str, str]], commit_identifier: int) -> None:
if not partitions:
Expand Down Expand Up @@ -518,6 +514,34 @@ def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier,
return True
return False

def _create_dynamic_partition_filter(self, commit_messages: List[CommitMessage]):
"""Build a partition filter from the unique partitions present in commit_messages."""
predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
predicates = []
seen_partitions = set()
for msg in commit_messages:
partition_values = tuple(msg.partition)
if partition_values not in seen_partitions:
seen_partitions.add(partition_values)
equalities = [predicate_builder.equal(name, value)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this does not handle NULL partition values correctly, could you add a case for null partition?

for name, value in zip(self.table.partition_keys, msg.partition)]
predicates.append(predicate_builder.and_predicates(equalities))
return predicate_builder.or_predicates(predicates)

def _create_static_partition_filter(self, overwrite_partition, commit_messages: List[CommitMessage]):
"""Build a partition filter from the explicit overwrite_partition spec."""
if not overwrite_partition:
return None
predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
equalities = [predicate_builder.equal(key, value) for key, value in overwrite_partition.items()]
partition_filter = predicate_builder.and_predicates(equalities)
for msg in commit_messages:
row = OffsetRow(msg.partition, 0, len(msg.partition))
if not partition_filter.test(row):
raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes "
f"in {msg.partition} does not belong to this partition")
return partition_filter

def _generate_overwrite_entries(self, latest_snapshot, partition_filter, commit_messages):
"""Generate commit entries for OVERWRITE mode based on latest snapshot."""
entries = []
Expand Down
Loading