Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions paimon-python/pypaimon/tests/sql_system_table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""End-to-end tests for Paimon system tables via pypaimon SQL.

Exercises the `<table>$<system_name>` syntax handled by paimon-rust
DataFusion integration. A non-partitioned table with one snapshot is
created in setUpClass and queried by each test.
"""

import json
import os
import tempfile
import unittest

import pyarrow as pa

from pypaimon_rust.datafusion import SQLContext


WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE")
TABLE_NAME = "sql_system_test_table"
ROW_COUNT = 3


class SQLSystemTableTest(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls._tmpdir = None
if WAREHOUSE:
cls.warehouse = WAREHOUSE
else:
cls._tmpdir = tempfile.TemporaryDirectory(prefix="paimon-sql-systest-")
cls.warehouse = cls._tmpdir.name

ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": cls.warehouse})
ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
ctx.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING)")
ctx.sql(
f"INSERT INTO {TABLE_NAME} VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')"
)
cls.ctx = ctx

@classmethod
def tearDownClass(cls):
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": cls.warehouse})
ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
if cls._tmpdir is not None:
cls._tmpdir.cleanup()

def _query(self, system_name: str) -> pa.Table:
batches = self.ctx.sql(f"SELECT * FROM {TABLE_NAME}${system_name}")
return pa.Table.from_batches(batches)

def test_options_system_table(self):
table = self._query("options")
self.assertListEqual(table.schema.names, ["key", "value"])

def test_schemas_system_table(self):
table = self._query("schemas")
self.assertListEqual(
table.schema.names,
["schema_id", "fields", "partition_keys", "primary_keys",
"options", "comment", "update_time"],
)
self.assertGreaterEqual(table.num_rows, 1, "should have at least one schema")
ids = table.column("schema_id").to_pylist()
self.assertEqual(sorted(ids), sorted(set(ids)), "schema_id should be unique")
fields = json.loads(table.column("fields").to_pylist()[0])
self.assertEqual([f["name"] for f in fields], ["id", "name"])

def test_snapshots_system_table(self):
table = self._query("snapshots")
names = table.schema.names
for required in (
"snapshot_id", "schema_id", "commit_user", "commit_identifier",
"commit_kind", "commit_time", "base_manifest_list",
"delta_manifest_list", "total_record_count",
):
self.assertIn(required, names)
self.assertEqual(table.num_rows, 1, "single batch write should produce one snapshot")
self.assertEqual(table.column("total_record_count").to_pylist()[0], ROW_COUNT)

def test_tags_system_table_empty(self):
table = self._query("tags")
self.assertListEqual(
table.schema.names,
["tag_name", "snapshot_id", "schema_id", "commit_time",
"record_count", "create_time", "time_retained"],
)
self.assertEqual(table.num_rows, 0, "no tags created")

def test_branches_system_table_empty(self):
table = self._query("branches")
self.assertListEqual(table.schema.names, ["branch_name", "create_time"])
non_main = [b for b in table.column("branch_name").to_pylist() if b != "main"]
self.assertEqual(non_main, [], "no user-created branches")

def test_manifests_system_table(self):
table = self._query("manifests")
for required in (
"file_name", "file_size", "num_added_files",
"num_deleted_files", "schema_id",
):
self.assertIn(required, table.schema.names)
self.assertGreaterEqual(table.num_rows, 1, "snapshot should have manifests")
for size in table.column("file_size").to_pylist():
self.assertGreater(size, 0)
total_added = sum(table.column("num_added_files").to_pylist())
self.assertGreaterEqual(total_added, 1, "single write should add at least one file")


if __name__ == "__main__":
unittest.main()
Loading