Skip to content

Commit 5380f3c

Browse files
committed
Fix ExplicitBucketHistogramAggregation
Fixes open-telemetry#3407
1 parent fc5aca6 commit 5380f3c

File tree

2 files changed

+169
-91
lines changed

2 files changed

+169
-91
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py

+147-82
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
377377
def __init__(
378378
self,
379379
attributes: Attributes,
380+
instrument_aggregation_temporality: AggregationTemporality,
380381
start_time_unix_nano: int,
381382
boundaries: Sequence[float] = (
382383
0.0,
@@ -398,33 +399,40 @@ def __init__(
398399
record_min_max: bool = True,
399400
):
400401
super().__init__(attributes)
402+
401403
self._boundaries = tuple(boundaries)
402-
self._bucket_counts = self._get_empty_bucket_counts()
404+
self._record_min_max = record_min_max
403405
self._min = inf
404406
self._max = -inf
405407
self._sum = 0
406-
self._record_min_max = record_min_max
408+
407409
self._start_time_unix_nano = start_time_unix_nano
408-
# It is assumed that the "natural" aggregation temporality for a
409-
# Histogram instrument is DELTA, like the "natural" aggregation
410-
# temporality for a Counter is DELTA and the "natural" aggregation
411-
# temporality for an ObservableCounter is CUMULATIVE.
412-
self._instrument_aggregation_temporality = AggregationTemporality.DELTA
410+
self._instrument_aggregation_temporality = (
411+
instrument_aggregation_temporality
412+
)
413+
414+
self._current_value = None
415+
416+
self._previous_collection_start_nano = self._start_time_unix_nano
417+
self._previous_cumulative_value = self._get_empty_bucket_counts()
413418

414419
def _get_empty_bucket_counts(self) -> List[int]:
415420
return [0] * (len(self._boundaries) + 1)
416421

417422
def aggregate(self, measurement: Measurement) -> None:
423+
with self._lock:
424+
if self._current_value is None:
425+
self._current_value = self._get_empty_bucket_counts()
418426

419-
value = measurement.value
427+
value = measurement.value
420428

421-
if self._record_min_max:
422-
self._min = min(self._min, value)
423-
self._max = max(self._max, value)
429+
self._sum += value
424430

425-
self._sum += value
431+
if self._record_min_max:
432+
self._min = min(self._min, value)
433+
self._max = max(self._max, value)
426434

427-
self._bucket_counts[bisect_left(self._boundaries, value)] += 1
435+
self._current_value[bisect_left(self._boundaries, value)] += 1
428436

429437
def collect(
430438
self,
@@ -434,84 +442,125 @@ def collect(
434442
"""
435443
Atomically return a point for the current value of the metric.
436444
"""
437-
with self._lock:
438-
if not any(self._bucket_counts):
439-
return None
440445

441-
bucket_counts = self._bucket_counts
442-
start_time_unix_nano = self._start_time_unix_nano
446+
with self._lock:
447+
current_value = self._current_value
443448
sum_ = self._sum
444-
max_ = self._max
445449
min_ = self._min
450+
max_ = self._max
446451

447-
self._bucket_counts = self._get_empty_bucket_counts()
448-
self._start_time_unix_nano = collection_start_nano
452+
self._current_value = None
449453
self._sum = 0
450454
self._min = inf
451455
self._max = -inf
452456

453-
current_point = HistogramDataPoint(
454-
attributes=self._attributes,
455-
start_time_unix_nano=start_time_unix_nano,
456-
time_unix_nano=collection_start_nano,
457-
count=sum(bucket_counts),
458-
sum=sum_,
459-
bucket_counts=tuple(bucket_counts),
460-
explicit_bounds=self._boundaries,
461-
min=min_,
462-
max=max_,
463-
)
457+
if (
458+
self._instrument_aggregation_temporality
459+
is AggregationTemporality.DELTA
460+
):
461+
# This happens when the corresponding instrument for this
462+
# aggregation is synchronous.
463+
if (
464+
collection_aggregation_temporality
465+
is AggregationTemporality.DELTA
466+
):
464467

465-
if self._previous_point is None or (
466-
self._instrument_aggregation_temporality
467-
is collection_aggregation_temporality
468-
):
469-
self._previous_point = current_point
470-
return current_point
468+
if current_value is None:
469+
return None
471470

472-
max_ = current_point.max
473-
min_ = current_point.min
471+
previous_collection_start_nano = (
472+
self._previous_collection_start_nano
473+
)
474+
self._previous_collection_start_nano = (
475+
collection_start_nano
476+
)
474477

475-
if (
476-
collection_aggregation_temporality
477-
is AggregationTemporality.CUMULATIVE
478-
):
479-
start_time_unix_nano = self._previous_point.start_time_unix_nano
480-
sum_ = current_point.sum + self._previous_point.sum
481-
# Only update min/max on delta -> cumulative
482-
max_ = max(current_point.max, self._previous_point.max)
483-
min_ = min(current_point.min, self._previous_point.min)
484-
bucket_counts = [
485-
curr_count + prev_count
486-
for curr_count, prev_count in zip(
487-
current_point.bucket_counts,
488-
self._previous_point.bucket_counts,
478+
return HistogramDataPoint(
479+
attributes=self._attributes,
480+
start_time_unix_nano=previous_collection_start_nano,
481+
time_unix_nano=collection_start_nano,
482+
count=sum(current_value),
483+
sum=sum_,
484+
bucket_counts=tuple(current_value),
485+
explicit_bounds=self._boundaries,
486+
min=min_,
487+
max=max_,
488+
)
489+
490+
if current_value is None:
491+
current_value = self._get_empty_bucket_counts()
492+
493+
self._previous_cumulative_value = [
494+
current_value_element + previous_cumulative_value_element
495+
for (
496+
current_value_element,
497+
previous_cumulative_value_element,
498+
) in zip(current_value, self._previous_cumulative_value)
499+
]
500+
501+
return HistogramDataPoint(
502+
attributes=self._attributes,
503+
start_time_unix_nano=self._start_time_unix_nano,
504+
time_unix_nano=collection_start_nano,
505+
count=sum(current_value),
506+
sum=sum_,
507+
bucket_counts=tuple(current_value),
508+
explicit_bounds=self._boundaries,
509+
min=min_,
510+
max=max_,
489511
)
490-
]
491-
else:
492-
start_time_unix_nano = self._previous_point.time_unix_nano
493-
sum_ = current_point.sum - self._previous_point.sum
494-
bucket_counts = [
495-
curr_count - prev_count
496-
for curr_count, prev_count in zip(
497-
current_point.bucket_counts,
498-
self._previous_point.bucket_counts,
512+
513+
# This happens when the corresponding instrument for this
514+
# aggregation is asynchronous.
515+
516+
if current_value is None:
517+
# This happens when the corresponding instrument callback
518+
# does not produce measurements.
519+
return None
520+
521+
if (
522+
collection_aggregation_temporality
523+
is AggregationTemporality.DELTA
524+
):
525+
526+
result_value = [
527+
current_value_element - previous_cumulative_value_element
528+
for (
529+
current_value_element,
530+
previous_cumulative_value_element,
531+
) in zip(current_value, self._previous_cumulative_value)
532+
]
533+
534+
self._previous_cumulative_value = current_value
535+
536+
previous_collection_start_nano = (
537+
self._previous_collection_start_nano
499538
)
500-
]
539+
self._previous_collection_start_nano = collection_start_nano
501540

502-
current_point = HistogramDataPoint(
503-
attributes=self._attributes,
504-
start_time_unix_nano=start_time_unix_nano,
505-
time_unix_nano=current_point.time_unix_nano,
506-
count=sum(bucket_counts),
507-
sum=sum_,
508-
bucket_counts=tuple(bucket_counts),
509-
explicit_bounds=current_point.explicit_bounds,
510-
min=min_,
511-
max=max_,
512-
)
513-
self._previous_point = current_point
514-
return current_point
541+
return HistogramDataPoint(
542+
attributes=self._attributes,
543+
start_time_unix_nano=previous_collection_start_nano,
544+
time_unix_nano=collection_start_nano,
545+
count=sum(result_value),
546+
sum=sum_,
547+
bucket_counts=tuple(result_value),
548+
explicit_bounds=self._boundaries,
549+
min=min_,
550+
max=max_,
551+
)
552+
553+
return HistogramDataPoint(
554+
attributes=self._attributes,
555+
start_time_unix_nano=self._start_time_unix_nano,
556+
time_unix_nano=collection_start_nano,
557+
count=sum(current_value),
558+
sum=sum_,
559+
bucket_counts=tuple(current_value),
560+
explicit_bounds=self._boundaries,
561+
min=min_,
562+
max=max_,
563+
)
515564

516565

517566
# pylint: disable=protected-access
@@ -1100,7 +1149,11 @@ def _create_aggregation(
11001149

11011150
if isinstance(instrument, Histogram):
11021151
return _ExplicitBucketHistogramAggregation(
1103-
attributes, start_time_unix_nano
1152+
attributes,
1153+
instrument_aggregation_temporality=(
1154+
AggregationTemporality.CUMULATIVE
1155+
),
1156+
start_time_unix_nano=start_time_unix_nano,
11041157
)
11051158

11061159
if isinstance(instrument, ObservableGauge):
@@ -1179,8 +1232,18 @@ def _create_aggregation(
11791232
attributes: Attributes,
11801233
start_time_unix_nano: int,
11811234
) -> _Aggregation:
1235+
1236+
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
1237+
if isinstance(instrument, Synchronous):
1238+
instrument_aggregation_temporality = AggregationTemporality.DELTA
1239+
elif isinstance(instrument, Asynchronous):
1240+
instrument_aggregation_temporality = (
1241+
AggregationTemporality.CUMULATIVE
1242+
)
1243+
11821244
return _ExplicitBucketHistogramAggregation(
11831245
attributes,
1246+
instrument_aggregation_temporality,
11841247
start_time_unix_nano,
11851248
self._boundaries,
11861249
self._record_min_max,
@@ -1200,16 +1263,18 @@ def _create_aggregation(
12001263
start_time_unix_nano: int,
12011264
) -> _Aggregation:
12021265

1203-
temporality = AggregationTemporality.UNSPECIFIED
1266+
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
12041267
if isinstance(instrument, Synchronous):
1205-
temporality = AggregationTemporality.DELTA
1268+
instrument_aggregation_temporality = AggregationTemporality.DELTA
12061269
elif isinstance(instrument, Asynchronous):
1207-
temporality = AggregationTemporality.CUMULATIVE
1270+
instrument_aggregation_temporality = (
1271+
AggregationTemporality.CUMULATIVE
1272+
)
12081273

12091274
return _SumAggregation(
12101275
attributes,
12111276
isinstance(instrument, (Counter, ObservableCounter)),
1212-
temporality,
1277+
instrument_aggregation_temporality,
12131278
start_time_unix_nano,
12141279
)
12151280

opentelemetry-sdk/tests/metrics/test_aggregation.py

+22-9
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,10 @@ def test_aggregate(self):
270270

271271
explicit_bucket_histogram_aggregation = (
272272
_ExplicitBucketHistogramAggregation(
273-
Mock(), 0, boundaries=[0, 2, 4]
273+
Mock(),
274+
AggregationTemporality.CUMULATIVE,
275+
0,
276+
boundaries=[0, 2, 4],
274277
)
275278
)
276279

@@ -284,22 +287,22 @@ def test_aggregate(self):
284287

285288
# The first bucket keeps count of values between (-inf, 0] (-1 and 0)
286289
self.assertEqual(
287-
explicit_bucket_histogram_aggregation._bucket_counts[0], 2
290+
explicit_bucket_histogram_aggregation._current_value[0], 2
288291
)
289292

290293
# The second bucket keeps count of values between (0, 2] (1 and 2)
291294
self.assertEqual(
292-
explicit_bucket_histogram_aggregation._bucket_counts[1], 2
295+
explicit_bucket_histogram_aggregation._current_value[1], 2
293296
)
294297

295298
# The third bucket keeps count of values between (2, 4] (3 and 4)
296299
self.assertEqual(
297-
explicit_bucket_histogram_aggregation._bucket_counts[2], 2
300+
explicit_bucket_histogram_aggregation._current_value[2], 2
298301
)
299302

300303
# The fourth bucket keeps count of values between (4, inf) (3 and 4)
301304
self.assertEqual(
302-
explicit_bucket_histogram_aggregation._bucket_counts[3], 1
305+
explicit_bucket_histogram_aggregation._current_value[3], 1
303306
)
304307

305308
histo = explicit_bucket_histogram_aggregation.collect(
@@ -314,7 +317,9 @@ def test_min_max(self):
314317
"""
315318

316319
explicit_bucket_histogram_aggregation = (
317-
_ExplicitBucketHistogramAggregation(Mock(), 0)
320+
_ExplicitBucketHistogramAggregation(
321+
Mock(), AggregationTemporality.CUMULATIVE, 0
322+
)
318323
)
319324

320325
explicit_bucket_histogram_aggregation.aggregate(measurement(-1))
@@ -328,7 +333,10 @@ def test_min_max(self):
328333

329334
explicit_bucket_histogram_aggregation = (
330335
_ExplicitBucketHistogramAggregation(
331-
Mock(), 0, record_min_max=False
336+
Mock(),
337+
AggregationTemporality.CUMULATIVE,
338+
0,
339+
record_min_max=False,
332340
)
333341
)
334342

@@ -348,7 +356,10 @@ def test_collect(self):
348356

349357
explicit_bucket_histogram_aggregation = (
350358
_ExplicitBucketHistogramAggregation(
351-
Mock(), 0, boundaries=[0, 1, 2]
359+
Mock(),
360+
AggregationTemporality.CUMULATIVE,
361+
0,
362+
boundaries=[0, 1, 2],
352363
)
353364
)
354365

@@ -381,7 +392,9 @@ def test_collect(self):
381392

382393
def test_boundaries(self):
383394
self.assertEqual(
384-
_ExplicitBucketHistogramAggregation(Mock(), 0)._boundaries,
395+
_ExplicitBucketHistogramAggregation(
396+
Mock(), AggregationTemporality.CUMULATIVE, 0
397+
)._boundaries,
385398
(
386399
0.0,
387400
5.0,

0 commit comments

Comments
 (0)