Skip to content

Commit fe1d1f6

Browse files
committed
Add timeouts to metric SDK
1 parent 7397605 commit fe1d1f6

File tree

10 files changed

+183
-36
lines changed

10 files changed

+183
-36
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,13 @@ def _translate_data(
169169
)
170170

171171
def export(
172-
self, metrics: Sequence[Metric], *args, **kwargs
172+
self,
173+
metrics: Sequence[Metric],
174+
timeout_millis: float = 10_000,
175+
**kwargs,
173176
) -> MetricExportResult:
177+
# TODO(): OTLPExporterMixin should pass timeout to gRPC
174178
return self._export(metrics)
175179

176-
def shutdown(self, *args, **kwargs):
180+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
177181
pass

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,15 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
277277
logger.exception(error)
278278
return output
279279

280-
def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT:
280+
def _export(
281+
self,
282+
data: TypingSequence[SDKDataT],
283+
timeout_millis: Optional[float] = None,
284+
) -> ExportResultT:
285+
if timeout_millis is not None:
286+
timeout_seconds = timeout_millis / 10**3
287+
else:
288+
timeout_seconds = self._timeout
281289

282290
max_value = 64
283291
# expo returns a generator that yields delay values which grow
@@ -292,7 +300,7 @@ def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT:
292300
self._client.Export(
293301
request=self._translate_data(data),
294302
metadata=self._headers,
295-
timeout=self._timeout,
303+
timeout=timeout_seconds,
296304
)
297305

298306
return self._result.SUCCESS

exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,17 @@ def __init__(self, prefix: str = "") -> None:
111111
self._collector._callback = self.collect
112112

113113
def _receive_metrics(
114-
self, metrics: Iterable[Metric], *args, **kwargs
114+
self,
115+
metrics: Iterable[Metric],
116+
timeout_millis: float = 10_000,
117+
**kwargs,
115118
) -> None:
116119
if metrics is None:
117120
return
118121
self._collector.add_metrics_data(metrics)
119122

120-
def shutdown(self, *args, **kwargs) -> bool:
123+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
121124
REGISTRY.unregister(self._collector)
122-
return True
123125

124126

125127
class _CustomCollector:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from opentelemetry.sdk.resources import Resource
4949
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
5050
from opentelemetry.util._once import Once
51+
from opentelemetry.util._time import time_ns
5152

5253
_logger = getLogger(__name__)
5354

@@ -369,16 +370,16 @@ def __init__(
369370
self._shutdown_once = Once()
370371
self._shutdown = False
371372

372-
def force_flush(self) -> bool:
373+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
373374

374375
# FIXME implement a timeout
375376

376377
for metric_reader in self._sdk_config.metric_readers:
377-
metric_reader.collect()
378+
metric_reader.collect(timeout_millis=timeout_millis)
378379
return True
379380

380-
def shutdown(self):
381-
# FIXME implement a timeout
381+
def shutdown(self, timeout_millis: float = 10_000):
382+
deadline_ns = time_ns() + timeout_millis * 10**6
382383

383384
def _shutdown():
384385
self._shutdown = True
@@ -392,8 +393,15 @@ def _shutdown():
392393
metric_reader_error = {}
393394

394395
for metric_reader in self._sdk_config.metric_readers:
396+
current_ts = time_ns()
395397
try:
396-
metric_reader.shutdown()
398+
if current_ts >= deadline_ns:
399+
raise TimeoutError(
400+
"Didn't get to execute, deadline already exceeded"
401+
)
402+
metric_reader.shutdown(
403+
timeout_millis=(deadline_ns - current_ts) / 10**6
404+
)
397405

398406
# pylint: disable=broad-except
399407
except Exception as error:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py

+32-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from abc import ABC, abstractmethod
1818
from enum import Enum
1919
from os import environ, linesep
20+
from socket import timeout
2021
from sys import stdout
2122
from threading import Event, RLock, Thread
2223
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence
@@ -31,6 +32,7 @@
3132
from opentelemetry.sdk._metrics.metric_reader import MetricReader
3233
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
3334
from opentelemetry.util._once import Once
35+
from opentelemetry.util._time import time_ns
3436

3537
_logger = logging.getLogger(__name__)
3638

@@ -53,8 +55,11 @@ class MetricExporter(ABC):
5355

5456
@abstractmethod
5557
def export(
56-
self, metrics: Sequence[Metric], *args, **kwargs
57-
) -> "MetricExportResult":
58+
self,
59+
metrics: Sequence[Metric],
60+
timeout_millis: float = 10_000,
61+
**kwargs,
62+
) -> MetricExportResult:
5863
"""Exports a batch of telemetry data.
5964
6065
Args:
@@ -65,7 +70,7 @@ def export(
6570
"""
6671

6772
@abstractmethod
68-
def shutdown(self, *args, **kwargs) -> None:
73+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
6974
"""Shuts down the exporter.
7075
7176
Called when the SDK is shut down.
@@ -90,14 +95,17 @@ def __init__(
9095
self.formatter = formatter
9196

9297
def export(
93-
self, metrics: Sequence[Metric], *args, **kwargs
98+
self,
99+
metrics: Sequence[Metric],
100+
timeout_millis: float = 10_000,
101+
**kwargs,
94102
) -> MetricExportResult:
95103
for metric in metrics:
96104
self.out.write(self.formatter(metric))
97105
self.out.flush()
98106
return MetricExportResult.SUCCESS
99107

100-
def shutdown(self, *args, **kwargs) -> None:
108+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
101109
pass
102110

103111

@@ -127,11 +135,16 @@ def get_metrics(self) -> List[Metric]:
127135
self._metrics = []
128136
return metrics
129137

130-
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
138+
def _receive_metrics(
139+
self,
140+
metrics: Iterable[Metric],
141+
timeout_millis: float = 10_000,
142+
**kwargs,
143+
) -> None:
131144
with self._lock:
132145
self._metrics = list(metrics)
133146

134-
def shutdown(self, *args, **kwargs):
147+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
135148
pass
136149

137150

@@ -193,23 +206,28 @@ def _at_fork_reinit(self):
193206
def _ticker(self) -> None:
194207
interval_secs = self._export_interval_millis / 1e3
195208
while not self._shutdown_event.wait(interval_secs):
196-
self.collect()
209+
self.collect(timeout_millis=self._export_timeout_millis)
197210
# one last collection below before shutting down completely
198-
self.collect()
211+
self.collect(timeout_millis=self._export_interval_millis)
199212

200213
def _receive_metrics(
201-
self, metrics: Iterable[Metric], *args, **kwargs
214+
self,
215+
metrics: Iterable[Metric],
216+
timeout_millis: float = 10_000,
217+
**kwargs,
202218
) -> None:
203219
if metrics is None:
204220
return
205221
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
206222
try:
207-
self._exporter.export(metrics)
223+
self._exporter.export(metrics, timeout_millis=timeout_millis)
208224
except Exception as e: # pylint: disable=broad-except,invalid-name
209225
_logger.exception("Exception while exporting metrics %s", str(e))
210226
detach(token)
211227

212-
def shutdown(self, *args, **kwargs):
228+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
229+
deadline_ns = time_ns() + timeout_millis * 10**6
230+
213231
def _shutdown():
214232
self._shutdown = True
215233

@@ -219,5 +237,5 @@ def _shutdown():
219237
return
220238

221239
self._shutdown_event.set()
222-
self._daemon_thread.join()
223-
self._exporter.shutdown()
240+
self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9)
241+
self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def __init__(
138138
self._instrument_class_aggregation.update(preferred_aggregation or {})
139139

140140
@final
141-
def collect(self) -> None:
141+
def collect(self, timeout_millis: float = 10_000) -> None:
142142
"""Collects the metrics from the internal SDK state and
143143
invokes the `_receive_metrics` with the collection.
144144
"""
@@ -148,7 +148,8 @@ def collect(self) -> None:
148148
)
149149
return
150150
self._receive_metrics(
151-
self._collect(self, self._instrument_class_temporality)
151+
self._collect(self, self._instrument_class_temporality),
152+
timeout_millis=timeout_millis,
152153
)
153154

154155
@final
@@ -162,11 +163,16 @@ def _set_collect_callback(
162163
self._collect = func
163164

164165
@abstractmethod
165-
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
166+
def _receive_metrics(
167+
self,
168+
metrics: Iterable[Metric],
169+
timeout_millis: float = 10_000,
170+
**kwargs,
171+
) -> None:
166172
"""Called by `MetricReader.collect` when it receives a batch of metrics"""
167173

168174
@abstractmethod
169-
def shutdown(self, *args, **kwargs):
175+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
170176
"""Shuts down the MetricReader. This method provides a way
171177
for the MetricReader to do any cleanup required. A metric reader can
172178
only be shutdown once, any subsequent calls are ignored and return
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
The purpose of this test is to test for backward compatibility with any user-implementable
17+
interfaces as they were originally defined. For example, changes to the MetricExporter ABC must
18+
be made in such a way that existing implementations (outside of this repo) continue to work
19+
when *called* by the SDK.
20+
21+
This does not apply to classes which are not intended to be overriden by the user e.g. Meter
22+
and PeriodicExportingMetricReader concrete class. Those may freely be modified in a
23+
backward-compatible way for *callers*.
24+
25+
Ideally, we could use mypy for this as well, but SDK is not type checked atm.
26+
"""
27+
28+
from typing import Iterable, Sequence
29+
from unittest import TestCase
30+
31+
from opentelemetry.sdk._metrics import MeterProvider
32+
from opentelemetry.sdk._metrics.export import (
33+
MetricExporter,
34+
MetricExportResult,
35+
PeriodicExportingMetricReader,
36+
)
37+
from opentelemetry.sdk._metrics.metric_reader import MetricReader
38+
from opentelemetry.sdk._metrics.point import Metric
39+
40+
41+
# Do not change these classes until after major version 1
42+
class OrigMetricExporter(MetricExporter):
43+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
44+
"""Shuts down the exporter.
45+
46+
Called when the SDK is shut down.
47+
"""
48+
49+
def export(
50+
self,
51+
metrics: Sequence[Metric],
52+
timeout_millis: float = 10_000,
53+
**kwargs,
54+
) -> MetricExportResult:
55+
pass
56+
57+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
58+
pass
59+
60+
61+
class OrigMetricReader(MetricReader):
62+
def _receive_metrics(
63+
self,
64+
metrics: Iterable[Metric],
65+
timeout_millis: float = 10_000,
66+
**kwargs,
67+
) -> None:
68+
pass
69+
70+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
71+
self.collect()
72+
73+
74+
class TestBackwardCompat(TestCase):
75+
def test_metric_exporter(self):
76+
exporter = OrigMetricExporter()
77+
meter_provider = MeterProvider(
78+
metric_readers=[PeriodicExportingMetricReader(exporter)]
79+
)
80+
# produce some data
81+
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
82+
meter_provider.shutdown()
83+
84+
def test_metric_reader(self):
85+
reader = OrigMetricReader()
86+
meter_provider = MeterProvider(metric_readers=[reader])
87+
# produce some data
88+
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
89+
meter_provider.shutdown()

opentelemetry-sdk/tests/metrics/test_metric_reader.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151
def _receive_metrics(self, metrics):
5252
pass
5353

54-
def shutdown(self):
54+
def shutdown(self, *args, **kwargs):
5555
return True
5656

5757

opentelemetry-sdk/tests/metrics/test_metrics.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self):
4949
def _receive_metrics(self, metrics):
5050
pass
5151

52-
def shutdown(self):
52+
def shutdown(self, *args, **kwargs):
5353
return True
5454

5555

@@ -433,12 +433,17 @@ def __init__(self):
433433
self.metrics = {}
434434
self._counter = 0
435435

436-
def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
436+
def export(
437+
self,
438+
metrics: Sequence[Metric],
439+
timeout_millis: float = 10_000,
440+
**kwargs,
441+
) -> MetricExportResult:
437442
self.metrics[self._counter] = metrics
438443
self._counter += 1
439444
return MetricExportResult.SUCCESS
440445

441-
def shutdown(self) -> None:
446+
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
442447
pass
443448

444449

0 commit comments

Comments
 (0)