Skip to content

Commit ed344a9

Browse files
committed
Refactor BatchLogRecordProcessor and associated tests (open-telemetry#4535)
1 parent 1233e24 commit ed344a9

File tree

2 files changed

+28
-23
lines changed

2 files changed

+28
-23
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ def __init__(
210210
self._schedule_delay = schedule_delay_millis / 1e3
211211
self._max_export_batch_size = max_export_batch_size
212212
# Not used. No way currently to pass timeout to export.
213+
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
213214
self._export_timeout_millis = export_timeout_millis
214215
# Deque is thread safe.
215216
self._queue = collections.deque([], max_queue_size)
@@ -218,9 +219,10 @@ def __init__(
218219
target=self.worker,
219220
daemon=True,
220221
)
222+
221223
self._shutdown = False
222224
self._export_lock = threading.Lock()
223-
self._worker_sleep = threading.Event()
225+
self._worker_awaken = threading.Event()
224226
self._worker_thread.start()
225227
if hasattr(os, "register_at_fork"):
226228
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
@@ -235,15 +237,15 @@ def _should_export_batch(
235237
# Always continue to export while queue length exceeds max batch size.
236238
if len(self._queue) >= self._max_export_batch_size:
237239
return True
238-
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
240+
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
239241
return True
240-
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
242+
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
241243
return num_iterations == 0
242244
return False
243245

244246
def _at_fork_reinit(self):
245247
self._export_lock = threading.Lock()
246-
self._worker_sleep = threading.Event()
248+
self._worker_awaken = threading.Event()
247249
self._queue.clear()
248250
self._worker_thread = threading.Thread(
249251
name="OtelBatchLogRecordProcessor",
@@ -258,15 +260,15 @@ def worker(self):
258260
# Lots of strategies in the spec for setting next timeout.
259261
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
260262
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
261-
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
263+
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
262264
if self._shutdown:
263265
break
264266
self._export(
265267
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
266268
if sleep_interrupted
267269
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
268270
)
269-
self._worker_sleep.clear()
271+
self._worker_awaken.clear()
270272
self._export(BatchLogExportStrategy.EXPORT_ALL)
271273

272274
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
@@ -296,7 +298,7 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
296298

297299
def emit(self, log_data: LogData) -> None:
298300
if self._shutdown:
299-
_logger.warning("Shutdown called, ignoring log.")
301+
_logger.info("Shutdown called, ignoring log.")
300302
return
301303
if self._pid != os.getpid():
302304
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
@@ -305,15 +307,15 @@ def emit(self, log_data: LogData) -> None:
305307
_logger.warning("Queue full, dropping log.")
306308
self._queue.appendleft(log_data)
307309
if len(self._queue) >= self._max_export_batch_size:
308-
self._worker_sleep.set()
310+
self._worker_awaken.set()
309311

310312
def shutdown(self):
311313
if self._shutdown:
312314
return
313315
# Prevents emit and force_flush from further calling export.
314316
self._shutdown = True
315317
# Interrupts sleep in the worker, if it's sleeping.
316-
self._worker_sleep.set()
318+
self._worker_awaken.set()
317319
# Main worker loop should exit after one final export call with flush all strategy.
318320
self._worker_thread.join()
319321
self._exporter.shutdown()

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -486,24 +486,20 @@ def test_logs_exported_once_batch_size_reached(self):
486486
exporter.export.assert_called_once()
487487
after_export = time.time_ns()
488488
# Shows the worker's 30 second sleep was interrupted within a second.
489-
self.assertTrue((after_export - before_export) < 1e9)
489+
self.assertLess(after_export - before_export, 1e9)
490490

491491
# pylint: disable=no-self-use
492492
def test_logs_exported_once_schedule_delay_reached(self):
493493
exporter = Mock()
494494
log_record_processor = BatchLogRecordProcessor(
495495
exporter=exporter,
496-
# Should not reach this during the test, instead export should be called when delay millis is hit.
497496
max_queue_size=15,
498497
max_export_batch_size=15,
499498
schedule_delay_millis=100,
500499
)
501-
for _ in range(15):
502-
log_record_processor.emit(EMPTY_LOG)
503-
time.sleep(0.11)
504-
exporter.export.assert_has_calls(
505-
[call([EMPTY_LOG]) for _ in range(15)]
506-
)
500+
log_record_processor.emit(EMPTY_LOG)
501+
time.sleep(0.2)
502+
exporter.export.assert_called_once_with([EMPTY_LOG])
507503

508504
def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
509505
exporter = Mock()
@@ -520,13 +516,13 @@ def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
520516
exporter.export.assert_called_once_with([EMPTY_LOG])
521517
self.assertTrue(exporter._stopped)
522518

523-
with self.assertLogs(level="WARNING") as log:
519+
with self.assertLogs(level="INFO") as log:
524520
# This log should not be flushed.
525521
log_record_processor.emit(EMPTY_LOG)
526522
self.assertEqual(len(log.output), 1)
527523
self.assertEqual(len(log.records), 1)
528524
self.assertIn("Shutdown called, ignoring log.", log.output[0])
529-
exporter.export.assert_called_once_with([EMPTY_LOG])
525+
exporter.export.assert_called_once()
530526

531527
# pylint: disable=no-self-use
532528
def test_force_flush_flushes_logs(self):
@@ -555,6 +551,7 @@ def bulk_log_and_flush(num_logs):
555551
with ThreadPoolExecutor(max_workers=69) as executor:
556552
for idx in range(69):
557553
executor.submit(bulk_log_and_flush, idx + 1)
554+
558555
executor.shutdown()
559556

560557
finished_logs = exporter.get_finished_logs()
@@ -564,16 +561,20 @@ def bulk_log_and_flush(num_logs):
564561
hasattr(os, "fork"),
565562
"needs *nix",
566563
)
567-
def test_batch_log_record_processor_fork(self):
564+
def test_batch_log_record_processor_fork_clears_logs_from_child(self):
568565
exporter = InMemoryLogExporter()
569566
log_record_processor = BatchLogRecordProcessor(
570567
exporter,
571568
max_export_batch_size=64,
572569
schedule_delay_millis=30000,
573570
)
574-
# These are not expected to be flushed. Calling fork clears any logs not flushed.
571+
# These logs should be flushed only from the parent process.
572+
# _at_fork_reinit should be called in the child process, to
573+
# clear these logs in the child process.
575574
for _ in range(10):
576575
log_record_processor.emit(EMPTY_LOG)
576+
577+
# The below test also needs this, but it can only be set once.
577578
multiprocessing.set_start_method("fork")
578579

579580
def child(conn):
@@ -603,8 +604,10 @@ def test_batch_log_record_processor_fork_doesnot_deadlock(self):
603604
)
604605

605606
def child(conn):
606-
for _ in range(100):
607+
def _target():
607608
log_record_processor.emit(EMPTY_LOG)
609+
610+
ConcurrencyTestBase.run_with_many_threads(_target, 100)
608611
log_record_processor.force_flush()
609612
logs = exporter.get_finished_logs()
610613
conn.send(len(logs) == 100)
@@ -615,7 +618,6 @@ def child(conn):
615618
process.start()
616619
self.assertTrue(parent_conn.recv())
617620
process.join()
618-
self.assertTrue(len(exporter.get_finished_logs()) == 0)
619621

620622
def test_batch_log_record_processor_gc(self):
621623
# Given a BatchLogRecordProcessor
@@ -677,4 +679,5 @@ def formatter(record): # pylint: disable=unused-argument
677679
mock_stdout = Mock()
678680
exporter = ConsoleLogExporter(out=mock_stdout, formatter=formatter)
679681
exporter.export([EMPTY_LOG])
682+
680683
mock_stdout.write.assert_called_once_with(mock_record_str)

0 commit comments

Comments
 (0)