Skip to content

Commit 3a91378

Browse files
author
Girish Chandrashekar
committed
Implement shutdown procedure for OTLP grpc exporters
- Add `_shutdown` variable for checking if the exporter has been shutdown. - Prevent export if the `_shutdown` flag has been set. Log a warning message is exporter has been shutdown. - Use thread lock to synchronize the last export call before shutdown timeout. The `shutdown` method will wait until the `timeout_millis` if there is an ongoing export. If there is no ongiong export, set the `_shutdown` flag to prevent further exports and return. - Add unit tests for the `OTLPExporterMixIn` and the sub classes for traces and metrics.
1 parent f879d38 commit 3a91378

File tree

6 files changed

+283
-107
lines changed

6 files changed

+283
-107
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

+92-79
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414

1515
"""OTLP Exporter"""
1616

17-
from logging import getLogger
17+
import threading
1818
from abc import ABC, abstractmethod
1919
from collections.abc import Sequence
20+
from logging import getLogger
2021
from os import environ
2122
from time import sleep
2223
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
2324
from typing import Sequence as TypingSequence
2425
from typing import TypeVar
2526
from urllib.parse import urlparse
26-
from opentelemetry.sdk.trace import ReadableSpan
2727

2828
import backoff
2929
from google.rpc.error_details_pb2 import RetryInfo
@@ -37,6 +37,9 @@
3737
ssl_channel_credentials,
3838
)
3939

40+
from opentelemetry.exporter.otlp.proto.grpc import (
41+
_OTLP_GRPC_HEADERS,
42+
)
4043
from opentelemetry.proto.common.v1.common_pb2 import (
4144
AnyValue,
4245
ArrayValue,
@@ -51,12 +54,10 @@
5154
OTEL_EXPORTER_OTLP_INSECURE,
5255
OTEL_EXPORTER_OTLP_TIMEOUT,
5356
)
54-
from opentelemetry.sdk.resources import Resource as SDKResource
5557
from opentelemetry.sdk.metrics.export import MetricsData
58+
from opentelemetry.sdk.resources import Resource as SDKResource
59+
from opentelemetry.sdk.trace import ReadableSpan
5660
from opentelemetry.util.re import parse_env_headers
57-
from opentelemetry.exporter.otlp.proto.grpc import (
58-
_OTLP_GRPC_HEADERS,
59-
)
6061

6162
logger = getLogger(__name__)
6263
SDKDataT = TypeVar("SDKDataT")
@@ -92,7 +93,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:
9293

9394

9495
def _translate_value(value: Any) -> KeyValue:
95-
9696
if isinstance(value, bool):
9797
any_value = AnyValue(bool_value=value)
9898

@@ -131,16 +131,15 @@ def _translate_key_values(key: str, value: Any) -> KeyValue:
131131

132132

133133
def get_resource_data(
134-
sdk_resource_scope_data: Dict[SDKResource, ResourceDataT],
135-
resource_class: Callable[..., TypingResourceT],
136-
name: str,
134+
sdk_resource_scope_data: Dict[SDKResource, ResourceDataT],
135+
resource_class: Callable[..., TypingResourceT],
136+
name: str,
137137
) -> List[TypingResourceT]:
138-
139138
resource_data = []
140139

141140
for (
142-
sdk_resource,
143-
scope_data,
141+
sdk_resource,
142+
scope_data,
144143
) in sdk_resource_scope_data.items():
145144

146145
collector_resource = Resource()
@@ -215,15 +214,15 @@ class OTLPExporterMixin(
215214
"""
216215

217216
def __init__(
218-
self,
219-
endpoint: Optional[str] = None,
220-
insecure: Optional[bool] = None,
221-
credentials: Optional[ChannelCredentials] = None,
222-
headers: Optional[
223-
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
224-
] = None,
225-
timeout: Optional[int] = None,
226-
compression: Optional[Compression] = None,
217+
self,
218+
endpoint: Optional[str] = None,
219+
insecure: Optional[bool] = None,
220+
credentials: Optional[ChannelCredentials] = None,
221+
headers: Optional[
222+
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
223+
] = None,
224+
timeout: Optional[int] = None,
225+
compression: Optional[Compression] = None,
227226
):
228227
super().__init__()
229228

@@ -265,10 +264,10 @@ def __init__(
265264
self._collector_kwargs = None
266265

267266
compression = (
268-
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
269-
if compression is None
270-
else compression
271-
) or Compression.NoCompression
267+
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
268+
if compression is None
269+
else compression
270+
) or Compression.NoCompression
272271

273272
if insecure:
274273
self._client = self._stub(
@@ -282,9 +281,12 @@ def __init__(
282281
secure_channel(endpoint, credentials, compression=compression)
283282
)
284283

284+
self._export_lock = threading.Lock()
285+
self._shutdown = False
286+
285287
@abstractmethod
286288
def _translate_data(
287-
self, data: TypingSequence[SDKDataT]
289+
self, data: TypingSequence[SDKDataT]
288290
) -> ExportServiceRequestT:
289291
pass
290292

@@ -300,8 +302,13 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
300302
return output
301303

302304
def _export(
303-
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
305+
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
304306
) -> ExportResultT:
307+
# After the call to shutdown, subsequent calls to Export are
308+
# not allowed and should return a Failure result.
309+
if self._shutdown:
310+
logger.warning("Exporter already shutdown, ignoring batch")
311+
return self._result.FAILURE
305312

306313
# FIXME remove this check if the export type for traces
307314
# gets updated to a class that represents the proto
@@ -317,69 +324,75 @@ def _export(
317324
# exponentially. Once delay is greater than max_value, the yielded
318325
# value will remain constant.
319326
for delay in _expo(max_value=max_value):
320-
321-
if delay == max_value:
327+
if delay == max_value or self._shutdown:
322328
return self._result.FAILURE
323329

324-
try:
325-
self._client.Export(
326-
request=self._translate_data(data),
327-
metadata=self._headers,
328-
timeout=self._timeout,
329-
)
330+
with self._export_lock:
331+
try:
332+
self._client.Export(
333+
request=self._translate_data(data),
334+
metadata=self._headers,
335+
timeout=self._timeout,
336+
)
330337

331-
return self._result.SUCCESS
338+
return self._result.SUCCESS
332339

333-
except RpcError as error:
340+
except RpcError as error:
334341

335-
if error.code() in [
336-
StatusCode.CANCELLED,
337-
StatusCode.DEADLINE_EXCEEDED,
338-
StatusCode.RESOURCE_EXHAUSTED,
339-
StatusCode.ABORTED,
340-
StatusCode.OUT_OF_RANGE,
341-
StatusCode.UNAVAILABLE,
342-
StatusCode.DATA_LOSS,
343-
]:
342+
if error.code() in [
343+
StatusCode.CANCELLED,
344+
StatusCode.DEADLINE_EXCEEDED,
345+
StatusCode.RESOURCE_EXHAUSTED,
346+
StatusCode.ABORTED,
347+
StatusCode.OUT_OF_RANGE,
348+
StatusCode.UNAVAILABLE,
349+
StatusCode.DATA_LOSS,
350+
]:
344351

345-
retry_info_bin = dict(error.trailing_metadata()).get(
346-
"google.rpc.retryinfo-bin"
347-
)
348-
if retry_info_bin is not None:
349-
retry_info = RetryInfo()
350-
retry_info.ParseFromString(retry_info_bin)
351-
delay = (
352-
retry_info.retry_delay.seconds
353-
+ retry_info.retry_delay.nanos / 1.0e9
352+
retry_info_bin = dict(error.trailing_metadata()).get(
353+
"google.rpc.retryinfo-bin"
354+
)
355+
if retry_info_bin is not None:
356+
retry_info = RetryInfo()
357+
retry_info.ParseFromString(retry_info_bin)
358+
delay = (
359+
retry_info.retry_delay.seconds
360+
+ retry_info.retry_delay.nanos / 1.0e9
361+
)
362+
363+
logger.warning(
364+
(
365+
"Transient error %s encountered while exporting "
366+
"%s, retrying in %ss."
367+
),
368+
error.code(),
369+
self._exporting,
370+
delay,
371+
)
372+
sleep(delay)
373+
continue
374+
else:
375+
logger.error(
376+
"Failed to export %s, error code: %s",
377+
self._exporting,
378+
error.code(),
354379
)
355380

356-
logger.warning(
357-
(
358-
"Transient error %s encountered while exporting "
359-
"%s, retrying in %ss."
360-
),
361-
error.code(),
362-
self._exporting,
363-
delay,
364-
)
365-
sleep(delay)
366-
continue
367-
else:
368-
logger.error(
369-
"Failed to export %s, error code: %s",
370-
self._exporting,
371-
error.code(),
372-
)
373-
374-
if error.code() == StatusCode.OK:
375-
return self._result.SUCCESS
381+
if error.code() == StatusCode.OK:
382+
return self._result.SUCCESS
376383

377-
return self._result.FAILURE
384+
return self._result.FAILURE
378385

379386
return self._result.FAILURE
380387

381-
def shutdown(self) -> None:
382-
pass
388+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
389+
if self._shutdown:
390+
logger.warning("Exporter already shutdown, ignoring call")
391+
return
392+
# wait for the last export if any
393+
self._export_lock.acquire(timeout=timeout_millis)
394+
self._shutdown = True
395+
self._export_lock.release()
383396

384397
@property
385398
@abstractmethod

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ def _split_metrics_data(
387387
yield MetricsData(resource_metrics=split_resource_metrics)
388388

389389
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
390-
pass
390+
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
391391

392392
@property
393393
def _exporting(self) -> str:

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ def _translate_data(
290290
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
291291
return self._export(spans)
292292

293+
def shutdown(self) -> None:
294+
OTLPExporterMixin.shutdown(self)
295+
293296
def force_flush(self, timeout_millis: int = 30000) -> bool:
294297
return True
295298

0 commit comments

Comments
 (0)