Skip to content

Commit b296c55

Browse files
srikanthccvAlex Boten
authored and
Alex Boten
committed
Add LogProcessors implementation (open-telemetry#1916)
1 parent 9445d0d commit b296c55

File tree

5 files changed

+878
-12
lines changed

5 files changed

+878
-12
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py

+156-10
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
import abc
1616
import atexit
17+
import concurrent.futures
1718
import logging
1819
import os
19-
from typing import Any, Optional, cast
20+
import threading
21+
from typing import Any, Callable, Optional, Tuple, Union, cast
2022

2123
from opentelemetry.sdk.environment_variables import (
2224
OTEL_PYTHON_LOG_EMITTER_PROVIDER,
@@ -27,6 +29,7 @@
2729
from opentelemetry.trace import get_current_span
2830
from opentelemetry.trace.span import TraceFlags
2931
from opentelemetry.util._providers import _load_provider
32+
from opentelemetry.util._time import _time_ns
3033
from opentelemetry.util.types import Attributes
3134

3235
_logger = logging.getLogger(__name__)
@@ -112,6 +115,135 @@ def force_flush(self, timeout_millis: int = 30000):
112115
"""
113116

114117

118+
# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
119+
# pylint:disable=no-member
120+
class SynchronousMultiLogProcessor(LogProcessor):
121+
"""Implementation of class:`LogProcessor` that forwards all received
122+
events to a list of log processors sequentially.
123+
124+
The underlying log processors are called in sequential order as they were
125+
added.
126+
"""
127+
128+
def __init__(self):
129+
# use a tuple to avoid race conditions when adding a new log and
130+
# iterating through it on "emit".
131+
self._log_processors = () # type: Tuple[LogProcessor, ...]
132+
self._lock = threading.Lock()
133+
134+
def add_log_processor(self, log_processor: LogProcessor) -> None:
135+
"""Adds a Logprocessor to the list of log processors handled by this instance"""
136+
with self._lock:
137+
self._log_processors = self._log_processors + (log_processor,)
138+
139+
def emit(self, log_data: LogData) -> None:
140+
for lp in self._log_processors:
141+
lp.emit(log_data)
142+
143+
def shutdown(self) -> None:
144+
"""Shutdown the log processors one by one"""
145+
for lp in self._log_processors:
146+
lp.shutdown()
147+
148+
def force_flush(self, timeout_millis: int = 30000) -> bool:
149+
"""Force flush the log processors one by one
150+
151+
Args:
152+
timeout_millis: The maximum amount of time to wait for logs to be
153+
exported. If the first n log processors exceeded the timeout
154+
then remaining log processors will not be flushed.
155+
156+
Returns:
157+
True if all the log processors flushes the logs within timeout,
158+
False otherwise.
159+
"""
160+
deadline_ns = _time_ns() + timeout_millis * 1000000
161+
for lp in self._log_processors:
162+
current_ts = _time_ns()
163+
if current_ts >= deadline_ns:
164+
return False
165+
166+
if not lp.force_flush((deadline_ns - current_ts) // 1000000):
167+
return False
168+
169+
return True
170+
171+
172+
class ConcurrentMultiLogProcessor(LogProcessor):
173+
"""Implementation of :class:`LogProcessor` that forwards all received
174+
events to a list of log processors in parallel.
175+
176+
Calls to the underlying log processors are forwarded in parallel by
177+
submitting them to a thread pool executor and waiting until each log
178+
processor finished its work.
179+
180+
Args:
181+
max_workers: The number of threads managed by the thread pool executor
182+
and thus defining how many log processors can work in parallel.
183+
"""
184+
185+
def __init__(self, max_workers: int = 2):
186+
# use a tuple to avoid race conditions when adding a new log and
187+
# iterating through it on "emit".
188+
self._log_processors = () # type: Tuple[LogProcessor, ...]
189+
self._lock = threading.Lock()
190+
self._executor = concurrent.futures.ThreadPoolExecutor(
191+
max_workers=max_workers
192+
)
193+
194+
def add_log_processor(self, log_processor: LogProcessor):
195+
with self._lock:
196+
self._log_processors = self._log_processors + (log_processor,)
197+
198+
def _submit_and_wait(
199+
self,
200+
func: Callable[[LogProcessor], Callable[..., None]],
201+
*args: Any,
202+
**kwargs: Any,
203+
):
204+
futures = []
205+
for lp in self._log_processors:
206+
future = self._executor.submit(func(lp), *args, **kwargs)
207+
futures.append(future)
208+
for future in futures:
209+
future.result()
210+
211+
def emit(self, log_data: LogData):
212+
self._submit_and_wait(lambda lp: lp.emit, log_data)
213+
214+
def shutdown(self):
215+
self._submit_and_wait(lambda lp: lp.shutdown)
216+
217+
def force_flush(self, timeout_millis: int = 30000) -> bool:
218+
"""Force flush the log processors in parallel.
219+
220+
Args:
221+
timeout_millis: The maximum amount of time to wait for logs to be
222+
exported.
223+
224+
Returns:
225+
True if all the log processors flushes the logs within timeout,
226+
False otherwise.
227+
"""
228+
futures = []
229+
for lp in self._log_processors:
230+
future = self._executor.submit(lp.force_flush, timeout_millis)
231+
futures.append(future)
232+
233+
done_futures, not_done_futures = concurrent.futures.wait(
234+
futures, timeout_millis / 1e3
235+
)
236+
237+
if not_done_futures:
238+
return False
239+
240+
for future in done_futures:
241+
if not future.result():
242+
return False
243+
244+
return True
245+
246+
115247
class OTLPHandler(logging.Handler):
116248
"""A handler class which writes logging records, in OTLP format, to
117249
a network destination or file.
@@ -155,36 +287,49 @@ def flush(self) -> None:
155287

156288

157289
class LogEmitter:
158-
# TODO: Add multi_log_processor
159290
def __init__(
160291
self,
161292
resource: Resource,
293+
multi_log_processor: Union[
294+
SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor
295+
],
162296
instrumentation_info: InstrumentationInfo,
163297
):
164298
self._resource = resource
299+
self._multi_log_processor = multi_log_processor
165300
self._instrumentation_info = instrumentation_info
166301

167302
@property
168303
def resource(self):
169304
return self._resource
170305

171306
def emit(self, record: LogRecord):
172-
# TODO: multi_log_processor.emit
173-
pass
307+
"""Emits the :class:`LogData` by associating :class:`LogRecord`
308+
and instrumentation info.
309+
"""
310+
log_data = LogData(record, self._instrumentation_info)
311+
self._multi_log_processor.emit(log_data)
174312

313+
# TODO: Should this flush everything in pipeline?
314+
# Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290
175315
def flush(self):
176-
# TODO: multi_log_processor.force_flush
177-
pass
316+
"""Ensure all logging output has been flushed."""
317+
self._multi_log_processor.force_flush()
178318

179319

180320
class LogEmitterProvider:
181-
# TODO: Add multi_log_processor
182321
def __init__(
183322
self,
184323
resource: Resource = Resource.create(),
185324
shutdown_on_exit: bool = True,
325+
multi_log_processor: Union[
326+
SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor
327+
] = None,
186328
):
187329
self._resource = resource
330+
self._multi_log_processor = (
331+
multi_log_processor or SynchronousMultiLogProcessor()
332+
)
188333
self._at_exit_handler = None
189334
if shutdown_on_exit:
190335
self._at_exit_handler = atexit.register(self.shutdown)
@@ -200,6 +345,7 @@ def get_log_emitter(
200345
) -> LogEmitter:
201346
return LogEmitter(
202347
self._resource,
348+
self._multi_log_processor,
203349
InstrumentationInfo(
204350
instrumenting_module_name, instrumenting_module_verison
205351
),
@@ -210,11 +356,11 @@ def add_log_processor(self, log_processor: LogProcessor):
210356
211357
The log processors are invoked in the same order they are registered.
212358
"""
213-
# TODO: multi_log_processor.add_log_processor.
359+
self._multi_log_processor.add_log_processor(log_processor)
214360

215361
def shutdown(self):
216362
"""Shuts down the log processors."""
217-
# TODO: multi_log_processor.shutdown
363+
self._multi_log_processor.shutdown()
218364
if self._at_exit_handler is not None:
219365
atexit.unregister(self._at_exit_handler)
220366
self._at_exit_handler = None
@@ -230,7 +376,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
230376
True if all the log processors flushes the logs within timeout,
231377
False otherwise.
232378
"""
233-
# TODO: multi_log_processor.force_flush
379+
return self._multi_log_processor.force_flush(timeout_millis)
234380

235381

236382
_LOG_EMITTER_PROVIDER = None

0 commit comments

Comments
 (0)