Skip to content

Commit 1e6e5d8

Browse files
lzchentoumorokoshi
authored andcommitted
Add SumObserver and UpDownSumObserver instruments (open-telemetry#789)
1 parent 6f7e875 commit 1e6e5d8

File tree

9 files changed

+267
-15
lines changed

9 files changed

+267
-15
lines changed

docs/examples/basic_meter/observer.py

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def get_ram_usage_callback(observer):
6060
description="RAM memory usage",
6161
unit="1",
6262
value_type=float,
63+
observer_type=ValueObserver,
6364
label_keys=(),
6465
)
6566

opentelemetry-api/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
([#552](https://github.com/open-telemetry/opentelemetry-python/pull/552))
1111
- Rename Observer to ValueObserver
1212
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
13+
- Add SumObserver and UpDownSumObserver in metrics
14+
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))
1315

1416
## 0.8b0
1517

opentelemetry-api/src/opentelemetry/metrics/__init__.py

+24
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,30 @@ def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
189189
"""
190190

191191

192+
class SumObserver(Observer):
193+
"""No-op implementation of ``SumObserver``."""
194+
195+
def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
196+
"""Captures ``value`` to the sumobserver.
197+
198+
Args:
199+
value: The value to capture to this sumobserver metric.
200+
labels: Labels associated to ``value``.
201+
"""
202+
203+
204+
class UpDownSumObserver(Observer):
205+
"""No-op implementation of ``UpDownSumObserver``."""
206+
207+
def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
208+
"""Captures ``value`` to the updownsumobserver.
209+
210+
Args:
211+
value: The value to capture to this updownsumobserver metric.
212+
labels: Labels associated to ``value``.
213+
"""
214+
215+
192216
class ValueObserver(Observer):
193217
"""No-op implementation of ``ValueObserver``."""
194218

opentelemetry-api/tests/metrics/test_metrics.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ def test_bound_valuerecorder(self):
5656
bound_valuerecorder = metrics.BoundValueRecorder()
5757
bound_valuerecorder.record(1)
5858

59-
def test_observer(self):
59+
def test_default_observer(self):
6060
observer = metrics.DefaultObserver()
6161
observer.observe(1, {})
62+
63+
def test_sum_observer(self):
64+
observer = metrics.SumObserver()
65+
observer.observe(1, {})
66+
67+
def test_updown_sum_observer(self):
68+
observer = metrics.UpDownSumObserver()
69+
observer.observe(1, {})
70+
71+
def test_value_observer(self):
72+
observer = metrics.ValueObserver()
73+
observer.observe(1, {})

opentelemetry-sdk/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
([#775](https://github.com/open-telemetry/opentelemetry-python/pull/775))
1313
- Rename Observer to ValueObserver
1414
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
15+
- Add SumObserver, UpDownSumObserver and LastValueAggregator in metrics
16+
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))
1517

1618
## 0.8b0
1719

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

+60-14
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,16 @@ def record(self, value: metrics_api.ValueT) -> None:
105105

106106

107107
class Metric(metrics_api.Metric):
108-
"""Base class for all metric types.
108+
"""Base class for all synchronous metric types.
109109
110-
Also known as metric instrument. This is the class that is used to
111-
represent a metric that is to be continuously recorded and tracked. Each
112-
metric has a set of bound metrics that are created from the metric. See
113-
`BaseBoundInstrument` for information on bound metric instruments.
110+
This is the class that is used to represent a metric that is to be
111+
synchronously recorded and tracked. Synchronous instruments are called
112+
inside a request, meaning they have an associated distributed context
113+
(i.e. Span context, correlation context). Multiple metric events may occur
114+
for a synchronous instrument within a give collection interval.
115+
116+
Each metric has a set of bound metrics that are created from the metric.
117+
See `BaseBoundInstrument` for information on bound metric instruments.
114118
"""
115119

116120
BOUND_INSTR_TYPE = BaseBoundInstrument
@@ -190,8 +194,14 @@ def record(
190194
UPDATE_FUNCTION = record
191195

192196

193-
class ValueObserver(metrics_api.ValueObserver):
194-
"""See `opentelemetry.metrics.ValueObserver`."""
197+
class Observer(metrics_api.Observer):
198+
"""Base class for all asynchronous metric types.
199+
200+
Also known as Observers, observer metric instruments are asynchronous in
201+
that they are reported by a callback, once per collection interval, and
202+
lack context. They are permitted to report only one value per distinct
203+
label set per period.
204+
"""
195205

196206
def __init__(
197207
self,
@@ -218,15 +228,10 @@ def __init__(
218228
def observe(
219229
self, value: metrics_api.ValueT, labels: Dict[str, str]
220230
) -> None:
221-
if not self.enabled:
222-
return
223-
if not isinstance(value, self.value_type):
224-
logger.warning(
225-
"Invalid value passed for %s.", self.value_type.__name__
226-
)
231+
key = get_labels_as_key(labels)
232+
if not self._validate_observe(value, key):
227233
return
228234

229-
key = get_labels_as_key(labels)
230235
if key not in self.aggregators:
231236
# TODO: how to cleanup aggregators?
232237
self.aggregators[key] = self.meter.batcher.aggregator_for(
@@ -235,6 +240,20 @@ def observe(
235240
aggregator = self.aggregators[key]
236241
aggregator.update(value)
237242

243+
# pylint: disable=W0613
244+
def _validate_observe(
245+
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]],
246+
) -> bool:
247+
if not self.enabled:
248+
return False
249+
if not isinstance(value, self.value_type):
250+
logger.warning(
251+
"Invalid value passed for %s.", self.value_type.__name__
252+
)
253+
return False
254+
255+
return True
256+
238257
def run(self) -> bool:
239258
try:
240259
self.callback(self)
@@ -252,6 +271,33 @@ def __repr__(self):
252271
)
253272

254273

274+
class SumObserver(Observer, metrics_api.SumObserver):
275+
"""See `opentelemetry.metrics.SumObserver`."""
276+
277+
def _validate_observe(
278+
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]],
279+
) -> bool:
280+
if not super()._validate_observe(value, key):
281+
return False
282+
# Must be non-decreasing because monotonic
283+
if (
284+
key in self.aggregators
285+
and self.aggregators[key].current is not None
286+
):
287+
if value < self.aggregators[key].current:
288+
logger.warning("Value passed must be non-decreasing.")
289+
return False
290+
return True
291+
292+
293+
class UpDownSumObserver(Observer, metrics_api.UpDownSumObserver):
294+
"""See `opentelemetry.metrics.UpDownSumObserver`."""
295+
296+
297+
class ValueObserver(Observer, metrics_api.ValueObserver):
298+
"""See `opentelemetry.metrics.ValueObserver`."""
299+
300+
255301
class Record:
256302
"""Container class used for processing in the `Batcher`"""
257303

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

+28
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,34 @@ def merge(self, other):
125125
)
126126

127127

128+
class LastValueAggregator(Aggregator):
129+
"""Aggregator that stores last value results."""
130+
131+
def __init__(self):
132+
super().__init__()
133+
self._lock = threading.Lock()
134+
self.last_update_timestamp = None
135+
136+
def update(self, value):
137+
with self._lock:
138+
self.current = value
139+
self.last_update_timestamp = time_ns()
140+
141+
def take_checkpoint(self):
142+
with self._lock:
143+
self.checkpoint = self.current
144+
self.current = None
145+
146+
def merge(self, other):
147+
last = self.checkpoint.last
148+
self.last_update_timestamp = get_latest_timestamp(
149+
self.last_update_timestamp, other.last_update_timestamp
150+
)
151+
if self.last_update_timestamp == other.last_update_timestamp:
152+
last = other.checkpoint.last
153+
self.checkpoint = last
154+
155+
128156
class ValueObserverAggregator(Aggregator):
129157
"""Same as MinMaxSumCount but also with last value."""
130158

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py

+5
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
from opentelemetry.metrics import (
1919
Counter,
2020
InstrumentT,
21+
SumObserver,
22+
UpDownSumObserver,
2123
ValueObserver,
2224
ValueRecorder,
2325
)
2426
from opentelemetry.sdk.metrics.export import MetricRecord
2527
from opentelemetry.sdk.metrics.export.aggregate import (
2628
Aggregator,
2729
CounterAggregator,
30+
LastValueAggregator,
2831
MinMaxSumCountAggregator,
2932
ValueObserverAggregator,
3033
)
@@ -54,6 +57,8 @@ def aggregator_for(self, instrument_type: Type[InstrumentT]) -> Aggregator:
5457
# pylint:disable=R0201
5558
if issubclass(instrument_type, Counter):
5659
return CounterAggregator()
60+
if issubclass(instrument_type, (SumObserver, UpDownSumObserver)):
61+
return LastValueAggregator()
5762
if issubclass(instrument_type, ValueRecorder):
5863
return MinMaxSumCountAggregator()
5964
if issubclass(instrument_type, ValueObserver):

opentelemetry-sdk/tests/metrics/test_metrics.py

+132
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,138 @@ def test_record(self):
290290
)
291291

292292

293+
class TestSumObserver(unittest.TestCase):
294+
def test_observe(self):
295+
meter = metrics.MeterProvider().get_meter(__name__)
296+
observer = metrics.SumObserver(
297+
None, "name", "desc", "unit", int, meter, ("key",), True
298+
)
299+
labels = {"key": "value"}
300+
key_labels = tuple(sorted(labels.items()))
301+
values = (37, 42, 60, 100)
302+
for val in values:
303+
observer.observe(val, labels)
304+
305+
self.assertEqual(observer.aggregators[key_labels].current, values[-1])
306+
307+
def test_observe_disabled(self):
308+
meter = metrics.MeterProvider().get_meter(__name__)
309+
observer = metrics.SumObserver(
310+
None, "name", "desc", "unit", int, meter, ("key",), False
311+
)
312+
labels = {"key": "value"}
313+
observer.observe(37, labels)
314+
self.assertEqual(len(observer.aggregators), 0)
315+
316+
@mock.patch("opentelemetry.sdk.metrics.logger")
317+
def test_observe_incorrect_type(self, logger_mock):
318+
meter = metrics.MeterProvider().get_meter(__name__)
319+
observer = metrics.SumObserver(
320+
None, "name", "desc", "unit", int, meter, ("key",), True
321+
)
322+
labels = {"key": "value"}
323+
observer.observe(37.0, labels)
324+
self.assertEqual(len(observer.aggregators), 0)
325+
self.assertTrue(logger_mock.warning.called)
326+
327+
@mock.patch("opentelemetry.sdk.metrics.logger")
328+
def test_observe_non_decreasing_error(self, logger_mock):
329+
meter = metrics.MeterProvider().get_meter(__name__)
330+
observer = metrics.SumObserver(
331+
None, "name", "desc", "unit", int, meter, ("key",), True
332+
)
333+
labels = {"key": "value"}
334+
observer.observe(37, labels)
335+
observer.observe(14, labels)
336+
self.assertEqual(len(observer.aggregators), 1)
337+
self.assertTrue(logger_mock.warning.called)
338+
339+
def test_run(self):
340+
meter = metrics.MeterProvider().get_meter(__name__)
341+
342+
callback = mock.Mock()
343+
observer = metrics.SumObserver(
344+
callback, "name", "desc", "unit", int, meter, (), True
345+
)
346+
347+
self.assertTrue(observer.run())
348+
callback.assert_called_once_with(observer)
349+
350+
@mock.patch("opentelemetry.sdk.metrics.logger")
351+
def test_run_exception(self, logger_mock):
352+
meter = metrics.MeterProvider().get_meter(__name__)
353+
354+
callback = mock.Mock()
355+
callback.side_effect = Exception("We have a problem!")
356+
357+
observer = metrics.SumObserver(
358+
callback, "name", "desc", "unit", int, meter, (), True
359+
)
360+
361+
self.assertFalse(observer.run())
362+
self.assertTrue(logger_mock.warning.called)
363+
364+
365+
class TestUpDownSumObserver(unittest.TestCase):
366+
def test_observe(self):
367+
meter = metrics.MeterProvider().get_meter(__name__)
368+
observer = metrics.UpDownSumObserver(
369+
None, "name", "desc", "unit", int, meter, ("key",), True
370+
)
371+
labels = {"key": "value"}
372+
key_labels = tuple(sorted(labels.items()))
373+
values = (37, 42, 14, 30)
374+
for val in values:
375+
observer.observe(val, labels)
376+
377+
self.assertEqual(observer.aggregators[key_labels].current, values[-1])
378+
379+
def test_observe_disabled(self):
380+
meter = metrics.MeterProvider().get_meter(__name__)
381+
observer = metrics.UpDownSumObserver(
382+
None, "name", "desc", "unit", int, meter, ("key",), False
383+
)
384+
labels = {"key": "value"}
385+
observer.observe(37, labels)
386+
self.assertEqual(len(observer.aggregators), 0)
387+
388+
@mock.patch("opentelemetry.sdk.metrics.logger")
389+
def test_observe_incorrect_type(self, logger_mock):
390+
meter = metrics.MeterProvider().get_meter(__name__)
391+
observer = metrics.UpDownSumObserver(
392+
None, "name", "desc", "unit", int, meter, ("key",), True
393+
)
394+
labels = {"key": "value"}
395+
observer.observe(37.0, labels)
396+
self.assertEqual(len(observer.aggregators), 0)
397+
self.assertTrue(logger_mock.warning.called)
398+
399+
def test_run(self):
400+
meter = metrics.MeterProvider().get_meter(__name__)
401+
402+
callback = mock.Mock()
403+
observer = metrics.UpDownSumObserver(
404+
callback, "name", "desc", "unit", int, meter, (), True
405+
)
406+
407+
self.assertTrue(observer.run())
408+
callback.assert_called_once_with(observer)
409+
410+
@mock.patch("opentelemetry.sdk.metrics.logger")
411+
def test_run_exception(self, logger_mock):
412+
meter = metrics.MeterProvider().get_meter(__name__)
413+
414+
callback = mock.Mock()
415+
callback.side_effect = Exception("We have a problem!")
416+
417+
observer = metrics.UpDownSumObserver(
418+
callback, "name", "desc", "unit", int, meter, (), True
419+
)
420+
421+
self.assertFalse(observer.run())
422+
self.assertTrue(logger_mock.warning.called)
423+
424+
293425
class TestValueObserver(unittest.TestCase):
294426
def test_observe(self):
295427
meter = metrics.MeterProvider().get_meter(__name__)

0 commit comments

Comments
 (0)