Skip to content

Commit bd054c2

Browse files
vstinnersorcio
authored andcommitted
pythongh-108973: Fix asyncio test_subprocess_consistent_callbacks() (python#109431)
SubprocessProtocol process_exited() method can be called before pipe_data_received() and pipe_connection_lost() methods. Document it and adapt the test for that. Revert commit 282edd7. _child_watcher_callback() calls immediately _process_exited(): don't add an additional delay with call_soon(). The reverted change didn't make _process_exited() more determistic: it can still be called before pipe_connection_lost() for example. Co-authored-by: Davide Rizzo <[email protected]>
1 parent d28c7be commit bd054c2

File tree

4 files changed

+67
-19
lines changed

4 files changed

+67
-19
lines changed

Doc/library/asyncio-llapi-index.rst

+5-5
Original file line numberDiff line numberDiff line change
@@ -484,19 +484,19 @@ Protocol classes can implement the following **callback methods**:
484484
:widths: 50 50
485485
:class: full-width-table
486486

487-
* - ``callback`` :meth:`pipe_data_received()
488-
<SubprocessProtocol.pipe_data_received>`
487+
* - ``callback`` :meth:`~SubprocessProtocol.pipe_data_received`
489488
- Called when the child process writes data into its
490489
*stdout* or *stderr* pipe.
491490

492-
* - ``callback`` :meth:`pipe_connection_lost()
493-
<SubprocessProtocol.pipe_connection_lost>`
491+
* - ``callback`` :meth:`~SubprocessProtocol.pipe_connection_lost`
494492
- Called when one of the pipes communicating with
495493
the child process is closed.
496494

497495
* - ``callback`` :meth:`process_exited()
498496
<SubprocessProtocol.process_exited>`
499-
- Called when the child process has exited.
497+
- Called when the child process has exited. It can be called before
498+
:meth:`~SubprocessProtocol.pipe_data_received` and
499+
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.
500500

501501

502502
Event Loop Policies

Doc/library/asyncio-protocol.rst

+18-1
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,9 @@ factories passed to the :meth:`loop.subprocess_exec` and
708708

709709
Called when the child process has exited.
710710

711+
It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and
712+
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.
713+
711714

712715
Examples
713716
========
@@ -1003,12 +1006,26 @@ The subprocess is created by the :meth:`loop.subprocess_exec` method::
10031006
def __init__(self, exit_future):
10041007
self.exit_future = exit_future
10051008
self.output = bytearray()
1009+
self.pipe_closed = False
1010+
self.exited = False
1011+
1012+
def pipe_connection_lost(self, fd, exc):
1013+
self.pipe_closed = True
1014+
self.check_for_exit()
10061015

10071016
def pipe_data_received(self, fd, data):
10081017
self.output.extend(data)
10091018

10101019
def process_exited(self):
1011-
self.exit_future.set_result(True)
1020+
self.exited = True
1021+
# process_exited() method can be called before
1022+
# pipe_connection_lost() method: wait until both methods are
1023+
# called.
1024+
self.check_for_exit()
1025+
1026+
def check_for_exit(self):
1027+
if self.pipe_closed and self.exited:
1028+
self.exit_future.set_result(True)
10121029

10131030
async def get_date():
10141031
# Get a reference to the event loop as we plan to use

Lib/asyncio/unix_events.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ async def _make_subprocess_transport(self, protocol, args, shell,
226226
return transp
227227

228228
def _child_watcher_callback(self, pid, returncode, transp):
229-
# Skip one iteration for callbacks to be executed
230-
self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
229+
self.call_soon_threadsafe(transp._process_exited, returncode)
231230

232231
async def create_unix_connection(
233232
self, protocol_factory, path=None, *,

Lib/test/test_asyncio/test_subprocess.py

+43-11
Original file line numberDiff line numberDiff line change
@@ -753,21 +753,44 @@ async def main() -> None:
753753

754754
self.loop.run_until_complete(main())
755755

756-
def test_subprocess_consistent_callbacks(self):
756+
def test_subprocess_protocol_events(self):
757+
# gh-108973: Test that all subprocess protocol methods are called.
758+
# The protocol methods are not called in a determistic order.
759+
# The order depends on the event loop and the operating system.
757760
events = []
761+
fds = [1, 2]
762+
expected = [
763+
('pipe_data_received', 1, b'stdout'),
764+
('pipe_data_received', 2, b'stderr'),
765+
('pipe_connection_lost', 1),
766+
('pipe_connection_lost', 2),
767+
'process_exited',
768+
]
769+
per_fd_expected = [
770+
'pipe_data_received',
771+
'pipe_connection_lost',
772+
]
773+
758774
class MyProtocol(asyncio.SubprocessProtocol):
759775
def __init__(self, exit_future: asyncio.Future) -> None:
760776
self.exit_future = exit_future
761777

762778
def pipe_data_received(self, fd, data) -> None:
763779
events.append(('pipe_data_received', fd, data))
780+
self.exit_maybe()
764781

765782
def pipe_connection_lost(self, fd, exc) -> None:
766-
events.append('pipe_connection_lost')
783+
events.append(('pipe_connection_lost', fd))
784+
self.exit_maybe()
767785

768786
def process_exited(self) -> None:
769787
events.append('process_exited')
770-
self.exit_future.set_result(True)
788+
self.exit_maybe()
789+
790+
def exit_maybe(self):
791+
# Only exit when we got all expected events
792+
if len(events) >= len(expected):
793+
self.exit_future.set_result(True)
771794

772795
async def main() -> None:
773796
loop = asyncio.get_running_loop()
@@ -777,15 +800,24 @@ async def main() -> None:
777800
sys.executable, '-c', code, stdin=None)
778801
await exit_future
779802
transport.close()
780-
self.assertEqual(events, [
781-
('pipe_data_received', 1, b'stdout'),
782-
('pipe_data_received', 2, b'stderr'),
783-
'pipe_connection_lost',
784-
'pipe_connection_lost',
785-
'process_exited',
786-
])
787803

788-
self.loop.run_until_complete(main())
804+
return events
805+
806+
events = self.loop.run_until_complete(main())
807+
808+
# First, make sure that we received all events
809+
self.assertSetEqual(set(events), set(expected))
810+
811+
# Second, check order of pipe events per file descriptor
812+
per_fd_events = {fd: [] for fd in fds}
813+
for event in events:
814+
if event == 'process_exited':
815+
continue
816+
name, fd = event[:2]
817+
per_fd_events[fd].append(name)
818+
819+
for fd in fds:
820+
self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events))
789821

790822
def test_subprocess_communicate_stdout(self):
791823
# See https://github.com/python/cpython/issues/100133

0 commit comments

Comments
 (0)