diff --git a/CHANGELOG.md b/CHANGELOG.md index 001fbf28a58..17f608342cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - typecheck: add sdk/resources and drop mypy ([#4578](https://github.com/open-telemetry/opentelemetry-python/pull/4578)) -- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more - clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) - and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)). +- Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs, + and an unnecessary 32 second sleep that occurred after all retries had completed/failed was removed. + Update gRPC OTLP Exporters to use official gRPC retry policy config. The `RetryInfo` proto in the error + response will now be ignored, and server's should now use the gRPC supported header `grpc-retry-pushback-ms`. + ([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)). - Use PEP702 for marking deprecations ([#4522](https://github.com/open-telemetry/opentelemetry-python/pull/4522)) - Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml index 40b4950b99a..77ff64ea5f8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml @@ -29,6 +29,7 @@ classifiers = [ ] dependencies = [ "opentelemetry-proto == 1.34.0.dev", + "requests ~= 2.7", ] [project.urls] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py index 2f49502cf1d..c239a4ed18a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -17,18 +17,18 @@ import logging from collections.abc import Sequence -from itertools import count from typing import ( Any, Callable, Dict, - Iterator, List, Mapping, Optional, TypeVar, ) +import requests + from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue from opentelemetry.proto.common.v1.common_pb2 import ( ArrayValue as PB2ArrayValue, @@ -110,6 +110,14 @@ def _encode_key_value( ) +def _is_retryable(resp: requests.Response) -> bool: + if resp.status_code == 408: + return True + if resp.status_code >= 500 and resp.status_code <= 599: + return True + return False + + def _encode_array( array: Sequence[Any], allow_null: bool = False ) -> Sequence[PB2AnyValue]: @@ -177,38 +185,3 @@ def _get_resource_data( ) ) return resource_data - - -def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]: - """ - Generates an infinite sequence of exponential backoff values. The sequence starts - from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified - and non-zero, the generated values will not exceed this maximum, capping at max_value - instead of growing indefinitely. - - Parameters: - - max_value (int, optional): The maximum value to yield. If 0 or not provided, the - sequence grows without bound. - - Returns: - Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or - capped at max_value. - - Example: - ``` - gen = _create_exp_backoff_generator(max_value=10) - for _ in range(5): - print(next(gen)) - ``` - This will print: - 1 - 2 - 4 - 8 - 10 - - Note: this functionality used to be handled by the 'backoff' package. - """ - for i in count(0): - out = 2**i - yield min(out, max_value) if max_value else out diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt b/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt index 1c295c81ca5..46ce953c4c3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt +++ b/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt @@ -1,6 +1,7 @@ asgiref==3.7.2 importlib-metadata==6.11.0 iniconfig==2.0.0 +requests == 2.7.0 packaging==24.0 pluggy==1.5.0 protobuf==5.26.1 diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py deleted file mode 100644 index 789a184ad04..00000000000 --- a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import TestCase - -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, -) - - -class TestBackoffGenerator(TestCase): - def test_exp_backoff_generator(self): - generator = _create_exp_backoff_generator() - self.assertEqual(next(generator), 1) - self.assertEqual(next(generator), 2) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 8) - self.assertEqual(next(generator), 16) - - def test_exp_backoff_generator_with_max(self): - generator = _create_exp_backoff_generator(max_value=4) - self.assertEqual(next(generator), 1) - self.assertEqual(next(generator), 2) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 4) - - def test_exp_backoff_generator_with_odd_max(self): - # use a max_value that's not in the set - generator = _create_exp_backoff_generator(max_value=11) - self.assertEqual(next(generator), 1) - self.assertEqual(next(generator), 2) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 8) - self.assertEqual(next(generator), 11) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 8f629899d77..e66f9dbcca7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -58,7 +58,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): if insecure is None: @@ -79,7 +79,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 259f1ddb91b..dfa9597a2f1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -14,12 +14,12 @@ """OTLP Exporter""" +import json import threading from abc import ABC, abstractmethod from collections.abc import Sequence # noqa: F401 from logging import getLogger from os import environ -from time import sleep from typing import ( # noqa: F401 Any, Callable, @@ -34,7 +34,6 @@ from typing import Sequence as TypingSequence from urllib.parse import urlparse -from google.rpc.error_details_pb2 import RetryInfo from typing_extensions import deprecated from grpc import ( @@ -47,7 +46,6 @@ ssl_channel_credentials, ) from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, _get_resource_data, ) from opentelemetry.exporter.otlp.proto.grpc import ( @@ -74,6 +72,35 @@ from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.util.re import parse_env_headers +# 5 is the maximum allowable attempts allowed by grpc retry policy. +# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt, +# plus or minus a 20% jitter. Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting +# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the +# timeout, and all fail. A header `grpc-retry-pushback-ms` when set by the server will override +# and take precedence over this backoff. See https://grpc.io/docs/guides/retry/ for more details. +_GRPC_RETRY_POLICY = json.dumps( + { + "methodConfig": [ + { + "name": [dict()], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "1s", + "maxBackoff": "9s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE", + "CANCELLED", + "RESOURCE_EXHAUSTED", + "ABORTED", + "OUT_OF_RANGE", + "DATA_LOSS", + ], + }, + } + ] + } +) logger = getLogger(__name__) SDKDataT = TypeVar("SDKDataT") ResourceDataT = TypeVar("ResourceDataT") @@ -186,8 +213,6 @@ class OTLPExporterMixin( compression: gRPC compression method to use """ - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -196,7 +221,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): super().__init__() @@ -233,7 +258,7 @@ def __init__( else: self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10) ) self._collector_kwargs = None @@ -246,7 +271,11 @@ def __init__( if insecure: self._channel = insecure_channel( - self._endpoint, compression=compression + self._endpoint, + compression=compression, + options=[ + ("grpc.service_config", _GRPC_RETRY_POLICY), + ], ) else: credentials = _get_credentials( @@ -256,7 +285,12 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) self._channel = secure_channel( - self._endpoint, credentials, compression=compression + self._endpoint, + credentials, + compression=compression, + options=[ + ("grpc.service_config", _GRPC_RETRY_POLICY), + ], ) self._client = self._stub(self._channel) @@ -270,10 +304,9 @@ def _translate_data( pass def _export( - self, data: Union[TypingSequence[ReadableSpan], MetricsData] + self, + data: Union[TypingSequence[ReadableSpan], MetricsData], ) -> ExportResultT: - # After the call to shutdown, subsequent calls to Export are - # not allowed and should return a Failure result. if self._shutdown: logger.warning("Exporter already shutdown, ignoring batch") return self._result.FAILURE @@ -281,80 +314,24 @@ def _export( # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto # TracesData and use the code below instead. - # logger.warning( - # "Transient error %s encountered while exporting %s, retrying in %ss.", - # error.code(), - # data.__class__.__name__, - # delay, - # ) - # expo returns a generator that yields delay values which grow - # exponentially. Once delay is greater than max_value, the yielded - # value will remain constant. - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT or self._shutdown: + with self._export_lock: + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=self._timeout, + ) + return self._result.SUCCESS + except RpcError as error: + logger.error( + "Failed to export %s to %s, error code: %s", + self._exporting, + self._endpoint, + error.code(), + exc_info=error.code() == StatusCode.UNKNOWN, + ) return self._result.FAILURE - with self._export_lock: - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, - ) - - return self._result.SUCCESS - - except RpcError as error: - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s to %s, retrying in %ss." - ), - error.code(), - self._exporting, - self._endpoint, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s to %s, error code: %s", - self._exporting, - self._endpoint, - error.code(), - exc_info=error.code() == StatusCode.UNKNOWN, - ) - - if error.code() == StatusCode.OK: - return self._result.SUCCESS - - return self._result.FAILURE - - return self._result.FAILURE - def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: logger.warning("Exporter already shutdown, ignoring call") diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 8580dbb7386..dbb2a8e1dee 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -99,7 +99,7 @@ def __init__( credentials: ChannelCredentials | None = None, headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str] | None = None, - timeout: int | None = None, + timeout: float | None = None, compression: Compression | None = None, preferred_temporality: dict[type, AggregationTemporality] | None = None, @@ -124,7 +124,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -172,7 +172,6 @@ def export( if split_export_result is MetricExportResult.FAILURE: export_result = MetricExportResult.FAILURE - return export_result def _split_metrics_data( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index c78c1b81bb6..7aef65a2ca9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -91,7 +91,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): if insecure is None: @@ -112,7 +112,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt b/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt index e11dad64b40..67a94245243 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt @@ -1,5 +1,4 @@ asgiref==3.7.2 -googleapis-common-protos==1.63.2 grpcio==1.66.2 importlib-metadata==6.11.0 iniconfig==2.0.0 diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 656d9a6cb79..3c752d70e50 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -15,17 +15,11 @@ import threading import time from concurrent.futures import ThreadPoolExecutor -from logging import WARNING +from logging import WARNING, getLogger from typing import Any, Optional, Sequence from unittest import TestCase -from unittest.mock import Mock, patch +from unittest.mock import ANY, Mock, patch -from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module - Duration, -) -from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module - RetryInfo, -) from grpc import Compression, StatusCode, server from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( @@ -55,6 +49,8 @@ SpanExportResult, ) +logger = getLogger(__name__) + # The below tests use this test SpanExporter and Spans, but are testing the # underlying behavior in the mixin. A MetricExporter or LogExporter could @@ -89,38 +85,34 @@ def __init__( self, export_result: StatusCode, optional_export_sleep: Optional[float] = None, - optional_export_retry_millis: Optional[float] = None, + optional_first_time_retry_millis: Optional[int] = None, ): self.export_result = export_result self.optional_export_sleep = optional_export_sleep - self.optional_export_retry_millis = optional_export_retry_millis + self.optional_first_time_retry_millis = ( + optional_first_time_retry_millis + ) + self.first_attempt = True + self.num_requests = 0 + self.now = time.time() # pylint: disable=invalid-name,unused-argument def Export(self, request, context): + self.num_requests += 1 if self.optional_export_sleep: time.sleep(self.optional_export_sleep) - if self.optional_export_retry_millis: - context.send_initial_metadata( - ( + if self.export_result != StatusCode.OK: + if self.optional_first_time_retry_millis and self.first_attempt: + self.first_attempt = False + context.set_trailing_metadata( ( - "google.rpc.retryinfo-bin", - RetryInfo().SerializeToString(), - ), + ( + "grpc-retry-pushback-ms", + str(self.optional_first_time_retry_millis), + ), + ) ) - ) - context.set_trailing_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo( - retry_delay=Duration( - nanos=int(self.optional_export_retry_millis) - ) - ).SerializeToString(), - ), - ) - ) - context.set_code(self.export_result) + context.abort(self.export_result, "") return ExportTraceServiceResponse() @@ -268,7 +260,9 @@ def test_otlp_exporter_otlp_compression_unspecified( """No env or kwarg should be NoCompression""" OTLPSpanExporterForTesting(insecure=True) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression + "localhost:4317", + compression=Compression.NoCompression, + options=ANY, ) # pylint: disable=no-self-use, disable=unused-argument @@ -292,7 +286,7 @@ def test_otlp_exporter_otlp_compression_envvar( """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPSpanExporterForTesting(insecure=True) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.Gzip + "localhost:4317", compression=Compression.Gzip, options=ANY ) def test_shutdown(self): @@ -372,52 +366,75 @@ def test_export_over_closed_grpc_channel(self): str(err.exception), "Cannot invoke RPC on closed channel!" ) - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [0.01]}) - + def test_retry_with_server_pushback(self): + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.UNAVAILABLE, optional_first_time_retry_millis=200 + ) add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE), + mock_trace_service, self.server, ) - result = self.exporter.export([self.span]) - self.assertEqual(result, SpanExportResult.FAILURE) - mock_sleep.assert_called_with(0.01) - - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep): + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=10) + before = time.time() + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + after = time.time() + # We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response. + # So we expect the first request at time 0, second at time 0.2, + # third at 1.2 (start of backoff policy), fourth at time 3.2, last at time 7.2. + self.assertEqual(mock_trace_service.num_requests, 5) + # The backoffs have a jitter +- 20%, so we have to put a higher bound than 7.2. + self.assertTrue(after - before < 8.8) + + def test_retry_timeout(self): + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.UNAVAILABLE + ) add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams( - StatusCode.UNAVAILABLE, - optional_export_sleep=None, - optional_export_retry_millis=1e7, - ), + mock_trace_service, self.server, ) + # Set timeout to 1.5 seconds. + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=1.5) + before = time.time() + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + after = time.time() + # Our retry starts with a 1 second backoff then doubles. + # So we expect just two calls: one at time 0, one at time 1. + self.assertEqual(mock_trace_service.num_requests, 2) + # gRPC retry config waits for the timeout (1.5) before cancelling the request. + self.assertTrue(after - before < 1.6) + + def test_timeout_set_correctly(self): + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.OK, optional_export_sleep=0.5 + ) + add_TraceServiceServicer_to_server( + mock_trace_service, + self.server, + ) + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.4) + # Should timeout. Deadline should be set to now + timeout. + # That is 400 millis from now, and export sleeps for 500 millis. with self.assertLogs(level=WARNING) as warning: self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE + exporter.export([self.span]), + SpanExportResult.FAILURE, ) - mock_sleep.assert_called_with(0.01) - self.assertEqual( - warning.records[0].message, - ( - "Transient error StatusCode.UNAVAILABLE encountered " - "while exporting traces to localhost:4317, retrying in 0.01s." - ), + "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", + warning.records[-1].message, ) - - def test_success(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(StatusCode.OK), - self.server, - ) + self.assertEqual(mock_trace_service.num_requests, 1) + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.8) self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.SUCCESS + exporter.export([self.span]), + SpanExportResult.SUCCESS, ) def test_otlp_headers_from_env(self): @@ -440,6 +457,6 @@ def test_permanent_failure(self): self.exporter.export([self.span]), SpanExportResult.FAILURE ) self.assertEqual( - warning.records[0].message, + warning.records[-1].message, "Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS", ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 2ea12f660fb..ceda6e72a8e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -18,7 +18,7 @@ from os.path import dirname from typing import List from unittest import TestCase -from unittest.mock import patch +from unittest.mock import ANY, patch from grpc import ChannelCredentials, Compression @@ -297,7 +297,9 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): insecure=True, compression=Compression.NoCompression ) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression + "localhost:4317", + compression=Compression.NoCompression, + options=ANY, ) def test_split_metrics_data_many_data_points(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 73d8d6c7a20..5238dc91224 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -16,7 +16,7 @@ import os from unittest import TestCase -from unittest.mock import Mock, PropertyMock, patch +from unittest.mock import ANY, Mock, PropertyMock, patch from grpc import ChannelCredentials, Compression @@ -333,7 +333,9 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): """Specifying kwarg should take precedence over env""" OTLPSpanExporter(insecure=True, compression=Compression.NoCompression) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression + "localhost:4317", + compression=Compression.NoCompression, + options=ANY, ) # pylint: disable=no-self-use @@ -350,7 +352,9 @@ def test_otlp_exporter_otlp_compression_precendence( """ OTLPSpanExporter(insecure=True) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.Gzip + "localhost:4317", + compression=Compression.Gzip, + options=ANY, ) def test_translate_spans(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index f86f0113833..9dda3cca5cd 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -14,17 +14,18 @@ import gzip import logging +import random import zlib from io import BytesIO from os import environ -from time import sleep +from time import sleep, time from typing import Dict, Optional, Sequence import requests from requests.exceptions import ConnectionError from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, + _is_retryable, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( @@ -61,11 +62,10 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_LOGS_EXPORT_PATH = "v1/logs" DEFAULT_TIMEOUT = 10 # in seconds +_MAX_RETRYS = 6 class OTLPLogExporter(LogExporter): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -73,7 +73,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): @@ -108,7 +108,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -124,7 +124,7 @@ def __init__( ) self._shutdown = False - def _export(self, serialized_data: bytes): + def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -143,7 +143,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: @@ -151,53 +151,42 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) return resp - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - def export(self, batch: Sequence[LogData]) -> LogExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return LogExportResult.FAILURE serialized_data = encode_logs(batch).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return LogExportResult.FAILURE - - resp = self._export(serialized_data) - # pylint: disable=no-else-return + deadline_sec = time() + self._timeout + backoff_seconds = 1 * random.uniform(0.8, 1.2) + for retry_num in range(1, _MAX_RETRYS + 1): + resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return LogExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: + if ( + not _is_retryable(resp) + or retry_num == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): _logger.error( "Failed to export logs batch code: %s, reason: %s", resp.status_code, resp.text, ) return LogExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + backoff_seconds *= 2 * random.uniform(0.8, 1.2) + # Not possible to reach here but the linter is complaining. return LogExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 1bdcc13c16a..31710804577 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -14,16 +14,18 @@ import gzip import logging +import random import zlib from io import BytesIO from os import environ -from time import sleep +from time import sleep, time from typing import ( # noqa: F401 Any, Callable, Dict, List, Mapping, + Optional, Sequence, ) @@ -32,8 +34,8 @@ from typing_extensions import deprecated from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, _get_resource_data, + _is_retryable, ) from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( OTLPMetricExporterMixin, @@ -48,7 +50,7 @@ from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 ExportMetricsServiceRequest, ) -from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 # noqa: F401 +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 AnyValue, ArrayValue, InstrumentationScope, @@ -98,11 +100,10 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" DEFAULT_TIMEOUT = 10 # in seconds +_MAX_RETRYS = 6 class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: str | None = None, @@ -110,7 +111,7 @@ def __init__( client_key_file: str | None = None, client_certificate_file: str | None = None, headers: dict[str, str] | None = None, - timeout: int | None = None, + timeout: float | None = None, compression: Compression | None = None, session: requests.Session | None = None, preferred_temporality: dict[type, AggregationTemporality] @@ -147,7 +148,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -165,8 +166,9 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._shutdown = False - def _export(self, serialized_data: bytes): + def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -185,7 +187,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: @@ -193,55 +195,56 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) return resp - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - def export( self, metrics_data: MetricsData, - timeout_millis: float = 10_000, + timeout_millis: Optional[float] = None, **kwargs, ) -> MetricExportResult: - serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return MetricExportResult.FAILURE - - resp = self._export(serialized_data.SerializeToString()) - # pylint: disable=no-else-return + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return MetricExportResult.FAILURE + serialized_data = encode_metrics(metrics_data).SerializeToString() + deadline_sec = time() + ( + timeout_millis / 1e3 if timeout_millis else self._timeout + ) + backoff_seconds = 1 * random.uniform(0.8, 1.2) + for retry_num in range(1, _MAX_RETRYS + 1): + resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return MetricExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: + if ( + not _is_retryable(resp) + or retry_num == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): _logger.error( - "Failed to export batch code: %s, reason: %s", + "Failed to export metrics batch code: %s, reason: %s", resp.status_code, resp.text, ) return MetricExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + backoff_seconds *= 2 * random.uniform(0.8, 1.2) + # Not possible to reach here but the linter is complaining. return MetricExportResult.FAILURE def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._session.close() + self._shutdown = True @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 1841e5210a4..7bce9494648 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -14,17 +14,18 @@ import gzip import logging +import random import zlib from io import BytesIO from os import environ -from time import sleep -from typing import Dict, Optional +from time import sleep, time +from typing import Dict, Optional, Sequence import requests from requests.exceptions import ConnectionError from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, + _is_retryable, ) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, @@ -49,6 +50,7 @@ OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, ) +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.util.re import parse_env_headers @@ -59,11 +61,10 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_TRACES_EXPORT_PATH = "v1/traces" DEFAULT_TIMEOUT = 10 # in seconds +_MAX_RETRYS = 6 class OTLPSpanExporter(SpanExporter): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -71,7 +72,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): @@ -105,7 +106,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -121,7 +122,7 @@ def __init__( ) self._shutdown = False - def _export(self, serialized_data: bytes): + def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -140,7 +141,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: @@ -148,61 +149,44 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) return resp - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - - def _serialize_spans(self, spans): - return encode_spans(spans).SerializePartialToString() - - def _export_serialized_spans(self, serialized_data): - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return SpanExportResult.FAILURE + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return SpanExportResult.FAILURE - resp = self._export(serialized_data) - # pylint: disable=no-else-return + serialized_data = encode_spans(spans).SerializePartialToString() + deadline_sec = time() + self._timeout + backoff_seconds = 1 * random.uniform(0.8, 1.2) + for retry_num in range(1, _MAX_RETRYS + 1): + resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return SpanExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: + if ( + not _is_retryable(resp) + or retry_num == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): _logger.error( - "Failed to export batch code: %s, reason: %s", + "Failed to export span batch code: %s, reason: %s", resp.status_code, resp.text, ) return SpanExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting span batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + backoff_seconds *= 2 * random.uniform(0.8, 1.2) + # Not possible to reach here but the linter is complaining. return SpanExportResult.FAILURE - def export(self, spans) -> SpanExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring batch") - return SpanExportResult.FAILURE - - serialized_data = self._serialize_spans(spans) - - return self._export_serialized_spans(serialized_data) - def shutdown(self): if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 16bb3e54286..85422e2b281 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from logging import WARNING from os import environ from unittest import TestCase -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import ANY, MagicMock, Mock, patch from requests import Session from requests.models import Response -from responses import POST, activate, add from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, @@ -327,31 +327,10 @@ def test_serialization(self, mock_post): url=exporter._endpoint, data=serialized_data.SerializeToString(), verify=exporter._certificate_file, - timeout=exporter._timeout, + timeout=ANY, # Timeout is a float based on real time, can't put an exact value here. cert=exporter._client_cert, ) - @activate - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - add( - POST, - "http://metrics.example.com/export", - json={"error": "something exploded"}, - status=500, - ) - - exporter = OTLPMetricExporter( - endpoint="http://metrics.example.com/export" - ) - metrics_data = self.metrics["sum_int"] - - exporter.export(metrics_data) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) - def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter() @@ -523,3 +502,60 @@ def test_preferred_aggregation_override(self): self.assertEqual( exporter._preferred_aggregation[Histogram], histogram_aggregation ) + + @patch.object(Session, "post") + def test_retry_timeout(self, mock_post): + exporter = OTLPMetricExporter(timeout=3.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + with self.assertLogs(level=WARNING) as warning: + before = time.time() + # Set timeout to 1.5 seconds, takes precedence over the 3.5 second class timeout. + self.assertEqual( + exporter.export(self.metrics["sum_int"], 1500), + MetricExportResult.FAILURE, + ) + after = time.time() + + # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. + self.assertEqual(mock_post.call_count, 2) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 1.3) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting metrics batch, retrying in", + warning.records[0].message, + ) + mock_post.reset_mock() + before = time.time() + # This time the class level 3.5s timeout should be used. + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + after = time.time() + + # First call at time 0, second at time 1, third at time 3. + self.assertEqual(mock_post.call_count, 3) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 3.7) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting metrics batch, retrying in", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_timeout_set_correctly(self, mock_post): + resp = Response() + resp.status_code = 200 + + def export_side_effect(*args, **kwargs): + # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. + self.assertTrue(0.4 - kwargs["timeout"] < 0.0005) + return resp + + mock_post.side_effect = export_side_effect + exporter = OTLPMetricExporter() + exporter.export(self.metrics["sum_int"], 400) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 66b0f890d76..d646968fa74 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -14,13 +14,16 @@ # pylint: disable=protected-access +import time import unittest +from logging import WARNING from typing import List -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, patch import requests -import responses from google.protobuf.json_format import MessageToDict +from requests import Session +from requests.models import Response from opentelemetry._logs import SeverityNumber from opentelemetry.exporter.otlp.proto.http import Compression @@ -267,25 +270,6 @@ def test_exported_log_without_span_id(self): else: self.fail("No log records found") - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, - "http://logs.example.com/export", - json={"error": "something exploded"}, - status=500, - ) - - exporter = OTLPLogExporter(endpoint="http://logs.example.com/export") - logs = self._get_sdk_log_data() - - exporter.export(logs) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) - @staticmethod def _get_sdk_log_data() -> List[LogData]: log1 = LogData( @@ -365,3 +349,42 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): self.assertEqual( OTLPLogExporter().export(MagicMock()), LogExportResult.SUCCESS ) + + @patch.object(Session, "post") + def test_retry_timeout(self, mock_post): + exporter = OTLPLogExporter(timeout=1.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + with self.assertLogs(level=WARNING) as warning: + before = time.time() + # Set timeout to 1.5 seconds + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogExportResult.FAILURE, + ) + after = time.time() + # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. + self.assertEqual(mock_post.call_count, 2) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 1.3) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting logs batch, retrying in", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_timeout_set_correctly(self, mock_post): + resp = Response() + resp.status_code = 200 + + def export_side_effect(*args, **kwargs): + # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. + self.assertTrue(0.4 - kwargs["timeout"] < 0.0005) + return resp + + mock_post.side_effect = export_side_effect + exporter = OTLPLogExporter(timeout=0.4) + exporter.export(self._get_sdk_log_data()) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 8d8ff6037aa..0707f7ef73f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -12,11 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import unittest -from unittest.mock import MagicMock, Mock, call, patch +from logging import WARNING +from unittest.mock import MagicMock, Mock, patch import requests -import responses +from requests import Session +from requests.models import Response from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( @@ -52,6 +55,16 @@ OS_ENV_CLIENT_KEY = "os/env/client-key.pem" OS_ENV_HEADERS = "envHeader1=val1,envHeader2=val2" OS_ENV_TIMEOUT = "30" +BASIC_SPAN = _Span( + "abc", + context=Mock( + **{ + "trace_state": {"a": "b", "c": "d"}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), +) # pylint: disable=protected-access @@ -227,37 +240,6 @@ def test_headers_parse_from_env(self): ), ) - # pylint: disable=no-self-use - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, - "http://traces.example.com/export", - json={"error": "something exploded"}, - status=500, - ) - - exporter = OTLPSpanExporter( - endpoint="http://traces.example.com/export" - ) - span = _Span( - "abc", - context=Mock( - **{ - "trace_state": {"a": "b", "c": "d"}, - "span_id": 10217189687419569865, - "trace_id": 67545097771067222548457157018666467027, - } - ), - ) - - exporter.export([span]) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ @@ -267,3 +249,42 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): self.assertEqual( OTLPSpanExporter().export(MagicMock()), SpanExportResult.SUCCESS ) + + @patch.object(Session, "post") + def test_retry_timeout(self, mock_post): + exporter = OTLPSpanExporter(timeout=1.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + with self.assertLogs(level=WARNING) as warning: + before = time.time() + # Set timeout to 1.5 seconds + self.assertEqual( + exporter.export([BASIC_SPAN]), + SpanExportResult.FAILURE, + ) + after = time.time() + # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. + self.assertEqual(mock_post.call_count, 2) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 1.3) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting span batch, retrying in", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_timeout_set_correctly(self, mock_post): + resp = Response() + resp.status_code = 200 + + def export_side_effect(*args, **kwargs): + # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. + self.assertTrue(0.4 - kwargs["timeout"] < 0.0005) + return resp + + mock_post.side_effect = export_side_effect + exporter = OTLPSpanExporter(timeout=0.4) + exporter.export([BASIC_SPAN]) diff --git a/uv.lock b/uv.lock index 0e8e7dece68..85ece1b9d77 100644 --- a/uv.lock +++ b/uv.lock @@ -281,10 +281,14 @@ name = "opentelemetry-exporter-otlp-proto-common" source = { editable = "exporter/opentelemetry-exporter-otlp-proto-common" } dependencies = [ { name = "opentelemetry-proto" }, + { name = "requests" }, ] [package.metadata] -requires-dist = [{ name = "opentelemetry-proto", editable = "opentelemetry-proto" }] +requires-dist = [ + { name = "opentelemetry-proto", editable = "opentelemetry-proto" }, + { name = "requests", specifier = "~=2.7" }, +] [[package]] name = "opentelemetry-exporter-otlp-proto-grpc"