Skip to content

Switch gRPC exporters to use official gRPC retry config. Make timeout encompass retries/backoffs #4564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -79,7 +79,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

"""OTLP Exporter"""

import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand All @@ -35,7 +35,6 @@
from urllib.parse import urlparse

from deprecated import deprecated
from google.rpc.error_details_pb2 import RetryInfo

from grpc import (
ChannelCredentials,
Expand All @@ -47,7 +46,6 @@
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
Expand All @@ -74,6 +72,34 @@
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers

_JSON_CONFIG = json.dumps(
{
"methodConfig": [
{
"name": [dict()],
"retryPolicy": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this correctly implement the OTLP spec? https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures

I'm wondering about RetryInfo and partial errors in particular

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't realize RetryInfo was part of OTLP spec. gRPC retry policy does not make use of RetryInfo, and it isn't mentioned in https://github.com/grpc/proposal/blob/master/A6-client-retries.md or https://grpc.io/docs/guides/retry/#retry-configuration. My reading of the partial success part is that the status code is still OK in that case, which this does handle correctly (by not retrying).

So you think we should remove this then ? I feel it's nice to let the gRPC library handle this stuff instead of hand rolling our own code.. If gRPC changes something around retries I'm assuming they'd update their own retry handling code and we wouldn't have to worry about it.

The gFC (https://github.com/grpc/proposal/blob/master/A6-client-retries.md) also mentions how the retryPolicy can be used for hedging and respect server pushback, but neither of those appear to be implemented yet

# 5 is the maximum allowable attempts allowed by grpc retry policy.
# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt.
# Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting
# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the
# timeout. See https://grpc.io/docs/guides/retry/ for more details.
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "9s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"CANCELLED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"DATA_LOSS",
],
},
}
]
}
)
logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
ResourceDataT = TypeVar("ResourceDataT")
Expand Down Expand Up @@ -197,7 +223,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
super().__init__()
Expand Down Expand Up @@ -234,7 +260,7 @@ def __init__(
else:
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)

self._timeout = timeout or int(
self._timeout = timeout or float(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
)
self._collector_kwargs = None
Expand All @@ -247,7 +273,11 @@ def __init__(

if insecure:
self._channel = insecure_channel(
self._endpoint, compression=compression
self._endpoint,
compression=compression,
options=[
("grpc.service_config", _JSON_CONFIG),
],
)
else:
credentials = _get_credentials(
Expand All @@ -257,7 +287,12 @@ def __init__(
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)
self._channel = secure_channel(
self._endpoint, credentials, compression=compression
self._endpoint,
credentials,
compression=compression,
options=[
("grpc.service_config", _JSON_CONFIG),
],
)
self._client = self._stub(self._channel)

Expand All @@ -271,91 +306,34 @@ def _translate_data(
pass

def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
self,
data: Union[TypingSequence[ReadableSpan], MetricsData],
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
# logger.warning(
# "Transient error %s encountered while exporting %s, retrying in %ss.",
# error.code(),
# data.__class__.__name__,
# delay,
# )
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):
if delay == self._MAX_RETRY_TIMEOUT or self._shutdown:
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
return self._result.SUCCESS
except RpcError as error:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
credentials: ChannelCredentials | None = None,
headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str]
| None = None,
timeout: int | None = None,
timeout: float | None = None,
compression: Compression | None = None,
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
Expand All @@ -124,7 +124,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down Expand Up @@ -172,7 +172,6 @@ def export(

if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE

return export_result

def _split_metrics_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -112,7 +112,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ asgiref==3.7.2
Deprecated==1.2.14
googleapis-common-protos==1.63.2
grpcio==1.66.2
grpcio-status==1.66.0
importlib-metadata==6.11.0
iniconfig==2.0.0
packaging==24.0
Expand Down
Loading
Loading