forked from open-telemetry/opentelemetry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregation.py
380 lines (314 loc) · 11.7 KB
/
aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from bisect import bisect_left
from dataclasses import replace
from logging import getLogger
from math import inf
from threading import Lock
from typing import Generic, List, Optional, Sequence, TypeVar
from opentelemetry._metrics.instrument import (
Asynchronous,
Instrument,
Synchronous,
_Monotonic,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import (
AggregationTemporality,
Gauge,
Histogram,
PointT,
Sum,
)
from opentelemetry.util._time import _time_ns
_PointVarT = TypeVar("_PointVarT", bound=PointT)
_logger = getLogger(__name__)
class _Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
self._lock = Lock()
@abstractmethod
def aggregate(self, measurement: Measurement) -> None:
pass
@abstractmethod
def collect(self) -> Optional[_PointVarT]:
pass
class _SumAggregation(_Aggregation[Sum]):
def __init__(
self,
instrument_is_monotonic: bool,
instrument_temporality: AggregationTemporality,
):
super().__init__()
self._start_time_unix_nano = _time_ns()
self._instrument_temporality = instrument_temporality
self._instrument_is_monotonic = instrument_is_monotonic
if self._instrument_temporality is AggregationTemporality.DELTA:
self._value = 0
else:
self._value = None
def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._value is None:
self._value = 0
self._value = self._value + measurement.value
def collect(self) -> Optional[Sum]:
"""
Atomically return a point for the current value of the metric and
reset the aggregation value.
"""
now = _time_ns()
if self._instrument_temporality is AggregationTemporality.DELTA:
with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano
self._value = 0
self._start_time_unix_nano = now + 1
return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)
with self._lock:
if self._value is None:
return None
value = self._value
self._value = None
return Sum(
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=now,
value=value,
)
class _LastValueAggregation(_Aggregation[Gauge]):
def __init__(self):
super().__init__()
self._value = None
def aggregate(self, measurement: Measurement):
with self._lock:
self._value = measurement.value
def collect(self) -> Optional[Gauge]:
"""
Atomically return a point for the current value of the metric.
"""
with self._lock:
if self._value is None:
return None
value = self._value
self._value = None
return Gauge(
time_unix_nano=_time_ns(),
value=value,
)
class _ExplicitBucketHistogramAggregation(_Aggregation[Histogram]):
def __init__(
self,
boundaries: Sequence[float] = (
0.0,
5.0,
10.0,
25.0,
50.0,
75.0,
100.0,
250.0,
500.0,
1000.0,
),
record_min_max: bool = True,
):
super().__init__()
self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._min = inf
self._max = -inf
self._sum = 0
self._record_min_max = record_min_max
self._start_time_unix_nano = _time_ns()
def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)
def aggregate(self, measurement: Measurement) -> None:
value = measurement.value
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
self._sum += value
self._bucket_counts[bisect_left(self._boundaries, value)] += 1
def collect(self) -> Histogram:
"""
Atomically return a point for the current value of the metric.
"""
now = _time_ns()
with self._lock:
value = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
histogram_sum = self._sum
self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = now + 1
self._sum = 0
return Histogram(
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
bucket_counts=tuple(value),
explicit_bounds=self._boundaries,
aggregation_temporality=AggregationTemporality.DELTA,
sum=histogram_sum,
)
# pylint: disable=too-many-return-statements,too-many-branches
def _convert_aggregation_temporality(
previous_point: Optional[_PointVarT],
current_point: _PointVarT,
aggregation_temporality: AggregationTemporality,
) -> _PointVarT:
"""Converts `current_point` to the requested `aggregation_temporality`
given the `previous_point`.
`previous_point` must have `CUMULATIVE` temporality. `current_point` may
have `DELTA` or `CUMULATIVE` temporality.
The output point will have temporality `aggregation_temporality`. Since
`GAUGE` points have no temporality, they are returned unchanged.
"""
current_point_type = type(current_point)
if current_point_type is Gauge:
return current_point
if previous_point is not None and type(previous_point) is not type(
current_point
):
_logger.warning(
"convert_aggregation_temporality called with mismatched "
"point types: %s and %s",
type(previous_point),
current_point_type,
)
return current_point
if current_point_type is Sum:
if previous_point is None:
# Output CUMULATIVE for a synchronous instrument
# There is no previous value, return the delta point as a
# cumulative
return replace(
current_point, aggregation_temporality=aggregation_temporality
)
if previous_point.aggregation_temporality is not (
AggregationTemporality.CUMULATIVE
):
raise Exception(
"previous_point aggregation temporality must be CUMULATIVE"
)
if current_point.aggregation_temporality is aggregation_temporality:
# Output DELTA for a synchronous instrument
# Output CUMULATIVE for an asynchronous instrument
return current_point
if aggregation_temporality is AggregationTemporality.DELTA:
# Output temporality DELTA for an asynchronous instrument
value = current_point.value - previous_point.value
output_start_time_unix_nano = previous_point.time_unix_nano
else:
# Output CUMULATIVE for a synchronous instrument
value = current_point.value + previous_point.value
output_start_time_unix_nano = previous_point.start_time_unix_nano
is_monotonic = (
previous_point.is_monotonic and current_point.is_monotonic
)
return Sum(
start_time_unix_nano=output_start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
aggregation_temporality=aggregation_temporality,
is_monotonic=is_monotonic,
)
if current_point_type is Histogram:
if previous_point is None:
return replace(
current_point, aggregation_temporality=aggregation_temporality
)
if previous_point.aggregation_temporality is not (
AggregationTemporality.CUMULATIVE
):
raise Exception(
"previous_point aggregation temporality must be CUMULATIVE"
)
if current_point.aggregation_temporality is aggregation_temporality:
return current_point
if aggregation_temporality is AggregationTemporality.CUMULATIVE:
start_time_unix_nano = previous_point.start_time_unix_nano
sum_ = current_point.sum + previous_point.sum
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts, previous_point.bucket_counts
)
]
else:
start_time_unix_nano = previous_point.time_unix_nano
sum_ = current_point.sum - previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts, previous_point.bucket_counts
)
]
return Histogram(
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
bucket_counts=bucket_counts,
explicit_bounds=current_point.explicit_bounds,
sum=sum_,
aggregation_temporality=aggregation_temporality,
)
return None
class _AggregationFactory(ABC):
@abstractmethod
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
"""Creates an aggregation"""
class ExplicitBucketHistogramAggregation(_AggregationFactory):
def __init__(
self,
boundaries: Sequence[float] = (
0.0,
5.0,
10.0,
25.0,
50.0,
75.0,
100.0,
250.0,
500.0,
1000.0,
),
record_min_max: bool = True,
) -> None:
self._boundaries = boundaries
self._record_min_max = record_min_max
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
return _ExplicitBucketHistogramAggregation(
boundaries=self._boundaries,
record_min_max=self._record_min_max,
)
class SumAggregation(_AggregationFactory):
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
temporality = AggregationTemporality.CUMULATIVE
return _SumAggregation(
isinstance(instrument, _Monotonic),
temporality,
)
class LastValueAggregation(_AggregationFactory):
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
return _LastValueAggregation()