Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 332 additions & 0 deletions python/tests/test_tsfile_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1649,3 +1649,335 @@ def test_dataset_tree_model_omits_non_numeric_measurements(tmp_path):
np.testing.assert_array_equal(
tsdf["root.a.b.temp"][:], np.array([0.5, 1.5, 2.5])
)


# ---------------------------------------------------------------------------
# Metadata index cache
# ---------------------------------------------------------------------------

from tsfile.dataset import index_cache
from tsfile.dataset.index_cache import CACHE_FILENAME


def _write_weather_dir(dir_path):
"""Two table-model shards holding distinct devices -> 4 logical series."""
_write_weather_rows_file(
dir_path / "part1.tsfile",
{
"time": [0, 1, 2],
"device": ["device_a", "device_a", "device_a"],
"temperature": [20.0, 21.5, 23.0],
"humidity": [50.0, 52.0, 55.0],
},
)
_write_weather_rows_file(
dir_path / "part2.tsfile",
{
"time": [0, 1, 2],
"device": ["device_b", "device_b", "device_b"],
"temperature": [10.0, 11.5, 13.0],
"humidity": [40.0, 42.0, 45.0],
},
)


def _read_all_series(tsdf):
"""Snapshot every series' timestamps + values for fresh-vs-cached comparison."""
snapshot = {}
for name in sorted(map(str, tsdf.list_timeseries())):
series = tsdf[name]
snapshot[name] = (series.timestamps.copy(), series[:].copy())
return snapshot


def _assert_snapshots_equal(a, b):
assert sorted(a.keys()) == sorted(b.keys())
for name in a:
np.testing.assert_array_equal(a[name][0], b[name][0])
np.testing.assert_array_equal(a[name][1], b[name][1])


def test_index_cache_created_on_first_load(tmp_path):
_write_weather_dir(tmp_path)
cache_file = tmp_path / CACHE_FILENAME
assert not cache_file.exists()

with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
assert len(tsdf) == 4

assert cache_file.exists()
# File starts with the format magic.
assert cache_file.read_bytes()[:8] == b"TSFIDX01"


def test_index_cache_second_load_matches_fresh(tmp_path):
_write_weather_dir(tmp_path)

with TsFileDataFrame(str(tmp_path), show_progress=False) as fresh:
fresh_series = sorted(map(str, fresh.list_timeseries()))
fresh_len = len(fresh)
fresh_reads = _read_all_series(fresh)
fresh_meta = fresh.list_timeseries_metadata()

with TsFileDataFrame(str(tmp_path), show_progress=False) as cached:
assert sorted(map(str, cached.list_timeseries())) == fresh_series
assert len(cached) == fresh_len
_assert_snapshots_equal(fresh_reads, _read_all_series(cached))
pd.testing.assert_frame_equal(cached.list_timeseries_metadata(), fresh_meta)


def test_index_cache_hit_skips_metadata_walk(tmp_path, monkeypatch):
_write_weather_dir(tmp_path)

# First load populates the cache (walk runs normally here).
with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
expected = sorted(map(str, tsdf.list_timeseries()))

# On a cache hit the expensive metadata walk must never run.
def boom(self):
raise AssertionError("metadata walk should be skipped on a cache hit")

monkeypatch.setattr(TsFileSeriesReader, "_cache_metadata", boom)
with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
assert sorted(map(str, tsdf.list_timeseries())) == expected


def test_use_cache_false_bypasses_read_and_write(tmp_path, monkeypatch):
_write_weather_dir(tmp_path)
cache_file = tmp_path / CACHE_FILENAME

# use_cache=False on a fresh dir: builds normally, writes no cache.
with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as tsdf:
assert len(tsdf) == 4
assert not cache_file.exists()

# Populate a cache with a default load, then confirm use_cache=False ignores
# it: the metadata walk runs and the cache bytes are left untouched.
with TsFileDataFrame(str(tmp_path), show_progress=False):
pass
assert cache_file.exists()
cache_bytes = cache_file.read_bytes()

calls = {"n": 0}
original = TsFileSeriesReader._cache_metadata

def counting(self):
calls["n"] += 1
return original(self)

monkeypatch.setattr(TsFileSeriesReader, "_cache_metadata", counting)
with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as tsdf:
assert len(tsdf) == 4
assert calls["n"] == 2 # one walk per shard -> cache was ignored
assert cache_file.read_bytes() == cache_bytes # unchanged


def test_index_cache_correctness_table_model(tmp_path):
_write_weather_dir(tmp_path)

with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as fresh:
fresh_reads = _read_all_series(fresh)
# Aligned read across the union window.
aligned_fresh = fresh.loc[:, list(map(str, fresh.list_timeseries()))]

# The fresh construction above (use_cache=False) wrote no cache; build one.
with TsFileDataFrame(str(tmp_path), show_progress=False):
pass
with TsFileDataFrame(str(tmp_path), show_progress=False) as cached:
_assert_snapshots_equal(fresh_reads, _read_all_series(cached))
aligned_cached = cached.loc[:, list(map(str, cached.list_timeseries()))]
np.testing.assert_array_equal(
aligned_fresh.timestamps, aligned_cached.timestamps
)
np.testing.assert_array_equal(aligned_fresh.values, aligned_cached.values)


def test_index_cache_correctness_tree_model(tmp_path):
_write_tree_file(tmp_path / "tree.tsfile")

with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as fresh:
fresh_series = sorted(map(str, fresh.list_timeseries()))
fresh_reads = _read_all_series(fresh)
assert fresh.model == "tree"

# Build the cache, then reload from it.
with TsFileDataFrame(str(tmp_path), show_progress=False):
pass
with TsFileDataFrame(str(tmp_path), show_progress=False) as cached:
assert cached.model == "tree"
assert sorted(map(str, cached.list_timeseries())) == fresh_series
assert "_col_1" in repr(cached)
_assert_snapshots_equal(fresh_reads, _read_all_series(cached))


def test_index_cache_single_file_not_cached(tmp_path):
path = tmp_path / "solo.tsfile"
_write_weather_file(path, 0)
assert index_cache.resolve_cache_path(str(path)) is None

with TsFileDataFrame(str(path), show_progress=False) as tsdf:
assert len(tsdf) == 2
# No fixed-name cache is written next to a single file.
assert not (tmp_path / CACHE_FILENAME).exists()


def test_index_cache_list_input_not_cached(tmp_path):
path1 = tmp_path / "part1.tsfile"
path2 = tmp_path / "part2.tsfile"
_write_weather_rows_file(
path1,
{
"time": [0, 1],
"device": ["device_a", "device_a"],
"temperature": [1.0, 2.0],
"humidity": [3.0, 4.0],
},
)
_write_weather_rows_file(
path2,
{
"time": [0, 1],
"device": ["device_b", "device_b"],
"temperature": [5.0, 6.0],
"humidity": [7.0, 8.0],
},
)
assert index_cache.resolve_cache_path([str(path1), str(path2)]) is None

with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as tsdf:
assert len(tsdf) == 4
assert not (tmp_path / CACHE_FILENAME).exists()


def test_index_cache_stale_file_set_falls_back(tmp_path):
_write_weather_dir(tmp_path)
with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
assert len(tsdf) == 4 # writes a 2-file cache

# Add a third shard with a distinct device: the resolved file set no longer
# matches the cache.
_write_weather_rows_file(
tmp_path / "part3.tsfile",
{
"time": [0, 1, 2],
"device": ["device_c", "device_c", "device_c"],
"temperature": [1.0, 2.0, 3.0],
"humidity": [4.0, 5.0, 6.0],
},
)
with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
assert len(tsdf) == 6 # fell back to a fresh build over all 3 files

# Cache was rewritten; a pure cache hit now covers all three files.
def boom(self):
raise AssertionError("should be a cache hit")

import tsfile.dataset.reader as reader_module

original = reader_module.TsFileSeriesReader._cache_metadata
reader_module.TsFileSeriesReader._cache_metadata = boom
try:
with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
assert len(tsdf) == 6
finally:
reader_module.TsFileSeriesReader._cache_metadata = original


def test_index_cache_corrupt_file_rebuilds(tmp_path):
_write_weather_dir(tmp_path)
with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
expected = sorted(map(str, tsdf.list_timeseries()))

# Corrupt the cache; the next load must swallow the failure and rebuild.
cache_file = tmp_path / CACHE_FILENAME
cache_file.write_bytes(b"not a valid index cache")

with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf:
assert sorted(map(str, tsdf.list_timeseries())) == expected
# Rebuilt to valid content.
assert cache_file.read_bytes()[:8] == b"TSFIDX01"


def test_index_cache_roundtrip_null_and_typed_tags(tmp_path):
# Mirror the nullable-tag device layout, but under a directory so the cache
# is exercised. Interior-null and trailing-null devices must survive.
schema = TableSchema(
"weather",
[
ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
null_region = pd.DataFrame(
{
"time": [0, 1],
"region": [None, None],
"device": ["alpha", "alpha"],
"temperature": [1.0, 2.0],
}
)
null_device = pd.DataFrame(
{
"time": [0, 1],
"region": ["north", "north"],
"device": [None, None],
"temperature": [3.0, 4.0],
}
)
with TsFileTableWriter(str(tmp_path / "tags.tsfile"), schema) as writer:
writer.write_dataframe(null_region)
writer.write_dataframe(null_device)

with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as fresh:
fresh_series = sorted(map(str, fresh.list_timeseries()))
fresh_reads = _read_all_series(fresh)

with TsFileDataFrame(str(tmp_path), show_progress=False):
pass # build the cache
with TsFileDataFrame(str(tmp_path), show_progress=False) as cached:
assert sorted(map(str, cached.list_timeseries())) == fresh_series
_assert_snapshots_equal(fresh_reads, _read_all_series(cached))


def test_index_cache_save_load_unit_roundtrip(tmp_path):
catalog = MetadataCatalog()
t0 = catalog.add_table(
"weather",
["region", "device"],
[TSDataType.STRING, TSDataType.INT32],
["temperature", "humidity"],
)
d0 = catalog.add_device(t0, ("north", 7), 0, 10)
d1 = catalog.add_device(t0, (None, 9), 5, 15) # interior-null tag
catalog.series_stats_by_ref[(d0, 0)] = SeriesStats(3, 0, 10, 3, 0, 10)
catalog.series_stats_by_ref[(d0, 1)] = SeriesStats(2, 0, 5, 2, 0, 5)
catalog.series_stats_by_ref[(d1, 0)] = SeriesStats(4, 5, 15, 4, 5, 15)

cache_path = str(tmp_path / "unit.tsfidx")
index_cache.save_catalogs(cache_path, ["/abs/a.tsfile"], [catalog])
paths, catalogs = index_cache.load_catalogs(cache_path)

assert paths == ["/abs/a.tsfile"]
restored = catalogs[0]

# Table entries incl. TSDataType tag types and rebuilt field index.
assert [e.table_name for e in restored.table_entries] == ["weather"]
entry = restored.table_entries[0]
assert entry.tag_types == (TSDataType.STRING, TSDataType.INT32)
assert all(isinstance(t, TSDataType) for t in entry.tag_types)
assert entry.get_field_index("humidity") == 1

# Device entries incl. interior-null tag and normalization state.
assert [e.tag_values for e in restored.device_entries] == [("north", 7), (None, 9)]

# Derived lookups rebuilt.
assert restored.table_id_by_name == {"weather": 0}
assert restored.device_id_by_key == {
(0, ("north", 7)): 0,
(0, (None, 9)): 1,
}

# Stats: same keys, same insertion order, same values.
assert list(restored.series_stats_by_ref.keys()) == [(0, 0), (0, 1), (1, 0)]
assert restored.series_stats_by_ref[(1, 0)] == SeriesStats(4, 5, 15, 4, 5, 15)
Loading
Loading