From 217463eca1e551c4e593608e7cf47b647b3bfcca Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 9 Apr 2025 18:50:39 +0000 Subject: [PATCH 1/5] Refactor BatchLogRecordProcessor --- .../sdk/_logs/_internal/export/__init__.py | 225 +++++++----------- opentelemetry-sdk/tests/logs/test_export.py | 224 ++++++++--------- 2 files changed, 176 insertions(+), 273 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 434dc745ccf..dd5f7f472a6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -21,8 +21,7 @@ import sys import threading from os import environ, linesep -from time import time_ns -from typing import IO, Callable, Deque, List, Optional, Sequence +from typing import IO, Callable, Deque, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -55,6 +54,12 @@ class LogExportResult(enum.Enum): FAILURE = 1 +class BatchLogExportStrategy(enum.Enum): + EXPORT_ALL = 0 + EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1 + EXPORT_AT_LEAST_ONE_BATCH = 2 + + class LogExporter(abc.ABC): """Interface for exporting logs. @@ -140,14 +145,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n return True -class _FlushRequest: - __slots__ = ["event", "num_log_records"] - - def __init__(self): - self.event = threading.Event() - self.num_log_records = 0 - - _BSP_RESET_ONCE = Once() @@ -166,8 +163,6 @@ class BatchLogRecordProcessor(LogRecordProcessor): """ _queue: Deque[LogData] - _flush_request: _FlushRequest | None - _log_records: List[LogData | None] def __init__( self, @@ -189,7 +184,7 @@ def __init__( max_export_batch_size = ( BatchLogRecordProcessor._default_max_export_batch_size() ) - + # Not used. No way currently to pass timeout to export. if export_timeout_millis is None: export_timeout_millis = ( BatchLogRecordProcessor._default_export_timeout_millis() @@ -201,26 +196,43 @@ def __init__( self._exporter = exporter self._max_queue_size = max_queue_size - self._schedule_delay_millis = schedule_delay_millis + self._schedule_delay = schedule_delay_millis / 1e3 self._max_export_batch_size = max_export_batch_size + # Not used. No way currently to pass timeout to export. self._export_timeout_millis = export_timeout_millis + # Deque is thread safe. self._queue = collections.deque([], max_queue_size) self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", target=self.worker, daemon=True, ) - self._condition = threading.Condition(threading.Lock()) + self._shutdown = False - self._flush_request = None - self._log_records = [None] * self._max_export_batch_size + self._export_lock = threading.Lock() + self._worker_sleep = threading.Event() self._worker_thread.start() if hasattr(os, "register_at_fork"): os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access self._pid = os.getpid() + def _should_export_batch( + self, batch_strategy: BatchLogExportStrategy, num_iterations: int + ) -> bool: + if not self._queue: + return False + # Always continue to export while queue length exceeds max batch size. + if len(self._queue) >= self._max_export_batch_size: + return True + if batch_strategy == BatchLogExportStrategy.EXPORT_ALL: + return True + if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH: + return num_iterations == 0 + return False + def _at_fork_reinit(self): - self._condition = threading.Condition(threading.Lock()) + self._export_lock = threading.Lock() + self._worker_sleep = threading.Event() self._queue.clear() self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", @@ -231,152 +243,75 @@ def _at_fork_reinit(self): self._pid = os.getpid() def worker(self): - timeout = self._schedule_delay_millis / 1e3 - flush_request: Optional[_FlushRequest] = None while not self._shutdown: - with self._condition: - if self._shutdown: - # shutdown may have been called, avoid further processing - break - flush_request = self._get_and_unset_flush_request() - if ( - len(self._queue) < self._max_export_batch_size - and flush_request is None - ): - self._condition.wait(timeout) - - flush_request = self._get_and_unset_flush_request() - if not self._queue: - timeout = self._schedule_delay_millis / 1e3 - self._notify_flush_request_finished(flush_request) - flush_request = None - continue - if self._shutdown: - break - - start_ns = time_ns() - self._export(flush_request) - end_ns = time_ns() - # subtract the duration of this export call to the next timeout - timeout = self._schedule_delay_millis / 1e3 - ( - (end_ns - start_ns) / 1e9 - ) - - self._notify_flush_request_finished(flush_request) - flush_request = None - - # there might have been a new flush request while export was running - # and before the done flag switched to true - with self._condition: - shutdown_flush_request = self._get_and_unset_flush_request() - - # flush the remaining logs - self._drain_queue() - self._notify_flush_request_finished(flush_request) - self._notify_flush_request_finished(shutdown_flush_request) - - def _export(self, flush_request: Optional[_FlushRequest] = None): - """Exports logs considering the given flush_request. - - If flush_request is not None then logs are exported in batches - until the number of exported logs reached or exceeded the num of logs in - flush_request, otherwise exports at max max_export_batch_size logs. - """ - if flush_request is None: - self._export_batch() - return - - num_log_records = flush_request.num_log_records - while self._queue: - exported = self._export_batch() - num_log_records -= exported - - if num_log_records <= 0: + # Lots of strategies in the spec for setting next timeout. + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor. + # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold. + sleep_interrupted = self._worker_sleep.wait(self._schedule_delay) + if self._shutdown: break - - def _export_batch(self) -> int: - """Exports at most max_export_batch_size logs and returns the number of - exported logs. - """ - idx = 0 - while idx < self._max_export_batch_size and self._queue: - record = self._queue.pop() - self._log_records[idx] = record - idx += 1 - token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) - try: - self._exporter.export(self._log_records[:idx]) # type: ignore - except Exception: # pylint: disable=broad-exception-caught - _logger.exception("Exception while exporting logs.") - detach(token) - - for index in range(idx): - self._log_records[index] = None - return idx - - def _drain_queue(self): - """Export all elements until queue is empty. - - Can only be called from the worker thread context because it invokes - `export` that is not thread safe. - """ - while self._queue: - self._export_batch() - - def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: - flush_request = self._flush_request - self._flush_request = None - if flush_request is not None: - flush_request.num_log_records = len(self._queue) - return flush_request - - @staticmethod - def _notify_flush_request_finished( - flush_request: Optional[_FlushRequest] = None, - ): - if flush_request is not None: - flush_request.event.set() - - def _get_or_create_flush_request(self) -> _FlushRequest: - if self._flush_request is None: - self._flush_request = _FlushRequest() - return self._flush_request + self._export( + BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD + if sleep_interrupted + else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH + ) + self._worker_sleep.clear() + self._export(BatchLogExportStrategy.EXPORT_ALL) + + def _export(self, batch_strategy: BatchLogExportStrategy) -> None: + with self._export_lock: + iteration = 0 + # We could see concurrent export calls from worker and force_flush. We call _should_export_batch + # once the lock is obtained to see if we still need to make the requested export. + while self._should_export_batch(batch_strategy, iteration): + iteration += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self._exporter.export( + [ + # Oldest records are at the back, so pop from there. + self._queue.pop() + for _ in range( + min( + self._max_export_batch_size, + len(self._queue), + ) + ) + ] + ) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) def emit(self, log_data: LogData) -> None: - """Adds the `LogData` to queue and notifies the waiting threads - when size of queue reaches max_export_batch_size. - """ if self._shutdown: + _logger.warning("Shutdown called, ignoring log.") return if self._pid != os.getpid(): _BSP_RESET_ONCE.do_once(self._at_fork_reinit) + if len(self._queue) == self._max_queue_size: + _logger.warning("Queue full, dropping log.") self._queue.appendleft(log_data) if len(self._queue) >= self._max_export_batch_size: - with self._condition: - self._condition.notify() + self._worker_sleep.set() def shutdown(self): + if self._shutdown: + return + # Prevents emit and force_flush from further calling export. self._shutdown = True - with self._condition: - self._condition.notify_all() + # Interrupts sleep in the worker, if it's sleeping. + self._worker_sleep.set() + # Main worker loop should exit after one final export call with flush all strategy. self._worker_thread.join() self._exporter.shutdown() def force_flush(self, timeout_millis: Optional[int] = None) -> bool: - if timeout_millis is None: - timeout_millis = self._export_timeout_millis if self._shutdown: - return True - - with self._condition: - flush_request = self._get_or_create_flush_request() - self._condition.notify_all() - - ret = flush_request.event.wait(timeout_millis / 1e3) - if not ret: - _logger.warning("Timeout was exceeded in force_flush().") - return ret + return + # Blocking call to export. + self._export(BatchLogExportStrategy.EXPORT_ALL) @staticmethod def _default_max_queue_size(): diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 2e00bad6538..6b3c3a91603 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -19,7 +19,7 @@ import time import unittest from concurrent.futures import ThreadPoolExecutor -from unittest.mock import Mock, patch +from unittest.mock import Mock, call, patch from opentelemetry._logs import SeverityNumber from opentelemetry.sdk import trace @@ -44,10 +44,15 @@ ) from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope -from opentelemetry.test.concurrency_test import ConcurrencyTestBase + from opentelemetry.trace import TraceFlags from opentelemetry.trace.span import INVALID_SPAN_CONTEXT +EMPTY_LOG = LogData( + log_record=LogRecord(), + instrumentation_scope=InstrumentationScope("example", "example"), +) + class TestSimpleLogRecordProcessor(unittest.TestCase): def test_simple_log_record_processor_default_level(self): @@ -326,7 +331,7 @@ def test_simple_log_record_processor_different_msg_types_with_formatter( self.assertEqual(expected, emitted) -class TestBatchLogRecordProcessor(ConcurrencyTestBase): +class TestBatchLogRecordProcessor(unittest.TestCase): def test_emit_call_log_record(self): exporter = InMemoryLogExporter() log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter)) @@ -351,7 +356,7 @@ def test_args(self): ) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 1024) - self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._schedule_delay, 2.5) self.assertEqual(log_record_processor._max_export_batch_size, 256) self.assertEqual(log_record_processor._export_timeout_millis, 15000) @@ -369,7 +374,7 @@ def test_env_vars(self): log_record_processor = BatchLogRecordProcessor(exporter) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 1024) - self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._schedule_delay, 2.5) self.assertEqual(log_record_processor._max_export_batch_size, 256) self.assertEqual(log_record_processor._export_timeout_millis, 15000) @@ -378,7 +383,7 @@ def test_args_defaults(self): log_record_processor = BatchLogRecordProcessor(exporter) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 2048) - self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._schedule_delay, 5) self.assertEqual(log_record_processor._max_export_batch_size, 512) self.assertEqual(log_record_processor._export_timeout_millis, 30000) @@ -398,7 +403,7 @@ def test_args_env_var_value_error(self): _logger.disabled = False self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 2048) - self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._schedule_delay, 5) self.assertEqual(log_record_processor._max_export_batch_size, 512) self.assertEqual(log_record_processor._export_timeout_millis, 30000) @@ -413,7 +418,7 @@ def test_args_none_defaults(self): ) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 2048) - self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._schedule_delay, 5) self.assertEqual(log_record_processor._max_export_batch_size, 512) self.assertEqual(log_record_processor._export_timeout_millis, 30000) @@ -463,161 +468,129 @@ def test_validation_negative_max_queue_size(self): max_export_batch_size=101, ) - def test_shutdown(self): - exporter = InMemoryLogExporter() - log_record_processor = BatchLogRecordProcessor(exporter) - - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("shutdown") - logger.addHandler(LoggingHandler(logger_provider=provider)) - - with self.assertLogs(level=logging.WARNING): - logger.warning("warning message: %s", "possible upcoming heatwave") - with self.assertLogs(level=logging.WARNING): - logger.error("Very high rise in temperatures across the globe") - with self.assertLogs(level=logging.WARNING): - logger.critical("Temperature hits high 420 C in Hyderabad") + def test_logs_exported_once_batch_size_reached(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + max_queue_size=15, + max_export_batch_size=15, + # Will not reach this during the test, this sleep should be interrupted when batch size is reached. + schedule_delay_millis=30000, + ) + before_export = time.time_ns() + for _ in range(15): + log_record_processor.emit(EMPTY_LOG) + # Wait a bit for the worker thread to wake up and call export. + time.sleep(0.1) + exporter.export.assert_called_once() + after_export = time.time_ns() + # Shows the worker's 30 second sleep was interrupted within a second. + self.assertTrue((after_export - before_export) < 1e9) + + # pylint: disable=no-self-use + def test_logs_exported_once_schedule_delay_reached(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + # Should not reach this during the test, instead export should be called when delay millis is hit. + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=100, + ) + for _ in range(15): + log_record_processor.emit(EMPTY_LOG) + time.sleep(0.11) + exporter.export.assert_has_calls( + [call([EMPTY_LOG]) for _ in range(15)] + ) + def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + # Neither of these thresholds should be hit before test ends. + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + ) + # This log should be flushed because it was written before shutdown. + log_record_processor.emit(EMPTY_LOG) log_record_processor.shutdown() self.assertTrue(exporter._stopped) - finished_logs = exporter.get_finished_logs() - expected = [ - ("warning message: possible upcoming heatwave", "WARN"), - ("Very high rise in temperatures across the globe", "ERROR"), - ( - "Temperature hits high 420 C in Hyderabad", - "CRITICAL", - ), - ] - emitted = [ - (item.log_record.body, item.log_record.severity_text) - for item in finished_logs - ] - self.assertEqual(expected, emitted) - for item in finished_logs: - self.assertEqual(item.instrumentation_scope.name, "shutdown") - - def test_force_flush(self): - exporter = InMemoryLogExporter() - log_record_processor = BatchLogRecordProcessor(exporter) - - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("force_flush") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - - logger.critical("Earth is burning") - log_record_processor.force_flush() - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 1) - log_record = finished_logs[0].log_record - self.assertEqual(log_record.body, "Earth is burning") - self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) - self.assertEqual( - finished_logs[0].instrumentation_scope.name, "force_flush" + with self.assertLogs(level="WARNING") as log: + # This log should not be flushed. + log_record_processor.emit(EMPTY_LOG) + self.assertEqual(len(log.output), 1) + self.assertEqual(len(log.records), 1) + self.assertIn("Shutdown called, ignoring log.", log.output[0]) + exporter.export.assert_called_once_with([EMPTY_LOG]) + + # pylint: disable=no-self-use + def test_force_flush_flushes_logs(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + # Neither of these thresholds should be hit before test ends. + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, ) - - def test_log_record_processor_too_many_logs(self): - exporter = InMemoryLogExporter() - log_record_processor = BatchLogRecordProcessor(exporter) - - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("many_logs") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - - for log_no in range(1000): - logger.critical("Log no: %s", log_no) - - self.assertTrue(log_record_processor.force_flush()) - finised_logs = exporter.get_finished_logs() - self.assertEqual(len(finised_logs), 1000) - for item in finised_logs: - self.assertEqual(item.instrumentation_scope.name, "many_logs") + for _ in range(10): + log_record_processor.emit(EMPTY_LOG) + log_record_processor.force_flush() + exporter.export.assert_called_once_with([EMPTY_LOG for _ in range(10)]) def test_with_multiple_threads(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor(exporter) - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("threads") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - def bulk_log_and_flush(num_logs): for _ in range(num_logs): - logger.critical("Critical message") - self.assertTrue(log_record_processor.force_flush()) + log_record_processor.emit(EMPTY_LOG) + log_record_processor.force_flush() with ThreadPoolExecutor(max_workers=69) as executor: - futures = [] for idx in range(69): - future = executor.submit(bulk_log_and_flush, idx + 1) - futures.append(future) + executor.submit(bulk_log_and_flush, idx + 1) executor.shutdown() finished_logs = exporter.get_finished_logs() self.assertEqual(len(finished_logs), 2415) - for item in finished_logs: - self.assertEqual(item.instrumentation_scope.name, "threads") @unittest.skipUnless( hasattr(os, "fork"), "needs *nix", ) def test_batch_log_record_processor_fork(self): - # pylint: disable=invalid-name exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor( exporter, max_export_batch_size=64, - schedule_delay_millis=10, + schedule_delay_millis=30000, ) - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("test-fork") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - - logger.critical("yolo") - time.sleep(0.5) # give some time for the exporter to upload - - self.assertTrue(log_record_processor.force_flush()) - self.assertEqual(len(exporter.get_finished_logs()), 1) - exporter.clear() + # These are not expected to be flushed. Calling fork clears any logs not flushed. + for _ in range(10): + log_record_processor.emit(EMPTY_LOG) multiprocessing.set_start_method("fork") def child(conn): - def _target(): - logger.critical("Critical message child") - - self.run_with_many_threads(_target, 100) - - time.sleep(0.5) + for _ in range(100): + log_record_processor.emit(EMPTY_LOG) + log_record_processor.force_flush() logs = exporter.get_finished_logs() conn.send(len(logs) == 100) conn.close() parent_conn, child_conn = multiprocessing.Pipe() - p = multiprocessing.Process(target=child, args=(child_conn,)) - p.start() + process = multiprocessing.Process(target=child, args=(child_conn,)) + process.start() self.assertTrue(parent_conn.recv()) - p.join() - - log_record_processor.shutdown() + process.join() + self.assertTrue(len(exporter.get_finished_logs()) == 0) class TestConsoleLogExporter(unittest.TestCase): @@ -661,11 +634,6 @@ def formatter(record): # pylint: disable=unused-argument mock_stdout = Mock() exporter = ConsoleLogExporter(out=mock_stdout, formatter=formatter) - log_data = LogData( - log_record=LogRecord(), - instrumentation_scope=InstrumentationScope( - "first_name", "first_version" - ), - ) - exporter.export([log_data]) + exporter.export([EMPTY_LOG]) + mock_stdout.write.assert_called_once_with(mock_record_str) From 41cd520312cc737bcc8d1f29c76f92f2e1df318b Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 18 Apr 2025 17:55:20 +0000 Subject: [PATCH 2/5] Respond to comments --- .../sdk/_logs/_internal/export/__init__.py | 19 +++++++------- .../sdk/environment_variables/__init__.py | 1 + opentelemetry-sdk/tests/logs/test_export.py | 25 +++++++++---------- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index dd5f7f472a6..f92a494a2e7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -199,6 +199,7 @@ def __init__( self._schedule_delay = schedule_delay_millis / 1e3 self._max_export_batch_size = max_export_batch_size # Not used. No way currently to pass timeout to export. + # TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do. self._export_timeout_millis = export_timeout_millis # Deque is thread safe. self._queue = collections.deque([], max_queue_size) @@ -210,7 +211,7 @@ def __init__( self._shutdown = False self._export_lock = threading.Lock() - self._worker_sleep = threading.Event() + self._worker_awaken = threading.Event() self._worker_thread.start() if hasattr(os, "register_at_fork"): os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access @@ -224,15 +225,15 @@ def _should_export_batch( # Always continue to export while queue length exceeds max batch size. if len(self._queue) >= self._max_export_batch_size: return True - if batch_strategy == BatchLogExportStrategy.EXPORT_ALL: + if batch_strategy is BatchLogExportStrategy.EXPORT_ALL: return True - if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH: + if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH: return num_iterations == 0 return False def _at_fork_reinit(self): self._export_lock = threading.Lock() - self._worker_sleep = threading.Event() + self._worker_awaken = threading.Event() self._queue.clear() self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", @@ -247,7 +248,7 @@ def worker(self): # Lots of strategies in the spec for setting next timeout. # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor. # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold. - sleep_interrupted = self._worker_sleep.wait(self._schedule_delay) + sleep_interrupted = self._worker_awaken.wait(self._schedule_delay) if self._shutdown: break self._export( @@ -255,7 +256,7 @@ def worker(self): if sleep_interrupted else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH ) - self._worker_sleep.clear() + self._worker_awaken.clear() self._export(BatchLogExportStrategy.EXPORT_ALL) def _export(self, batch_strategy: BatchLogExportStrategy) -> None: @@ -285,7 +286,7 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None: def emit(self, log_data: LogData) -> None: if self._shutdown: - _logger.warning("Shutdown called, ignoring log.") + _logger.info("Shutdown called, ignoring log.") return if self._pid != os.getpid(): _BSP_RESET_ONCE.do_once(self._at_fork_reinit) @@ -294,7 +295,7 @@ def emit(self, log_data: LogData) -> None: _logger.warning("Queue full, dropping log.") self._queue.appendleft(log_data) if len(self._queue) >= self._max_export_batch_size: - self._worker_sleep.set() + self._worker_awaken.set() def shutdown(self): if self._shutdown: @@ -302,7 +303,7 @@ def shutdown(self): # Prevents emit and force_flush from further calling export. self._shutdown = True # Interrupts sleep in the worker, if it's sleeping. - self._worker_sleep.set() + self._worker_awaken.set() # Main worker loop should exit after one final export call with flush all strategy. self._worker_thread.join() self._exporter.shutdown() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 4f69143084c..23b634fcd85 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -87,6 +87,7 @@ .. envvar:: OTEL_BLRP_EXPORT_TIMEOUT The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor. +This environment variable currently does nothing, see https://github.com/open-telemetry/opentelemetry-python/issues/4555. Default: 30000 """ diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 6b3c3a91603..6c9e126bf40 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -485,24 +485,19 @@ def test_logs_exported_once_batch_size_reached(self): exporter.export.assert_called_once() after_export = time.time_ns() # Shows the worker's 30 second sleep was interrupted within a second. - self.assertTrue((after_export - before_export) < 1e9) + self.assertLess(after_export - before_export, 1e9) - # pylint: disable=no-self-use def test_logs_exported_once_schedule_delay_reached(self): exporter = Mock() log_record_processor = BatchLogRecordProcessor( exporter=exporter, - # Should not reach this during the test, instead export should be called when delay millis is hit. max_queue_size=15, max_export_batch_size=15, schedule_delay_millis=100, ) - for _ in range(15): - log_record_processor.emit(EMPTY_LOG) - time.sleep(0.11) - exporter.export.assert_has_calls( - [call([EMPTY_LOG]) for _ in range(15)] - ) + log_record_processor.emit(EMPTY_LOG) + time.sleep(0.11) + exporter.export.assert_called_once_with([EMPTY_LOG]) def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self): exporter = Mock() @@ -516,15 +511,16 @@ def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self): # This log should be flushed because it was written before shutdown. log_record_processor.emit(EMPTY_LOG) log_record_processor.shutdown() + exporter.export.assert_called_once_with([EMPTY_LOG]) self.assertTrue(exporter._stopped) - with self.assertLogs(level="WARNING") as log: + with self.assertLogs(level="INFO") as log: # This log should not be flushed. log_record_processor.emit(EMPTY_LOG) self.assertEqual(len(log.output), 1) self.assertEqual(len(log.records), 1) self.assertIn("Shutdown called, ignoring log.", log.output[0]) - exporter.export.assert_called_once_with([EMPTY_LOG]) + exporter.export.assert_called_once() # pylint: disable=no-self-use def test_force_flush_flushes_logs(self): @@ -570,7 +566,9 @@ def test_batch_log_record_processor_fork(self): max_export_batch_size=64, schedule_delay_millis=30000, ) - # These are not expected to be flushed. Calling fork clears any logs not flushed. + # These logs should be flushed only from the parent process. + # _at_fork_reinit should be called in the child process, to + # clear these logs in the child process. for _ in range(10): log_record_processor.emit(EMPTY_LOG) @@ -590,7 +588,8 @@ def child(conn): process.start() self.assertTrue(parent_conn.recv()) process.join() - self.assertTrue(len(exporter.get_finished_logs()) == 0) + log_record_processor.force_flush() + self.assertTrue(len(exporter.get_finished_logs()) == 10) class TestConsoleLogExporter(unittest.TestCase): From 839424478bf73717f43a1c35a6230a54d199b5aa Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 18 Apr 2025 18:03:20 +0000 Subject: [PATCH 3/5] Fix lint --- opentelemetry-sdk/tests/logs/test_export.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 6c9e126bf40..fbe51f7f64d 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -19,7 +19,7 @@ import time import unittest from concurrent.futures import ThreadPoolExecutor -from unittest.mock import Mock, call, patch +from unittest.mock import Mock, patch from opentelemetry._logs import SeverityNumber from opentelemetry.sdk import trace @@ -44,7 +44,6 @@ ) from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope - from opentelemetry.trace import TraceFlags from opentelemetry.trace.span import INVALID_SPAN_CONTEXT @@ -487,6 +486,7 @@ def test_logs_exported_once_batch_size_reached(self): # Shows the worker's 30 second sleep was interrupted within a second. self.assertLess(after_export - before_export, 1e9) + # pylint: disable=no-self-use def test_logs_exported_once_schedule_delay_reached(self): exporter = Mock() log_record_processor = BatchLogRecordProcessor( From 08134878d401d7a81b2dfbef337b72ce278cee46 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 18 Apr 2025 18:04:40 +0000 Subject: [PATCH 4/5] Add delay for windows test. --- opentelemetry-sdk/tests/logs/test_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index fbe51f7f64d..3a23d56fb71 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -496,7 +496,7 @@ def test_logs_exported_once_schedule_delay_reached(self): schedule_delay_millis=100, ) log_record_processor.emit(EMPTY_LOG) - time.sleep(0.11) + time.sleep(0.2) exporter.export.assert_called_once_with([EMPTY_LOG]) def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self): From a088789afed2f6b46a21bb101cb4ace379360671 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Mon, 21 Apr 2025 15:53:00 +0000 Subject: [PATCH 5/5] Fix fork test --- opentelemetry-sdk/tests/logs/test_export.py | 37 ++++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 3a23d56fb71..58704115aab 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -44,6 +44,7 @@ ) from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.trace import TraceFlags from opentelemetry.trace.span import INVALID_SPAN_CONTEXT @@ -559,7 +560,7 @@ def bulk_log_and_flush(num_logs): hasattr(os, "fork"), "needs *nix", ) - def test_batch_log_record_processor_fork(self): + def test_batch_log_record_processor_fork_clears_logs_from_child(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor( exporter, @@ -572,15 +573,13 @@ def test_batch_log_record_processor_fork(self): for _ in range(10): log_record_processor.emit(EMPTY_LOG) + # The below test also needs this, but it can only be set once. multiprocessing.set_start_method("fork") def child(conn): - for _ in range(100): - log_record_processor.emit(EMPTY_LOG) log_record_processor.force_flush() - logs = exporter.get_finished_logs() - conn.send(len(logs) == 100) + conn.send(len(logs) == 0) conn.close() parent_conn, child_conn = multiprocessing.Pipe() @@ -591,6 +590,34 @@ def child(conn): log_record_processor.force_flush() self.assertTrue(len(exporter.get_finished_logs()) == 10) + @unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", + ) + def test_batch_log_record_processor_fork_doesnot_deadlock(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor( + exporter, + max_export_batch_size=64, + schedule_delay_millis=30000, + ) + + def child(conn): + def _target(): + log_record_processor.emit(EMPTY_LOG) + + ConcurrencyTestBase.run_with_many_threads(_target, 100) + log_record_processor.force_flush() + logs = exporter.get_finished_logs() + conn.send(len(logs) == 100) + conn.close() + + parent_conn, child_conn = multiprocessing.Pipe() + process = multiprocessing.Process(target=child, args=(child_conn,)) + process.start() + self.assertTrue(parent_conn.recv()) + process.join() + class TestConsoleLogExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use