Skip to content

Commit cd4c5e7

Browse files
aabmassocelotlAlex Botencodeboten
authored
Implement MetricReaderStorage (open-telemetry#2456)
* Implement MetricReaderStorage * Apply suggestions from code review Co-authored-by: Diego Hurtado <[email protected]> * fix tset * syntax error * move async instrument callback invocation into the metric reader storage * Rename ViewStorage -> ViewInstrumentMatch Tests still need to be fixed. * Implement MetricReaderStorage * Apply suggestions from code review Co-authored-by: Diego Hurtado <[email protected]> * fix tset * syntax error * move async instrument callback invocation into the metric reader storage * Rename ViewStorage -> ViewInstrumentMatch * fix lint * fix lint * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py Co-authored-by: Diego Hurtado <[email protected]> * refactor to have the measurement consumer handle async callbacks again * remove print * lint Co-authored-by: Diego Hurtado <[email protected]> Co-authored-by: Alex Boten <[email protected]> Co-authored-by: Alex Boten <[email protected]>
1 parent 50413be commit cd4c5e7

11 files changed

+362
-64
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def __init__(
159159
self._meter_lock = Lock()
160160
self._atexit_handler = None
161161
self._sdk_config = SdkConfiguration(
162-
resource=resource, metric_readers=metric_readers
162+
resource=resource, metric_readers=metric_readers, views=()
163163
)
164164
self._measurement_consumer = SynchronousMeasurementConsumer(
165165
sdk_config=self._sdk_config

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def add(
9595
)
9696
return
9797
self._measurement_consumer.consume_measurement(
98-
Measurement(amount, attributes)
98+
Measurement(amount, self, attributes)
9999
)
100100

101101

@@ -104,7 +104,7 @@ def add(
104104
self, amount: Union[int, float], attributes: Dict[str, str] = None
105105
):
106106
self._measurement_consumer.consume_measurement(
107-
Measurement(amount, attributes)
107+
Measurement(amount, self, attributes)
108108
)
109109

110110

@@ -127,7 +127,7 @@ def record(
127127
)
128128
return
129129
self._measurement_consumer.consume_measurement(
130-
Measurement(amount, attributes)
130+
Measurement(amount, self, attributes)
131131
)
132132

133133

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py

+2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
from dataclasses import dataclass
1616
from typing import Union
1717

18+
from opentelemetry._metrics.instrument import Instrument
1819
from opentelemetry.util.types import Attributes
1920

2021

2122
@dataclass(frozen=True)
2223
class Measurement:
2324
value: Union[int, float]
25+
instrument: Instrument
2426
attributes: Attributes = None

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ def collect(
7070
self, metric_reader: MetricReader, temporality: AggregationTemporality
7171
) -> Iterable[Metric]:
7272
with self._lock:
73+
metric_reader_storage = self._reader_storages[metric_reader]
7374
for async_instrument in self._async_instruments:
7475
for measurement in async_instrument.callback():
75-
self.consume_measurement(measurement)
76+
metric_reader_storage.consume_measurement(measurement)
7677
return self._reader_storages[metric_reader].collect(temporality)

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py

+95-6
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,112 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Iterable
15+
from threading import RLock
16+
from typing import Dict, Iterable, List
1617

17-
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
18+
from opentelemetry._metrics.instrument import Counter, Histogram, Instrument
19+
from opentelemetry.sdk._metrics._view_instrument_match import (
20+
_ViewInstrumentMatch,
21+
)
22+
from opentelemetry.sdk._metrics.aggregation import (
23+
AggregationTemporality,
24+
ExplicitBucketHistogramAggregation,
25+
LastValueAggregation,
26+
SumAggregation,
27+
)
1828
from opentelemetry.sdk._metrics.measurement import Measurement
1929
from opentelemetry.sdk._metrics.point import Metric
2030
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
31+
from opentelemetry.sdk._metrics.view import View
2132

2233

23-
# TODO: #2378
2434
class MetricReaderStorage:
2535
"""The SDK's storage for a given reader"""
2636

2737
def __init__(self, sdk_config: SdkConfiguration) -> None:
28-
pass
38+
self._lock = RLock()
39+
self._sdk_config = sdk_config
40+
self._view_instrument_match: Dict[
41+
Instrument, List[_ViewInstrumentMatch]
42+
] = {}
43+
44+
def _get_or_init_view_instrument_match(
45+
self, instrument: Instrument
46+
) -> List["_ViewInstrumentMatch"]:
47+
# Optimistically get the relevant views for the given instrument. Once set for a given
48+
# instrument, the mapping will never change
49+
if instrument in self._view_instrument_match:
50+
return self._view_instrument_match[instrument]
51+
52+
with self._lock:
53+
# double check if it was set before we held the lock
54+
if instrument in self._view_instrument_match:
55+
return self._view_instrument_match[instrument]
56+
57+
# not present, hold the lock and add a new mapping
58+
matches = []
59+
for view in self._sdk_config.views:
60+
if view.match(instrument):
61+
# Note: if a view matches multiple instruments, this will create a separate
62+
# _ViewInstrumentMatch per instrument. If the user's View configuration includes a
63+
# name, this will cause multiple conflicting output streams.
64+
matches.append(
65+
_ViewInstrumentMatch(
66+
name=view.name or instrument.name,
67+
resource=self._sdk_config.resource,
68+
instrumentation_info=None,
69+
aggregation=view.aggregation,
70+
unit=instrument.unit,
71+
description=view.description,
72+
)
73+
)
74+
75+
# if no view targeted the instrument, use the default
76+
if not matches:
77+
# TODO: the logic to select aggregation could be moved
78+
if isinstance(instrument, Counter):
79+
agg = SumAggregation(True, AggregationTemporality.DELTA)
80+
elif isinstance(instrument, Histogram):
81+
agg = ExplicitBucketHistogramAggregation()
82+
else:
83+
agg = LastValueAggregation()
84+
matches.append(
85+
_ViewInstrumentMatch(
86+
resource=self._sdk_config.resource,
87+
instrumentation_info=None,
88+
aggregation=agg,
89+
unit=instrument.unit,
90+
description=instrument.description,
91+
name=instrument.name,
92+
)
93+
)
94+
self._view_instrument_match[instrument] = matches
95+
return matches
2996

3097
def consume_measurement(self, measurement: Measurement) -> None:
31-
pass
98+
for matches in self._get_or_init_view_instrument_match(
99+
measurement.instrument
100+
):
101+
matches.consume_measurement(measurement)
32102

33103
def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
34-
pass
104+
# use a list instead of yielding to prevent a slow reader from holding SDK locks
105+
metrics: List[Metric] = []
106+
107+
# While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are
108+
# sure we collect all existing view). However, instruments can still send measurements
109+
# that will make it into the individual aggregations; collection will acquire those
110+
# locks iteratively to keep locking as fine-grained as possible. One side effect is
111+
# that end times can be slightly skewed among the metric streams produced by the SDK,
112+
# but we still align the output timestamps for a single instrument.
113+
with self._lock:
114+
for matches in self._view_instrument_match.values():
115+
for match in matches:
116+
metrics.extend(match.collect(temporality))
117+
118+
return metrics
119+
120+
121+
def default_view(instrument: Instrument) -> View:
122+
# TODO: #2247
123+
return View()

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
from typing import Sequence
33

44
from opentelemetry.sdk._metrics.metric_reader import MetricReader
5+
from opentelemetry.sdk._metrics.view import View
56
from opentelemetry.sdk.resources import Resource
67

78

89
@dataclass
910
class SdkConfiguration:
1011
resource: Resource
11-
# TODO: once views are added
12-
# views: Sequence[View]
1312
metric_readers: Sequence[MetricReader]
13+
views: Sequence[View]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
# TODO: #2247
17+
# pylint: disable=no-self-use
18+
class View:
19+
def match(self) -> bool:
20+
return False

0 commit comments

Comments
 (0)