diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f1cd0c79dd..f5a7a983987 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Fix handling of empty metric collection cycles + ([#3335](https://github.com/open-telemetry/opentelemetry-python/pull/3335)) - Fix error when no LoggerProvider configured for LoggingHandler ([#3423](https://github.com/open-telemetry/opentelemetry-python/pull/3423)) - + + ## Version 1.20.0/0.41b0 (2023-09-04) - Modify Prometheus exporter to translate non-monotonic Sums into Gauges diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index ab4645c82f3..110f963a486 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -16,7 +16,7 @@ from logging import getLogger from threading import Lock from time import time_ns -from typing import Dict, List, Sequence +from typing import Dict, List, Optional, Sequence from opentelemetry.metrics import Instrument from opentelemetry.sdk.metrics._internal.aggregation import ( @@ -126,7 +126,7 @@ def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nanos: int, - ) -> Sequence[DataPointT]: + ) -> Optional[Sequence[DataPointT]]: data_points: List[DataPointT] = [] with self._lock: @@ -136,4 +136,8 @@ def collect( ) if data_point is not None: data_points.append(data_point) - return data_points + + # Returning here None instead of an empty list because the caller + # does not consume a sequence and to be consistent with the rest of + # collect methods that also return None. + return data_points or None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 5bd94d5aacc..0568270ae6b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -322,10 +322,14 @@ def collect(self, timeout_millis: float = 10_000) -> None: ) return - self._receive_metrics( - self._collect(self, timeout_millis=timeout_millis), - timeout_millis=timeout_millis, - ) + metrics = self._collect(self, timeout_millis=timeout_millis) + + if metrics is not None: + + self._receive_metrics( + metrics, + timeout_millis=timeout_millis, + ) @final def _set_collect_callback( @@ -515,8 +519,7 @@ def _receive_metrics( timeout_millis: float = 10_000, **kwargs, ) -> None: - if metrics_data is None: - return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: with self._export_lock: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 9daf1eff461..c5e81678dcb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -17,7 +17,7 @@ from abc import ABC, abstractmethod from threading import Lock from time import time_ns -from typing import Iterable, List, Mapping +from typing import Iterable, List, Mapping, Optional # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics @@ -51,7 +51,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Iterable[Metric]: + ) -> Optional[Iterable[Metric]]: pass @@ -94,7 +94,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Iterable[Metric]: + ) -> Optional[Iterable[Metric]]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] @@ -123,4 +123,6 @@ def collect( for measurement in measurements: metric_reader_storage.consume_measurement(measurement) - return self._reader_storages[metric_reader].collect() + result = self._reader_storages[metric_reader].collect() + + return result diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index bef57eaab09..700ace87204 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -15,7 +15,7 @@ from logging import getLogger from threading import RLock from time import time_ns -from typing import Dict, List +from typing import Dict, List, Optional from opentelemetry.metrics import ( Asynchronous, @@ -119,7 +119,7 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self) -> MetricsData: + def collect(self) -> Optional[MetricsData]: # Use a list instead of yielding to prevent a slow reader from holding # SDK locks @@ -152,6 +152,13 @@ def collect(self) -> MetricsData: for view_instrument_match in view_instrument_matches: + data_points = view_instrument_match.collect( + aggregation_temporality, collection_start_nanos + ) + + if data_points is None: + continue + if isinstance( # pylint: disable=protected-access view_instrument_match._aggregation, @@ -159,9 +166,7 @@ def collect(self) -> MetricsData: ): data = Sum( aggregation_temporality=aggregation_temporality, - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ), + data_points=data_points, is_monotonic=isinstance( instrument, (Counter, ObservableCounter) ), @@ -171,20 +176,14 @@ def collect(self) -> MetricsData: view_instrument_match._aggregation, _LastValueAggregation, ): - data = Gauge( - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ) - ) + data = Gauge(data_points=data_points) elif isinstance( # pylint: disable=protected-access view_instrument_match._aggregation, _ExplicitBucketHistogramAggregation, ): data = Histogram( - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ), + data_points=data_points, aggregation_temporality=aggregation_temporality, ) elif isinstance( @@ -200,9 +199,7 @@ def collect(self) -> MetricsData: _ExponentialBucketHistogramAggregation, ): data = ExponentialHistogram( - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ), + data_points=data_points, aggregation_temporality=aggregation_temporality, ) @@ -216,32 +213,38 @@ def collect(self) -> MetricsData: ) ) - if instrument.instrumentation_scope not in ( - instrumentation_scope_scope_metrics - ): - instrumentation_scope_scope_metrics[ - instrument.instrumentation_scope - ] = ScopeMetrics( - scope=instrument.instrumentation_scope, - metrics=metrics, - schema_url=instrument.instrumentation_scope.schema_url, - ) - else: - instrumentation_scope_scope_metrics[ - instrument.instrumentation_scope - ].metrics.extend(metrics) - - return MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=self._sdk_config.resource, - scope_metrics=list( - instrumentation_scope_scope_metrics.values() - ), - schema_url=self._sdk_config.resource.schema_url, + if metrics: + + if instrument.instrumentation_scope not in ( + instrumentation_scope_scope_metrics + ): + instrumentation_scope_scope_metrics[ + instrument.instrumentation_scope + ] = ScopeMetrics( + scope=instrument.instrumentation_scope, + metrics=metrics, + schema_url=instrument.instrumentation_scope.schema_url, + ) + else: + instrumentation_scope_scope_metrics[ + instrument.instrumentation_scope + ].metrics.extend(metrics) + + if instrumentation_scope_scope_metrics: + + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self._sdk_config.resource, + scope_metrics=list( + instrumentation_scope_scope_metrics.values() + ), + schema_url=self._sdk_config.resource.schema_url, + ) + ] ) - ] - ) + + return None def _handle_view_instrument_match( self, diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py b/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py index 60c4227c3b2..1b3283717ae 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py @@ -72,3 +72,19 @@ def test_console_exporter(self): self.assertEqual(metrics["attributes"], {"a": "b"}) self.assertEqual(metrics["value"], 1) + + def test_console_exporter_no_export(self): + + output = StringIO() + exporter = ConsoleMetricExporter(out=output) + reader = PeriodicExportingMetricReader( + exporter, export_interval_millis=100 + ) + provider = MeterProvider(metric_readers=[reader]) + provider.shutdown() + + output.seek(0) + actual = "".join(output.readlines()) + expected = "" + + self.assertEqual(actual, expected) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index ad90fe9a298..d022456415b 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -31,15 +31,7 @@ def test_disable_default_views(self): counter.add(10, {"label": "value1"}) counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) - self.assertEqual( - ( - reader.get_metrics_data() - .resource_metrics[0] - .scope_metrics[0] - .metrics - ), - [], - ) + self.assertIsNone(reader.get_metrics_data()) def test_disable_default_views_add_custom(self): reader = InMemoryMetricReader() diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py b/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py index 045afe0b298..bbc67eac309 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py @@ -74,6 +74,15 @@ class TestExporterConcurrency(ConcurrencyTestBase): > be called again only after the current call returns. https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch + + This test also tests that a thread that calls the a + ``MetricReader.collect`` method using an asynchronous instrument is able + to perform two actions in the same thread lock space (without it being + interrupted by another thread): + + 1. Consume the measurement produced by the callback associated to the + asynchronous instrument. + 2. Export the measurement mentioned in the step above. """ def test_exporter_not_called_concurrently(self): @@ -84,7 +93,11 @@ def test_exporter_not_called_concurrently(self): ) meter_provider = MeterProvider(metric_readers=[reader]) + counter_cb_counter = 0 + def counter_cb(options: CallbackOptions): + nonlocal counter_cb_counter + counter_cb_counter += 1 yield Observation(2) meter_provider.get_meter(__name__).create_observable_counter( @@ -97,6 +110,7 @@ def test_many_threads(): self.run_with_many_threads(test_many_threads, num_threads=100) + self.assertEqual(counter_cb_counter, 100) # no thread should be in export() now self.assertEqual(exporter.count_in_export, 0) # should be one call for each thread diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py new file mode 100644 index 00000000000..81d419819a4 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py @@ -0,0 +1,82 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.resources import SERVICE_NAME, Resource + + +class TestHistogramExport(TestCase): + def test_histogram_counter_collection(self): + + in_memory_metric_reader = InMemoryMetricReader() + + provider = MeterProvider( + resource=Resource.create({SERVICE_NAME: "otel-test"}), + metric_readers=[in_memory_metric_reader], + ) + + meter = provider.get_meter("my-meter") + + histogram = meter.create_histogram("my_histogram") + counter = meter.create_counter("my_counter") + histogram.record(5, {"attribute": "value"}) + counter.add(1, {"attribute": "value_counter"}) + + metric_data = in_memory_metric_reader.get_metrics_data() + + self.assertEqual( + len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 2 + ) + + self.assertEqual( + ( + metric_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .bucket_counts + ), + (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), + ) + self.assertEqual( + ( + metric_data.resource_metrics[0] + .scope_metrics[0] + .metrics[1] + .data.data_points[0] + .value + ), + 1, + ) + + metric_data = in_memory_metric_reader.get_metrics_data() + + # FIXME ExplicitBucketHistogramAggregation is resetting counts to zero + # even if aggregation temporality is cumulative. + self.assertEqual( + len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 1 + ) + self.assertEqual( + ( + metric_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .value + ), + 1, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index 97b5532feae..1da6d5bcf60 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -314,15 +314,7 @@ def test_drop_aggregation(self): ) metric_reader_storage.consume_measurement(Measurement(1, counter)) - self.assertEqual( - [], - ( - metric_reader_storage.collect() - .resource_metrics[0] - .scope_metrics[0] - .metrics - ), - ) + self.assertIsNone(metric_reader_storage.collect()) def test_same_collection_start(self):