|
19 | 19 | import os
|
20 | 20 | import sys
|
21 | 21 | import threading
|
22 |
| -from os import linesep |
| 22 | +from os import environ, linesep |
23 | 23 | from time import time_ns
|
24 | 24 | from typing import IO, Callable, Deque, List, Optional, Sequence
|
25 | 25 |
|
|
30 | 30 | set_value,
|
31 | 31 | )
|
32 | 32 | from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
|
| 33 | +from opentelemetry.sdk.environment_variables import ( |
| 34 | + OTEL_BLRP_EXPORT_TIMEOUT, |
| 35 | + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, |
| 36 | + OTEL_BLRP_MAX_QUEUE_SIZE, |
| 37 | + OTEL_BLRP_SCHEDULE_DELAY, |
| 38 | +) |
33 | 39 | from opentelemetry.util._once import Once
|
34 | 40 |
|
| 41 | +_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 |
| 42 | +_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 |
| 43 | +_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000 |
| 44 | +_DEFAULT_MAX_QUEUE_SIZE = 2048 |
| 45 | +_ENV_VAR_INT_VALUE_ERROR_MESSAGE = ( |
| 46 | + "Unable to parse value for %s as integer. Defaulting to %s." |
| 47 | +) |
| 48 | + |
35 | 49 | _logger = logging.getLogger(__name__)
|
36 | 50 |
|
37 | 51 |
|
@@ -142,20 +156,54 @@ class BatchLogRecordProcessor(LogRecordProcessor):
|
142 | 156 | """This is an implementation of LogRecordProcessor which creates batches of
|
143 | 157 | received logs in the export-friendly LogData representation and
|
144 | 158 | send to the configured LogExporter, as soon as they are emitted.
|
| 159 | +
|
| 160 | + `BatchLogRecordProcessor` is configurable with the following environment |
| 161 | + variables which correspond to constructor parameters: |
| 162 | +
|
| 163 | + - :envvar:`OTEL_BLRP_SCHEDULE_DELAY` |
| 164 | + - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` |
| 165 | + - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` |
| 166 | + - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` |
145 | 167 | """
|
146 | 168 |
|
147 | 169 | def __init__(
|
148 | 170 | self,
|
149 | 171 | exporter: LogExporter,
|
150 |
| - schedule_delay_millis: int = 5000, |
151 |
| - max_export_batch_size: int = 512, |
152 |
| - export_timeout_millis: int = 30000, |
| 172 | + schedule_delay_millis: float = None, |
| 173 | + max_export_batch_size: int = None, |
| 174 | + export_timeout_millis: float = None, |
| 175 | + max_queue_size: int = None, |
153 | 176 | ):
|
| 177 | + if max_queue_size is None: |
| 178 | + max_queue_size = BatchLogRecordProcessor._default_max_queue_size() |
| 179 | + |
| 180 | + if schedule_delay_millis is None: |
| 181 | + schedule_delay_millis = ( |
| 182 | + BatchLogRecordProcessor._default_schedule_delay_millis() |
| 183 | + ) |
| 184 | + |
| 185 | + if max_export_batch_size is None: |
| 186 | + max_export_batch_size = ( |
| 187 | + BatchLogRecordProcessor._default_max_export_batch_size() |
| 188 | + ) |
| 189 | + |
| 190 | + if export_timeout_millis is None: |
| 191 | + export_timeout_millis = ( |
| 192 | + BatchLogRecordProcessor._default_export_timeout_millis() |
| 193 | + ) |
| 194 | + |
| 195 | + BatchLogRecordProcessor._validate_arguments( |
| 196 | + max_queue_size, schedule_delay_millis, max_export_batch_size |
| 197 | + ) |
| 198 | + |
154 | 199 | self._exporter = exporter
|
| 200 | + self._max_queue_size = max_queue_size |
155 | 201 | self._schedule_delay_millis = schedule_delay_millis
|
156 | 202 | self._max_export_batch_size = max_export_batch_size
|
157 | 203 | self._export_timeout_millis = export_timeout_millis
|
158 |
| - self._queue = collections.deque() # type: Deque[LogData] |
| 204 | + self._queue = collections.deque( |
| 205 | + [], max_queue_size |
| 206 | + ) # type: Deque[LogData] |
159 | 207 | self._worker_thread = threading.Thread(
|
160 | 208 | name="OtelBatchLogRecordProcessor",
|
161 | 209 | target=self.worker,
|
@@ -333,3 +381,86 @@ def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
|
333 | 381 | if not ret:
|
334 | 382 | _logger.warning("Timeout was exceeded in force_flush().")
|
335 | 383 | return ret
|
| 384 | + |
| 385 | + @staticmethod |
| 386 | + def _default_max_queue_size(): |
| 387 | + try: |
| 388 | + return int( |
| 389 | + environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) |
| 390 | + ) |
| 391 | + except ValueError: |
| 392 | + _logger.exception( |
| 393 | + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, |
| 394 | + OTEL_BLRP_MAX_QUEUE_SIZE, |
| 395 | + _DEFAULT_MAX_QUEUE_SIZE, |
| 396 | + ) |
| 397 | + return _DEFAULT_MAX_QUEUE_SIZE |
| 398 | + |
| 399 | + @staticmethod |
| 400 | + def _default_schedule_delay_millis(): |
| 401 | + try: |
| 402 | + return int( |
| 403 | + environ.get( |
| 404 | + OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS |
| 405 | + ) |
| 406 | + ) |
| 407 | + except ValueError: |
| 408 | + _logger.exception( |
| 409 | + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, |
| 410 | + OTEL_BLRP_SCHEDULE_DELAY, |
| 411 | + _DEFAULT_SCHEDULE_DELAY_MILLIS, |
| 412 | + ) |
| 413 | + return _DEFAULT_SCHEDULE_DELAY_MILLIS |
| 414 | + |
| 415 | + @staticmethod |
| 416 | + def _default_max_export_batch_size(): |
| 417 | + try: |
| 418 | + return int( |
| 419 | + environ.get( |
| 420 | + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, |
| 421 | + _DEFAULT_MAX_EXPORT_BATCH_SIZE, |
| 422 | + ) |
| 423 | + ) |
| 424 | + except ValueError: |
| 425 | + _logger.exception( |
| 426 | + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, |
| 427 | + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, |
| 428 | + _DEFAULT_MAX_EXPORT_BATCH_SIZE, |
| 429 | + ) |
| 430 | + return _DEFAULT_MAX_EXPORT_BATCH_SIZE |
| 431 | + |
| 432 | + @staticmethod |
| 433 | + def _default_export_timeout_millis(): |
| 434 | + try: |
| 435 | + return int( |
| 436 | + environ.get( |
| 437 | + OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS |
| 438 | + ) |
| 439 | + ) |
| 440 | + except ValueError: |
| 441 | + _logger.exception( |
| 442 | + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, |
| 443 | + OTEL_BLRP_EXPORT_TIMEOUT, |
| 444 | + _DEFAULT_EXPORT_TIMEOUT_MILLIS, |
| 445 | + ) |
| 446 | + return _DEFAULT_EXPORT_TIMEOUT_MILLIS |
| 447 | + |
| 448 | + @staticmethod |
| 449 | + def _validate_arguments( |
| 450 | + max_queue_size, schedule_delay_millis, max_export_batch_size |
| 451 | + ): |
| 452 | + if max_queue_size <= 0: |
| 453 | + raise ValueError("max_queue_size must be a positive integer.") |
| 454 | + |
| 455 | + if schedule_delay_millis <= 0: |
| 456 | + raise ValueError("schedule_delay_millis must be positive.") |
| 457 | + |
| 458 | + if max_export_batch_size <= 0: |
| 459 | + raise ValueError( |
| 460 | + "max_export_batch_size must be a positive integer." |
| 461 | + ) |
| 462 | + |
| 463 | + if max_export_batch_size > max_queue_size: |
| 464 | + raise ValueError( |
| 465 | + "max_export_batch_size must be less than or equal to max_queue_size." |
| 466 | + ) |
0 commit comments