@@ -376,7 +376,8 @@ def __init__(
376
376
self ._flush_pending = False
377
377
self ._subprocess_flush_pending = False
378
378
self ._io_loop = pub_thread .io_loop
379
- self ._new_buffer ()
379
+ self ._buffer_lock = threading .RLock ()
380
+ self ._buffer = StringIO ()
380
381
self .echo = None
381
382
self ._isatty = bool (isatty )
382
383
@@ -533,7 +534,8 @@ def write(self, string: str) -> int:
533
534
534
535
is_child = (not self ._is_master_process ())
535
536
# only touch the buffer in the IO thread to avoid races
536
- self .pub_thread .schedule (lambda : self ._buffer .write (string ))
537
+ with self ._buffer_lock :
538
+ self ._buffer .write (string )
537
539
if is_child :
538
540
# mp.Pool cannot be trusted to flush promptly (or ever),
539
541
# and this helps.
@@ -558,17 +560,15 @@ def writable(self):
558
560
return True
559
561
560
562
def _flush_buffer (self ):
561
- """clear the current buffer and return the current buffer data.
562
-
563
- This should only be called in the IO thread.
564
- """
565
- data = ''
566
- if self ._buffer is not None :
567
- buf = self ._buffer
568
- self ._new_buffer ()
569
- data = buf .getvalue ()
570
- buf .close ()
563
+ """clear the current buffer and return the current buffer data."""
564
+ buf = self ._rotate_buffer ()
565
+ data = buf .getvalue ()
566
+ buf .close ()
571
567
return data
572
568
573
- def _new_buffer (self ):
574
- self ._buffer = StringIO ()
569
+ def _rotate_buffer (self ):
570
+ """Returns the current buffer and replaces it with an empty buffer."""
571
+ with self ._buffer_lock :
572
+ old_buffer = self ._buffer
573
+ self ._buffer = StringIO ()
574
+ return old_buffer
0 commit comments