Skip to content

Commit 83e89d1

Browse files
authored
sdk: Add last_updated_timestamp for metric observers (#522)
Refactors last_updated_timestamp into aggregators instead of bound metric instrument.
1 parent 6edee7f commit 83e89d1

File tree

6 files changed

+128
-33
lines changed

6 files changed

+128
-33
lines changed

ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,9 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor:
147147
def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
148148
# TODO: horrible hack to get original list of keys to then get the bound
149149
# instrument
150-
key = dict(metric_record.labels)
151150
point = metrics_pb2.Point(
152151
timestamp=utils.proto_timestamp_from_time_ns(
153-
metric_record.metric.bind(key).last_update_timestamp
152+
metric_record.aggregator.last_update_timestamp
154153
)
155154
)
156155
if metric_record.metric.value_type == int:

ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,11 @@ def test_translate_to_collector(self):
178178
self.assertEqual(len(output_metrics[0].timeseries[0].points), 1)
179179
self.assertEqual(
180180
output_metrics[0].timeseries[0].points[0].timestamp.seconds,
181-
record.metric.bind(self._labels).last_update_timestamp
182-
// 1000000000,
181+
record.aggregator.last_update_timestamp // 1000000000,
183182
)
184183
self.assertEqual(
185184
output_metrics[0].timeseries[0].points[0].timestamp.nanos,
186-
record.metric.bind(self._labels).last_update_timestamp
187-
% 1000000000,
185+
record.aggregator.last_update_timestamp % 1000000000,
188186
)
189187
self.assertEqual(
190188
output_metrics[0].timeseries[0].points[0].int64_value, 123

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

+3-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
2222
from opentelemetry.sdk.resources import Resource
2323
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
24-
from opentelemetry.util import time_ns
2524

2625
logger = logging.getLogger(__name__)
2726

@@ -54,7 +53,6 @@ def __init__(
5453
self.value_type = value_type
5554
self.enabled = enabled
5655
self.aggregator = aggregator
57-
self.last_update_timestamp = time_ns()
5856
self._ref_count = 0
5957
self._ref_count_lock = threading.Lock()
6058

@@ -69,7 +67,6 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool:
6967
return True
7068

7169
def update(self, value: metrics_api.ValueT):
72-
self.last_update_timestamp = time_ns()
7370
self.aggregator.update(value)
7471

7572
def release(self):
@@ -88,10 +85,8 @@ def ref_count(self):
8885
return self._ref_count
8986

9087
def __repr__(self):
91-
return '{}(data="{}", last_update_timestamp={})'.format(
92-
type(self).__name__,
93-
self.aggregator.current,
94-
self.last_update_timestamp,
88+
return '{}(data="{}")'.format(
89+
type(self).__name__, self.aggregator.current
9590
)
9691

9792

@@ -331,7 +326,6 @@ def _collect_observers(self) -> None:
331326
if not observer.enabled:
332327
continue
333328

334-
# TODO: capture timestamp?
335329
if not observer.run():
336330
continue
337331

@@ -404,9 +398,7 @@ def unregister_observer(self, observer: "Observer") -> None:
404398

405399

406400
class MeterProvider(metrics_api.MeterProvider):
407-
def __init__(
408-
self, resource: Resource = Resource.create_empty(),
409-
):
401+
def __init__(self, resource: Resource = Resource.create_empty()):
410402
self.resource = resource
411403

412404
def get_meter(

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

+29-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import threading
1717
from collections import namedtuple
1818

19+
from opentelemetry.util import time_ns
20+
1921

2022
class Aggregator(abc.ABC):
2123
"""Base class for aggregators.
@@ -49,10 +51,12 @@ def __init__(self):
4951
self.current = 0
5052
self.checkpoint = 0
5153
self._lock = threading.Lock()
54+
self.last_update_timestamp = None
5255

5356
def update(self, value):
5457
with self._lock:
5558
self.current += value
59+
self.last_update_timestamp = time_ns()
5660

5761
def take_checkpoint(self):
5862
with self._lock:
@@ -62,6 +66,9 @@ def take_checkpoint(self):
6266
def merge(self, other):
6367
with self._lock:
6468
self.checkpoint += other.checkpoint
69+
self.last_update_timestamp = get_latest_timestamp(
70+
self.last_update_timestamp, other.last_update_timestamp
71+
)
6572

6673

6774
class MinMaxSumCountAggregator(Aggregator):
@@ -88,6 +95,7 @@ def __init__(self):
8895
self.current = self._EMPTY
8996
self.checkpoint = self._EMPTY
9097
self._lock = threading.Lock()
98+
self.last_update_timestamp = None
9199

92100
def update(self, value):
93101
with self._lock:
@@ -100,6 +108,7 @@ def update(self, value):
100108
self.current.sum + value,
101109
self.current.count + 1,
102110
)
111+
self.last_update_timestamp = time_ns()
103112

104113
def take_checkpoint(self):
105114
with self._lock:
@@ -111,6 +120,9 @@ def merge(self, other):
111120
self.checkpoint = self._merge_checkpoint(
112121
self.checkpoint, other.checkpoint
113122
)
123+
self.last_update_timestamp = get_latest_timestamp(
124+
self.last_update_timestamp, other.last_update_timestamp
125+
)
114126

115127

116128
class ObserverAggregator(Aggregator):
@@ -123,20 +135,32 @@ def __init__(self):
123135
self.mmsc = MinMaxSumCountAggregator()
124136
self.current = None
125137
self.checkpoint = self._TYPE(None, None, None, 0, None)
138+
self.last_update_timestamp = None
126139

127140
def update(self, value):
128141
self.mmsc.update(value)
129142
self.current = value
143+
self.last_update_timestamp = time_ns()
130144

131145
def take_checkpoint(self):
132146
self.mmsc.take_checkpoint()
133147
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,)))
134148

135149
def merge(self, other):
136150
self.mmsc.merge(other.mmsc)
137-
self.checkpoint = self._TYPE(
138-
*(
139-
self.mmsc.checkpoint
140-
+ (other.checkpoint.last or self.checkpoint.last,)
141-
)
151+
last = self.checkpoint.last
152+
self.last_update_timestamp = get_latest_timestamp(
153+
self.last_update_timestamp, other.last_update_timestamp
142154
)
155+
if self.last_update_timestamp == other.last_update_timestamp:
156+
last = other.checkpoint.last
157+
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,)))
158+
159+
160+
def get_latest_timestamp(time_stamp, other_timestamp):
161+
if time_stamp is None:
162+
return other_timestamp
163+
if other_timestamp is not None:
164+
if time_stamp < other_timestamp:
165+
return other_timestamp
166+
return time_stamp

opentelemetry-sdk/tests/metrics/export/test_export.py

+91-3
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,14 @@ def call_update(counter):
228228
update_total += val
229229
return update_total
230230

231-
def test_update(self):
231+
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
232+
def test_update(self, time_mock):
233+
time_mock.return_value = 123
232234
counter = CounterAggregator()
233235
counter.update(1.0)
234236
counter.update(2.0)
235237
self.assertEqual(counter.current, 3.0)
238+
self.assertEqual(counter.last_update_timestamp, 123)
236239

237240
def test_checkpoint(self):
238241
counter = CounterAggregator()
@@ -246,8 +249,10 @@ def test_merge(self):
246249
counter2 = CounterAggregator()
247250
counter.checkpoint = 1.0
248251
counter2.checkpoint = 3.0
252+
counter2.last_update_timestamp = 123
249253
counter.merge(counter2)
250254
self.assertEqual(counter.checkpoint, 4.0)
255+
self.assertEqual(counter.last_update_timestamp, 123)
251256

252257
def test_concurrent_update(self):
253258
counter = CounterAggregator()
@@ -296,7 +301,9 @@ def call_update(mmsc):
296301
count_ += 1
297302
return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_)
298303

299-
def test_update(self):
304+
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
305+
def test_update(self, time_mock):
306+
time_mock.return_value = 123
300307
mmsc = MinMaxSumCountAggregator()
301308
# test current values without any update
302309
self.assertEqual(mmsc.current, MinMaxSumCountAggregator._EMPTY)
@@ -309,6 +316,7 @@ def test_update(self):
309316
self.assertEqual(
310317
mmsc.current, (min(values), max(values), sum(values), len(values))
311318
)
319+
self.assertEqual(mmsc.last_update_timestamp, 123)
312320

313321
def test_checkpoint(self):
314322
mmsc = MinMaxSumCountAggregator()
@@ -340,6 +348,9 @@ def test_merge(self):
340348
mmsc1.checkpoint = checkpoint1
341349
mmsc2.checkpoint = checkpoint2
342350

351+
mmsc1.last_update_timestamp = 100
352+
mmsc2.last_update_timestamp = 123
353+
343354
mmsc1.merge(mmsc2)
344355

345356
self.assertEqual(
@@ -348,6 +359,7 @@ def test_merge(self):
348359
checkpoint1, checkpoint2
349360
),
350361
)
362+
self.assertEqual(mmsc1.last_update_timestamp, 123)
351363

352364
def test_merge_checkpoint(self):
353365
func = MinMaxSumCountAggregator._merge_checkpoint
@@ -421,7 +433,9 @@ def test_concurrent_update_and_checkpoint(self):
421433

422434

423435
class TestObserverAggregator(unittest.TestCase):
424-
def test_update(self):
436+
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
437+
def test_update(self, time_mock):
438+
time_mock.return_value = 123
425439
observer = ObserverAggregator()
426440
# test current values without any update
427441
self.assertEqual(observer.mmsc.current, (None, None, None, 0))
@@ -436,6 +450,7 @@ def test_update(self):
436450
observer.mmsc.current,
437451
(min(values), max(values), sum(values), len(values)),
438452
)
453+
self.assertEqual(observer.last_update_timestamp, 123)
439454

440455
self.assertEqual(observer.current, values[-1])
441456

@@ -471,6 +486,77 @@ def test_merge(self):
471486
observer1.mmsc.checkpoint = mmsc_checkpoint1
472487
observer2.mmsc.checkpoint = mmsc_checkpoint2
473488

489+
observer1.last_update_timestamp = 100
490+
observer2.last_update_timestamp = 123
491+
492+
observer1.checkpoint = checkpoint1
493+
observer2.checkpoint = checkpoint2
494+
495+
observer1.merge(observer2)
496+
497+
self.assertEqual(
498+
observer1.checkpoint,
499+
(
500+
min(checkpoint1.min, checkpoint2.min),
501+
max(checkpoint1.max, checkpoint2.max),
502+
checkpoint1.sum + checkpoint2.sum,
503+
checkpoint1.count + checkpoint2.count,
504+
checkpoint2.last,
505+
),
506+
)
507+
self.assertEqual(observer1.last_update_timestamp, 123)
508+
509+
def test_merge_last_updated(self):
510+
observer1 = ObserverAggregator()
511+
observer2 = ObserverAggregator()
512+
513+
mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
514+
mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)
515+
516+
checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,)))
517+
518+
checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,)))
519+
520+
observer1.mmsc.checkpoint = mmsc_checkpoint1
521+
observer2.mmsc.checkpoint = mmsc_checkpoint2
522+
523+
observer1.last_update_timestamp = 123
524+
observer2.last_update_timestamp = 100
525+
526+
observer1.checkpoint = checkpoint1
527+
observer2.checkpoint = checkpoint2
528+
529+
observer1.merge(observer2)
530+
531+
self.assertEqual(
532+
observer1.checkpoint,
533+
(
534+
min(checkpoint1.min, checkpoint2.min),
535+
max(checkpoint1.max, checkpoint2.max),
536+
checkpoint1.sum + checkpoint2.sum,
537+
checkpoint1.count + checkpoint2.count,
538+
checkpoint1.last,
539+
),
540+
)
541+
self.assertEqual(observer1.last_update_timestamp, 123)
542+
543+
def test_merge_last_updated_none(self):
544+
observer1 = ObserverAggregator()
545+
observer2 = ObserverAggregator()
546+
547+
mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
548+
mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)
549+
550+
checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,)))
551+
552+
checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,)))
553+
554+
observer1.mmsc.checkpoint = mmsc_checkpoint1
555+
observer2.mmsc.checkpoint = mmsc_checkpoint2
556+
557+
observer1.last_update_timestamp = None
558+
observer2.last_update_timestamp = 100
559+
474560
observer1.checkpoint = checkpoint1
475561
observer2.checkpoint = checkpoint2
476562

@@ -486,6 +572,7 @@ def test_merge(self):
486572
checkpoint2.last,
487573
),
488574
)
575+
self.assertEqual(observer1.last_update_timestamp, 100)
489576

490577
def test_merge_with_empty(self):
491578
observer1 = ObserverAggregator()
@@ -496,6 +583,7 @@ def test_merge_with_empty(self):
496583

497584
observer1.mmsc.checkpoint = mmsc_checkpoint1
498585
observer1.checkpoint = checkpoint1
586+
observer1.last_update_timestamp = 100
499587

500588
observer1.merge(observer2)
501589

opentelemetry-sdk/tests/metrics/test_metrics.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -368,13 +368,10 @@ def test_add_incorrect_type(self, logger_mock):
368368
self.assertEqual(bound_counter.aggregator.current, 0)
369369
self.assertTrue(logger_mock.warning.called)
370370

371-
@mock.patch("opentelemetry.sdk.metrics.time_ns")
372-
def test_update(self, time_mock):
371+
def test_update(self):
373372
aggregator = export.aggregate.CounterAggregator()
374373
bound_counter = metrics.BoundCounter(int, True, aggregator)
375-
time_mock.return_value = 123
376374
bound_counter.update(4.0)
377-
self.assertEqual(bound_counter.last_update_timestamp, 123)
378375
self.assertEqual(bound_counter.aggregator.current, 4.0)
379376

380377

@@ -403,11 +400,8 @@ def test_record_incorrect_type(self, logger_mock):
403400
)
404401
self.assertTrue(logger_mock.warning.called)
405402

406-
@mock.patch("opentelemetry.sdk.metrics.time_ns")
407-
def test_update(self, time_mock):
403+
def test_update(self):
408404
aggregator = export.aggregate.MinMaxSumCountAggregator()
409405
bound_measure = metrics.BoundMeasure(int, True, aggregator)
410-
time_mock.return_value = 123
411406
bound_measure.update(4.0)
412-
self.assertEqual(bound_measure.last_update_timestamp, 123)
413407
self.assertEqual(bound_measure.aggregator.current, (4.0, 4.0, 4.0, 1))

0 commit comments

Comments
 (0)