Skip to content

Add a timeout param to all OTLP grpc / http export calls #4560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b15f8c8
Add a timeout to export calls
DylanRussell Apr 22, 2025
e0cf233
feat: Updated and added examples (logs and metrics) (#4559)
Jayclifford345 Apr 23, 2025
1b1e8d8
opentelemetry-sdk: use stable code attributes (#4508)
xrmx Apr 23, 2025
adbec50
bugfix(exporter): ensure response is closed (#4477)
codeboten Apr 23, 2025
00329e0
Refactor BatchLogRecordProcessor and associated tests (#4535)
DylanRussell Apr 24, 2025
e46db88
infra: Automate SHA procedure during releases (#4547)
emdneto Apr 28, 2025
561f347
Add timeout millis param to export.
DylanRussell Apr 28, 2025
8269f54
Refactor BatchLogRecordProcessor
DylanRussell Apr 9, 2025
072c6bc
build(deps): bump jinja2 (#4534)
dependabot[bot] Apr 11, 2025
ea17936
logs: fix serialization of Extended attributes (#4342)
xrmx Apr 18, 2025
dae1288
feat: Updated and added examples (logs and metrics) (#4559)
Jayclifford345 Apr 23, 2025
4def4ac
opentelemetry-sdk: use stable code attributes (#4508)
xrmx Apr 23, 2025
211c49e
bugfix(exporter): ensure response is closed (#4477)
codeboten Apr 23, 2025
9ee6872
Refactor BatchLogRecordProcessor and associated tests (#4535)
DylanRussell Apr 24, 2025
e6ac352
infra: Automate SHA procedure during releases (#4547)
emdneto Apr 28, 2025
9f9c3b1
Refactor BatchLogRecordProcessor
DylanRussell Apr 9, 2025
c40c9bf
Add a timeout to export calls
DylanRussell Apr 22, 2025
b299c7b
Add timeout millis param to export.
DylanRussell Apr 28, 2025
1233e24
Refactor BatchLogRecordProcessor
DylanRussell Apr 9, 2025
ed344a9
Refactor BatchLogRecordProcessor and associated tests (#4535)
DylanRussell Apr 24, 2025
953ad93
Refactor BatchLogRecordProcessor
DylanRussell Apr 9, 2025
0e1be3a
Merge remote-tracking branch 'origin/retry' into retry
DylanRussell Apr 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -79,7 +79,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down Expand Up @@ -107,8 +107,12 @@ def _translate_data(
) -> ExportLogsServiceRequest:
return encode_logs(data)

def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._export(batch)
def export(
self, batch: Sequence[LogData], timeout_millis: Optional[float] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't timeout_millis be Optional[int]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel too strongly about it, I think float or int both are fine.. I switched it to int though

) -> LogExportResult:
return self._export(
batch, timeout_millis / 1e3 if timeout_millis else None
)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

"""OTLP Exporter"""

import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand All @@ -35,7 +35,6 @@
from urllib.parse import urlparse

from deprecated import deprecated
from google.rpc.error_details_pb2 import RetryInfo

from grpc import (
ChannelCredentials,
Expand All @@ -47,7 +46,6 @@
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
Expand All @@ -74,6 +72,29 @@
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers

json_config = json.dumps(
{
"methodConfig": [
{
"name": [dict()],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "64s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"CANCELLED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"DATA_LOSS",
],
},
}
]
}
)
logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
ResourceDataT = TypeVar("ResourceDataT")
Expand Down Expand Up @@ -195,7 +216,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
super().__init__()
Expand Down Expand Up @@ -232,7 +253,7 @@ def __init__(
else:
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)

self._timeout = timeout or int(
self._timeout = timeout or float(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
)
self._collector_kwargs = None
Expand All @@ -245,7 +266,11 @@ def __init__(

if insecure:
self._channel = insecure_channel(
self._endpoint, compression=compression
self._endpoint,
compression=compression,
options=[
("grpc.service_config", json_config),
],
)
else:
credentials = _get_credentials(
Expand All @@ -255,7 +280,12 @@ def __init__(
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)
self._channel = secure_channel(
self._endpoint, credentials, compression=compression
self._endpoint,
credentials,
compression=compression,
options=[
("grpc.service_config", json_config),
],
)
self._client = self._stub(self._channel)

Expand All @@ -269,90 +299,35 @@ def _translate_data(
pass

def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
self,
data: Union[TypingSequence[ReadableSpan], MetricsData],
timeout_sec: Optional[float] = None,
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
# logger.warning(
# "Transient error %s encountered while exporting %s, retrying in %ss.",
# error.code(),
# data.__class__.__name__,
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=(timeout_sec or self._timeout),
)
return self._result.SUCCESS
except RpcError as error:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

from __future__ import annotations

import time
from dataclasses import replace
from logging import getLogger
from os import environ
from typing import Iterable, List, Tuple, Union
from typing import Iterable, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence

from grpc import ChannelCredentials, Compression
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(
credentials: ChannelCredentials | None = None,
headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str]
| None = None,
timeout: int | None = None,
timeout: float | None = None,
compression: Compression | None = None,
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
Expand All @@ -124,7 +125,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down Expand Up @@ -158,17 +159,22 @@ def _translate_data(
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
timeout_millis: Optional[float] = None,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
timeout_sec = (
timeout_millis / 1e3 if timeout_millis else self._timeout # pylint: disable=protected-access
)
if self._max_export_batch_size is None:
return self._export(data=metrics_data)
return self._export(metrics_data, timeout_sec)

export_result = MetricExportResult.SUCCESS

deadline_sec = time.time() + timeout_sec
for split_metrics_data in self._split_metrics_data(metrics_data):
split_export_result = self._export(data=split_metrics_data)
time_remaining_sec = deadline_sec - time.time()
split_export_result = self._export(
split_metrics_data, time_remaining_sec
)

if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -112,7 +112,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand All @@ -139,8 +139,14 @@ def _translate_data(
) -> ExportTraceServiceRequest:
return encode_spans(data)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)
def export(
self,
spans: Sequence[ReadableSpan],
timeout_millis: Optional[float] = None,
) -> SpanExportResult:
return self._export(
spans, timeout_millis / 1e3 if timeout_millis else None
)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ asgiref==3.7.2
Deprecated==1.2.14
googleapis-common-protos==1.63.2
grpcio==1.66.2
grpcio-status==1.66.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be added as dev depdency to the package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used for the tests, so I don't think so ?

importlib-metadata==6.11.0
iniconfig==2.0.0
packaging==24.0
Expand Down
Loading