Skip to content

Overhaul publish_dataset extension#9217

Merged
crusaderky merged 10 commits intodask:mainfrom
crusaderky:publish_dataset
Apr 29, 2026
Merged

Overhaul publish_dataset extension#9217
crusaderky merged 10 commits intodask:mainfrom
crusaderky:publish_dataset

Conversation

@crusaderky
Copy link
Copy Markdown
Collaborator

@crusaderky crusaderky commented Apr 1, 2026

This PR thoroughly reviews the publish_dataset API:

Bugfixes

  • Fixed bug where publish_dataset(..., override=True) would cause any keys from the original dataset to become immortal unless they were also present in the new dataset
  • Fixed race condition where the user calls get_dataset() and immediately afterwards unpublish_dataset(), thinking that they are holding a reference to the futures locally, but the scheduler hasn't noted it by the time unpublish_dataset lands on the scheduler. This caused the client to end up holding a reference to forgotten keys. This is symmetrical to what Fix race condition for published futures with annotations #8577 fixed for publish_dataset.
  • Fixed memory leak, introduced by Fix race condition for published futures with annotations #8577, where each call to publish_dataset would create an immortal asyncio.Event instance on the scheduler.

New features

  • Added syntax publish_dataset({name: value, ...}). Note that this is the only way one can publish multiple datasets with non-string names at once.
  • get_dataset and unpublish_dataset can now get/delete multiple datasets at once.

Performance improvements

  • publish_dataset was previously performing 2 RPC calls per dataset, had a latency of 2x RTT, and was typically opening additional TCP/IP connections equal to the number of datasets beyond the first. Changed to a single RPC call and 1x RTT regardless of the number of datasets being published.

This, together with the new features, should offer a significant speedup for users that were previously publishing/getting/retrieving many datasets at once (and, for all cases other than publishing string-named datasets, were forced to use a tight for loop of RPC calls). This is particularly significant when the client-scheduler comms suffers from high latency, e.g. in Coiled.

Minor tweaks

  • The override flag was undocumented. Added documentation.
  • Support for names other than strings was undocumented. Added documentation.
  • Clarified documentation of publish_dataset in regards to keys that have not been persisted. Added test coverage for the use case.
  • Syntax for publishing a tuple of collections as a single name publish_dataset(x, y, name="foo") was untested. Added test coverage.
  • Cleaned up test suite
  • Achieved 100% test coverage
  • Added type annotations

@crusaderky crusaderky requested a review from fjetter as a code owner April 1, 2026 11:44
Comment thread distributed/publish.py
Comment on lines -48 to -51
if not override and name in self.datasets:
raise KeyError("Dataset %s already exists" % name)
self.scheduler.client_desires_keys(keys, f"published-{stringify(name)}")
self.datasets[name] = {"data": data, "keys": keys}
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed bug where publish_dataset(..., override=True) would cause any keys from the original dataset to become immortal unless they were also present in the new dataset

Comment thread distributed/publish.py Outdated
out = self.datasets.pop(name, {"keys": []})
self.scheduler.client_releases_keys(out["keys"], f"published-{stringify(name)}")
async def delete(self, names: tuple[Key, ...], uid: bytes) -> None:
await self._sync_batched_send(uid)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed race condition where the user calls get_dataset() and immediately afterwards unpublish_dataset(), thinking that they are holding a reference to the futures locally, but the scheduler hasn't noted it by the time unpublish_dataset lands on the scheduler. This caused the client holding a reference to forgotten keys. This is symmetrical to what #8577 fixed for publish_dataset.

Comment thread distributed/publish.py
try:
await self._flush_received[uid].wait()
finally:
del self._flush_received[uid]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed memory leak where each call to publish_dataset would create an immortal asyncio.Event instance on the scheduler.

Note that there is still a potential memory leak left here, where the client flushes the batched comms, but then disconnects before the RPC call can be executed.
I tried fixing this use case but gave up, as I ended up with code that was both severely over-engineered and fragile to race conditions. Namely, one must be thoughtful when testing scheduler.clients, because the register-client endpoint is neither an async RPC nor a batched comm, and I found myself in cases where the batched comms had arrived but the client hadn't been registered yet.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One Event object leaked per very rare situation sounds OK to me.

Comment thread distributed/scheduler.py
Comment on lines +5663 to +5665
warnings.warn(
f"Client {client!r} desires key {k!r} but key is unknown."
)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up many times during this exercise.

Comment thread distributed/client.py

def publish_dataset(
self, *args: Any, name: Key | None = None, override: bool = False, **kwargs
):
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: offering four different syntaxes to achieve the same thing is in clear violation of the Zen of Python:

There should be one-- and preferably only one --obvious way to do it.

Removing some of these syntaxes however would be a breaking change and is beyond the scope of this PR.

Comment thread distributed/client.py
def publish_dataset(self, *args, **kwargs):
@overload
def publish_dataset(
self, *args: Mapping[Key, Any], override: bool = False, **kwargs
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frustratingly, mypy complains if you write

    @overload
    def publish_dataset(
        self, arg: Mapping[Key, Any], /, *, override: bool = False, **kwargs
    ): ...

which would be more correct.

Comment thread distributed/client.py
Comment on lines -3011 to -3012
kwargs : dict
additional keyword arguments to _get_dataset
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kwargs actually contain the situationally useful asynchronous=False to be passed to a synchronous client.
However their lack of documentation is endemic and is out of scope for this PR.
This change simply aligns the documentation of this method to all other client methods.

Comment on lines -119 to -122
x = delayed(inc)(1)
y = delayed(inc)(2)
await c.publish_dataset(x=1, y=2)
datasets = await c.scheduler.publish_list()
assert datasets == ("x", "y")

await c.publish_dataset(x=x, y=y)
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was misleading: these datasets are not persisted so they are no different than just plain python objects.



@gen_cluster(client=True, worker_kwargs={"resources": {"A": 1}})
async def test_publish_submit_ordering(c, s, a, b):
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test from #8577. Now folded into test_publish_unpublish_wait_for_batched_comms.
I found this test to be quite confusing, as it mentions annotations and resources, but the race condition has nothing to do with them.

async def test_publish_submit_ordering(c, s, a, b):
RESOURCES = {"A": 1}
@gen_cluster(client=True)
async def test_publish_unpublish_wait_for_batched_comms(c, s, a, b):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting out

await self._sync_batched_send(uid)

either in PublishExtension.put or in PublishExtension.delete consistently trips this test.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 1, 2026

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    31 files  ±  0      31 suites  ±0   11h 12m 11s ⏱️ - 1m 40s
 4 123 tests +  9   4 015 ✅ + 10    104 💤 ±0  4 ❌  - 1 
59 783 runs  +135  57 305 ✅ +137  2 474 💤 ±0  4 ❌  - 2 

For more details on these failures, see this check.

Results for commit 98628e7. ± Comparison against base commit 8e28688.

This pull request removes 3 and adds 12 tests. Note that renamed tests count towards both.
distributed.tests.test_publish ‑ test_datasets_republish
distributed.tests.test_publish ‑ test_publish_non_string_key
distributed.tests.test_publish ‑ test_publish_submit_ordering
distributed.tests.test_publish ‑ test_publish_bad_syntax
distributed.tests.test_publish ‑ test_publish_dataset_override
distributed.tests.test_publish ‑ test_publish_dataset_override_partial
distributed.tests.test_publish ‑ test_publish_dataset_override_releases_old_keys
distributed.tests.test_publish ‑ test_publish_multiple_collections_one_name
distributed.tests.test_publish ‑ test_publish_multiple_names
distributed.tests.test_publish ‑ test_publish_non_string_names[8]
distributed.tests.test_publish ‑ test_publish_non_string_names[9.0]
distributed.tests.test_publish ‑ test_publish_non_string_names[name0]
distributed.tests.test_publish ‑ test_publish_unpersisted
…

♻️ This comment has been updated with latest results.

@crusaderky
Copy link
Copy Markdown
Collaborator Author

All test failures are unrelated.

@crusaderky
Copy link
Copy Markdown
Collaborator Author

@dask/maintenance May I have a review please?

@martindurant
Copy link
Copy Markdown
Member

I love that you're giving this code some love. I'll try to set aside some time to look through.

Copy link
Copy Markdown
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like good changes to me

Comment thread distributed/publish.py
try:
await self._flush_received[uid].wait()
finally:
del self._flush_received[uid]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One Event object leaked per very rare situation sounds OK to me.

Comment thread distributed/publish.py Outdated
calls. Return True if the client is still connected; False otherwise.
"""
try:
await self._flush_received[uid].wait()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any exception possible beside KeyError? In that case, the finally will raise it too (maybe should be pop).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you can't use pop.
This is leveraging defaultdict so either:

  1. flush_batched_send runs first. The event is created and immediately set; wait here does not block
  2. OR _sync_batched_send runs first. The event is created unset, which causes wait to block. Later on, flush_batched_send retrieves the event and unsets.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment explaining this.

Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any exception possible beside KeyError?

wait can raise CancelledError, which should only happen when the event loop is shut down.
I'm trying to figure out if it happens if the TCP/IP channel collapses, but I'm not finding the code for it.

Comment thread distributed/publish.py Outdated
override: bool,
uid: bytes,
) -> None:
await self._sync_batched_send(uid)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might wait forever, leaving the data in limbo?
I think your argument is, that the Event should be set first, right?

Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might wait forever, leaving the data in limbo?

put is an asyncio task invoked by the RPC channel. I'm unsure if it gets cancelled if the channel.
I'm adding a system that periodically retests if the client is still there every 30s.

the Event should be set first, right?

The opposite - most frequently, _sync_batched_send will run first and block.

@crusaderky
Copy link
Copy Markdown
Collaborator Author

@martindurant all comments addressed

@martindurant
Copy link
Copy Markdown
Member

Does the test coverage report make sense?

@crusaderky
Copy link
Copy Markdown
Collaborator Author

crusaderky commented Apr 29, 2026

Does the test coverage report make sense?

coverage is broken #9216

@martindurant
Copy link
Copy Markdown
Member

OK, +1 from me then

@crusaderky
Copy link
Copy Markdown
Collaborator Author

Thank you!

@crusaderky crusaderky merged commit f431280 into dask:main Apr 29, 2026
30 of 37 checks passed
@crusaderky crusaderky deleted the publish_dataset branch April 29, 2026 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants