Skip to content

Commit c51d38b

Browse files
authored
Merge branch 'main' into issue_2142
2 parents 5f8ec9b + d112814 commit c51d38b

File tree

2 files changed

+53
-15
lines changed

2 files changed

+53
-15
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py

+13-14
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
4949
from opentelemetry.sdk.resources import Resource
5050
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
51+
from opentelemetry.util._once import Once
5152

5253
_logger = getLogger(__name__)
5354

@@ -185,30 +186,30 @@ def __init__(
185186
self._metric_readers = metric_readers
186187

187188
for metric_reader in self._sdk_config.metric_readers:
188-
metric_reader._register_measurement_consumer(self)
189+
metric_reader._set_collect_callback(
190+
self._measurement_consumer.collect
191+
)
189192

193+
self._shutdown_once = Once()
190194
self._shutdown = False
191195

192196
def force_flush(self) -> bool:
193197

194198
# FIXME implement a timeout
195199

196-
metric_reader_result = True
197-
198200
for metric_reader in self._sdk_config.metric_readers:
199-
metric_reader_result = (
200-
metric_reader_result and metric_reader.force_flush()
201-
)
202-
203-
if not metric_reader_result:
204-
_logger.warning("Unable to force flush all metric readers")
205-
206-
return metric_reader_result
201+
metric_reader.collect()
202+
return True
207203

208204
def shutdown(self):
209205
# FIXME implement a timeout
210206

211-
if self._shutdown:
207+
def _shutdown():
208+
self._shutdown = True
209+
210+
did_shutdown = self._shutdown_once.do_once(_shutdown)
211+
212+
if not did_shutdown:
212213
_logger.warning("shutdown can only be called once")
213214
return False
214215

@@ -224,8 +225,6 @@ def shutdown(self):
224225

225226
overall_result = overall_result and metric_reader_result
226227

227-
self._shutdown = True
228-
229228
if self._atexit_handler is not None:
230229
unregister(self._atexit_handler)
231230
self._atexit_handler = None

opentelemetry-sdk/tests/metrics/test_metrics.py

+40-1
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,24 @@
2626
ObservableUpDownCounter,
2727
UpDownCounter,
2828
)
29+
from opentelemetry.sdk._metrics.metric_reader import MetricReader
30+
from opentelemetry.sdk._metrics.point import AggregationTemporality
2931
from opentelemetry.sdk.resources import Resource
32+
from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc
3033

3134

32-
class TestMeterProvider(TestCase):
35+
class DummyMetricReader(MetricReader):
36+
def __init__(self):
37+
super().__init__(AggregationTemporality.CUMULATIVE)
38+
39+
def _receive_metrics(self, metrics):
40+
pass
41+
42+
def shutdown(self):
43+
return True
44+
45+
46+
class TestMeterProvider(ConcurrencyTestBase):
3347
def test_resource(self):
3448
"""
3549
`MeterProvider` provides a way to allow a `Resource` to be specified.
@@ -139,6 +153,31 @@ def test_shutdown_subsequent_calls(self):
139153
with self.assertLogs(level=WARNING):
140154
meter_provider.shutdown()
141155

156+
@patch("opentelemetry.sdk._metrics._logger")
157+
def test_shutdown_race(self, mock_logger):
158+
mock_logger.warning = MockFunc()
159+
meter_provider = MeterProvider()
160+
num_threads = 70
161+
self.run_with_many_threads(
162+
meter_provider.shutdown, num_threads=num_threads
163+
)
164+
self.assertEqual(mock_logger.warning.call_count, num_threads - 1)
165+
166+
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
167+
def test_measurement_collect_callback(
168+
self, mock_sync_measurement_consumer
169+
):
170+
metric_readers = [DummyMetricReader()] * 5
171+
sync_consumer_instance = mock_sync_measurement_consumer()
172+
sync_consumer_instance.collect = MockFunc()
173+
MeterProvider(metric_readers=metric_readers)
174+
175+
for reader in metric_readers:
176+
reader.collect()
177+
self.assertEqual(
178+
sync_consumer_instance.collect.call_count, len(metric_readers)
179+
)
180+
142181
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
143182
def test_creates_sync_measurement_consumer(
144183
self, mock_sync_measurement_consumer

0 commit comments

Comments
 (0)