diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index a4eb113c89b..254c5f6b96d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -22,8 +22,7 @@ import threading import weakref from os import environ, linesep -from time import time_ns -from typing import IO, Callable, Deque, List, Optional, Sequence +from typing import IO, Callable, Deque, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -56,6 +55,12 @@ class LogExportResult(enum.Enum): FAILURE = 1 +class BatchLogExportStrategy(enum.Enum): + EXPORT_ALL = 0 + EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1 + EXPORT_AT_LEAST_ONE_BATCH = 2 + + class LogExporter(abc.ABC): """Interface for exporting logs. @@ -141,14 +146,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n return True -class _FlushRequest: - __slots__ = ["event", "num_log_records"] - - def __init__(self): - self.event = threading.Event() - self.num_log_records = 0 - - _BSP_RESET_ONCE = Once() @@ -167,8 +164,6 @@ class BatchLogRecordProcessor(LogRecordProcessor): """ _queue: Deque[LogData] - _flush_request: _FlushRequest | None - _log_records: List[LogData | None] def __init__( self, @@ -190,7 +185,7 @@ def __init__( max_export_batch_size = ( BatchLogRecordProcessor._default_max_export_batch_size() ) - + # Not used. No way currently to pass timeout to export. if export_timeout_millis is None: export_timeout_millis = ( BatchLogRecordProcessor._default_export_timeout_millis() @@ -202,27 +197,45 @@ def __init__( self._exporter = exporter self._max_queue_size = max_queue_size - self._schedule_delay_millis = schedule_delay_millis + self._schedule_delay = schedule_delay_millis / 1e3 self._max_export_batch_size = max_export_batch_size + # Not used. No way currently to pass timeout to export. + # TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do. self._export_timeout_millis = export_timeout_millis + # Deque is thread safe. self._queue = collections.deque([], max_queue_size) self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", target=self.worker, daemon=True, ) - self._condition = threading.Condition(threading.Lock()) + self._shutdown = False - self._flush_request = None - self._log_records = [None] * self._max_export_batch_size + self._export_lock = threading.Lock() + self._worker_awaken = threading.Event() self._worker_thread.start() if hasattr(os, "register_at_fork"): weak_reinit = weakref.WeakMethod(self._at_fork_reinit) os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda self._pid = os.getpid() + def _should_export_batch( + self, batch_strategy: BatchLogExportStrategy, num_iterations: int + ) -> bool: + if not self._queue: + return False + # Always continue to export while queue length exceeds max batch size. + if len(self._queue) >= self._max_export_batch_size: + return True + if batch_strategy is BatchLogExportStrategy.EXPORT_ALL: + return True + if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH: + return num_iterations == 0 + return False + def _at_fork_reinit(self): - self._condition = threading.Condition(threading.Lock()) + self._export_lock = threading.Lock() + self._worker_awaken = threading.Event() self._queue.clear() self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", @@ -233,152 +246,75 @@ def _at_fork_reinit(self): self._pid = os.getpid() def worker(self): - timeout = self._schedule_delay_millis / 1e3 - flush_request: Optional[_FlushRequest] = None while not self._shutdown: - with self._condition: - if self._shutdown: - # shutdown may have been called, avoid further processing - break - flush_request = self._get_and_unset_flush_request() - if ( - len(self._queue) < self._max_export_batch_size - and flush_request is None - ): - self._condition.wait(timeout) - - flush_request = self._get_and_unset_flush_request() - if not self._queue: - timeout = self._schedule_delay_millis / 1e3 - self._notify_flush_request_finished(flush_request) - flush_request = None - continue - if self._shutdown: - break - - start_ns = time_ns() - self._export(flush_request) - end_ns = time_ns() - # subtract the duration of this export call to the next timeout - timeout = self._schedule_delay_millis / 1e3 - ( - (end_ns - start_ns) / 1e9 - ) - - self._notify_flush_request_finished(flush_request) - flush_request = None - - # there might have been a new flush request while export was running - # and before the done flag switched to true - with self._condition: - shutdown_flush_request = self._get_and_unset_flush_request() - - # flush the remaining logs - self._drain_queue() - self._notify_flush_request_finished(flush_request) - self._notify_flush_request_finished(shutdown_flush_request) - - def _export(self, flush_request: Optional[_FlushRequest] = None): - """Exports logs considering the given flush_request. - - If flush_request is not None then logs are exported in batches - until the number of exported logs reached or exceeded the num of logs in - flush_request, otherwise exports at max max_export_batch_size logs. - """ - if flush_request is None: - self._export_batch() - return - - num_log_records = flush_request.num_log_records - while self._queue: - exported = self._export_batch() - num_log_records -= exported - - if num_log_records <= 0: + # Lots of strategies in the spec for setting next timeout. + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor. + # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold. + sleep_interrupted = self._worker_awaken.wait(self._schedule_delay) + if self._shutdown: break - - def _export_batch(self) -> int: - """Exports at most max_export_batch_size logs and returns the number of - exported logs. - """ - idx = 0 - while idx < self._max_export_batch_size and self._queue: - record = self._queue.pop() - self._log_records[idx] = record - idx += 1 - token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) - try: - self._exporter.export(self._log_records[:idx]) # type: ignore - except Exception: # pylint: disable=broad-exception-caught - _logger.exception("Exception while exporting logs.") - detach(token) - - for index in range(idx): - self._log_records[index] = None - return idx - - def _drain_queue(self): - """Export all elements until queue is empty. - - Can only be called from the worker thread context because it invokes - `export` that is not thread safe. - """ - while self._queue: - self._export_batch() - - def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: - flush_request = self._flush_request - self._flush_request = None - if flush_request is not None: - flush_request.num_log_records = len(self._queue) - return flush_request - - @staticmethod - def _notify_flush_request_finished( - flush_request: Optional[_FlushRequest] = None, - ): - if flush_request is not None: - flush_request.event.set() - - def _get_or_create_flush_request(self) -> _FlushRequest: - if self._flush_request is None: - self._flush_request = _FlushRequest() - return self._flush_request + self._export( + BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD + if sleep_interrupted + else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH + ) + self._worker_awaken.clear() + self._export(BatchLogExportStrategy.EXPORT_ALL) + + def _export(self, batch_strategy: BatchLogExportStrategy) -> None: + with self._export_lock: + iteration = 0 + # We could see concurrent export calls from worker and force_flush. We call _should_export_batch + # once the lock is obtained to see if we still need to make the requested export. + while self._should_export_batch(batch_strategy, iteration): + iteration += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self._exporter.export( + [ + # Oldest records are at the back, so pop from there. + self._queue.pop() + for _ in range( + min( + self._max_export_batch_size, + len(self._queue), + ) + ) + ] + ) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) def emit(self, log_data: LogData) -> None: - """Adds the `LogData` to queue and notifies the waiting threads - when size of queue reaches max_export_batch_size. - """ if self._shutdown: + _logger.info("Shutdown called, ignoring log.") return if self._pid != os.getpid(): _BSP_RESET_ONCE.do_once(self._at_fork_reinit) + if len(self._queue) == self._max_queue_size: + _logger.warning("Queue full, dropping log.") self._queue.appendleft(log_data) if len(self._queue) >= self._max_export_batch_size: - with self._condition: - self._condition.notify() + self._worker_awaken.set() def shutdown(self): + if self._shutdown: + return + # Prevents emit and force_flush from further calling export. self._shutdown = True - with self._condition: - self._condition.notify_all() + # Interrupts sleep in the worker, if it's sleeping. + self._worker_awaken.set() + # Main worker loop should exit after one final export call with flush all strategy. self._worker_thread.join() self._exporter.shutdown() def force_flush(self, timeout_millis: Optional[int] = None) -> bool: - if timeout_millis is None: - timeout_millis = self._export_timeout_millis if self._shutdown: - return True - - with self._condition: - flush_request = self._get_or_create_flush_request() - self._condition.notify_all() - - ret = flush_request.event.wait(timeout_millis / 1e3) - if not ret: - _logger.warning("Timeout was exceeded in force_flush().") - return ret + return + # Blocking call to export. + self._export(BatchLogExportStrategy.EXPORT_ALL) @staticmethod def _default_max_queue_size(): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 4f69143084c..23b634fcd85 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -87,6 +87,7 @@ .. envvar:: OTEL_BLRP_EXPORT_TIMEOUT The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor. +This environment variable currently does nothing, see https://github.com/open-telemetry/opentelemetry-python/issues/4555. Default: 30000 """ diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index b9ec0ac2e7f..6511b137a92 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -50,6 +50,11 @@ from opentelemetry.trace import TraceFlags from opentelemetry.trace.span import INVALID_SPAN_CONTEXT +EMPTY_LOG = LogData( + log_record=LogRecord(), + instrumentation_scope=InstrumentationScope("example", "example"), +) + class TestSimpleLogRecordProcessor(unittest.TestCase): def test_simple_log_record_processor_default_level(self): @@ -328,7 +333,7 @@ def test_simple_log_record_processor_different_msg_types_with_formatter( self.assertEqual(expected, emitted) -class TestBatchLogRecordProcessor(ConcurrencyTestBase): +class TestBatchLogRecordProcessor(unittest.TestCase): def test_emit_call_log_record(self): exporter = InMemoryLogExporter() log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter)) @@ -353,7 +358,7 @@ def test_args(self): ) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 1024) - self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._schedule_delay, 2.5) self.assertEqual(log_record_processor._max_export_batch_size, 256) self.assertEqual(log_record_processor._export_timeout_millis, 15000) @@ -371,7 +376,7 @@ def test_env_vars(self): log_record_processor = BatchLogRecordProcessor(exporter) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 1024) - self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._schedule_delay, 2.5) self.assertEqual(log_record_processor._max_export_batch_size, 256) self.assertEqual(log_record_processor._export_timeout_millis, 15000) @@ -380,7 +385,7 @@ def test_args_defaults(self): log_record_processor = BatchLogRecordProcessor(exporter) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 2048) - self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._schedule_delay, 5) self.assertEqual(log_record_processor._max_export_batch_size, 512) self.assertEqual(log_record_processor._export_timeout_millis, 30000) @@ -400,7 +405,7 @@ def test_args_env_var_value_error(self): _logger.disabled = False self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 2048) - self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._schedule_delay, 5) self.assertEqual(log_record_processor._max_export_batch_size, 512) self.assertEqual(log_record_processor._export_timeout_millis, 30000) @@ -415,7 +420,7 @@ def test_args_none_defaults(self): ) self.assertEqual(log_record_processor._exporter, exporter) self.assertEqual(log_record_processor._max_queue_size, 2048) - self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._schedule_delay, 5) self.assertEqual(log_record_processor._max_export_batch_size, 512) self.assertEqual(log_record_processor._export_timeout_millis, 30000) @@ -465,161 +470,155 @@ def test_validation_negative_max_queue_size(self): max_export_batch_size=101, ) - def test_shutdown(self): - exporter = InMemoryLogExporter() - log_record_processor = BatchLogRecordProcessor(exporter) - - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("shutdown") - logger.addHandler(LoggingHandler(logger_provider=provider)) - - with self.assertLogs(level=logging.WARNING): - logger.warning("warning message: %s", "possible upcoming heatwave") - with self.assertLogs(level=logging.WARNING): - logger.error("Very high rise in temperatures across the globe") - with self.assertLogs(level=logging.WARNING): - logger.critical("Temperature hits high 420 C in Hyderabad") + def test_logs_exported_once_batch_size_reached(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + max_queue_size=15, + max_export_batch_size=15, + # Will not reach this during the test, this sleep should be interrupted when batch size is reached. + schedule_delay_millis=30000, + ) + before_export = time.time_ns() + for _ in range(15): + log_record_processor.emit(EMPTY_LOG) + # Wait a bit for the worker thread to wake up and call export. + time.sleep(0.1) + exporter.export.assert_called_once() + after_export = time.time_ns() + # Shows the worker's 30 second sleep was interrupted within a second. + self.assertLess(after_export - before_export, 1e9) + + # pylint: disable=no-self-use + def test_logs_exported_once_schedule_delay_reached(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=100, + ) + log_record_processor.emit(EMPTY_LOG) + time.sleep(0.2) + exporter.export.assert_called_once_with([EMPTY_LOG]) + def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + # Neither of these thresholds should be hit before test ends. + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + ) + # This log should be flushed because it was written before shutdown. + log_record_processor.emit(EMPTY_LOG) log_record_processor.shutdown() + exporter.export.assert_called_once_with([EMPTY_LOG]) self.assertTrue(exporter._stopped) - finished_logs = exporter.get_finished_logs() - expected = [ - ("warning message: possible upcoming heatwave", "WARN"), - ("Very high rise in temperatures across the globe", "ERROR"), - ( - "Temperature hits high 420 C in Hyderabad", - "CRITICAL", - ), - ] - emitted = [ - (item.log_record.body, item.log_record.severity_text) - for item in finished_logs - ] - self.assertEqual(expected, emitted) - for item in finished_logs: - self.assertEqual(item.instrumentation_scope.name, "shutdown") - - def test_force_flush(self): - exporter = InMemoryLogExporter() - log_record_processor = BatchLogRecordProcessor(exporter) - - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("force_flush") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - - logger.critical("Earth is burning") - log_record_processor.force_flush() - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 1) - log_record = finished_logs[0].log_record - self.assertEqual(log_record.body, "Earth is burning") - self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) - self.assertEqual( - finished_logs[0].instrumentation_scope.name, "force_flush" + with self.assertLogs(level="INFO") as log: + # This log should not be flushed. + log_record_processor.emit(EMPTY_LOG) + self.assertEqual(len(log.output), 1) + self.assertEqual(len(log.records), 1) + self.assertIn("Shutdown called, ignoring log.", log.output[0]) + exporter.export.assert_called_once() + + # pylint: disable=no-self-use + def test_force_flush_flushes_logs(self): + exporter = Mock() + log_record_processor = BatchLogRecordProcessor( + exporter=exporter, + # Neither of these thresholds should be hit before test ends. + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, ) - - def test_log_record_processor_too_many_logs(self): - exporter = InMemoryLogExporter() - log_record_processor = BatchLogRecordProcessor(exporter) - - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("many_logs") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - - for log_no in range(1000): - logger.critical("Log no: %s", log_no) - - self.assertTrue(log_record_processor.force_flush()) - finised_logs = exporter.get_finished_logs() - self.assertEqual(len(finised_logs), 1000) - for item in finised_logs: - self.assertEqual(item.instrumentation_scope.name, "many_logs") + for _ in range(10): + log_record_processor.emit(EMPTY_LOG) + log_record_processor.force_flush() + exporter.export.assert_called_once_with([EMPTY_LOG for _ in range(10)]) def test_with_multiple_threads(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor(exporter) - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) - - logger = logging.getLogger("threads") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) - def bulk_log_and_flush(num_logs): for _ in range(num_logs): - logger.critical("Critical message") - self.assertTrue(log_record_processor.force_flush()) + log_record_processor.emit(EMPTY_LOG) + log_record_processor.force_flush() with ThreadPoolExecutor(max_workers=69) as executor: - futures = [] for idx in range(69): - future = executor.submit(bulk_log_and_flush, idx + 1) - futures.append(future) + executor.submit(bulk_log_and_flush, idx + 1) executor.shutdown() finished_logs = exporter.get_finished_logs() self.assertEqual(len(finished_logs), 2415) - for item in finished_logs: - self.assertEqual(item.instrumentation_scope.name, "threads") @unittest.skipUnless( hasattr(os, "fork"), "needs *nix", ) - def test_batch_log_record_processor_fork(self): - # pylint: disable=invalid-name + def test_batch_log_record_processor_fork_clears_logs_from_child(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor( exporter, max_export_batch_size=64, - schedule_delay_millis=10, + schedule_delay_millis=30000, ) - provider = LoggerProvider() - provider.add_log_record_processor(log_record_processor) + # These logs should be flushed only from the parent process. + # _at_fork_reinit should be called in the child process, to + # clear these logs in the child process. + for _ in range(10): + log_record_processor.emit(EMPTY_LOG) - logger = logging.getLogger("test-fork") - logger.propagate = False - logger.addHandler(LoggingHandler(logger_provider=provider)) + # The below test also needs this, but it can only be set once. + multiprocessing.set_start_method("fork") - logger.critical("yolo") - time.sleep(0.5) # give some time for the exporter to upload + def child(conn): + log_record_processor.force_flush() + logs = exporter.get_finished_logs() + conn.send(len(logs) == 0) + conn.close() - self.assertTrue(log_record_processor.force_flush()) - self.assertEqual(len(exporter.get_finished_logs()), 1) - exporter.clear() + parent_conn, child_conn = multiprocessing.Pipe() + process = multiprocessing.Process(target=child, args=(child_conn,)) + process.start() + self.assertTrue(parent_conn.recv()) + process.join() + log_record_processor.force_flush() + self.assertTrue(len(exporter.get_finished_logs()) == 10) - multiprocessing.set_start_method("fork") + @unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", + ) + def test_batch_log_record_processor_fork_doesnot_deadlock(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor( + exporter, + max_export_batch_size=64, + schedule_delay_millis=30000, + ) def child(conn): def _target(): - logger.critical("Critical message child") - - self.run_with_many_threads(_target, 100) - - time.sleep(0.5) + log_record_processor.emit(EMPTY_LOG) + ConcurrencyTestBase.run_with_many_threads(_target, 100) + log_record_processor.force_flush() logs = exporter.get_finished_logs() conn.send(len(logs) == 100) conn.close() parent_conn, child_conn = multiprocessing.Pipe() - p = multiprocessing.Process(target=child, args=(child_conn,)) - p.start() + process = multiprocessing.Process(target=child, args=(child_conn,)) + process.start() self.assertTrue(parent_conn.recv()) - p.join() - - log_record_processor.shutdown() + process.join() def test_batch_log_record_processor_gc(self): # Given a BatchLogRecordProcessor @@ -680,11 +679,6 @@ def formatter(record): # pylint: disable=unused-argument mock_stdout = Mock() exporter = ConsoleLogExporter(out=mock_stdout, formatter=formatter) - log_data = LogData( - log_record=LogRecord(), - instrumentation_scope=InstrumentationScope( - "first_name", "first_version" - ), - ) - exporter.export([log_data]) + exporter.export([EMPTY_LOG]) + mock_stdout.write.assert_called_once_with(mock_record_str)