|
14 | 14 |
|
15 | 15 | import math
|
16 | 16 | from time import sleep, time_ns
|
17 |
| -from typing import Sequence |
| 17 | +from typing import ( |
| 18 | + Optional, |
| 19 | + Sequence, |
| 20 | +) |
18 | 21 | from unittest.mock import Mock
|
19 | 22 |
|
20 | 23 | from flaky import flaky
|
21 | 24 |
|
22 |
| -from opentelemetry.sdk.metrics import Counter |
| 25 | +from opentelemetry.sdk.metrics import ( |
| 26 | + Counter, |
| 27 | + MetricsTimeoutError, |
| 28 | +) |
23 | 29 | from opentelemetry.sdk.metrics._internal import _Counter
|
24 | 30 | from opentelemetry.sdk.metrics.export import (
|
25 | 31 | AggregationTemporality,
|
@@ -67,6 +73,25 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
|
67 | 73 | return True
|
68 | 74 |
|
69 | 75 |
|
| 76 | +class ExceptionAtCollectionPeriodicExportingMetricReader( |
| 77 | + PeriodicExportingMetricReader |
| 78 | +): |
| 79 | + def __init__( |
| 80 | + self, |
| 81 | + exporter: MetricExporter, |
| 82 | + exception: Exception, |
| 83 | + export_interval_millis: Optional[float] = None, |
| 84 | + export_timeout_millis: Optional[float] = None, |
| 85 | + ) -> None: |
| 86 | + super().__init__( |
| 87 | + exporter, export_interval_millis, export_timeout_millis |
| 88 | + ) |
| 89 | + self._collect_exception = exception |
| 90 | + |
| 91 | + def collect(self, timeout_millis: float = 10_000) -> None: |
| 92 | + raise self._collect_exception |
| 93 | + |
| 94 | + |
70 | 95 | metrics_list = [
|
71 | 96 | Metric(
|
72 | 97 | name="sum_name",
|
@@ -111,11 +136,13 @@ def test_defaults(self):
|
111 | 136 | pmr.shutdown()
|
112 | 137 |
|
113 | 138 | def _create_periodic_reader(
|
114 |
| - self, metrics, exporter, collect_wait=0, interval=60000 |
| 139 | + self, metrics, exporter, collect_wait=0, interval=60000, timeout=30000 |
115 | 140 | ):
|
116 | 141 |
|
117 | 142 | pmr = PeriodicExportingMetricReader(
|
118 |
| - exporter, export_interval_millis=interval |
| 143 | + exporter, |
| 144 | + export_interval_millis=interval, |
| 145 | + export_timeout_millis=timeout, |
119 | 146 | )
|
120 | 147 |
|
121 | 148 | def _collect(reader, timeout_millis):
|
@@ -219,3 +246,27 @@ def test_exporter_aggregation_preference(self):
|
219 | 246 | self.assertTrue(isinstance(value, DefaultAggregation))
|
220 | 247 | else:
|
221 | 248 | self.assertTrue(isinstance(value, LastValueAggregation))
|
| 249 | + |
| 250 | + def test_metric_timeout_does_not_kill_worker_thread(self): |
| 251 | + exporter = FakeMetricsExporter() |
| 252 | + pmr = ExceptionAtCollectionPeriodicExportingMetricReader( |
| 253 | + exporter, |
| 254 | + MetricsTimeoutError("test timeout"), |
| 255 | + export_timeout_millis=1, |
| 256 | + ) |
| 257 | + |
| 258 | + sleep(0.1) |
| 259 | + self.assertTrue(pmr._daemon_thread.is_alive()) |
| 260 | + pmr.shutdown() |
| 261 | + |
| 262 | + def test_non_metric_timeout_error_kills_exporter_thread(self): |
| 263 | + exporter = FakeMetricsExporter() |
| 264 | + pmr = ExceptionAtCollectionPeriodicExportingMetricReader( |
| 265 | + exporter, |
| 266 | + ZeroDivisionError(), |
| 267 | + export_timeout_millis=1, |
| 268 | + ) |
| 269 | + |
| 270 | + sleep(0.1) |
| 271 | + self.assertFalse(pmr._daemon_thread.is_alive()) |
| 272 | + pmr.shutdown() |
0 commit comments