Skip to content

gh-129205: Experiment BytesIO._readfrom() #130098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Include/internal/pycore_global_objects_fini_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Include/internal/pycore_global_strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(entrypoint)
STRUCT_FOR_ID(env)
STRUCT_FOR_ID(errors)
STRUCT_FOR_ID(estimate)
STRUCT_FOR_ID(event)
STRUCT_FOR_ID(eventmask)
STRUCT_FOR_ID(exc_type)
Expand Down
1 change: 1 addition & 0 deletions Include/internal/pycore_runtime_init_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Include/internal/pycore_unicodeobject_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 4 additions & 8 deletions Lib/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,10 @@ def read(self, size=-1):
return data

def readall(self):
chunks = []
# sys.maxsize means the max length of output buffer is unlimited,
# so that the whole input buffer can be decompressed within one
# .decompress() call.
while data := self.read(sys.maxsize):
chunks.append(data)

return b"".join(chunks)
# FIXME(cmaloney): non blocking support?
bio = io.BytesIO()
bio.readfrom(self)
return bio.getvalue()

# Rewind the file to the beginning of the data stream.
def _rewind(self):
Expand Down
112 changes: 86 additions & 26 deletions Lib/_pyio.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,82 @@ def read1(self, size=-1):
"""
return self.read(size)

def readfrom(self, file, /, *, estimate=None, limit=None):
"""Efficiently read from the provided file and return True if hit end.

Returns True if and only if a read into a non-zero length buffer
returns 0 bytes. On most systems this indicates end of file / stream.
"""
if self.closed:
raise ValueError("read from closed file")

# In order to detect end of file, need a read() of at least 1
# byte which returns size 0. Oversize the buffer by 1 byte so the
# I/O can be completed with two read() calls (one for all data, one
# for EOF) without needing to resize the buffer.
target_read = None
if estimate is not None:
target_read = int(estimate) + 1
else:
target_read = DEFAULT_BUFFER_SIZE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: max(DEFAULT_BUFFER_SIZE, len(self._buffer) - self._pos) / if there's already lots of space use it.


# Cap to limit
if limit is not None:
limit = int(limit)
if limit == 0: # Nothing to read.
return False
if limit < 0:
raise ValueError(f"limit must be larger than 0, got {limit}")

if limit is not None:
target_read = min(target_read, limit)

# Expand buffer to get target read in one read when possible.
if len(self._buffer) < target_read + self._pos:
self._buffer.resize(self._pos + target_read)

if isinstance(file, int): # File descriptor
read_fn = lambda: os.readinto(file, memoryview(self._buffer)[self._pos:])
elif file_readinto := getattr(file, "readinto", None):
read_fn = lambda: file_readinto(memoryview(self._buffer)[self._pos:])
elif file_read := getattr(file, "read", None):
def read_fn():
data = file_read(len(self._buffer) - self._pos)
self._buffer[self._pos:self._pos + len(data)] = data

found_eof = False
start_pos = self._pos
try:
while n := read_fn():
self._pos += n
# Expand buffer if needed.
if len(self._buffer) - self._pos <= 0:
bytes_read = self._pos - start_pos
target_read = _new_buffersize(bytes_read)

# Keep buffer size <= limit, so only need to check against
# limit when resizing.
if limit is not None:
remaining = limit - bytes_read
if remaining <= 0:
assert remaining == 0, "should never pass limit"
break
target_read = min(remaining, target_read)

self._buffer.resize(target_read + len(self._buffer))

else:
assert len(self._buffer) - self._pos >= 1, \
"os.readinto buffer size 0 will result in erroneous EOF / returns 0"
found_eof = True

except BlockingIOError:
pass

# Remove all excess bytes.
self._buffer.resize(self._pos)
return found_eof

def write(self, b):
if self.closed:
raise ValueError("write to closed file")
Expand Down Expand Up @@ -1666,38 +1742,22 @@ def readall(self):
"""
self._checkClosed()
self._checkReadable()
if self._stat_atopen is None or self._stat_atopen.st_size <= 0:
bufsize = DEFAULT_BUFFER_SIZE
else:
# In order to detect end of file, need a read() of at least 1
# byte which returns size 0. Oversize the buffer by 1 byte so the
# I/O can be completed with two read() calls (one for all data, one
# for EOF) without needing to resize the buffer.
bufsize = self._stat_atopen.st_size + 1

if self._stat_atopen.st_size > 65536:
estimate = None
if self._stat_atopen and self._stat_atopen.st_size >= 0:
estimate = self._stat_atopen.st_size
if estimate > 65536:
try:
pos = os.lseek(self._fd, 0, SEEK_CUR)
if self._stat_atopen.st_size >= pos:
bufsize = self._stat_atopen.st_size - pos + 1
estimate = estimate - pos if estimate > pos else 0
except OSError:
pass

result = bytearray(bufsize)
bytes_read = 0
try:
while n := os.readinto(self._fd, memoryview(result)[bytes_read:]):
bytes_read += n
if bytes_read >= len(result):
result.resize(_new_buffersize(bytes_read))
except BlockingIOError:
if not bytes_read:
return None
bio = BytesIO()
found_eof = bio.readfrom(self._fd, estimate=estimate)
result = bio.getvalue()
# No limit in readfrom, so not finding eof indicates blocked.
return result if result or found_eof else None

assert len(result) - bytes_read >= 1, \
"os.readinto buffer size 0 will result in erroneous EOF / returns 0"
result.resize(bytes_read)
return bytes(result)

def readinto(self, buffer):
"""Same as RawIOBase.readinto()."""
Expand Down
9 changes: 3 additions & 6 deletions Lib/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,12 +1921,9 @@ def _execute_child(self, args, executable, preexec_fn, close_fds,

# Wait for exec to fail or succeed; possibly raising an
# exception (limited in size)
errpipe_data = bytearray()
while True:
part = os.read(errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data) > 50000:
break
bio = io.BytesIO()
bio.readfrom(errpipe_read, estimate=0, limit=50_000)
errpipe_data = bio.getvalue()
finally:
# be sure the FD is closed no matter what
os.close(errpipe_read)
Expand Down
Loading
Loading