Skip to content

Commit 9ee6872

Browse files
committed
Refactor BatchLogRecordProcessor and associated tests (open-telemetry#4535)
1 parent 211c49e commit 9ee6872

File tree

3 files changed

+56
-23
lines changed

3 files changed

+56
-23
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ def __init__(
210210
self._schedule_delay = schedule_delay_millis / 1e3
211211
self._max_export_batch_size = max_export_batch_size
212212
# Not used. No way currently to pass timeout to export.
213+
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
213214
self._export_timeout_millis = export_timeout_millis
214215
# Deque is thread safe.
215216
self._queue = collections.deque([], max_queue_size)
@@ -218,9 +219,10 @@ def __init__(
218219
target=self.worker,
219220
daemon=True,
220221
)
222+
221223
self._shutdown = False
222224
self._export_lock = threading.Lock()
223-
self._worker_sleep = threading.Event()
225+
self._worker_awaken = threading.Event()
224226
self._worker_thread.start()
225227
if hasattr(os, "register_at_fork"):
226228
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
@@ -235,15 +237,15 @@ def _should_export_batch(
235237
# Always continue to export while queue length exceeds max batch size.
236238
if len(self._queue) >= self._max_export_batch_size:
237239
return True
238-
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
240+
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
239241
return True
240-
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
242+
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
241243
return num_iterations == 0
242244
return False
243245

244246
def _at_fork_reinit(self):
245247
self._export_lock = threading.Lock()
246-
self._worker_sleep = threading.Event()
248+
self._worker_awaken = threading.Event()
247249
self._queue.clear()
248250
self._worker_thread = threading.Thread(
249251
name="OtelBatchLogRecordProcessor",
@@ -258,15 +260,15 @@ def worker(self):
258260
# Lots of strategies in the spec for setting next timeout.
259261
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
260262
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
261-
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
263+
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
262264
if self._shutdown:
263265
break
264266
self._export(
265267
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
266268
if sleep_interrupted
267269
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
268270
)
269-
self._worker_sleep.clear()
271+
self._worker_awaken.clear()
270272
self._export(BatchLogExportStrategy.EXPORT_ALL)
271273

272274
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
@@ -296,7 +298,7 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
296298

297299
def emit(self, log_data: LogData) -> None:
298300
if self._shutdown:
299-
_logger.warning("Shutdown called, ignoring log.")
301+
_logger.info("Shutdown called, ignoring log.")
300302
return
301303
if self._pid != os.getpid():
302304
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
@@ -305,15 +307,15 @@ def emit(self, log_data: LogData) -> None:
305307
_logger.warning("Queue full, dropping log.")
306308
self._queue.appendleft(log_data)
307309
if len(self._queue) >= self._max_export_batch_size:
308-
self._worker_sleep.set()
310+
self._worker_awaken.set()
309311

310312
def shutdown(self):
311313
if self._shutdown:
312314
return
313315
# Prevents emit and force_flush from further calling export.
314316
self._shutdown = True
315317
# Interrupts sleep in the worker, if it's sleeping.
316-
self._worker_sleep.set()
318+
self._worker_awaken.set()
317319
# Main worker loop should exit after one final export call with flush all strategy.
318320
self._worker_thread.join()
319321
self._exporter.shutdown()

opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT
8888
8989
The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor.
90+
This environment variable currently does nothing, see https://github.com/open-telemetry/opentelemetry-python/issues/4555.
9091
Default: 30000
9192
"""
9293

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -486,24 +486,20 @@ def test_logs_exported_once_batch_size_reached(self):
486486
exporter.export.assert_called_once()
487487
after_export = time.time_ns()
488488
# Shows the worker's 30 second sleep was interrupted within a second.
489-
self.assertTrue((after_export - before_export) < 1e9)
489+
self.assertLess(after_export - before_export, 1e9)
490490

491491
# pylint: disable=no-self-use
492492
def test_logs_exported_once_schedule_delay_reached(self):
493493
exporter = Mock()
494494
log_record_processor = BatchLogRecordProcessor(
495495
exporter=exporter,
496-
# Should not reach this during the test, instead export should be called when delay millis is hit.
497496
max_queue_size=15,
498497
max_export_batch_size=15,
499498
schedule_delay_millis=100,
500499
)
501-
for _ in range(15):
502-
log_record_processor.emit(EMPTY_LOG)
503-
time.sleep(0.11)
504-
exporter.export.assert_has_calls(
505-
[call([EMPTY_LOG]) for _ in range(15)]
506-
)
500+
log_record_processor.emit(EMPTY_LOG)
501+
time.sleep(0.2)
502+
exporter.export.assert_called_once_with([EMPTY_LOG])
507503

508504
def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
509505
exporter = Mock()
@@ -517,15 +513,16 @@ def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
517513
# This log should be flushed because it was written before shutdown.
518514
log_record_processor.emit(EMPTY_LOG)
519515
log_record_processor.shutdown()
516+
exporter.export.assert_called_once_with([EMPTY_LOG])
520517
self.assertTrue(exporter._stopped)
521518

522-
with self.assertLogs(level="WARNING") as log:
519+
with self.assertLogs(level="INFO") as log:
523520
# This log should not be flushed.
524521
log_record_processor.emit(EMPTY_LOG)
525522
self.assertEqual(len(log.output), 1)
526523
self.assertEqual(len(log.records), 1)
527524
self.assertIn("Shutdown called, ignoring log.", log.output[0])
528-
exporter.export.assert_called_once_with([EMPTY_LOG])
525+
exporter.export.assert_called_once()
529526

530527
# pylint: disable=no-self-use
531528
def test_force_flush_flushes_logs(self):
@@ -554,6 +551,7 @@ def bulk_log_and_flush(num_logs):
554551
with ThreadPoolExecutor(max_workers=69) as executor:
555552
for idx in range(69):
556553
executor.submit(bulk_log_and_flush, idx + 1)
554+
557555
executor.shutdown()
558556

559557
finished_logs = exporter.get_finished_logs()
@@ -563,21 +561,53 @@ def bulk_log_and_flush(num_logs):
563561
hasattr(os, "fork"),
564562
"needs *nix",
565563
)
566-
def test_batch_log_record_processor_fork(self):
564+
def test_batch_log_record_processor_fork_clears_logs_from_child(self):
567565
exporter = InMemoryLogExporter()
568566
log_record_processor = BatchLogRecordProcessor(
569567
exporter,
570568
max_export_batch_size=64,
571569
schedule_delay_millis=30000,
572570
)
573-
# These are not expected to be flushed. Calling fork clears any logs not flushed.
571+
# These logs should be flushed only from the parent process.
572+
# _at_fork_reinit should be called in the child process, to
573+
# clear these logs in the child process.
574574
for _ in range(10):
575575
log_record_processor.emit(EMPTY_LOG)
576+
577+
# The below test also needs this, but it can only be set once.
576578
multiprocessing.set_start_method("fork")
577579

578580
def child(conn):
579-
for _ in range(100):
581+
log_record_processor.force_flush()
582+
logs = exporter.get_finished_logs()
583+
conn.send(len(logs) == 0)
584+
conn.close()
585+
586+
parent_conn, child_conn = multiprocessing.Pipe()
587+
process = multiprocessing.Process(target=child, args=(child_conn,))
588+
process.start()
589+
self.assertTrue(parent_conn.recv())
590+
process.join()
591+
log_record_processor.force_flush()
592+
self.assertTrue(len(exporter.get_finished_logs()) == 10)
593+
594+
@unittest.skipUnless(
595+
hasattr(os, "fork"),
596+
"needs *nix",
597+
)
598+
def test_batch_log_record_processor_fork_doesnot_deadlock(self):
599+
exporter = InMemoryLogExporter()
600+
log_record_processor = BatchLogRecordProcessor(
601+
exporter,
602+
max_export_batch_size=64,
603+
schedule_delay_millis=30000,
604+
)
605+
606+
def child(conn):
607+
def _target():
580608
log_record_processor.emit(EMPTY_LOG)
609+
610+
ConcurrencyTestBase.run_with_many_threads(_target, 100)
581611
log_record_processor.force_flush()
582612
logs = exporter.get_finished_logs()
583613
conn.send(len(logs) == 100)
@@ -588,7 +618,6 @@ def child(conn):
588618
process.start()
589619
self.assertTrue(parent_conn.recv())
590620
process.join()
591-
self.assertTrue(len(exporter.get_finished_logs()) == 0)
592621

593622
def test_batch_log_record_processor_gc(self):
594623
# Given a BatchLogRecordProcessor
@@ -650,4 +679,5 @@ def formatter(record): # pylint: disable=unused-argument
650679
mock_stdout = Mock()
651680
exporter = ConsoleLogExporter(out=mock_stdout, formatter=formatter)
652681
exporter.export([EMPTY_LOG])
682+
653683
mock_stdout.write.assert_called_once_with(mock_record_str)

0 commit comments

Comments
 (0)