Skip to content

Refactor BatchLogRecordProcessor and associated tests #4535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import threading
import weakref
from os import environ, linesep
from time import time_ns
from typing import IO, Callable, Deque, List, Optional, Sequence
from typing import IO, Callable, Deque, Optional, Sequence

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Expand Down Expand Up @@ -56,6 +55,12 @@ class LogExportResult(enum.Enum):
FAILURE = 1


class BatchLogExportStrategy(enum.Enum):
EXPORT_ALL = 0
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
EXPORT_AT_LEAST_ONE_BATCH = 2


class LogExporter(abc.ABC):
"""Interface for exporting logs.

Expand Down Expand Up @@ -141,14 +146,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
return True


class _FlushRequest:
__slots__ = ["event", "num_log_records"]

def __init__(self):
self.event = threading.Event()
self.num_log_records = 0


_BSP_RESET_ONCE = Once()


Expand All @@ -167,8 +164,6 @@ class BatchLogRecordProcessor(LogRecordProcessor):
"""

_queue: Deque[LogData]
_flush_request: _FlushRequest | None
_log_records: List[LogData | None]

def __init__(
self,
Expand All @@ -190,7 +185,7 @@ def __init__(
max_export_batch_size = (
BatchLogRecordProcessor._default_max_export_batch_size()
)

# Not used. No way currently to pass timeout to export.
if export_timeout_millis is None:
export_timeout_millis = (
BatchLogRecordProcessor._default_export_timeout_millis()
Expand All @@ -202,27 +197,45 @@ def __init__(

self._exporter = exporter
self._max_queue_size = max_queue_size
self._schedule_delay_millis = schedule_delay_millis
self._schedule_delay = schedule_delay_millis / 1e3
self._max_export_batch_size = max_export_batch_size
# Not used. No way currently to pass timeout to export.
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
self._export_timeout_millis = export_timeout_millis
# Deque is thread safe.
self._queue = collections.deque([], max_queue_size)
self._worker_thread = threading.Thread(
name="OtelBatchLogRecordProcessor",
target=self.worker,
daemon=True,
)
self._condition = threading.Condition(threading.Lock())

self._shutdown = False
self._flush_request = None
self._log_records = [None] * self._max_export_batch_size
self._export_lock = threading.Lock()
self._worker_awaken = threading.Event()
self._worker_thread.start()
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def _should_export_batch(
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
) -> bool:
if not self._queue:
return False
# Always continue to export while queue length exceeds max batch size.
if len(self._queue) >= self._max_export_batch_size:
return True
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
return True
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
return num_iterations == 0
return False

def _at_fork_reinit(self):
self._condition = threading.Condition(threading.Lock())
self._export_lock = threading.Lock()
self._worker_awaken = threading.Event()
self._queue.clear()
self._worker_thread = threading.Thread(
name="OtelBatchLogRecordProcessor",
Expand All @@ -233,152 +246,75 @@ def _at_fork_reinit(self):
self._pid = os.getpid()

def worker(self):
timeout = self._schedule_delay_millis / 1e3
flush_request: Optional[_FlushRequest] = None
while not self._shutdown:
with self._condition:
if self._shutdown:
# shutdown may have been called, avoid further processing
break
flush_request = self._get_and_unset_flush_request()
if (
len(self._queue) < self._max_export_batch_size
and flush_request is None
):
self._condition.wait(timeout)

flush_request = self._get_and_unset_flush_request()
if not self._queue:
timeout = self._schedule_delay_millis / 1e3
self._notify_flush_request_finished(flush_request)
flush_request = None
continue
if self._shutdown:
break

start_ns = time_ns()
self._export(flush_request)
end_ns = time_ns()
# subtract the duration of this export call to the next timeout
timeout = self._schedule_delay_millis / 1e3 - (
(end_ns - start_ns) / 1e9
)

self._notify_flush_request_finished(flush_request)
flush_request = None

# there might have been a new flush request while export was running
# and before the done flag switched to true
with self._condition:
shutdown_flush_request = self._get_and_unset_flush_request()

# flush the remaining logs
self._drain_queue()
self._notify_flush_request_finished(flush_request)
self._notify_flush_request_finished(shutdown_flush_request)

def _export(self, flush_request: Optional[_FlushRequest] = None):
"""Exports logs considering the given flush_request.

If flush_request is not None then logs are exported in batches
until the number of exported logs reached or exceeded the num of logs in
flush_request, otherwise exports at max max_export_batch_size logs.
"""
if flush_request is None:
self._export_batch()
return

num_log_records = flush_request.num_log_records
while self._queue:
exported = self._export_batch()
num_log_records -= exported

if num_log_records <= 0:
# Lots of strategies in the spec for setting next timeout.
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
if self._shutdown:
break

def _export_batch(self) -> int:
"""Exports at most max_export_batch_size logs and returns the number of
exported logs.
"""
idx = 0
while idx < self._max_export_batch_size and self._queue:
record = self._queue.pop()
self._log_records[idx] = record
idx += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(self._log_records[:idx]) # type: ignore
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting logs.")
detach(token)

for index in range(idx):
self._log_records[index] = None
return idx

def _drain_queue(self):
"""Export all elements until queue is empty.

Can only be called from the worker thread context because it invokes
`export` that is not thread safe.
"""
while self._queue:
self._export_batch()

def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
flush_request = self._flush_request
self._flush_request = None
if flush_request is not None:
flush_request.num_log_records = len(self._queue)
return flush_request

@staticmethod
def _notify_flush_request_finished(
flush_request: Optional[_FlushRequest] = None,
):
if flush_request is not None:
flush_request.event.set()

def _get_or_create_flush_request(self) -> _FlushRequest:
if self._flush_request is None:
self._flush_request = _FlushRequest()
return self._flush_request
self._export(
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
if sleep_interrupted
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
)
self._worker_awaken.clear()
self._export(BatchLogExportStrategy.EXPORT_ALL)

def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
with self._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(
[
# Oldest records are at the back, so pop from there.
self._queue.pop()
for _ in range(
min(
self._max_export_batch_size,
len(self._queue),
)
)
]
)
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting logs.")
detach(token)

def emit(self, log_data: LogData) -> None:
"""Adds the `LogData` to queue and notifies the waiting threads
when size of queue reaches max_export_batch_size.
"""
if self._shutdown:
_logger.info("Shutdown called, ignoring log.")
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)

if len(self._queue) == self._max_queue_size:
_logger.warning("Queue full, dropping log.")
self._queue.appendleft(log_data)
if len(self._queue) >= self._max_export_batch_size:
with self._condition:
self._condition.notify()
self._worker_awaken.set()

def shutdown(self):
if self._shutdown:
return
# Prevents emit and force_flush from further calling export.
self._shutdown = True
with self._condition:
self._condition.notify_all()
# Interrupts sleep in the worker, if it's sleeping.
self._worker_awaken.set()
# Main worker loop should exit after one final export call with flush all strategy.
self._worker_thread.join()
self._exporter.shutdown()

def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if timeout_millis is None:
timeout_millis = self._export_timeout_millis
if self._shutdown:
return True

with self._condition:
flush_request = self._get_or_create_flush_request()
self._condition.notify_all()

ret = flush_request.event.wait(timeout_millis / 1e3)
if not ret:
_logger.warning("Timeout was exceeded in force_flush().")
return ret
return
# Blocking call to export.
self._export(BatchLogExportStrategy.EXPORT_ALL)

@staticmethod
def _default_max_queue_size():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT

The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor.
This environment variable currently does nothing, see https://github.com/open-telemetry/opentelemetry-python/issues/4555.
Default: 30000
"""

Expand Down
Loading