-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
gh-124397: Add free-threading support for iterators. #148894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
rhettinger
wants to merge
8
commits into
python:main
Choose a base branch
from
rhettinger:iterator_synchronization
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
19ddd2e
Issue-124397: Add free-threading support for iterators.
rhettinger 4aa242d
Add blurb
rhettinger e0c44be
More wordsmithing
rhettinger adcb718
Clarify use of the lock. Add message to the ValueError.
rhettinger 4c2bad0
Include "threading" in the reference
rhettinger d9dde84
Support send(), throw(), and close() for generators.
rhettinger fcb9ee8
Tweak wording. Add doctest.
rhettinger 314ec67
Merge branch 'main' into iterator_synchronization
rhettinger File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1436,3 +1436,146 @@ is equivalent to:: | |
| Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`, | ||
| :class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as | ||
| :keyword:`with` statement context managers. | ||
|
|
||
|
|
||
| Iterator synchronization | ||
| ------------------------ | ||
|
|
||
| By default, Python iterators do not support concurrent access. Most iterators make | ||
| no guarantees when accessed simultaneously from multiple threads. Generator | ||
| iterators, for example, raise :exc:`ValueError` if one of their iterator methods | ||
| is called while the generator is already executing. The tools in this section | ||
| allow reliable concurrency support to be added to ordinary iterators and | ||
| iterator-producing callables. | ||
|
|
||
| The :class:`serialize` wrapper lets multiple threads share a single iterator and | ||
| take turns consuming from it. While one thread is running ``__next__()``, the | ||
| others block until the iterator becomes available. Each value produced by the | ||
| underlying iterator is delivered to exactly one caller. | ||
|
|
||
| The :func:`concurrent_tee` function lets multiple threads each receive the full | ||
| stream of values from one underlying iterator. It creates independent iterators | ||
| that all draw from the same source. Values are buffered until consumed by all | ||
| of the derived iterators. | ||
|
|
||
| .. class:: serialize(iterable) | ||
|
|
||
| Return an iterator wrapper that serializes concurrent calls to | ||
| :meth:`~iterator.__next__` using a lock. | ||
|
|
||
| If the wrapped iterator also defines :meth:`~generator.send`, | ||
| :meth:`~generator.throw`, or :meth:`~generator.close`, those calls | ||
| are serialized as well. | ||
|
|
||
| This makes it possible to share a single iterator, including a generator | ||
| iterator, between multiple threads. A lock assures that calls are handled | ||
| one at a time. No values are duplicated or skipped by the wrapper itself. | ||
| Each item from the underlying iterator is given to exactly one caller. | ||
|
|
||
| This wrapper does not copy or buffer values. Threads that call | ||
| :func:`next` while another thread is already advancing the iterator will | ||
| block until the active call completes. | ||
|
|
||
| Example: | ||
|
|
||
| .. doctest:: | ||
|
|
||
| import threading | ||
|
|
||
| def count(): | ||
| for i in range(5): | ||
| yield i | ||
|
|
||
| it = threading.serialize(count()) | ||
|
|
||
| def worker(): | ||
| for item in it: | ||
| print(threading.current_thread().name, item) | ||
|
|
||
| threads = [threading.Thread(target=worker) for _ in range(2)] | ||
| for thread in threads: | ||
| thread.start() | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| In this example, each number is printed exactly once, but the work is shared | ||
| between the two threads. | ||
|
|
||
| .. function:: synchronized(func) | ||
|
|
||
| Wrap an iterator-producing callable so that each iterator it returns is | ||
| automatically passed through :class:`serialize`. | ||
|
|
||
| This is especially useful as a :term:`decorator` for generator functions, | ||
| allowing their generator-iterators to be consumed from multiple threads. | ||
|
|
||
| Example: | ||
|
|
||
| .. doctest:: | ||
|
|
||
| import threading | ||
|
|
||
| @threading.synchronized | ||
| def counter(): | ||
| i = 0 | ||
| while True: | ||
| yield i | ||
| i += 1 | ||
|
|
||
| it = counter() | ||
|
|
||
| def worker(): | ||
| for _ in range(5): | ||
| print(next(it)) | ||
|
|
||
| threads = [threading.Thread(target=worker) for _ in range(2)] | ||
| for thread in threads: | ||
| thread.start() | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| The returned wrapper preserves the metadata of *func*, such as its name and | ||
| wrapped function reference. | ||
|
|
||
| .. function:: concurrent_tee(iterable, n=2) | ||
|
|
||
| Return *n* independent iterators from a single input *iterable*, with | ||
| guaranteed behavior when the derived iterators are consumed concurrently. | ||
|
|
||
| This function is similar to :func:`itertools.tee`, but is intended for cases | ||
| where the source iterator may feed consumers running in different threads. | ||
| Each returned iterator yields every value from the underlying iterable, in | ||
| the same order. | ||
|
|
||
| Internally, values are buffered until every derived iterator has consumed | ||
| them. | ||
|
|
||
| The returned iterators share the same underlying synchronization lock. Each | ||
| individual derived iterator is intended to be consumed by one thread at a | ||
| time. If a single derived iterator must itself be shared by multiple | ||
| threads, wrap it with :class:`serialize`. | ||
|
|
||
| If *n* is ``0``, return an empty tuple. If *n* is negative, raise | ||
| :exc:`ValueError`. | ||
|
|
||
| Example: | ||
|
|
||
| .. doctest:: | ||
|
|
||
| import threading | ||
|
|
||
| source = range(5) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't seem like a very compelling example because Maybe something like a simple generator would be more useful as an example? |
||
| left, right = threading.concurrent_tee(source) | ||
|
|
||
| def consume(name, iterable): | ||
| for item in iterable: | ||
| print(name, item) | ||
|
|
||
| t1 = threading.Thread(target=consume, args=("left", left)) | ||
| t2 = threading.Thread(target=consume, args=("right", right)) | ||
| t1.start() | ||
| t2.start() | ||
| t1.join() | ||
| t2.join() | ||
|
|
||
| In this example, both consumer threads see the full sequence ``0`` through ``4``. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs >>> for doctest to run. Alternatively, use
.. code-block:: python