Skip to content

perf: phased codecpipeline#3885

Open
d-v-b wants to merge 6 commits intozarr-developers:mainfrom
d-v-b:perf/prepared-write-v2
Open

perf: phased codecpipeline#3885
d-v-b wants to merge 6 commits intozarr-developers:mainfrom
d-v-b:perf/prepared-write-v2

Conversation

@d-v-b
Copy link
Copy Markdown
Contributor

@d-v-b d-v-b commented Apr 8, 2026

This PR defines a new codec pipeline class called PhasedCodecPipeline that enables much higher performance for chunk encoding and decoding than the current BatchedCodecPipeline.

The approach here is to completely ignore how the v3 spec defines array -> bytes codecs 😆. Instead of treating codecs as functions that mix IO and compute, we treat codec encoding and decoding as a sequence:

  1. preparatory IO, async
    fetch exactly what we need to fetch from storage, given the codecs we have. So if there's a sharding codec in the first array->bytes position, the codec pipeline knows it must fetch the shard index, then fetch the involved subchunks, before passing them to compute.
  2. pure compute. sync. Apply filters and compressors. safe to parallelize over chunks.
  3. (if writing): final IO, async. reconcile the in-memory compressed chunks against our model of the stored chunk. Write out bytes.

Basically, we use the first array -> bytes codec to figure out what kind of preparatory IO and final IO we need to perform, and the rest of the codecs to figure out what kind of chunk encoding we need to do. Separating IO from compute in different phases makes things simpler and faster.

Happy to chat more about this direction. IMO the spec should be re-written with this framing, because it makes much more sense than trying to shoe-horn sharding in as a codec.

I don't want to make our benchmarking suite any bigger but on my laptop this codec pipeline is 2-5x faster than the batchedcodec pipeline for a lot of workloads. I can include some of those benchmarks later.

This was mostly written by claude, based on previous work in #3719. All these changes should be non-breaking, so I think this is in principle safe for us to play around with in a patch or minor release.

Edit: this PR depends on changes submitted in #3907 and #3908

@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Apr 8, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 8, 2026

Codecov Report

❌ Patch coverage is 84.68013% with 91 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.73%. Comparing base (dd5a321) to head (1be5563).

Files with missing lines Patch % Lines
src/zarr/core/codec_pipeline.py 76.40% 63 Missing ⚠️
src/zarr/codecs/sharding.py 90.83% 22 Missing ⚠️
src/zarr/codecs/numcodecs/_codecs.py 83.33% 4 Missing ⚠️
src/zarr/storage/_local.py 93.75% 1 Missing ⚠️
src/zarr/storage/_memory.py 94.73% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3885      +/-   ##
==========================================
- Coverage   93.10%   92.73%   -0.37%     
==========================================
  Files          85       85              
  Lines       11193    11731     +538     
==========================================
+ Hits        10421    10879     +458     
- Misses        772      852      +80     
Files with missing lines Coverage Δ
src/zarr/abc/store.py 96.34% <100.00%> (+0.04%) ⬆️
src/zarr/codecs/_v2.py 94.11% <100.00%> (+0.50%) ⬆️
src/zarr/core/array.py 97.74% <100.00%> (+0.02%) ⬆️
src/zarr/core/config.py 100.00% <ø> (ø)
src/zarr/storage/_local.py 95.27% <93.75%> (-0.14%) ⬇️
src/zarr/storage/_memory.py 94.44% <94.73%> (-0.04%) ⬇️
src/zarr/codecs/numcodecs/_codecs.py 95.45% <83.33%> (-0.94%) ⬇️
src/zarr/codecs/sharding.py 90.17% <90.83%> (+0.77%) ⬆️
src/zarr/core/codec_pipeline.py 84.76% <76.40%> (-9.42%) ⬇️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@d-v-b
Copy link
Copy Markdown
Contributor Author

d-v-b commented Apr 9, 2026

@TomAugspurger how would this design work with CUDA codecs?

@d-v-b d-v-b force-pushed the perf/prepared-write-v2 branch from 5d3064e to b67a5a0 Compare April 15, 2026 09:51
@github-actions github-actions bot removed the needs release notes Automatically applied to PRs which haven't added release notes label Apr 15, 2026
@d-v-b d-v-b force-pushed the perf/prepared-write-v2 branch 2 times, most recently from a84a15a to 68a7cdc Compare April 17, 2026 10:41
Comment thread src/zarr/core/codec_pipeline.py Outdated
Comment on lines +943 to +962
# Phase 1: fetch all chunks (IO, sequential)
raw_buffers: list[Buffer | None] = [
bg.get_sync(prototype=cs.prototype) # type: ignore[attr-defined]
for bg, cs, *_ in batch
]

# Phase 2: decode (compute, optionally threaded)
def _decode_one(raw: Buffer | None, chunk_spec: ArraySpec) -> NDBuffer | None:
if raw is None:
return None
return transform.decode_chunk(raw, chunk_spec)

specs = [cs for _, cs, *_ in batch]
if n_workers > 0 and len(batch) > 1:
with ThreadPoolExecutor(max_workers=n_workers) as pool:
decoded_list = list(pool.map(_decode_one, raw_buffers, specs))
else:
decoded_list = [
_decode_one(raw, spec) for raw, spec in zip(raw_buffers, specs, strict=True)
]
Copy link
Copy Markdown
Contributor

@ilan-gold ilan-gold Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this all multi-threaded i.e., the I/O as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should benchmark this, but my expectation was that IO against memory storage and local storage is not compute-limited, and so threads wouldn't remove a real bottleneck. for memory storage i'm sure this is true, not sure about local storage though

d-v-b and others added 6 commits April 17, 2026 22:51
Adds a SupportsSetRange protocol to zarr.abc.store for stores that
allow overwriting a byte range within an existing value. Implementations
are added for LocalStore (using file-handle seek+write) and MemoryStore
(in-memory bytearray slice assignment).

This is the prerequisite for the partial-shard write fast path in
ShardingCodec, which can patch individual inner-chunk slots without
rewriting the entire shard blob when the inner codec chain is fixed-size.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
V2Codec, BytesCodec, BloscCodec, etc. previously only implemented the
async _decode_single / _encode_single methods. Add their sync
counterparts (_decode_sync / _encode_sync) so that the upcoming
SyncCodecPipeline can dispatch through them without spinning up an
event loop.

For codecs that wrap external compressors (numcodecs.Zstd, numcodecs.Blosc,
the V2 fallback chain), the sync versions just call the underlying
compressor's blocking API directly instead of routing through
asyncio.to_thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arallelism

Adds SyncCodecPipeline alongside BatchedCodecPipeline. The new pipeline
runs codecs through their sync entry points (_decode_sync / _encode_sync)
and dispatches per-chunk work to a module-level thread pool sized by
the codec_pipeline.max_workers config (default = os.cpu_count()).

Each chunk's full lifecycle (fetch + decode + scatter for reads;
get-existing + merge + encode + set/delete for writes) runs as one
pool task — overlapping IO of one chunk with compute of another.
Scatter into the shared output buffer is thread-safe because chunks
have non-overlapping output selections.

The async wrappers (read/write) detect SupportsGetSync/SupportsSetSync
stores and dispatch to the sync fast path, passing the configured
max_workers. Other stores fall through to the async path, which still
uses asyncio.concurrent_map at async.concurrency.

Notes on perf:
- Default (None → cpu_count) is tuned for chunks ≥ ~512 KB.
- Small chunks (≤ 64 KB) regress 1.5-3x because pool dispatch overhead
  (~30-50 µs/task) dominates per-chunk work. Workaround:
  zarr.config.set({"codec_pipeline.max_workers": 1}).
- For large chunks on local/memory stores, IO+compute parallelism
  yields 1.7-2.5x over BatchedCodecPipeline on direct-API reads and
  ~2.5x on roundtrip.

ChunkTransform encapsulates the sync codec chain. It caches resolved
ArraySpecs across calls with the same chunk_spec — combined with the
constant-ArraySpec optimization in indexing, hot-path overhead is
minimized.

Includes test scaffolding for the new pipeline (test_sync_codec_pipeline)
and config plumbing for the max_workers key.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _encode_partial_sync and _decode_partial_sync to ShardingCodec.
For fixed-size inner codec chains and stores that implement
SupportsSetRange, partial writes patch individual inner-chunk slots
in-place instead of rewriting the whole shard:

  - Reads existing shard index (one byte-range get).
  - For each affected inner chunk: decodes the slot, merges the new
    region, re-encodes.
  - Writes each modified slot at its deterministic byte offset, then
    rewrites just the index.

For variable-size inner codecs (e.g. with compression) or stores that
don't support byte-range writes, falls through to a full-shard rewrite
matching BatchedCodecPipeline semantics.

The partial-decode path computes a ReadPlan from the shard index and
issues one byte-range get per overlapping chunk, decoding only what
the read selection touches.

Both paths are dispatched from SyncCodecPipeline via the existing
supports_partial_decode / supports_partial_encode protocol checks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new test files:

  test_codec_invariants — asserts contract-level properties that every
  codec / shard / buffer combination must satisfy: round-trip exactness,
  prototype propagation, fill-value handling, all-empty shard handling.

  test_pipeline_parity — exhaustive matrix asserting that
  SyncCodecPipeline and BatchedCodecPipeline produce semantically
  identical results across codec configs, layouts (including
  nested sharding), write sequences, and write_empty_chunks settings.
  Three checks per cell:
    1. Same array contents on read.
    2. Same set of store keys after writes.
    3. Each pipeline reads the other's output identically (catches
       layout-divergence bugs).

These tests pinned the design throughout the SyncCodecPipeline +
partial-shard development.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds .gitignore entries for .claude/, CLAUDE.md, and docs/superpowers/
so local IDE/agent planning artifacts don't get committed by accident.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@d-v-b d-v-b force-pushed the perf/prepared-write-v2 branch from aa111a2 to 1be5563 Compare April 17, 2026 21:04
selected = decoded[chunk_selection]
if drop_axes:
selected = selected.squeeze(axis=drop_axes)
out[out_selection] = selected
Copy link
Copy Markdown
Contributor

@ilan-gold ilan-gold Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth experimenting with moving this setting operation out[out_selection] = selected outside the threadpool execution since, IIRC, it holds the GIL and is probably non-trivial time-wise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory usage will probably go up a bit though....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants