From be90bfdd3d177a6dfe2c59f7f4787fb3f40b31ad Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Sun, 1 May 2022 15:25:58 -0600 Subject: [PATCH 1/5] Add variadic arguments to metric exporter/reader interfaces Fixes #2650 --- .../proto/grpc/_metric_exporter/__init__.py | 6 ++++-- .../exporter/prometheus/__init__.py | 6 ++++-- .../sdk/_metrics/_internal/export/__init__.py | 20 +++++++++++-------- .../_internal/measurement_consumer.py | 18 +++++++++++++---- .../sdk/_metrics/_internal/metric_reader.py | 4 ++-- 5 files changed, 36 insertions(+), 18 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index 1ad5cc4d808..c40ff1ddd21 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -168,8 +168,10 @@ def _translate_data( ) ) - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, metrics: Sequence[Metric], *args, **kwargs + ) -> MetricExportResult: return self._export(metrics) - def shutdown(self): + def shutdown(self, *args, **kwargs): pass diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 4d506626750..1b81f062c02 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -110,12 +110,14 @@ def __init__(self, prefix: str = "") -> None: REGISTRY.register(self._collector) self._collector._callback = self.collect - def _receive_metrics(self, metrics: Iterable[Metric]) -> None: + def _receive_metrics( + self, metrics: Iterable[Metric], *args, **kwargs + ) -> None: if metrics is None: return self._collector.add_metrics_data(metrics) - def shutdown(self) -> bool: + def shutdown(self, *args, **kwargs) -> bool: REGISTRY.unregister(self._collector) return True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index b99d7f8f03b..a8f849feb55 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -52,7 +52,9 @@ class MetricExporter(ABC): """ @abstractmethod - def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": + def export( + self, metrics: Sequence[Metric], **kwargs + ) -> "MetricExportResult": """Exports a batch of telemetry data. Args: @@ -63,7 +65,7 @@ def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": """ @abstractmethod - def shutdown(self) -> None: + def shutdown(self, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. @@ -87,13 +89,15 @@ def __init__( self.out = out self.formatter = formatter - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, metrics: Sequence[Metric], **kwargs + ) -> MetricExportResult: for metric in metrics: self.out.write(self.formatter(metric)) self.out.flush() return MetricExportResult.SUCCESS - def shutdown(self) -> None: + def shutdown(self, **kwargs) -> None: pass @@ -123,11 +127,11 @@ def get_metrics(self) -> List[Metric]: self._metrics = [] return metrics - def _receive_metrics(self, metrics: Iterable[Metric]): + def _receive_metrics(self, metrics: Iterable[Metric], **kwargs): with self._lock: self._metrics = list(metrics) - def shutdown(self): + def shutdown(self, **kwargs): pass @@ -193,7 +197,7 @@ def _ticker(self) -> None: # one last collection below before shutting down completely self.collect() - def _receive_metrics(self, metrics: Iterable[Metric]) -> None: + def _receive_metrics(self, metrics: Iterable[Metric], **kwargs) -> None: if metrics is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) @@ -203,7 +207,7 @@ def _receive_metrics(self, metrics: Iterable[Metric]) -> None: _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) - def shutdown(self): + def shutdown(self, **kwargs): def _shutdown(): self._shutdown = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py index ce9b1f0d5e3..2a9c36e9965 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py @@ -32,11 +32,15 @@ class MeasurementConsumer(ABC): @abstractmethod - def consume_measurement(self, measurement: Measurement) -> None: + def consume_measurement( + self, measurement: Measurement, *args, **kwargs + ) -> None: pass @abstractmethod - def register_asynchronous_instrument(self, instrument: "_Asynchronous"): + def register_asynchronous_instrument( + self, instrument: "_Asynchronous", *args, **kwargs + ): pass @abstractmethod @@ -44,6 +48,8 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], + *args, + **kwargs ) -> Iterable[Metric]: pass @@ -61,12 +67,14 @@ def __init__(self, sdk_config: SdkConfiguration) -> None: } self._async_instruments: List["_Asynchronous"] = [] - def consume_measurement(self, measurement: Measurement) -> None: + def consume_measurement( + self, measurement: Measurement, *args, **kwargs + ) -> None: for reader_storage in self._reader_storages.values(): reader_storage.consume_measurement(measurement) def register_asynchronous_instrument( - self, instrument: "_Asynchronous" + self, instrument: "_Asynchronous", *args, **kwargs ) -> None: with self._lock: self._async_instruments.append(instrument) @@ -75,6 +83,8 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], + *args, + **kwargs ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index b84b873854f..772a88d3f8e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -162,11 +162,11 @@ def _set_collect_callback( self._collect = func @abstractmethod - def _receive_metrics(self, metrics: Iterable[Metric]): + def _receive_metrics(self, metrics: Iterable[Metric], **kwargs): """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod - def shutdown(self): + def shutdown(self, **kwargs): """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return From 41581d3fb3b421bf2c2e8315a10e7152a828e740 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Sun, 1 May 2022 18:07:34 -0600 Subject: [PATCH 2/5] Add changelog entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d43389891c4..c98bb873ce5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD) +- Add variadic arguments to metric exporter/reader interfaces + ([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654)) - Move Metrics API behind internal package ([#2651](https://github.com/open-telemetry/opentelemetry-python/pull/2651)) From 7eb61349ee0f45aaeb64888bc107027c3deaa2d1 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 2 May 2022 10:14:24 -0600 Subject: [PATCH 3/5] Remove args --- .../otlp/proto/grpc/_metric_exporter/__init__.py | 4 ++-- .../opentelemetry/exporter/prometheus/__init__.py | 6 ++---- .../sdk/_metrics/_internal/measurement_consumer.py | 14 ++++---------- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index c40ff1ddd21..cebfe7b2347 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -169,9 +169,9 @@ def _translate_data( ) def export( - self, metrics: Sequence[Metric], *args, **kwargs + self, metrics: Sequence[Metric], **kwargs ) -> MetricExportResult: return self._export(metrics) - def shutdown(self, *args, **kwargs): + def shutdown(self, **kwargs): pass diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 1b81f062c02..356c11136b2 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -110,14 +110,12 @@ def __init__(self, prefix: str = "") -> None: REGISTRY.register(self._collector) self._collector._callback = self.collect - def _receive_metrics( - self, metrics: Iterable[Metric], *args, **kwargs - ) -> None: + def _receive_metrics(self, metrics: Iterable[Metric], **kwargs) -> None: if metrics is None: return self._collector.add_metrics_data(metrics) - def shutdown(self, *args, **kwargs) -> bool: + def shutdown(self, **kwargs) -> bool: REGISTRY.unregister(self._collector) return True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py index 2a9c36e9965..1400b2922bc 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py @@ -32,14 +32,12 @@ class MeasurementConsumer(ABC): @abstractmethod - def consume_measurement( - self, measurement: Measurement, *args, **kwargs - ) -> None: + def consume_measurement(self, measurement: Measurement, **kwargs) -> None: pass @abstractmethod def register_asynchronous_instrument( - self, instrument: "_Asynchronous", *args, **kwargs + self, instrument: "_Asynchronous", **kwargs ): pass @@ -48,7 +46,6 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], - *args, **kwargs ) -> Iterable[Metric]: pass @@ -67,14 +64,12 @@ def __init__(self, sdk_config: SdkConfiguration) -> None: } self._async_instruments: List["_Asynchronous"] = [] - def consume_measurement( - self, measurement: Measurement, *args, **kwargs - ) -> None: + def consume_measurement(self, measurement: Measurement, **kwargs) -> None: for reader_storage in self._reader_storages.values(): reader_storage.consume_measurement(measurement) def register_asynchronous_instrument( - self, instrument: "_Asynchronous", *args, **kwargs + self, instrument: "_Asynchronous", **kwargs ) -> None: with self._lock: self._async_instruments.append(instrument) @@ -83,7 +78,6 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], - *args, **kwargs ) -> Iterable[Metric]: with self._lock: From e45c7c3ab90491fdf0d34574cc1cfa9157276558 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Tue, 3 May 2022 21:07:56 -0600 Subject: [PATCH 4/5] Add args again This reverts commit 7eb61349ee0f45aaeb64888bc107027c3deaa2d1. --- .../otlp/proto/grpc/_metric_exporter/__init__.py | 4 ++-- .../opentelemetry/exporter/prometheus/__init__.py | 6 ++++-- .../sdk/_metrics/_internal/measurement_consumer.py | 12 ++++-------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index cebfe7b2347..c40ff1ddd21 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -169,9 +169,9 @@ def _translate_data( ) def export( - self, metrics: Sequence[Metric], **kwargs + self, metrics: Sequence[Metric], *args, **kwargs ) -> MetricExportResult: return self._export(metrics) - def shutdown(self, **kwargs): + def shutdown(self, *args, **kwargs): pass diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 356c11136b2..1b81f062c02 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -110,12 +110,14 @@ def __init__(self, prefix: str = "") -> None: REGISTRY.register(self._collector) self._collector._callback = self.collect - def _receive_metrics(self, metrics: Iterable[Metric], **kwargs) -> None: + def _receive_metrics( + self, metrics: Iterable[Metric], *args, **kwargs + ) -> None: if metrics is None: return self._collector.add_metrics_data(metrics) - def shutdown(self, **kwargs) -> bool: + def shutdown(self, *args, **kwargs) -> bool: REGISTRY.unregister(self._collector) return True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py index 1400b2922bc..ce9b1f0d5e3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py @@ -32,13 +32,11 @@ class MeasurementConsumer(ABC): @abstractmethod - def consume_measurement(self, measurement: Measurement, **kwargs) -> None: + def consume_measurement(self, measurement: Measurement) -> None: pass @abstractmethod - def register_asynchronous_instrument( - self, instrument: "_Asynchronous", **kwargs - ): + def register_asynchronous_instrument(self, instrument: "_Asynchronous"): pass @abstractmethod @@ -46,7 +44,6 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], - **kwargs ) -> Iterable[Metric]: pass @@ -64,12 +61,12 @@ def __init__(self, sdk_config: SdkConfiguration) -> None: } self._async_instruments: List["_Asynchronous"] = [] - def consume_measurement(self, measurement: Measurement, **kwargs) -> None: + def consume_measurement(self, measurement: Measurement) -> None: for reader_storage in self._reader_storages.values(): reader_storage.consume_measurement(measurement) def register_asynchronous_instrument( - self, instrument: "_Asynchronous", **kwargs + self, instrument: "_Asynchronous" ) -> None: with self._lock: self._async_instruments.append(instrument) @@ -78,7 +75,6 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], - **kwargs ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] From c56d133fbe058d853b7f3e7680859fe7d08e0b22 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Wed, 4 May 2022 13:41:12 -0600 Subject: [PATCH 5/5] Add missing args --- .../sdk/_metrics/_internal/export/__init__.py | 18 ++++++++++-------- .../sdk/_metrics/_internal/metric_reader.py | 4 ++-- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index a8f849feb55..959006e2347 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -53,7 +53,7 @@ class MetricExporter(ABC): @abstractmethod def export( - self, metrics: Sequence[Metric], **kwargs + self, metrics: Sequence[Metric], *args, **kwargs ) -> "MetricExportResult": """Exports a batch of telemetry data. @@ -65,7 +65,7 @@ def export( """ @abstractmethod - def shutdown(self, **kwargs) -> None: + def shutdown(self, *args, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. @@ -90,14 +90,14 @@ def __init__( self.formatter = formatter def export( - self, metrics: Sequence[Metric], **kwargs + self, metrics: Sequence[Metric], *args, **kwargs ) -> MetricExportResult: for metric in metrics: self.out.write(self.formatter(metric)) self.out.flush() return MetricExportResult.SUCCESS - def shutdown(self, **kwargs) -> None: + def shutdown(self, *args, **kwargs) -> None: pass @@ -127,11 +127,11 @@ def get_metrics(self) -> List[Metric]: self._metrics = [] return metrics - def _receive_metrics(self, metrics: Iterable[Metric], **kwargs): + def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): with self._lock: self._metrics = list(metrics) - def shutdown(self, **kwargs): + def shutdown(self, *args, **kwargs): pass @@ -197,7 +197,9 @@ def _ticker(self) -> None: # one last collection below before shutting down completely self.collect() - def _receive_metrics(self, metrics: Iterable[Metric], **kwargs) -> None: + def _receive_metrics( + self, metrics: Iterable[Metric], *args, **kwargs + ) -> None: if metrics is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) @@ -207,7 +209,7 @@ def _receive_metrics(self, metrics: Iterable[Metric], **kwargs) -> None: _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) - def shutdown(self, **kwargs): + def shutdown(self, *args, **kwargs): def _shutdown(): self._shutdown = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index 772a88d3f8e..afdc7083162 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -162,11 +162,11 @@ def _set_collect_callback( self._collect = func @abstractmethod - def _receive_metrics(self, metrics: Iterable[Metric], **kwargs): + def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod - def shutdown(self, **kwargs): + def shutdown(self, *args, **kwargs): """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return