Skip to content

Commit ed1f7d8

Browse files
committed
Add conversion to TimeSeries methods
1 parent 4b0b438 commit ed1f7d8

File tree

3 files changed

+271
-19
lines changed

3 files changed

+271
-19
lines changed

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@
77
((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180])
88
- Add Exporter constructor validation methods
99
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
10+
- Add conversion to TimeSeries methods
11+
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py

+102-10
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import re
1516
from typing import Dict, Sequence
1617

18+
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
19+
WriteRequest,
20+
)
1721
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
1822
Label,
1923
Sample,
@@ -24,6 +28,13 @@
2428
MetricsExporter,
2529
MetricsExportResult,
2630
)
31+
from opentelemetry.sdk.metrics.export.aggregate import (
32+
HistogramAggregator,
33+
LastValueAggregator,
34+
MinMaxSumCountAggregator,
35+
SumAggregator,
36+
ValueObserverAggregator,
37+
)
2738

2839

2940
class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
@@ -145,31 +156,88 @@ def shutdown(self) -> None:
145156
def convert_to_timeseries(
146157
self, export_records: Sequence[ExportRecord]
147158
) -> Sequence[TimeSeries]:
148-
raise NotImplementedError()
159+
converter_map = {
160+
MinMaxSumCountAggregator: self.convert_from_min_max_sum_count,
161+
SumAggregator: self.convert_from_sum,
162+
HistogramAggregator: self.convert_from_histogram,
163+
LastValueAggregator: self.convert_from_last_value,
164+
ValueObserverAggregator: self.convert_from_last_value,
165+
}
166+
timeseries = []
167+
for export_record in export_records:
168+
aggregator_type = type(export_record.aggregator)
169+
converter = converter_map.get(aggregator_type)
170+
if not converter:
171+
raise ValueError(
172+
str(aggregator_type) + " conversion is not supported"
173+
)
174+
timeseries.extend(converter(export_record))
175+
return timeseries
149176

150177
def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
151-
raise NotImplementedError()
178+
name = sum_record.instrument.name
179+
value = sum_record.aggregator.checkpoint
180+
return [self.create_timeseries(sum_record, name, value)]
152181

153182
def convert_from_min_max_sum_count(
154183
self, min_max_sum_count_record: ExportRecord
155184
) -> TimeSeries:
156-
raise NotImplementedError()
185+
timeseries = []
186+
agg_types = ["min", "max", "sum", "count"]
187+
for agg_type in agg_types:
188+
name = min_max_sum_count_record.instrument.name + "_" + agg_type
189+
value = getattr(
190+
min_max_sum_count_record.aggregator.checkpoint, agg_type
191+
)
192+
timeseries.append(
193+
self.create_timeseries(min_max_sum_count_record, name, value)
194+
)
195+
return timeseries
157196

158197
def convert_from_histogram(
159198
self, histogram_record: ExportRecord
160199
) -> TimeSeries:
161-
raise NotImplementedError()
200+
count = 0
201+
timeseries = []
202+
for bound in histogram_record.aggregator.checkpoint.keys():
203+
bb = "+Inf" if bound == float("inf") else str(bound)
204+
name = (
205+
histogram_record.instrument.name + '_bucket{le="' + bb + '"}'
206+
)
207+
value = histogram_record.aggregator.checkpoint[bound]
208+
timeseries.append(
209+
self.create_timeseries(histogram_record, name, value)
210+
)
211+
count += value
212+
name = histogram_record.instrument.name + "_count"
213+
timeseries.append(
214+
self.create_timeseries(histogram_record, name, float(count))
215+
)
216+
return timeseries
162217

163218
def convert_from_last_value(
164219
self, last_value_record: ExportRecord
165220
) -> TimeSeries:
166-
raise NotImplementedError()
221+
name = last_value_record.instrument.name
222+
value = last_value_record.aggregator.checkpoint
223+
return [self.create_timeseries(last_value_record, name, value)]
167224

168225
def convert_from_value_observer(
169226
self, value_observer_record: ExportRecord
170227
) -> TimeSeries:
171-
raise NotImplementedError()
172-
228+
timeseries = []
229+
agg_types = ["min", "max", "sum", "count", "last"]
230+
for agg_type in agg_types:
231+
name = value_observer_record.instrument.name + "_" + agg_type
232+
value = getattr(
233+
value_observer_record.aggregator.checkpoint, agg_type
234+
)
235+
timeseries.append(
236+
self.create_timeseries(value_observer_record, name, value)
237+
)
238+
return timeseries
239+
240+
# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
173241
def convert_from_quantile(
174242
self, summary_record: ExportRecord
175243
) -> TimeSeries:
@@ -179,13 +247,37 @@ def convert_from_quantile(
179247
def create_timeseries(
180248
self, export_record: ExportRecord, name, value: float
181249
) -> TimeSeries:
182-
raise NotImplementedError()
250+
timeseries = TimeSeries()
251+
# Add name label, record labels and resource labels
252+
timeseries.labels.append(self.create_label("__name__", name))
253+
resource_attributes = export_record.resource.attributes
254+
for label_name, label_value in resource_attributes.items():
255+
timeseries.labels.append(
256+
self.create_label(label_name, label_value)
257+
)
258+
for label in export_record.labels:
259+
if label[0] not in resource_attributes.keys():
260+
timeseries.labels.append(self.create_label(label[0], label[1]))
261+
# Add sample
262+
timeseries.samples.append(
263+
self.create_sample(
264+
export_record.aggregator.last_update_timestamp, value
265+
)
266+
)
267+
return timeseries
183268

184269
def create_sample(self, timestamp: int, value: float) -> Sample:
185-
raise NotImplementedError()
270+
sample = Sample()
271+
sample.timestamp = int(timestamp / 1000000)
272+
sample.value = value
273+
return sample
186274

187275
def create_label(self, name: str, value: str) -> Label:
188-
raise NotImplementedError()
276+
label = Label()
277+
# Label name must contain only alphanumeric characters and underscores
278+
label.name = re.sub("[^0-9a-zA-Z_]+", "_", name)
279+
label.value = value
280+
return label
189281

190282
def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
191283
raise NotImplementedError()

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py

+167-9
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,25 @@
1313
# limitations under the License.
1414

1515
import unittest
16+
from unittest import mock
1617

1718
from opentelemetry.exporter.prometheus_remote_write import (
1819
PrometheusRemoteWriteMetricsExporter,
1920
)
21+
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
22+
TimeSeries,
23+
)
24+
from opentelemetry.sdk.metrics import Counter
25+
from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult
26+
from opentelemetry.sdk.metrics.export.aggregate import (
27+
HistogramAggregator,
28+
LastValueAggregator,
29+
MinMaxSumCountAggregator,
30+
SumAggregator,
31+
ValueObserverAggregator,
32+
)
33+
from opentelemetry.sdk.resources import Resource
34+
from opentelemetry.sdk.util import get_dict_as_key
2035

2136

2237
class TestValidation(unittest.TestCase):
@@ -111,35 +126,151 @@ def test_invalid_tls_config_key_only_param(self):
111126
class TestConversion(unittest.TestCase):
112127
# Initializes test data that is reused across tests
113128
def setUp(self):
114-
pass
129+
self._test_metric = Counter(
130+
"testname", "testdesc", "testunit", int, None
131+
)
132+
self._exporter = PrometheusRemoteWriteMetricsExporter(
133+
endpoint="/prom/test_endpoint"
134+
)
135+
136+
def generate_record(aggregator_type):
137+
return ExportRecord(
138+
self._test_metric, None, aggregator_type(), Resource({}),
139+
)
140+
141+
self._generate_record = generate_record
142+
143+
def converter_method(record, name, value):
144+
return (type(record.aggregator), name, value)
145+
146+
self._converter_mock = mock.MagicMock(return_value=converter_method)
115147

116148
# Ensures conversion to timeseries function works with valid aggregation types
117149
def test_valid_convert_to_timeseries(self):
118-
pass
150+
timeseries_mock_method = mock.Mock(return_value=["test_value"])
151+
self._exporter.convert_from_sum = timeseries_mock_method
152+
self._exporter.convert_from_min_max_sum_count = timeseries_mock_method
153+
self._exporter.convert_from_histogram = timeseries_mock_method
154+
self._exporter.convert_from_last_value = timeseries_mock_method
155+
self._exporter.convert_from_value_observer = timeseries_mock_method
156+
test_records = [
157+
self._generate_record(SumAggregator),
158+
self._generate_record(MinMaxSumCountAggregator),
159+
self._generate_record(HistogramAggregator),
160+
self._generate_record(LastValueAggregator),
161+
self._generate_record(ValueObserverAggregator),
162+
]
163+
data = self._exporter.convert_to_timeseries(test_records)
164+
self.assertEqual(len(data), 5)
165+
for timeseries in data:
166+
self.assertEqual(timeseries, "test_value")
167+
168+
no_type_records = [self._generate_record(lambda: None)]
169+
with self.assertRaises(ValueError):
170+
self._exporter.convert_to_timeseries(no_type_records)
119171

120172
# Ensures conversion to timeseries fails for unsupported aggregation types
121173
def test_invalid_convert_to_timeseries(self):
122-
pass
174+
no_type_records = [self._generate_record(lambda: None)]
175+
with self.assertRaises(ValueError):
176+
self._exporter.convert_to_timeseries(no_type_records)
123177

124178
# Ensures sum aggregator is correctly converted to timeseries
125179
def test_convert_from_sum(self):
126-
pass
180+
sum_record = self._generate_record(SumAggregator)
181+
sum_record.aggregator.update(3)
182+
sum_record.aggregator.update(2)
183+
sum_record.aggregator.take_checkpoint()
184+
185+
self._exporter.create_timeseries = self._converter_mock()
186+
timeseries = self._exporter.convert_from_sum(sum_record)
187+
self.assertEqual(timeseries[0], (SumAggregator, "testname", 5))
127188

128189
# Ensures sum min_max_count aggregator is correctly converted to timeseries
129190
def test_convert_from_min_max_sum_count(self):
130-
pass
191+
min_max_sum_count_record = self._generate_record(
192+
MinMaxSumCountAggregator
193+
)
194+
min_max_sum_count_record.aggregator.update(5)
195+
min_max_sum_count_record.aggregator.update(1)
196+
min_max_sum_count_record.aggregator.take_checkpoint()
197+
198+
self._exporter.create_timeseries = self._converter_mock()
199+
timeseries = self._exporter.convert_from_min_max_sum_count(
200+
min_max_sum_count_record
201+
)
202+
self.assertEqual(
203+
timeseries[0], (MinMaxSumCountAggregator, "testname_min", 1)
204+
)
205+
self.assertEqual(
206+
timeseries[1], (MinMaxSumCountAggregator, "testname_max", 5)
207+
)
208+
self.assertEqual(
209+
timeseries[2], (MinMaxSumCountAggregator, "testname_sum", 6)
210+
)
211+
self.assertEqual(
212+
timeseries[3], (MinMaxSumCountAggregator, "testname_count", 2)
213+
)
131214

132215
# Ensures histogram aggregator is correctly converted to timeseries
133216
def test_convert_from_histogram(self):
134-
pass
217+
histogram_record = self._generate_record(HistogramAggregator)
218+
histogram_record.aggregator.update(5)
219+
histogram_record.aggregator.update(2)
220+
histogram_record.aggregator.update(-1)
221+
histogram_record.aggregator.take_checkpoint()
222+
223+
self._exporter.create_timeseries = self._converter_mock()
224+
timeseries = self._exporter.convert_from_histogram(histogram_record)
225+
self.assertEqual(
226+
timeseries[0], (HistogramAggregator, 'testname_bucket{le="0"}', 1)
227+
)
228+
self.assertEqual(
229+
timeseries[1],
230+
(HistogramAggregator, 'testname_bucket{le="+Inf"}', 2),
231+
)
232+
self.assertEqual(
233+
timeseries[2], (HistogramAggregator, "testname_count", 3)
234+
)
135235

136236
# Ensures last value aggregator is correctly converted to timeseries
137237
def test_convert_from_last_value(self):
138-
pass
238+
last_value_record = self._generate_record(LastValueAggregator)
239+
last_value_record.aggregator.update(1)
240+
last_value_record.aggregator.update(5)
241+
last_value_record.aggregator.take_checkpoint()
242+
243+
self._exporter.create_timeseries = self._converter_mock()
244+
timeseries = self._exporter.convert_from_last_value(last_value_record)
245+
self.assertEqual(timeseries[0], (LastValueAggregator, "testname", 5))
139246

140247
# Ensures value observer aggregator is correctly converted to timeseries
141248
def test_convert_from_value_observer(self):
142-
pass
249+
value_observer_record = self._generate_record(ValueObserverAggregator)
250+
value_observer_record.aggregator.update(5)
251+
value_observer_record.aggregator.update(1)
252+
value_observer_record.aggregator.update(2)
253+
value_observer_record.aggregator.take_checkpoint()
254+
255+
self._exporter.create_timeseries = self._converter_mock()
256+
timeseries = self._exporter.convert_from_value_observer(
257+
value_observer_record
258+
)
259+
self.assertEqual(
260+
timeseries[0], (ValueObserverAggregator, "testname_min", 1)
261+
)
262+
self.assertEqual(
263+
timeseries[1], (ValueObserverAggregator, "testname_max", 5)
264+
)
265+
self.assertEqual(
266+
timeseries[2], (ValueObserverAggregator, "testname_sum", 8)
267+
)
268+
self.assertEqual(
269+
timeseries[3], (ValueObserverAggregator, "testname_count", 3)
270+
)
271+
self.assertEqual(
272+
timeseries[4], (ValueObserverAggregator, "testname_last", 2)
273+
)
143274

144275
# Ensures quantile aggregator is correctly converted to timeseries
145276
# TODO: Add test once method is implemented
@@ -148,7 +279,34 @@ def test_convert_from_quantile(self):
148279

149280
# Ensures timeseries produced contains appropriate sample and labels
150281
def test_create_timeseries(self):
151-
pass
282+
sum_aggregator = SumAggregator()
283+
sum_aggregator.update(5)
284+
sum_aggregator.take_checkpoint()
285+
sum_aggregator.last_update_timestamp = 10
286+
export_record = ExportRecord(
287+
self._test_metric,
288+
get_dict_as_key({"record_name": "record_value"}),
289+
sum_aggregator,
290+
Resource({"resource_name": "resource_value"}),
291+
)
292+
293+
expected_timeseries = TimeSeries()
294+
expected_timeseries.labels.append(
295+
self._exporter.create_label("__name__", "testname")
296+
)
297+
expected_timeseries.labels.append(
298+
self._exporter.create_label("resource_name", "resource_value")
299+
)
300+
expected_timeseries.labels.append(
301+
self._exporter.create_label("record_name", "record_value")
302+
)
303+
expected_timeseries.samples.append(
304+
self._exporter.create_sample(10, 5.0),
305+
)
306+
timeseries = self._exporter.create_timeseries(
307+
export_record, "testname", 5.0,
308+
)
309+
self.assertEqual(timeseries, expected_timeseries)
152310

153311

154312
class TestExport(unittest.TestCase):

0 commit comments

Comments
 (0)