Skip to content

Commit ad4ed87

Browse files
authored
Forbid creating of stream objects outside of asyncio (#13101)
1 parent 2cc0223 commit ad4ed87

File tree

5 files changed

+203
-76
lines changed

5 files changed

+203
-76
lines changed

Lib/asyncio/streams.py

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import socket
66
import sys
7+
import warnings
78
import weakref
89

910
if hasattr(socket, 'AF_UNIX'):
@@ -42,11 +43,14 @@ async def open_connection(host=None, port=None, *,
4243
"""
4344
if loop is None:
4445
loop = events.get_event_loop()
45-
reader = StreamReader(limit=limit, loop=loop)
46-
protocol = StreamReaderProtocol(reader, loop=loop)
46+
reader = StreamReader(limit=limit, loop=loop,
47+
_asyncio_internal=True)
48+
protocol = StreamReaderProtocol(reader, loop=loop,
49+
_asyncio_internal=True)
4750
transport, _ = await loop.create_connection(
4851
lambda: protocol, host, port, **kwds)
49-
writer = StreamWriter(transport, protocol, reader, loop)
52+
writer = StreamWriter(transport, protocol, reader, loop,
53+
_asyncio_internal=True)
5054
return reader, writer
5155

5256

@@ -77,9 +81,11 @@ async def start_server(client_connected_cb, host=None, port=None, *,
7781
loop = events.get_event_loop()
7882

7983
def factory():
80-
reader = StreamReader(limit=limit, loop=loop)
84+
reader = StreamReader(limit=limit, loop=loop,
85+
_asyncio_internal=True)
8186
protocol = StreamReaderProtocol(reader, client_connected_cb,
82-
loop=loop)
87+
loop=loop,
88+
_asyncio_internal=True)
8389
return protocol
8490

8591
return await loop.create_server(factory, host, port, **kwds)
@@ -93,11 +99,14 @@ async def open_unix_connection(path=None, *,
9399
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
94100
if loop is None:
95101
loop = events.get_event_loop()
96-
reader = StreamReader(limit=limit, loop=loop)
97-
protocol = StreamReaderProtocol(reader, loop=loop)
102+
reader = StreamReader(limit=limit, loop=loop,
103+
_asyncio_internal=True)
104+
protocol = StreamReaderProtocol(reader, loop=loop,
105+
_asyncio_internal=True)
98106
transport, _ = await loop.create_unix_connection(
99107
lambda: protocol, path, **kwds)
100-
writer = StreamWriter(transport, protocol, reader, loop)
108+
writer = StreamWriter(transport, protocol, reader, loop,
109+
_asyncio_internal=True)
101110
return reader, writer
102111

103112
async def start_unix_server(client_connected_cb, path=None, *,
@@ -107,9 +116,11 @@ async def start_unix_server(client_connected_cb, path=None, *,
107116
loop = events.get_event_loop()
108117

109118
def factory():
110-
reader = StreamReader(limit=limit, loop=loop)
119+
reader = StreamReader(limit=limit, loop=loop,
120+
_asyncio_internal=True)
111121
protocol = StreamReaderProtocol(reader, client_connected_cb,
112-
loop=loop)
122+
loop=loop,
123+
_asyncio_internal=True)
113124
return protocol
114125

115126
return await loop.create_unix_server(factory, path, **kwds)
@@ -125,11 +136,20 @@ class FlowControlMixin(protocols.Protocol):
125136
StreamWriter.drain() must wait for _drain_helper() coroutine.
126137
"""
127138

128-
def __init__(self, loop=None):
139+
def __init__(self, loop=None, *, _asyncio_internal=False):
129140
if loop is None:
130141
self._loop = events.get_event_loop()
131142
else:
132143
self._loop = loop
144+
if not _asyncio_internal:
145+
# NOTE:
146+
# Avoid inheritance from FlowControlMixin
147+
# Copy-paste the code to your project
148+
# if you need flow control helpers
149+
warnings.warn(f"{self.__class__} should be instaniated "
150+
"by asyncio internals only, "
151+
"please avoid its creation from user code",
152+
DeprecationWarning)
133153
self._paused = False
134154
self._drain_waiter = None
135155
self._connection_lost = False
@@ -191,8 +211,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
191211

192212
_source_traceback = None
193213

194-
def __init__(self, stream_reader, client_connected_cb=None, loop=None):
195-
super().__init__(loop=loop)
214+
def __init__(self, stream_reader, client_connected_cb=None, loop=None,
215+
*, _asyncio_internal=False):
216+
super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
196217
if stream_reader is not None:
197218
self._stream_reader_wr = weakref.ref(stream_reader,
198219
self._on_reader_gc)
@@ -253,7 +274,8 @@ def connection_made(self, transport):
253274
if self._client_connected_cb is not None:
254275
self._stream_writer = StreamWriter(transport, self,
255276
reader,
256-
self._loop)
277+
self._loop,
278+
_asyncio_internal=True)
257279
res = self._client_connected_cb(reader,
258280
self._stream_writer)
259281
if coroutines.iscoroutine(res):
@@ -311,7 +333,13 @@ class StreamWriter:
311333
directly.
312334
"""
313335

314-
def __init__(self, transport, protocol, reader, loop):
336+
def __init__(self, transport, protocol, reader, loop,
337+
*, _asyncio_internal=False):
338+
if not _asyncio_internal:
339+
warnings.warn(f"{self.__class__} should be instaniated "
340+
"by asyncio internals only, "
341+
"please avoid its creation from user code",
342+
DeprecationWarning)
315343
self._transport = transport
316344
self._protocol = protocol
317345
# drain() expects that the reader has an exception() method
@@ -388,7 +416,14 @@ class StreamReader:
388416

389417
_source_traceback = None
390418

391-
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
419+
def __init__(self, limit=_DEFAULT_LIMIT, loop=None,
420+
*, _asyncio_internal=False):
421+
if not _asyncio_internal:
422+
warnings.warn(f"{self.__class__} should be instaniated "
423+
"by asyncio internals only, "
424+
"please avoid its creation from user code",
425+
DeprecationWarning)
426+
392427
# The line length limit is a security feature;
393428
# it also doubles as half the buffer limit.
394429

Lib/asyncio/subprocess.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
22

33
import subprocess
4+
import warnings
45

56
from . import events
67
from . import protocols
@@ -18,8 +19,8 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
1819
protocols.SubprocessProtocol):
1920
"""Like StreamReaderProtocol, but for a subprocess."""
2021

21-
def __init__(self, limit, loop):
22-
super().__init__(loop=loop)
22+
def __init__(self, limit, loop, *, _asyncio_internal=False):
23+
super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
2324
self._limit = limit
2425
self.stdin = self.stdout = self.stderr = None
2526
self._transport = None
@@ -42,14 +43,16 @@ def connection_made(self, transport):
4243
stdout_transport = transport.get_pipe_transport(1)
4344
if stdout_transport is not None:
4445
self.stdout = streams.StreamReader(limit=self._limit,
45-
loop=self._loop)
46+
loop=self._loop,
47+
_asyncio_internal=True)
4648
self.stdout.set_transport(stdout_transport)
4749
self._pipe_fds.append(1)
4850

4951
stderr_transport = transport.get_pipe_transport(2)
5052
if stderr_transport is not None:
5153
self.stderr = streams.StreamReader(limit=self._limit,
52-
loop=self._loop)
54+
loop=self._loop,
55+
_asyncio_internal=True)
5356
self.stderr.set_transport(stderr_transport)
5457
self._pipe_fds.append(2)
5558

@@ -58,7 +61,8 @@ def connection_made(self, transport):
5861
self.stdin = streams.StreamWriter(stdin_transport,
5962
protocol=self,
6063
reader=None,
61-
loop=self._loop)
64+
loop=self._loop,
65+
_asyncio_internal=True)
6266

6367
def pipe_data_received(self, fd, data):
6468
if fd == 1:
@@ -104,7 +108,13 @@ def _maybe_close_transport(self):
104108

105109

106110
class Process:
107-
def __init__(self, transport, protocol, loop):
111+
def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
112+
if not _asyncio_internal:
113+
warnings.warn(f"{self.__class__} should be instaniated "
114+
"by asyncio internals only, "
115+
"please avoid its creation from user code",
116+
DeprecationWarning)
117+
108118
self._transport = transport
109119
self._protocol = protocol
110120
self._loop = loop
@@ -195,12 +205,13 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
195205
if loop is None:
196206
loop = events.get_event_loop()
197207
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
198-
loop=loop)
208+
loop=loop,
209+
_asyncio_internal=True)
199210
transport, protocol = await loop.subprocess_shell(
200211
protocol_factory,
201212
cmd, stdin=stdin, stdout=stdout,
202213
stderr=stderr, **kwds)
203-
return Process(transport, protocol, loop)
214+
return Process(transport, protocol, loop, _asyncio_internal=True)
204215

205216

206217
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
@@ -209,10 +220,11 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
209220
if loop is None:
210221
loop = events.get_event_loop()
211222
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
212-
loop=loop)
223+
loop=loop,
224+
_asyncio_internal=True)
213225
transport, protocol = await loop.subprocess_exec(
214226
protocol_factory,
215227
program, *args,
216228
stdin=stdin, stdout=stdout,
217229
stderr=stderr, **kwds)
218-
return Process(transport, protocol, loop)
230+
return Process(transport, protocol, loop, _asyncio_internal=True)

0 commit comments

Comments
 (0)