From 8f744fb4ff03cb84bd0c27d0df75bb75b30870bf Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 2 Jul 2026 19:56:34 +0800 Subject: [PATCH] [Python] Add metadata index cache for TsFileDataFrame Persist each shard's MetadataCatalog to a fixed-name index file in the dataset directory so repeated loads skip the expensive native metadata walk. A new use_cache flag (default True) enables it only when a single directory is passed; single-file and list inputs are unchanged. The cache is binary: a pickled sidecar for the small table/device tables plus one numpy int64 structured array per shard for the bulk series stats. Writes are atomic (temp + os.replace); load falls back to a fresh build on a bad magic/version or a changed file set. Source files are not validated, per design. --- python/tests/test_tsfile_dataset.py | 332 +++++++++++++++++++++++++++ python/tsfile/dataset/dataframe.py | 81 ++++++- python/tsfile/dataset/index_cache.py | 255 ++++++++++++++++++++ python/tsfile/dataset/reader.py | 35 ++- 4 files changed, 695 insertions(+), 8 deletions(-) create mode 100644 python/tsfile/dataset/index_cache.py diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index aa28a18f5..e27b0f69f 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -1649,3 +1649,335 @@ def test_dataset_tree_model_omits_non_numeric_measurements(tmp_path): np.testing.assert_array_equal( tsdf["root.a.b.temp"][:], np.array([0.5, 1.5, 2.5]) ) + + +# --------------------------------------------------------------------------- +# Metadata index cache +# --------------------------------------------------------------------------- + +from tsfile.dataset import index_cache +from tsfile.dataset.index_cache import CACHE_FILENAME + + +def _write_weather_dir(dir_path): + """Two table-model shards holding distinct devices -> 4 logical series.""" + _write_weather_rows_file( + dir_path / "part1.tsfile", + { + "time": [0, 1, 2], + "device": ["device_a", "device_a", "device_a"], + "temperature": [20.0, 21.5, 23.0], + "humidity": [50.0, 52.0, 55.0], + }, + ) + _write_weather_rows_file( + dir_path / "part2.tsfile", + { + "time": [0, 1, 2], + "device": ["device_b", "device_b", "device_b"], + "temperature": [10.0, 11.5, 13.0], + "humidity": [40.0, 42.0, 45.0], + }, + ) + + +def _read_all_series(tsdf): + """Snapshot every series' timestamps + values for fresh-vs-cached comparison.""" + snapshot = {} + for name in sorted(map(str, tsdf.list_timeseries())): + series = tsdf[name] + snapshot[name] = (series.timestamps.copy(), series[:].copy()) + return snapshot + + +def _assert_snapshots_equal(a, b): + assert sorted(a.keys()) == sorted(b.keys()) + for name in a: + np.testing.assert_array_equal(a[name][0], b[name][0]) + np.testing.assert_array_equal(a[name][1], b[name][1]) + + +def test_index_cache_created_on_first_load(tmp_path): + _write_weather_dir(tmp_path) + cache_file = tmp_path / CACHE_FILENAME + assert not cache_file.exists() + + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + assert len(tsdf) == 4 + + assert cache_file.exists() + # File starts with the format magic. + assert cache_file.read_bytes()[:8] == b"TSFIDX01" + + +def test_index_cache_second_load_matches_fresh(tmp_path): + _write_weather_dir(tmp_path) + + with TsFileDataFrame(str(tmp_path), show_progress=False) as fresh: + fresh_series = sorted(map(str, fresh.list_timeseries())) + fresh_len = len(fresh) + fresh_reads = _read_all_series(fresh) + fresh_meta = fresh.list_timeseries_metadata() + + with TsFileDataFrame(str(tmp_path), show_progress=False) as cached: + assert sorted(map(str, cached.list_timeseries())) == fresh_series + assert len(cached) == fresh_len + _assert_snapshots_equal(fresh_reads, _read_all_series(cached)) + pd.testing.assert_frame_equal(cached.list_timeseries_metadata(), fresh_meta) + + +def test_index_cache_hit_skips_metadata_walk(tmp_path, monkeypatch): + _write_weather_dir(tmp_path) + + # First load populates the cache (walk runs normally here). + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + expected = sorted(map(str, tsdf.list_timeseries())) + + # On a cache hit the expensive metadata walk must never run. + def boom(self): + raise AssertionError("metadata walk should be skipped on a cache hit") + + monkeypatch.setattr(TsFileSeriesReader, "_cache_metadata", boom) + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + assert sorted(map(str, tsdf.list_timeseries())) == expected + + +def test_use_cache_false_bypasses_read_and_write(tmp_path, monkeypatch): + _write_weather_dir(tmp_path) + cache_file = tmp_path / CACHE_FILENAME + + # use_cache=False on a fresh dir: builds normally, writes no cache. + with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as tsdf: + assert len(tsdf) == 4 + assert not cache_file.exists() + + # Populate a cache with a default load, then confirm use_cache=False ignores + # it: the metadata walk runs and the cache bytes are left untouched. + with TsFileDataFrame(str(tmp_path), show_progress=False): + pass + assert cache_file.exists() + cache_bytes = cache_file.read_bytes() + + calls = {"n": 0} + original = TsFileSeriesReader._cache_metadata + + def counting(self): + calls["n"] += 1 + return original(self) + + monkeypatch.setattr(TsFileSeriesReader, "_cache_metadata", counting) + with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as tsdf: + assert len(tsdf) == 4 + assert calls["n"] == 2 # one walk per shard -> cache was ignored + assert cache_file.read_bytes() == cache_bytes # unchanged + + +def test_index_cache_correctness_table_model(tmp_path): + _write_weather_dir(tmp_path) + + with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as fresh: + fresh_reads = _read_all_series(fresh) + # Aligned read across the union window. + aligned_fresh = fresh.loc[:, list(map(str, fresh.list_timeseries()))] + + # The fresh construction above (use_cache=False) wrote no cache; build one. + with TsFileDataFrame(str(tmp_path), show_progress=False): + pass + with TsFileDataFrame(str(tmp_path), show_progress=False) as cached: + _assert_snapshots_equal(fresh_reads, _read_all_series(cached)) + aligned_cached = cached.loc[:, list(map(str, cached.list_timeseries()))] + np.testing.assert_array_equal( + aligned_fresh.timestamps, aligned_cached.timestamps + ) + np.testing.assert_array_equal(aligned_fresh.values, aligned_cached.values) + + +def test_index_cache_correctness_tree_model(tmp_path): + _write_tree_file(tmp_path / "tree.tsfile") + + with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as fresh: + fresh_series = sorted(map(str, fresh.list_timeseries())) + fresh_reads = _read_all_series(fresh) + assert fresh.model == "tree" + + # Build the cache, then reload from it. + with TsFileDataFrame(str(tmp_path), show_progress=False): + pass + with TsFileDataFrame(str(tmp_path), show_progress=False) as cached: + assert cached.model == "tree" + assert sorted(map(str, cached.list_timeseries())) == fresh_series + assert "_col_1" in repr(cached) + _assert_snapshots_equal(fresh_reads, _read_all_series(cached)) + + +def test_index_cache_single_file_not_cached(tmp_path): + path = tmp_path / "solo.tsfile" + _write_weather_file(path, 0) + assert index_cache.resolve_cache_path(str(path)) is None + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + assert len(tsdf) == 2 + # No fixed-name cache is written next to a single file. + assert not (tmp_path / CACHE_FILENAME).exists() + + +def test_index_cache_list_input_not_cached(tmp_path): + path1 = tmp_path / "part1.tsfile" + path2 = tmp_path / "part2.tsfile" + _write_weather_rows_file( + path1, + { + "time": [0, 1], + "device": ["device_a", "device_a"], + "temperature": [1.0, 2.0], + "humidity": [3.0, 4.0], + }, + ) + _write_weather_rows_file( + path2, + { + "time": [0, 1], + "device": ["device_b", "device_b"], + "temperature": [5.0, 6.0], + "humidity": [7.0, 8.0], + }, + ) + assert index_cache.resolve_cache_path([str(path1), str(path2)]) is None + + with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as tsdf: + assert len(tsdf) == 4 + assert not (tmp_path / CACHE_FILENAME).exists() + + +def test_index_cache_stale_file_set_falls_back(tmp_path): + _write_weather_dir(tmp_path) + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + assert len(tsdf) == 4 # writes a 2-file cache + + # Add a third shard with a distinct device: the resolved file set no longer + # matches the cache. + _write_weather_rows_file( + tmp_path / "part3.tsfile", + { + "time": [0, 1, 2], + "device": ["device_c", "device_c", "device_c"], + "temperature": [1.0, 2.0, 3.0], + "humidity": [4.0, 5.0, 6.0], + }, + ) + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + assert len(tsdf) == 6 # fell back to a fresh build over all 3 files + + # Cache was rewritten; a pure cache hit now covers all three files. + def boom(self): + raise AssertionError("should be a cache hit") + + import tsfile.dataset.reader as reader_module + + original = reader_module.TsFileSeriesReader._cache_metadata + reader_module.TsFileSeriesReader._cache_metadata = boom + try: + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + assert len(tsdf) == 6 + finally: + reader_module.TsFileSeriesReader._cache_metadata = original + + +def test_index_cache_corrupt_file_rebuilds(tmp_path): + _write_weather_dir(tmp_path) + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + expected = sorted(map(str, tsdf.list_timeseries())) + + # Corrupt the cache; the next load must swallow the failure and rebuild. + cache_file = tmp_path / CACHE_FILENAME + cache_file.write_bytes(b"not a valid index cache") + + with TsFileDataFrame(str(tmp_path), show_progress=False) as tsdf: + assert sorted(map(str, tsdf.list_timeseries())) == expected + # Rebuilt to valid content. + assert cache_file.read_bytes()[:8] == b"TSFIDX01" + + +def test_index_cache_roundtrip_null_and_typed_tags(tmp_path): + # Mirror the nullable-tag device layout, but under a directory so the cache + # is exercised. Interior-null and trailing-null devices must survive. + schema = TableSchema( + "weather", + [ + ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + null_region = pd.DataFrame( + { + "time": [0, 1], + "region": [None, None], + "device": ["alpha", "alpha"], + "temperature": [1.0, 2.0], + } + ) + null_device = pd.DataFrame( + { + "time": [0, 1], + "region": ["north", "north"], + "device": [None, None], + "temperature": [3.0, 4.0], + } + ) + with TsFileTableWriter(str(tmp_path / "tags.tsfile"), schema) as writer: + writer.write_dataframe(null_region) + writer.write_dataframe(null_device) + + with TsFileDataFrame(str(tmp_path), show_progress=False, use_cache=False) as fresh: + fresh_series = sorted(map(str, fresh.list_timeseries())) + fresh_reads = _read_all_series(fresh) + + with TsFileDataFrame(str(tmp_path), show_progress=False): + pass # build the cache + with TsFileDataFrame(str(tmp_path), show_progress=False) as cached: + assert sorted(map(str, cached.list_timeseries())) == fresh_series + _assert_snapshots_equal(fresh_reads, _read_all_series(cached)) + + +def test_index_cache_save_load_unit_roundtrip(tmp_path): + catalog = MetadataCatalog() + t0 = catalog.add_table( + "weather", + ["region", "device"], + [TSDataType.STRING, TSDataType.INT32], + ["temperature", "humidity"], + ) + d0 = catalog.add_device(t0, ("north", 7), 0, 10) + d1 = catalog.add_device(t0, (None, 9), 5, 15) # interior-null tag + catalog.series_stats_by_ref[(d0, 0)] = SeriesStats(3, 0, 10, 3, 0, 10) + catalog.series_stats_by_ref[(d0, 1)] = SeriesStats(2, 0, 5, 2, 0, 5) + catalog.series_stats_by_ref[(d1, 0)] = SeriesStats(4, 5, 15, 4, 5, 15) + + cache_path = str(tmp_path / "unit.tsfidx") + index_cache.save_catalogs(cache_path, ["/abs/a.tsfile"], [catalog]) + paths, catalogs = index_cache.load_catalogs(cache_path) + + assert paths == ["/abs/a.tsfile"] + restored = catalogs[0] + + # Table entries incl. TSDataType tag types and rebuilt field index. + assert [e.table_name for e in restored.table_entries] == ["weather"] + entry = restored.table_entries[0] + assert entry.tag_types == (TSDataType.STRING, TSDataType.INT32) + assert all(isinstance(t, TSDataType) for t in entry.tag_types) + assert entry.get_field_index("humidity") == 1 + + # Device entries incl. interior-null tag and normalization state. + assert [e.tag_values for e in restored.device_entries] == [("north", 7), (None, 9)] + + # Derived lookups rebuilt. + assert restored.table_id_by_name == {"weather": 0} + assert restored.device_id_by_key == { + (0, ("north", 7)): 0, + (0, (None, 9)): 1, + } + + # Stats: same keys, same insertion order, same values. + assert list(restored.series_stats_by_ref.keys()) == [(0, 0), (0, 1), (1, 0)] + assert restored.series_stats_by_ref[(1, 0)] == SeriesStats(4, 5, 15, 4, 5, 15) diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index a65b231bc..7da468548 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -28,6 +28,7 @@ import numpy as np +from . import index_cache from .formatting import format_dataframe_table from .metadata import ( MODEL_TABLE, @@ -620,9 +621,19 @@ def __getitem__(self, key) -> AlignedTimeseries: class TsFileDataFrame: """Lazy-loaded unified numeric dataset view over multiple TsFile shards.""" - def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): + def __init__( + self, + paths: Union[str, List[str]], + show_progress: bool = True, + use_cache: bool = True, + ): self._paths = _expand_paths(paths) self._show_progress = show_progress + self._use_cache = use_cache + # Resolve from the ORIGINAL paths arg (before expansion): the on-disk + # cache is only enabled when a single directory is passed. None means + # "no cache location" -- single-file / list inputs behave as before. + self._cache_path = index_cache.resolve_cache_path(paths) if use_cache else None self._readers: Dict[str, object] = {} self._index = _DataFrameCatalog() self._is_view = False @@ -669,14 +680,74 @@ def _load_metadata(self): """Build the logical cross-file index and the derived per-series caches.""" from .reader import TsFileSeriesReader - if len(self._paths) >= 2: - self._load_metadata_parallel(TsFileSeriesReader) - else: - self._load_metadata_serial(TsFileSeriesReader) + loaded_from_cache = False + if self._use_cache and self._cache_path and os.path.exists(self._cache_path): + loaded_from_cache = self._load_metadata_from_cache(TsFileSeriesReader) + + if not loaded_from_cache: + if len(self._paths) >= 2: + self._load_metadata_parallel(TsFileSeriesReader) + else: + self._load_metadata_serial(TsFileSeriesReader) if not self._index.series: raise ValueError("No valid time series found in the provided TsFile files") + # Persist the freshly built catalogs so the next load skips the walk. + # Only after a fresh build (never re-writing a cache we just read) and + # only when non-empty (the empty check above already guarded this). + if not loaded_from_cache and self._use_cache and self._cache_path: + self._write_index_cache() + + def _load_metadata_from_cache(self, reader_class) -> bool: + """Rebuild readers + index from the on-disk cache; False falls back to a fresh build.""" + try: + cached_paths, catalogs = index_cache.load_catalogs(self._cache_path) + except Exception: + # Corrupt / old / unreadable cache -> rebuild from source. + return False + + # Source files are not validated (per design), but the resolved file + # SET must match: cached device/series indices are positional, so a + # changed set (file added/removed) would misalign. Rebuild if so. + if cached_paths != self._paths: + return False + + catalog_by_path = dict(zip(cached_paths, catalogs)) + total = len(self._paths) + self._show_loading_progress(0, total) + # Iterate in self._paths order so _register_reader replays the exact + # same merge order as a fresh build -> identical series/device order. + for index, file_path in enumerate(self._paths, start=1): + _register_reader( + self._readers, + self._index, + file_path, + reader_class.from_cached_catalog( + file_path, catalog_by_path[file_path], show_progress=False + ), + ) + self._show_loading_progress(index, total) + + self._show_loading_progress( + total, total, sum(reader.series_count for reader in self._readers.values()) + ) + return True + + def _write_index_cache(self): + """Write per-file catalogs to the cache; best-effort (warn on failure).""" + # Emit catalogs in self._paths order so the reload replays merges + # identically. self._readers is keyed by file_path for both build paths. + catalogs = [self._readers[path].catalog for path in self._paths] + try: + index_cache.save_catalogs(self._cache_path, list(self._paths), catalogs) + except Exception as e: + warnings.warn( + f"Failed to write TsFile index cache: {e}", + RuntimeWarning, + stacklevel=2, + ) + def _show_loading_progress(self, done: int, total: int, total_series: int = None): if not self._show_progress or total <= 0: return diff --git a/python/tsfile/dataset/index_cache.py b/python/tsfile/dataset/index_cache.py new file mode 100644 index 000000000..bfd717be0 --- /dev/null +++ b/python/tsfile/dataset/index_cache.py @@ -0,0 +1,255 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""On-disk cache for TsFileDataFrame's per-file metadata catalogs. + +Building a :class:`~tsfile.dataset.metadata.MetadataCatalog` requires a full +native metadata walk (``get_timeseries_metadata``) whose cost scales with the +number of series -- slow and memory-hungry for datasets with many series that +are reloaded repeatedly (e.g. across training runs). The catalog itself is pure +metadata (strings + ints), so it serializes into a small file. + +This module persists the *per-file* catalogs (one ``MetadataCatalog`` per +shard, in load order). On reload the dataframe reopens each file cheaply and +restores its catalog from here, skipping the walk, then replays the same +deterministic cross-file merge (``_register_reader``) to rebuild its global +view identically. + +File layout (single file, written atomically):: + + [ 8 bytes ] magic b"TSFIDX01" + [ 4 bytes ] uint32 header_len (little-endian) + [ header_len bytes ] pickled header dict (the string "sidecar") + [ 4 bytes ] uint32 n_stats_arrays + repeat n_stats_arrays times: + [ .npy blob ] one int64 structured array (np.save framing) + +The bulk ``series_stats_by_ref`` (six int64 per series, can be millions of +rows) goes into a compact numpy structured array per catalog; the comparatively +tiny table/device tables go into the pickled header. Derived catalog fields +(``table_id_by_name``, ``device_id_by_key``, ``TableEntry._field_index_by_name``) +are never stored -- they are rebuilt by replaying ``add_table`` / ``add_device`` +on load. +""" + +import os +import pickle +import struct +from typing import List, Optional, Tuple + +import numpy as np + +from ..constants import TSDataType +from .metadata import MetadataCatalog, SeriesStats + +# Bump when the on-disk layout changes; load_catalogs rejects mismatches so a +# stale cache is transparently rebuilt rather than mis-parsed. +CACHE_VERSION = 1 + +# Fixed cache file name written at the "dataset top" (the directory passed to +# TsFileDataFrame). The extension is deliberately NOT ".tsfile" so the +# directory walk in dataframe._expand_paths (which collects "*.tsfile") never +# picks the cache up as a data shard. Do NOT rename this to end in ".tsfile". +CACHE_FILENAME = ".tsfile_dataframe_index.tsfidx" + +_MAGIC = b"TSFIDX01" + +# One row per (device_id, field_idx) series: the two-int key followed by the +# six SeriesStats ints, all int64. +_STATS_DTYPE = np.dtype( + [ + ("device_id", " Optional[str]: + """Resolve where the fixed-name index cache lives for this ``paths`` input. + + The cache is only enabled when ``paths`` is a single directory string -- the + unambiguous "dataset top". Single-file and list inputs return ``None`` (no + caching; behavior is unchanged for those callers), because there is no + single stable directory to key the cache to. + + ``paths_arg`` must be the ORIGINAL argument passed to + ``TsFileDataFrame.__init__`` (before path expansion), since the rule keys + off whether the user passed one directory. + """ + if isinstance(paths_arg, str) and os.path.isdir(paths_arg): + return os.path.join(os.path.abspath(paths_arg), CACHE_FILENAME) + return None + + +def _catalog_to_header(catalog: MetadataCatalog) -> dict: + """Serialize a catalog's string tables (everything except bulk stats).""" + tables = [ + { + "name": entry.table_name, + "tag_columns": list(entry.tag_columns), + "tag_types": [int(dtype) for dtype in entry.tag_types], + "field_columns": list(entry.field_columns), + } + for entry in catalog.table_entries + ] + devices = [ + { + "table_id": entry.table_id, + "tag_values": list(entry.tag_values), + "min_time": entry.min_time, + "max_time": entry.max_time, + } + for entry in catalog.device_entries + ] + return {"tables": tables, "devices": devices} + + +def _catalog_to_stats_array(catalog: MetadataCatalog) -> np.ndarray: + """Pack ``series_stats_by_ref`` into a structured array in insertion order. + + Insertion order is preserved because ``_register_reader`` and + ``iter_series_paths`` iterate ``series_stats_by_ref`` to assign the global + series order; the load path re-inserts rows in the same order. + """ + items = catalog.series_stats_by_ref + array = np.empty(len(items), dtype=_STATS_DTYPE) + for row, ((device_id, field_idx), stats) in zip(array, items.items()): + row["device_id"] = device_id + row["field_idx"] = field_idx + row["length"] = stats.length + row["min_time"] = stats.min_time + row["max_time"] = stats.max_time + row["timeline_length"] = stats.timeline_length + row["timeline_min_time"] = stats.timeline_min_time + row["timeline_max_time"] = stats.timeline_max_time + return array + + +def save_catalogs( + cache_path: str, file_paths: List[str], catalogs: List[MetadataCatalog] +) -> None: + """Persist per-file catalogs to ``cache_path`` atomically. + + ``file_paths`` and ``catalogs`` are parallel lists in dataframe load order; + the reload replays them in this order so the merged view is rebuilt + identically. Writes a temp file in the same directory then ``os.replace`` so + a concurrent reader never sees a torn file. + """ + header = { + "version": CACHE_VERSION, + "file_paths": list(file_paths), + "catalogs": [_catalog_to_header(catalog) for catalog in catalogs], + } + stats_arrays = [_catalog_to_stats_array(catalog) for catalog in catalogs] + + header_bytes = pickle.dumps(header, protocol=pickle.HIGHEST_PROTOCOL) + tmp_path = f"{cache_path}.tmp.{os.getpid()}" + with open(tmp_path, "wb") as fh: + fh.write(_MAGIC) + fh.write(struct.pack(" MetadataCatalog: + """Rebuild a catalog by replaying the public mutators (rebuilds derived state).""" + catalog = MetadataCatalog() + for table in header_catalog["tables"]: + catalog.add_table( + table["name"], + table["tag_columns"], + [TSDataType(value) for value in table["tag_types"]], + table["field_columns"], + ) + for device in header_catalog["devices"]: + catalog.add_device( + device["table_id"], + tuple(device["tag_values"]), + device["min_time"], + device["max_time"], + ) + # Set stats directly (no public adder), preserving the stored row order. + for row in stats_array: + catalog.series_stats_by_ref[(int(row["device_id"]), int(row["field_idx"]))] = ( + SeriesStats( + length=int(row["length"]), + min_time=int(row["min_time"]), + max_time=int(row["max_time"]), + timeline_length=int(row["timeline_length"]), + timeline_min_time=int(row["timeline_min_time"]), + timeline_max_time=int(row["timeline_max_time"]), + ) + ) + return catalog + + +def load_catalogs(cache_path: str) -> Tuple[List[str], List[MetadataCatalog]]: + """Load per-file paths and catalogs from ``cache_path``. + + Raises :class:`IndexCacheError` on a bad magic, version mismatch, or any + structural corruption so the caller can fall back to a fresh build. + """ + try: + with open(cache_path, "rb") as fh: + magic = fh.read(len(_MAGIC)) + if magic != _MAGIC: + raise IndexCacheError(f"Bad index cache magic: {magic!r}") + (header_len,) = struct.unpack(" None: + """Open the native reader and probe the file model (the cheap half). + + This is everything ``__init__`` does except the expensive + ``_cache_metadata`` metadata walk, so both the normal constructor and + the cache-backed ``from_cached_catalog`` share it. + """ if not os.path.exists(file_path): raise FileNotFoundError(f"TsFile not found: {file_path}") self.file_path = file_path - self.show_progress = show_progress try: self._reader = TsFileReaderPy(file_path) @@ -106,8 +118,25 @@ def __init__(self, file_path: str, show_progress: bool = True): self._table_schemas = self._reader.get_all_table_schemas() self._model_kind: str = MODEL_TREE if not self._table_schemas else MODEL_TABLE - self._catalog = MetadataCatalog() - self._cache_metadata() + @classmethod + def from_cached_catalog( + cls, + file_path: str, + catalog: MetadataCatalog, + show_progress: bool = False, + ) -> "TsFileSeriesReader": + """Build a reader whose catalog comes from an on-disk index cache. + + Opens the file and re-probes the model kind exactly like ``__init__`` + (both cheap), but SKIPS the expensive ``_cache_metadata`` walk, using + the supplied catalog instead. The caller is trusted to pass a catalog + matching the file (source files are not validated, per design). + """ + obj = cls.__new__(cls) + obj.show_progress = show_progress + obj._open_and_probe(file_path) + obj._catalog = catalog + return obj def __del__(self): self.close()