diff --git a/docs/examples/basic_meter/observer.py b/docs/examples/basic_meter/observer.py index b61b9e4db80..d3aa02168d4 100644 --- a/docs/examples/basic_meter/observer.py +++ b/docs/examples/basic_meter/observer.py @@ -60,6 +60,7 @@ def get_ram_usage_callback(observer): description="RAM memory usage", unit="1", value_type=float, + observer_type=ValueObserver, label_keys=(), ) diff --git a/opentelemetry-api/CHANGELOG.md b/opentelemetry-api/CHANGELOG.md index b1610ab2c3d..670e3d7a7f5 100644 --- a/opentelemetry-api/CHANGELOG.md +++ b/opentelemetry-api/CHANGELOG.md @@ -10,6 +10,8 @@ ([#552](https://github.com/open-telemetry/opentelemetry-python/pull/552)) - Rename Observer to ValueObserver ([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764)) +- Add SumObserver and UpDownSumObserver in metrics + ([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789)) ## 0.8b0 diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index da47356e058..569930d6f3b 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -189,6 +189,30 @@ def observe(self, value: ValueT, labels: Dict[str, str]) -> None: """ +class SumObserver(Observer): + """No-op implementation of ``SumObserver``.""" + + def observe(self, value: ValueT, labels: Dict[str, str]) -> None: + """Captures ``value`` to the sumobserver. + + Args: + value: The value to capture to this sumobserver metric. + labels: Labels associated to ``value``. + """ + + +class UpDownSumObserver(Observer): + """No-op implementation of ``UpDownSumObserver``.""" + + def observe(self, value: ValueT, labels: Dict[str, str]) -> None: + """Captures ``value`` to the updownsumobserver. + + Args: + value: The value to capture to this updownsumobserver metric. + labels: Labels associated to ``value``. + """ + + class ValueObserver(Observer): """No-op implementation of ``ValueObserver``.""" diff --git a/opentelemetry-api/tests/metrics/test_metrics.py b/opentelemetry-api/tests/metrics/test_metrics.py index 897c7492e42..b3cbdefd15a 100644 --- a/opentelemetry-api/tests/metrics/test_metrics.py +++ b/opentelemetry-api/tests/metrics/test_metrics.py @@ -56,6 +56,18 @@ def test_bound_valuerecorder(self): bound_valuerecorder = metrics.BoundValueRecorder() bound_valuerecorder.record(1) - def test_observer(self): + def test_default_observer(self): observer = metrics.DefaultObserver() observer.observe(1, {}) + + def test_sum_observer(self): + observer = metrics.SumObserver() + observer.observe(1, {}) + + def test_updown_sum_observer(self): + observer = metrics.UpDownSumObserver() + observer.observe(1, {}) + + def test_value_observer(self): + observer = metrics.ValueObserver() + observer.observe(1, {}) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 713580dbd6b..23007dde5a8 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -12,6 +12,8 @@ ([#775](https://github.com/open-telemetry/opentelemetry-python/pull/775)) - Rename Observer to ValueObserver ([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764)) +- Add SumObserver, UpDownSumObserver and LastValueAggregator in metrics + ([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789)) ## 0.8b0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 507e00d8ead..7156f68c165 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -105,12 +105,16 @@ def record(self, value: metrics_api.ValueT) -> None: class Metric(metrics_api.Metric): - """Base class for all metric types. + """Base class for all synchronous metric types. - Also known as metric instrument. This is the class that is used to - represent a metric that is to be continuously recorded and tracked. Each - metric has a set of bound metrics that are created from the metric. See - `BaseBoundInstrument` for information on bound metric instruments. + This is the class that is used to represent a metric that is to be + synchronously recorded and tracked. Synchronous instruments are called + inside a request, meaning they have an associated distributed context + (i.e. Span context, correlation context). Multiple metric events may occur + for a synchronous instrument within a give collection interval. + + Each metric has a set of bound metrics that are created from the metric. + See `BaseBoundInstrument` for information on bound metric instruments. """ BOUND_INSTR_TYPE = BaseBoundInstrument @@ -190,8 +194,14 @@ def record( UPDATE_FUNCTION = record -class ValueObserver(metrics_api.ValueObserver): - """See `opentelemetry.metrics.ValueObserver`.""" +class Observer(metrics_api.Observer): + """Base class for all asynchronous metric types. + + Also known as Observers, observer metric instruments are asynchronous in + that they are reported by a callback, once per collection interval, and + lack context. They are permitted to report only one value per distinct + label set per period. + """ def __init__( self, @@ -218,15 +228,10 @@ def __init__( def observe( self, value: metrics_api.ValueT, labels: Dict[str, str] ) -> None: - if not self.enabled: - return - if not isinstance(value, self.value_type): - logger.warning( - "Invalid value passed for %s.", self.value_type.__name__ - ) + key = get_labels_as_key(labels) + if not self._validate_observe(value, key): return - key = get_labels_as_key(labels) if key not in self.aggregators: # TODO: how to cleanup aggregators? self.aggregators[key] = self.meter.batcher.aggregator_for( @@ -235,6 +240,20 @@ def observe( aggregator = self.aggregators[key] aggregator.update(value) + # pylint: disable=W0613 + def _validate_observe( + self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]], + ) -> bool: + if not self.enabled: + return False + if not isinstance(value, self.value_type): + logger.warning( + "Invalid value passed for %s.", self.value_type.__name__ + ) + return False + + return True + def run(self) -> bool: try: self.callback(self) @@ -252,6 +271,33 @@ def __repr__(self): ) +class SumObserver(Observer, metrics_api.SumObserver): + """See `opentelemetry.metrics.SumObserver`.""" + + def _validate_observe( + self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]], + ) -> bool: + if not super()._validate_observe(value, key): + return False + # Must be non-decreasing because monotonic + if ( + key in self.aggregators + and self.aggregators[key].current is not None + ): + if value < self.aggregators[key].current: + logger.warning("Value passed must be non-decreasing.") + return False + return True + + +class UpDownSumObserver(Observer, metrics_api.UpDownSumObserver): + """See `opentelemetry.metrics.UpDownSumObserver`.""" + + +class ValueObserver(Observer, metrics_api.ValueObserver): + """See `opentelemetry.metrics.ValueObserver`.""" + + class Record: """Container class used for processing in the `Batcher`""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 1745d854e9d..ad728d8c502 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -125,6 +125,34 @@ def merge(self, other): ) +class LastValueAggregator(Aggregator): + """Aggregator that stores last value results.""" + + def __init__(self): + super().__init__() + self._lock = threading.Lock() + self.last_update_timestamp = None + + def update(self, value): + with self._lock: + self.current = value + self.last_update_timestamp = time_ns() + + def take_checkpoint(self): + with self._lock: + self.checkpoint = self.current + self.current = None + + def merge(self, other): + last = self.checkpoint.last + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp + ) + if self.last_update_timestamp == other.last_update_timestamp: + last = other.checkpoint.last + self.checkpoint = last + + class ValueObserverAggregator(Aggregator): """Same as MinMaxSumCount but also with last value.""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py index db3675ecd61..c0405d1ffb8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py @@ -18,6 +18,8 @@ from opentelemetry.metrics import ( Counter, InstrumentT, + SumObserver, + UpDownSumObserver, ValueObserver, ValueRecorder, ) @@ -25,6 +27,7 @@ from opentelemetry.sdk.metrics.export.aggregate import ( Aggregator, CounterAggregator, + LastValueAggregator, MinMaxSumCountAggregator, ValueObserverAggregator, ) @@ -54,6 +57,8 @@ def aggregator_for(self, instrument_type: Type[InstrumentT]) -> Aggregator: # pylint:disable=R0201 if issubclass(instrument_type, Counter): return CounterAggregator() + if issubclass(instrument_type, (SumObserver, UpDownSumObserver)): + return LastValueAggregator() if issubclass(instrument_type, ValueRecorder): return MinMaxSumCountAggregator() if issubclass(instrument_type, ValueObserver): diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 4c2d691549d..02cf2c93548 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -290,6 +290,138 @@ def test_record(self): ) +class TestSumObserver(unittest.TestCase): + def test_observe(self): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.SumObserver( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + labels = {"key": "value"} + key_labels = tuple(sorted(labels.items())) + values = (37, 42, 60, 100) + for val in values: + observer.observe(val, labels) + + self.assertEqual(observer.aggregators[key_labels].current, values[-1]) + + def test_observe_disabled(self): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.SumObserver( + None, "name", "desc", "unit", int, meter, ("key",), False + ) + labels = {"key": "value"} + observer.observe(37, labels) + self.assertEqual(len(observer.aggregators), 0) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_observe_incorrect_type(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.SumObserver( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + labels = {"key": "value"} + observer.observe(37.0, labels) + self.assertEqual(len(observer.aggregators), 0) + self.assertTrue(logger_mock.warning.called) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_observe_non_decreasing_error(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.SumObserver( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + labels = {"key": "value"} + observer.observe(37, labels) + observer.observe(14, labels) + self.assertEqual(len(observer.aggregators), 1) + self.assertTrue(logger_mock.warning.called) + + def test_run(self): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + observer = metrics.SumObserver( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertTrue(observer.run()) + callback.assert_called_once_with(observer) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_run_exception(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + callback.side_effect = Exception("We have a problem!") + + observer = metrics.SumObserver( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertFalse(observer.run()) + self.assertTrue(logger_mock.warning.called) + + +class TestUpDownSumObserver(unittest.TestCase): + def test_observe(self): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.UpDownSumObserver( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + labels = {"key": "value"} + key_labels = tuple(sorted(labels.items())) + values = (37, 42, 14, 30) + for val in values: + observer.observe(val, labels) + + self.assertEqual(observer.aggregators[key_labels].current, values[-1]) + + def test_observe_disabled(self): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.UpDownSumObserver( + None, "name", "desc", "unit", int, meter, ("key",), False + ) + labels = {"key": "value"} + observer.observe(37, labels) + self.assertEqual(len(observer.aggregators), 0) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_observe_incorrect_type(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.UpDownSumObserver( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + labels = {"key": "value"} + observer.observe(37.0, labels) + self.assertEqual(len(observer.aggregators), 0) + self.assertTrue(logger_mock.warning.called) + + def test_run(self): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + observer = metrics.UpDownSumObserver( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertTrue(observer.run()) + callback.assert_called_once_with(observer) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_run_exception(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + callback.side_effect = Exception("We have a problem!") + + observer = metrics.UpDownSumObserver( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertFalse(observer.run()) + self.assertTrue(logger_mock.warning.called) + + class TestValueObserver(unittest.TestCase): def test_observe(self): meter = metrics.MeterProvider().get_meter(__name__)