From 25134a0e45540fe20ed14f5f6f21ed3619527861 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 30 Apr 2026 16:23:30 +0800 Subject: [PATCH] [python] Support FileStoreCommit overwrite dynamic partition --- .../pypaimon/common/options/core_options.py | 13 ++++ .../tests/py36/rest_ao_read_write_test.py | 64 +++++++++++++++++ .../pypaimon/tests/reader_base_test.py | 64 +++++++++++++++++ .../tests/rest/rest_read_write_test.py | 64 +++++++++++++++++ .../pypaimon/write/file_store_commit.py | 72 ++++++++++++------- 5 files changed, 253 insertions(+), 24 deletions(-) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 6a4b51c8c33a..3fe2e7945578 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -442,6 +442,16 @@ class CoreOptions: ) ) + DYNAMIC_PARTITION_OVERWRITE: ConfigOption[bool] = ( + ConfigOptions.key("dynamic-partition-overwrite") + .boolean_type() + .default_value(True) + .with_description( + "Whether only overwrite dynamic partition when overwriting a partitioned table " + "with dynamic partition columns. Works only when the table has partition keys." + ) + ) + def __init__(self, options: Options): self.options = options @@ -622,3 +632,6 @@ def read_batch_size(self, default=None) -> int: def add_column_before_partition(self) -> bool: return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False) + + def dynamic_partition_overwrite(self) -> bool: + return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE) diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index cfdf33b755d5..fa94284fa842 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -129,6 +129,70 @@ def test_overwrite(self): pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def test_dynamic_partition_overwrite(self): + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0']) + self.rest_catalog.create_table('default.test_dynamic_overwrite', schema, False) + table = self.rest_catalog.get_table('default.test_dynamic_overwrite') + read_builder = table.new_read_builder() + + self._batch_write(table, pd.DataFrame({ + 'f0': [1, 2, 3], + 'f1': ['apple', 'banana', 'cherry'], + })) + + # dynamic overwrite partition f0=1 only; f0=2, f0=3 untouched + self._batch_overwrite(table, pd.DataFrame({ + 'f0': [1], + 'f1': ['watermelon'], + })) + + self._assert_table_equals(read_builder, pd.DataFrame({ + 'f0': pd.array([1, 2, 3], dtype='int32'), + 'f1': ['watermelon', 'banana', 'cherry'], + }), sort_by='f0') + + # dynamic overwrite partitions f0=1, f0=2; f0=3 untouched + self._batch_overwrite(table, pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['mango', 'grape'], + })) + + self._assert_table_equals(read_builder, pd.DataFrame({ + 'f0': pd.array([1, 2, 3], dtype='int32'), + 'f1': ['mango', 'grape', 'cherry'], + }), sort_by='f0') + + def _batch_write(self, table, df): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _batch_overwrite(self, table, df, partition=None): + write_builder = table.new_batch_write_builder().overwrite(partition) + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _assert_table_equals(self, read_builder, expected_df, sort_by=None): + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df = table_read.to_pandas(table_scan.plan().splits()) + if sort_by: + actual_df = actual_df.sort_values(by=sort_by) + pd.testing.assert_frame_equal( + actual_df.reset_index(drop=True), expected_df.reset_index(drop=True)) + def test_full_data_types(self): simple_pa_schema = pa.schema([ ('f0', pa.int8()), diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 657678f9eabc..45bf134e5bb5 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -163,6 +163,70 @@ def test_overwrite(self): pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def test_dynamic_partition_overwrite(self): + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0']) + self.catalog.create_table('default.test_dynamic_overwrite', schema, False) + table = self.catalog.get_table('default.test_dynamic_overwrite') + read_builder = table.new_read_builder() + + self._batch_write(table, pd.DataFrame({ + 'f0': [1, 2, 3], + 'f1': ['apple', 'banana', 'cherry'], + })) + + # dynamic overwrite partition f0=1 only; f0=2, f0=3 untouched + self._batch_overwrite(table, pd.DataFrame({ + 'f0': [1], + 'f1': ['watermelon'], + })) + + self._assert_table_equals(read_builder, pd.DataFrame({ + 'f0': pd.array([1, 2, 3], dtype='int32'), + 'f1': ['watermelon', 'banana', 'cherry'], + }), sort_by='f0') + + # dynamic overwrite partitions f0=1, f0=2; f0=3 untouched + self._batch_overwrite(table, pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['mango', 'grape'], + })) + + self._assert_table_equals(read_builder, pd.DataFrame({ + 'f0': pd.array([1, 2, 3], dtype='int32'), + 'f1': ['mango', 'grape', 'cherry'], + }), sort_by='f0') + + def _batch_write(self, table, df): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _batch_overwrite(self, table, df, partition=None): + write_builder = table.new_batch_write_builder().overwrite(partition) + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _assert_table_equals(self, read_builder, expected_df, sort_by=None): + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df = table_read.to_pandas(table_scan.plan().splits()) + if sort_by: + actual_df = actual_df.sort_values(by=sort_by) + pd.testing.assert_frame_equal( + actual_df.reset_index(drop=True), expected_df.reset_index(drop=True)) + def test_full_data_types(self): simple_pa_schema = pa.schema([ ('f0', pa.int8()), diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py index 455cc6cdf97f..385a43293b8c 100644 --- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py @@ -116,6 +116,70 @@ def test_overwrite(self): pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def test_dynamic_partition_overwrite(self): + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0']) + self.rest_catalog.create_table('default.test_dynamic_overwrite', schema, False) + table = self.rest_catalog.get_table('default.test_dynamic_overwrite') + read_builder = table.new_read_builder() + + self._batch_write(table, pd.DataFrame({ + 'f0': [1, 2, 3], + 'f1': ['apple', 'banana', 'cherry'], + })) + + # dynamic overwrite partition f0=1 only; f0=2, f0=3 untouched + self._batch_overwrite(table, pd.DataFrame({ + 'f0': [1], + 'f1': ['watermelon'], + })) + + self._assert_table_equals(read_builder, pd.DataFrame({ + 'f0': pd.array([1, 2, 3], dtype='int32'), + 'f1': ['watermelon', 'banana', 'cherry'], + }), sort_by='f0') + + # dynamic overwrite partitions f0=1, f0=2; f0=3 untouched + self._batch_overwrite(table, pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['mango', 'grape'], + })) + + self._assert_table_equals(read_builder, pd.DataFrame({ + 'f0': pd.array([1, 2, 3], dtype='int32'), + 'f1': ['mango', 'grape', 'cherry'], + }), sort_by='f0') + + def _batch_write(self, table, df): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _batch_overwrite(self, table, df, partition=None): + write_builder = table.new_batch_write_builder().overwrite(partition) + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _assert_table_equals(self, read_builder, expected_df, sort_by=None): + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df = table_read.to_pandas(table_scan.plan().splits()) + if sort_by: + actual_df = actual_df.sort_values(by=sort_by) + pd.testing.assert_frame_equal( + actual_df.reset_index(drop=True), expected_df.reset_index(drop=True)) + def test_parquet_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 832a39ba6887..8d7227dd5460 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -159,37 +159,33 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" - if not commit_messages: - return - logger.info( "Ready to overwrite to table %s, number of commit messages: %d", self.table.identifier, len(commit_messages), ) + skip_overwrite = False partition_filter = None - # sanity check, all changes must be done within the given partition, meanwhile build a partition filter - if len(overwrite_partition) > 0: - predicate_builder = PredicateBuilder(self.table.partition_keys_fields) - sub_predicates = [] - for key, value in overwrite_partition.items(): - sub_predicates.append(predicate_builder.equal(key, value)) - partition_filter = predicate_builder.and_predicates(sub_predicates) - - for msg in commit_messages: - row = OffsetRow(msg.partition, 0, len(msg.partition)) - if not partition_filter.test(row): - raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " - f"in {msg.partition} does not belong to this partition") - self._try_commit( - commit_kind="OVERWRITE", - commit_identifier=commit_identifier, - commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( - snapshot, partition_filter, commit_messages), - detect_conflicts=True, - allow_rollback=False, - ) + # Partition filter is built from dynamic or static partition according to options. + if len(self.table.partition_keys) > 0 and self.table.options.dynamic_partition_overwrite(): + if not commit_messages: + # In dynamic mode, if there are no changes to commit, no data will be deleted + skip_overwrite = True + else: + partition_filter = self._create_dynamic_partition_filter(commit_messages) + else: + partition_filter = self._create_static_partition_filter(overwrite_partition, commit_messages) + + if not skip_overwrite: + self._try_commit( + commit_kind="OVERWRITE", + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( + snapshot, partition_filter, commit_messages), + detect_conflicts=True, + allow_rollback=False, + ) def drop_partitions(self, partitions: List[Dict[str, str]], commit_identifier: int) -> None: if not partitions: @@ -518,6 +514,34 @@ def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, return True return False + def _create_dynamic_partition_filter(self, commit_messages: List[CommitMessage]): + """Build a partition filter from the unique partitions present in commit_messages.""" + predicate_builder = PredicateBuilder(self.table.partition_keys_fields) + predicates = [] + seen_partitions = set() + for msg in commit_messages: + partition_values = tuple(msg.partition) + if partition_values not in seen_partitions: + seen_partitions.add(partition_values) + equalities = [predicate_builder.equal(name, value) + for name, value in zip(self.table.partition_keys, msg.partition)] + predicates.append(predicate_builder.and_predicates(equalities)) + return predicate_builder.or_predicates(predicates) + + def _create_static_partition_filter(self, overwrite_partition, commit_messages: List[CommitMessage]): + """Build a partition filter from the explicit overwrite_partition spec.""" + if not overwrite_partition: + return None + predicate_builder = PredicateBuilder(self.table.partition_keys_fields) + equalities = [predicate_builder.equal(key, value) for key, value in overwrite_partition.items()] + partition_filter = predicate_builder.and_predicates(equalities) + for msg in commit_messages: + row = OffsetRow(msg.partition, 0, len(msg.partition)) + if not partition_filter.test(row): + raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " + f"in {msg.partition} does not belong to this partition") + return partition_filter + def _generate_overwrite_entries(self, latest_snapshot, partition_filter, commit_messages): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = []