Skip to content

Commit b9815b5

Browse files
ocelotlsrikanthccvaabmassAlex Boten
authored
Add aggregation temporality conversion algorithm (#2380)
* Implement temporality conversion Fixes #2329 * Add aggregation temporality conversion algorithm Fixes #2329 * Fix start_time_unix_nano * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py Co-authored-by: Srikanth Chekuri <[email protected]> * Add warning test * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py Co-authored-by: Aaron Abbott <[email protected]> * Add comments to algorigthm These comments intend to identify the conditional blocks of the algorithm with the sections defined in the design document. * Add check for previous point cumulative aggregation temporality * Added named arguments for gauge * Change to gauge * Fix lint * Remove numbers from comment * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py Co-authored-by: Srikanth Chekuri <[email protected]> * Revert "Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py" This reverts commit 8bdbe49. Co-authored-by: Srikanth Chekuri <[email protected]> Co-authored-by: Aaron Abbott <[email protected]> Co-authored-by: Alex Boten <[email protected]>
1 parent d00aaea commit b9815b5

File tree

2 files changed

+379
-1
lines changed

2 files changed

+379
-1
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py

+78
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from abc import ABC, abstractmethod
1616
from bisect import bisect_left
17+
from dataclasses import replace
1718
from logging import getLogger
1819
from math import inf
1920
from threading import Lock
@@ -200,3 +201,80 @@ def collect(self) -> Optional[Histogram]:
200201
aggregation_temporality=AggregationTemporality.DELTA,
201202
sum=self._sum,
202203
)
204+
205+
206+
def _convert_aggregation_temporality(
207+
previous_point: Optional[_PointVarT],
208+
current_point: _PointVarT,
209+
aggregation_temporality: AggregationTemporality,
210+
) -> _PointVarT:
211+
"""Converts `current_point` to the requested `aggregation_temporality`
212+
given the `previous_point`.
213+
214+
`previous_point` must have `CUMULATIVE` temporality. `current_point` may
215+
have `DELTA` or `CUMULATIVE` temporality.
216+
217+
The output point will have temporality `aggregation_temporality`. Since
218+
`GAUGE` points have no temporality, they are returned unchanged.
219+
"""
220+
221+
current_point_type = type(current_point)
222+
223+
if current_point_type is Gauge:
224+
return current_point
225+
226+
if previous_point is not None and type(previous_point) is not type(
227+
current_point
228+
):
229+
_logger.warning(
230+
"convert_aggregation_temporality called with mismatched "
231+
"point types: %s and %s",
232+
type(previous_point),
233+
current_point_type,
234+
)
235+
236+
return current_point
237+
238+
if current_point_type is Sum:
239+
if previous_point is None:
240+
# Output CUMULATIVE for a synchronous instrument
241+
# There is no previous value, return the delta point as a
242+
# cumulative
243+
return replace(
244+
current_point, aggregation_temporality=aggregation_temporality
245+
)
246+
if previous_point.aggregation_temporality is not (
247+
AggregationTemporality.CUMULATIVE
248+
):
249+
raise Exception(
250+
"previous_point aggregation temporality must be CUMULATIVE"
251+
)
252+
253+
if current_point.aggregation_temporality is aggregation_temporality:
254+
# Output DELTA for a synchronous instrument
255+
# Output CUMULATIVE for an asynchronous instrument
256+
return current_point
257+
258+
if aggregation_temporality is AggregationTemporality.DELTA:
259+
# Output temporality DELTA for an asynchronous instrument
260+
value = current_point.value - previous_point.value
261+
output_start_time_unix_nano = previous_point.time_unix_nano
262+
263+
else:
264+
# Output CUMULATIVE for a synchronous instrument
265+
value = current_point.value + previous_point.value
266+
output_start_time_unix_nano = previous_point.start_time_unix_nano
267+
268+
is_monotonic = (
269+
previous_point.is_monotonic and current_point.is_monotonic
270+
)
271+
272+
return Sum(
273+
start_time_unix_nano=output_start_time_unix_nano,
274+
time_unix_nano=current_point.time_unix_nano,
275+
value=value,
276+
aggregation_temporality=aggregation_temporality,
277+
is_monotonic=is_monotonic,
278+
)
279+
280+
return None

0 commit comments

Comments
 (0)