From 05169639bd277740196241e97a0819bb5094890b Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Sun, 26 Apr 2026 23:33:13 +0000 Subject: [PATCH] gh-47798: Refactor the POSIX subprocess.Popen._communicate selector loop into helpers No public API change. Lift the per-iteration select/read/write loop out of Popen._communicate (POSIX) into a module-level _communicate_io_posix(), with small _flush_stdin / _make_input_view / _translate_newlines helpers alongside it. Popen._communicate calls the helper and persists the returned input offset for resume-after-timeout. Retire the private Popen._remaining_time method in favor of module-level _deadline_remaining; all call sites (POSIX and Windows) updated. Defensive behavioural deltas: the stdin and stdout/stderr .close() calls in the I/O loop now swallow BrokenPipeError / OSError, matching __exit__ and the no-input path; previously these were bare. Adds test_communicate_timeout_resume_partial_write to cover _input_offset bookkeeping across TimeoutExpired/resume. --- Lib/subprocess.py | 217 +++++++++++++++++++++++++----------- Lib/test/test_subprocess.py | 33 ++++++ 2 files changed, 184 insertions(+), 66 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 7ac2289f535b6d..38b655f2f7b9d2 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -250,6 +250,82 @@ def __repr__(self): else: _PopenSelector = selectors.SelectSelector + def _communicate_io_posix(selector, stdin, input_view, input_offset, + output_buffers, endtime, *, close_on_eof=False): + """ + Low-level POSIX I/O multiplexing loop used by Popen._communicate. + + Handles the select loop for reading/writing but does not manage + stream lifecycle or raise timeout exceptions. + + Args: + selector: A _PopenSelector with streams already registered + stdin: Writable file object for input, or None + input_view: memoryview of input bytes, or None + input_offset: Starting offset into input_view (for resume support) + output_buffers: Dict {file_object: list} to append read chunks to + endtime: Deadline timestamp, or None for no timeout + close_on_eof: If True, close output streams immediately when they + EOF rather than leaving them open for the caller to close. + Used by Popen._communicate() to match its historical behavior + of releasing fds as soon as the child closes the corresponding + pipe. + + Returns: + (new_input_offset, completed) + - new_input_offset: How many bytes of input were written + - completed: True if all I/O finished, False if timed out + + Note: + - Closes output streams on EOF only if close_on_eof=True + - Does NOT raise TimeoutExpired (caller handles) + - Appends to output_buffers lists in place + """ + stdin_fd = stdin.fileno() if stdin else None + + while selector.get_map(): + remaining = _deadline_remaining(endtime) + if remaining is not None and remaining <= 0: + return (input_offset, False) # Timed out + + ready = selector.select(remaining) + + # Check timeout after select (may have woken spuriously) + if endtime is not None and _time() > endtime: + return (input_offset, False) # Timed out + + for key, events in ready: + if key.fd == stdin_fd: + chunk = input_view[input_offset:input_offset + _PIPE_BUF] + try: + input_offset += os.write(key.fd, chunk) + except BrokenPipeError: + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + else: + if input_offset >= len(input_view): + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + elif key.fileobj in output_buffers: + data = os.read(key.fd, 32768) + if not data: + selector.unregister(key.fileobj) + if close_on_eof: + try: + key.fileobj.close() + except OSError: + pass + else: + output_buffers[key.fileobj].append(data) + + return (input_offset, True) # Completed + if _mswindows: # On Windows we just need to close `Popen._handle` when we no longer need @@ -289,6 +365,45 @@ def _cleanup(): DEVNULL = -3 +def _deadline_remaining(endtime): + """Calculate remaining time until deadline.""" + if endtime is None: + return None + return endtime - _time() + + +def _flush_stdin(stdin): + """Flush stdin, ignoring BrokenPipeError and closed file ValueError.""" + try: + stdin.flush() + except BrokenPipeError: + pass # communicate() must ignore BrokenPipeError. + except ValueError: + # Ignore ValueError: I/O operation on closed file. + if not stdin.closed: + raise + + +def _make_input_view(input_data): + """Convert input data to a byte memoryview for writing. + + Handles the case where input_data is already a memoryview with + non-byte elements (e.g., int32 array) by casting to a byte view. + This ensures len(view) returns the byte count, not element count. + """ + if not input_data: + return None + if isinstance(input_data, memoryview): + return input_data.cast("b") # ensure byte view for correct len() + return memoryview(input_data) + + +def _translate_newlines(data, encoding, errors): + """Decode bytes to str and translate newlines to \n.""" + data = data.decode(encoding, errors) + return data.replace("\r\n", "\n").replace("\r", "\n") + + # XXX This function is only used by multiprocessing and the test suite, # but it's here so that it can be imported when Python is compiled without # threads. @@ -1149,8 +1264,8 @@ def universal_newlines(self, universal_newlines): self.text_mode = bool(universal_newlines) def _translate_newlines(self, data, encoding, errors): - data = data.decode(encoding, errors) - return data.replace("\r\n", "\n").replace("\r", "\n") + # Subclass-overridable hook; defers to the module-level helper. + return _translate_newlines(data, encoding, errors) def __enter__(self): return self @@ -1277,7 +1392,7 @@ def communicate(self, input=None, timeout=None): # See the detailed comment in .wait(). if timeout is not None: sigint_timeout = min(self._sigint_wait_secs, - self._remaining_time(endtime)) + _deadline_remaining(endtime)) else: sigint_timeout = self._sigint_wait_secs self._sigint_wait_secs = 0 # nothing else should wait. @@ -1290,7 +1405,7 @@ def communicate(self, input=None, timeout=None): finally: self._communication_started = True try: - self.wait(timeout=self._remaining_time(endtime)) + self.wait(timeout=_deadline_remaining(endtime)) except TimeoutExpired as exc: exc.timeout = timeout raise @@ -1304,14 +1419,6 @@ def poll(self): return self._internal_poll() - def _remaining_time(self, endtime): - """Convenience for _communicate when computing timeouts.""" - if endtime is None: - return None - else: - return endtime - _time() - - def _check_timeout(self, endtime, orig_timeout, stdout_seq, stderr_seq, skip_check_and_raise=False): """Convenience for checking if a timeout has expired.""" @@ -1337,7 +1444,7 @@ def wait(self, timeout=None): # generated SIGINT and will exit rapidly. if timeout is not None: sigint_timeout = min(self._sigint_wait_secs, - self._remaining_time(endtime)) + _deadline_remaining(endtime)) else: sigint_timeout = self._sigint_wait_secs self._sigint_wait_secs = 0 # nothing else should wait. @@ -1704,7 +1811,7 @@ def _communicate(self, input, endtime, orig_timeout): # thread remains writing and the fd left open in case the user # calls communicate again. if hasattr(self, "_stdin_thread"): - self._stdin_thread.join(self._remaining_time(endtime)) + self._stdin_thread.join(_deadline_remaining(endtime)) if self._stdin_thread.is_alive(): raise TimeoutExpired(self.args, orig_timeout) @@ -1712,11 +1819,11 @@ def _communicate(self, input, endtime, orig_timeout): # threads remain reading and the fds left open in case the user # calls communicate again. if self.stdout is not None: - self.stdout_thread.join(self._remaining_time(endtime)) + self.stdout_thread.join(_deadline_remaining(endtime)) if self.stdout_thread.is_alive(): raise TimeoutExpired(self.args, orig_timeout) if self.stderr is not None: - self.stderr_thread.join(self._remaining_time(endtime)) + self.stderr_thread.join(_deadline_remaining(endtime)) if self.stderr_thread.is_alive(): raise TimeoutExpired(self.args, orig_timeout) @@ -2210,7 +2317,7 @@ def _wait(self, timeout): break finally: self._waitpid_lock.release() - remaining = self._remaining_time(endtime) + remaining = _deadline_remaining(endtime) if remaining <= 0: raise TimeoutExpired(self.args, timeout) delay = min(delay * 2, remaining, .05) @@ -2234,14 +2341,7 @@ def _communicate(self, input, endtime, orig_timeout): if self.stdin and not self._communication_started: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. - try: - self.stdin.flush() - except BrokenPipeError: - pass # communicate() must ignore BrokenPipeError. - except ValueError: - # ignore ValueError: I/O operation on closed file. - if not self.stdin.closed: - raise + _flush_stdin(self.stdin) if not input: try: self.stdin.close() @@ -2266,11 +2366,8 @@ def _communicate(self, input, endtime, orig_timeout): self._save_input(input) - if self._input: - if not isinstance(self._input, memoryview): - input_view = memoryview(self._input) - else: - input_view = self._input.cast("b") # byte input required + input_view = _make_input_view(self._input) + input_offset = self._input_offset if self._input else 0 with _PopenSelector() as selector: if self.stdin and not self.stdin.closed and self._input: @@ -2280,43 +2377,31 @@ def _communicate(self, input, endtime, orig_timeout): if self.stderr and not self.stderr.closed: selector.register(self.stderr, selectors.EVENT_READ) - while selector.get_map(): - timeout = self._remaining_time(endtime) - if timeout is not None and timeout <= 0: - self._check_timeout(endtime, orig_timeout, - stdout, stderr, - skip_check_and_raise=True) - raise RuntimeError( # Impossible :) - '_check_timeout(..., skip_check_and_raise=True) ' - 'failed to raise TimeoutExpired.') - - ready = selector.select(timeout) - self._check_timeout(endtime, orig_timeout, stdout, stderr) - - # XXX Rewrite these to use non-blocking I/O on the file - # objects; they are no longer using C stdio! - - for key, events in ready: - if key.fileobj is self.stdin: - chunk = input_view[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - self._input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fileobj) - key.fileobj.close() - else: - if self._input_offset >= len(input_view): - selector.unregister(key.fileobj) - key.fileobj.close() - elif key.fileobj in (self.stdout, self.stderr): - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fileobj) - key.fileobj.close() - self._fileobj2output[key.fileobj].append(data) + stdin_to_write = (self.stdin if self.stdin and self._input + and not self.stdin.closed else None) + # Persist the returned offset on self so a subsequent + # communicate() after a TimeoutExpired resumes mid-input + # rather than re-sending bytes the child already consumed. + new_offset, completed = _communicate_io_posix( + selector, + stdin_to_write, + input_view, + input_offset, + self._fileobj2output, + endtime, + close_on_eof=True) + if self._input: + self._input_offset = new_offset + + if not completed: + self._check_timeout(endtime, orig_timeout, stdout, stderr, + skip_check_and_raise=True) + raise RuntimeError( # Impossible :) + '_check_timeout(..., skip_check_and_raise=True) ' + 'failed to raise TimeoutExpired.') + try: - self.wait(timeout=self._remaining_time(endtime)) + self.wait(timeout=_deadline_remaining(endtime)) except TimeoutExpired as exc: exc.timeout = orig_timeout raise diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 3237a9cb49876d..1a3db527d3d5b8 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1130,6 +1130,39 @@ def test_communicate_timeout_large_input(self): p.kill() p.wait() + def test_communicate_timeout_resume_partial_write(self): + """Resume writing input after a partial-write TimeoutExpired. + + Exercises the _input_offset bookkeeping across the + _communicate_io_posix factoring: a first communicate() must time out + mid-write, and a subsequent communicate() must finish delivering the + remaining bytes so the child receives the full input intact. + """ + # 1 MiB easily exceeds typical pipe buffers (~64 KiB) so writing + # blocks once the buffer fills before the child starts reading. + input_data = bytes(range(256)) * 4096 # 1 MiB, distinctive pattern + self.assertEqual(len(input_data), 1024 * 1024) + + p = subprocess.Popen( + [sys.executable, "-c", + "import sys, time; " + "time.sleep(0.5); " + "sys.stdout.buffer.write(sys.stdin.buffer.read())"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + try: + with self.assertRaises(subprocess.TimeoutExpired): + p.communicate(input_data, timeout=0.05) + + # Resume: no new input, generous timeout to avoid CI flakes. + stdout, stderr = p.communicate(timeout=support.LONG_TIMEOUT) + self.assertEqual(len(stdout), len(input_data)) + self.assertEqual(stdout, input_data) + finally: + p.kill() + p.wait() + # Test for the fd leak reported in http://bugs.python.org/issue2791. def test_communicate_pipe_fd_leak(self): for stdin_pipe in (False, True):