Skip to content

Commit a4b4c45

Browse files
authored
Implement MetricReader default aggregation controls (open-telemetry#2638)
1 parent 7aac852 commit a4b4c45

10 files changed

+211
-23
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21
1111

1212

13+
- Add parameter to MetricReader constructor to select aggregation per instrument kind
14+
([#2638](https://github.com/open-telemetry/opentelemetry-python/pull/2638))
1315
- Add parameter to MetricReader constructor to select temporality per instrument kind
1416
([#2637](https://github.com/open-telemetry/opentelemetry-python/pull/2637))
1517
- Fix unhandled callback exceptions on async instruments

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py

+17-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
from typing import TYPE_CHECKING, Dict, Iterable
1919

2020
from opentelemetry.sdk._metrics.aggregation import (
21+
DefaultAggregation,
2122
_Aggregation,
23+
_AggregationFactory,
2224
_convert_aggregation_temporality,
2325
_PointVarT,
2426
)
@@ -39,13 +41,15 @@ def __init__(
3941
view: View,
4042
instrument: "_Instrument",
4143
sdk_config: SdkConfiguration,
44+
instrument_class_aggregation: Dict[type, _AggregationFactory],
4245
):
4346
self._view = view
4447
self._instrument = instrument
4548
self._sdk_config = sdk_config
4649
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
4750
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
4851
self._lock = Lock()
52+
self._instrument_class_aggregation = instrument_class_aggregation
4953

5054
# pylint: disable=protected-access
5155
def consume_measurement(self, measurement: Measurement) -> None:
@@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None:
6771
if attributes not in self._attributes_aggregation:
6872
with self._lock:
6973
if attributes not in self._attributes_aggregation:
70-
self._attributes_aggregation[
71-
attributes
72-
] = self._view._aggregation._create_aggregation(
73-
self._instrument
74-
)
74+
if not isinstance(
75+
self._view._aggregation, DefaultAggregation
76+
):
77+
aggregation = (
78+
self._view._aggregation._create_aggregation(
79+
self._instrument
80+
)
81+
)
82+
else:
83+
aggregation = self._instrument_class_aggregation[
84+
self._instrument.__class__
85+
]._create_aggregation(self._instrument)
86+
self._attributes_aggregation[attributes] = aggregation
7587

7688
self._attributes_aggregation[attributes].aggregate(measurement)
7789

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
detach,
2828
set_value,
2929
)
30+
from opentelemetry.sdk._metrics.aggregation import _AggregationFactory
3031
from opentelemetry.sdk._metrics.metric_reader import MetricReader
3132
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
3233
from opentelemetry.util._once import Once
@@ -103,9 +104,14 @@ class InMemoryMetricReader(MetricReader):
103104
"""
104105

105106
def __init__(
106-
self, preferred_temporality: Dict[type, AggregationTemporality] = None
107+
self,
108+
preferred_temporality: Dict[type, AggregationTemporality] = None,
109+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
107110
) -> None:
108-
super().__init__(preferred_temporality=preferred_temporality)
111+
super().__init__(
112+
preferred_temporality=preferred_temporality,
113+
preferred_aggregation=preferred_aggregation,
114+
)
109115
self._lock = RLock()
110116
self._metrics: List[Metric] = []
111117

@@ -135,10 +141,14 @@ def __init__(
135141
self,
136142
exporter: MetricExporter,
137143
preferred_temporality: Dict[type, AggregationTemporality] = None,
144+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
138145
export_interval_millis: Optional[float] = None,
139146
export_timeout_millis: Optional[float] = None,
140147
) -> None:
141-
super().__init__(preferred_temporality=preferred_temporality)
148+
super().__init__(
149+
preferred_temporality=preferred_temporality,
150+
preferred_aggregation=preferred_aggregation,
151+
)
142152
self._exporter = exporter
143153
if export_interval_millis is None:
144154
try:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# pylint: disable=too-many-ancestors
1616

17-
import logging
17+
from logging import getLogger
1818
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union
1919

2020
from opentelemetry._metrics.instrument import CallbackT
@@ -38,7 +38,8 @@
3838
MeasurementConsumer,
3939
)
4040

41-
_logger = logging.getLogger(__name__)
41+
42+
_logger = getLogger(__name__)
4243

4344

4445
class _Synchronous:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
5353
self._sdk_config = sdk_config
5454
# should never be mutated
5555
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
56-
reader: MetricReaderStorage(sdk_config)
56+
reader: MetricReaderStorage(
57+
sdk_config, reader._instrument_class_aggregation
58+
)
5759
for reader in sdk_config.metric_readers
5860
}
5961
self._async_instruments: List["_Asynchronous"] = []

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
from typing_extensions import final
2121

22+
from opentelemetry.sdk._metrics.aggregation import (
23+
DefaultAggregation,
24+
_AggregationFactory,
25+
)
2226
from opentelemetry.sdk._metrics.instrument import (
2327
Counter,
2428
Histogram,
@@ -52,6 +56,19 @@ class MetricReader(ABC):
5256
their association to their default aggregation temporalities.
5357
The value passed here will override the corresponding values set
5458
via the environment variable
59+
preferred_aggregation: A mapping between instrument classes and
60+
aggregation instances. By default maps all instrument classes to an
61+
instance of `DefaultAggregation`. This mapping will be used to
62+
define the default aggregation of every instrument class. If the
63+
user wants to make a change in the default aggregation of an
64+
instrument class, it is enough to pass here a dictionary whose keys
65+
are the instrument classes and the values are the corresponding
66+
desired aggregation for the instrument classes that the user wants
67+
to change, not necessarily all of them. The classes not included in
68+
the passed dictionary will retain their association to their
69+
default aggregations. The aggregation defined here will be
70+
overriden by an aggregation defined by a view that is not
71+
`DefaultAggregation`.
5572
5673
.. document protected _receive_metrics which is a intended to be overriden by subclass
5774
.. automethod:: _receive_metrics
@@ -61,7 +78,9 @@ class MetricReader(ABC):
6178
# to the end of the documentation paragraph above.
6279

6380
def __init__(
64-
self, preferred_temporality: Dict[type, AggregationTemporality] = None
81+
self,
82+
preferred_temporality: Dict[type, AggregationTemporality] = None,
83+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
6584
) -> None:
6685
self._collect: Callable[
6786
["MetricReader", AggregationTemporality], Iterable[Metric]
@@ -106,6 +125,17 @@ def __init__(
106125
)
107126

108127
self._instrument_class_temporality.update(preferred_temporality or {})
128+
self._preferred_temporality = preferred_temporality
129+
self._instrument_class_aggregation = {
130+
Counter: DefaultAggregation(),
131+
UpDownCounter: DefaultAggregation(),
132+
Histogram: DefaultAggregation(),
133+
ObservableCounter: DefaultAggregation(),
134+
ObservableUpDownCounter: DefaultAggregation(),
135+
ObservableGauge: DefaultAggregation(),
136+
}
137+
138+
self._instrument_class_aggregation.update(preferred_aggregation or {})
109139

110140
@final
111141
def collect(self) -> None:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from opentelemetry.sdk._metrics._view_instrument_match import (
2020
_ViewInstrumentMatch,
2121
)
22-
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
22+
from opentelemetry.sdk._metrics.aggregation import (
23+
AggregationTemporality,
24+
_AggregationFactory,
25+
)
2326
from opentelemetry.sdk._metrics.measurement import Measurement
2427
from opentelemetry.sdk._metrics.point import Metric
2528
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
@@ -31,12 +34,17 @@
3134
class MetricReaderStorage:
3235
"""The SDK's storage for a given reader"""
3336

34-
def __init__(self, sdk_config: SdkConfiguration) -> None:
37+
def __init__(
38+
self,
39+
sdk_config: SdkConfiguration,
40+
instrument_class_aggregation: Dict[type, _AggregationFactory],
41+
) -> None:
3542
self._lock = RLock()
3643
self._sdk_config = sdk_config
3744
self._view_instrument_match: Dict[
3845
Instrument, List[_ViewInstrumentMatch]
3946
] = {}
47+
self._instrument_class_aggregation = instrument_class_aggregation
4048

4149
def _get_or_init_view_instrument_match(
4250
self, instrument: Instrument
@@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match(
6270
view=view,
6371
instrument=instrument,
6472
sdk_config=self._sdk_config,
73+
instrument_class_aggregation=(
74+
self._instrument_class_aggregation
75+
),
6576
)
6677
)
6778

@@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match(
7283
view=_DEFAULT_VIEW,
7384
instrument=instrument,
7485
sdk_config=self._sdk_config,
86+
instrument_class_aggregation=(
87+
self._instrument_class_aggregation
88+
),
7589
)
7690
)
7791
self._view_instrument_match[instrument] = view_instrument_matches

opentelemetry-sdk/tests/metrics/test_metric_reader.py

+51-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
from unittest import TestCase
1818
from unittest.mock import patch
1919

20-
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
20+
from opentelemetry.sdk._metrics.aggregation import (
21+
AggregationTemporality,
22+
DefaultAggregation,
23+
LastValueAggregation,
24+
_AggregationFactory,
25+
)
2126
from opentelemetry.sdk._metrics.instrument import (
2227
Counter,
2328
Histogram,
@@ -34,10 +39,13 @@
3439

3540
class DummyMetricReader(MetricReader):
3641
def __init__(
37-
self, preferred_temporality: Dict[type, AggregationTemporality] = None
42+
self,
43+
preferred_temporality: Dict[type, AggregationTemporality] = None,
44+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
3845
) -> None:
3946
super().__init__(
4047
preferred_temporality=preferred_temporality,
48+
preferred_aggregation=preferred_aggregation,
4149
)
4250

4351
def _receive_metrics(self, metrics):
@@ -173,3 +181,44 @@ def test_configure_temporality_parameter(self):
173181
dummy_metric_reader._instrument_class_temporality[ObservableGauge],
174182
AggregationTemporality.DELTA,
175183
)
184+
185+
def test_default_temporality(self):
186+
dummy_metric_reader = DummyMetricReader()
187+
self.assertEqual(
188+
dummy_metric_reader._instrument_class_aggregation.keys(),
189+
set(
190+
[
191+
Counter,
192+
UpDownCounter,
193+
Histogram,
194+
ObservableCounter,
195+
ObservableUpDownCounter,
196+
ObservableGauge,
197+
]
198+
),
199+
)
200+
for (
201+
value
202+
) in dummy_metric_reader._instrument_class_aggregation.values():
203+
self.assertIsInstance(value, DefaultAggregation)
204+
205+
dummy_metric_reader = DummyMetricReader(
206+
preferred_aggregation={Counter: LastValueAggregation()}
207+
)
208+
self.assertEqual(
209+
dummy_metric_reader._instrument_class_aggregation.keys(),
210+
set(
211+
[
212+
Counter,
213+
UpDownCounter,
214+
Histogram,
215+
ObservableCounter,
216+
ObservableUpDownCounter,
217+
ObservableGauge,
218+
]
219+
),
220+
)
221+
self.assertIsInstance(
222+
dummy_metric_reader._instrument_class_aggregation[Counter],
223+
LastValueAggregation,
224+
)

opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py

+15-7
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from unittest.mock import Mock, patch
15+
from unittest.mock import MagicMock, Mock, patch
1616

17-
from opentelemetry.sdk._metrics.aggregation import DropAggregation
17+
from opentelemetry.sdk._metrics.aggregation import (
18+
DefaultAggregation,
19+
DropAggregation,
20+
)
1821
from opentelemetry.sdk._metrics.instrument import Counter
1922
from opentelemetry.sdk._metrics.measurement import Measurement
2023
from opentelemetry.sdk._metrics.metric_reader_storage import (
@@ -56,7 +59,8 @@ def test_creates_view_instrument_matches(
5659
resource=Mock(),
5760
metric_readers=(),
5861
views=(view1, view2),
59-
)
62+
),
63+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
6064
)
6165

6266
# instrument1 matches view1 and view2, so should create two ViewInstrumentMatch objects
@@ -100,7 +104,8 @@ def test_forwards_calls_to_view_instrument_match(
100104
resource=Mock(),
101105
metric_readers=(),
102106
views=(view1, view2),
103-
)
107+
),
108+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
104109
)
105110

106111
# Measurements from an instrument should be passed on to each ViewInstrumentMatch objects
@@ -147,7 +152,8 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock):
147152
resource=Mock(),
148153
metric_readers=(),
149154
views=(view1,),
150-
)
155+
),
156+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
151157
)
152158

153159
def send_measurement():
@@ -172,7 +178,8 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock):
172178
resource=Mock(),
173179
metric_readers=(),
174180
views=(),
175-
)
181+
),
182+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
176183
)
177184

178185
storage.consume_measurement(Measurement(1, instrument1))
@@ -200,7 +207,8 @@ def test_drop_aggregation(self):
200207
instrument_name="name", aggregation=DropAggregation()
201208
),
202209
),
203-
)
210+
),
211+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
204212
)
205213
metric_reader_storage.consume_measurement(Measurement(1, counter))
206214

0 commit comments

Comments
 (0)