Skip to content

Commit 6848cfd

Browse files
committed
Adding conversion logic
1 parent a32026b commit 6848cfd

File tree

2 files changed

+270
-21
lines changed

2 files changed

+270
-21
lines changed

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py

+99-9
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,23 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import re
1516
from typing import Dict, Sequence
1617

1718
from opentelemetry.sdk.metrics.export import (
1819
ExportRecord,
1920
MetricsExporter,
2021
MetricsExportResult,
2122
)
23+
from opentelemetry.sdk.metrics.export.aggregate import (
24+
HistogramAggregator,
25+
LastValueAggregator,
26+
MinMaxSumCountAggregator,
27+
SumAggregator,
28+
ValueObserverAggregator,
29+
)
2230

31+
from .gen.remote_pb2 import WriteRequest
2332
from .gen.types_pb2 import Label, Sample, TimeSeries
2433

2534

@@ -128,31 +137,88 @@ def shutdown(self) -> None:
128137
def convert_to_timeseries(
129138
self, export_records: Sequence[ExportRecord]
130139
) -> Sequence[TimeSeries]:
131-
pass
140+
converter_map = {
141+
MinMaxSumCountAggregator: self.convert_from_min_max_sum_count,
142+
SumAggregator: self.convert_from_sum,
143+
HistogramAggregator: self.convert_from_histogram,
144+
LastValueAggregator: self.convert_from_last_value,
145+
ValueObserverAggregator: self.convert_from_last_value,
146+
}
147+
timeseries = []
148+
for export_record in export_records:
149+
aggregator_type = type(export_record.aggregator)
150+
converter = converter_map.get(aggregator_type)
151+
if not converter:
152+
raise ValueError(
153+
str(aggregator_type) + " conversion is not supported"
154+
)
155+
timeseries.extend(converter(export_record))
156+
return timeseries
132157

133158
def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
134-
pass
159+
name = sum_record.instrument.name
160+
value = sum_record.aggregator.checkpoint
161+
return [self.create_timeseries(sum_record, name, value)]
135162

136163
def convert_from_min_max_sum_count(
137164
self, min_max_sum_count_record: ExportRecord
138165
) -> TimeSeries:
139-
pass
166+
timeseries = []
167+
agg_types = ["min", "max", "sum", "count"]
168+
for agg_type in agg_types:
169+
name = min_max_sum_count_record.instrument.name + "_" + agg_type
170+
value = getattr(
171+
min_max_sum_count_record.aggregator.checkpoint, agg_type
172+
)
173+
timeseries.append(
174+
self.create_timeseries(min_max_sum_count_record, name, value)
175+
)
176+
return timeseries
140177

141178
def convert_from_histogram(
142179
self, histogram_record: ExportRecord
143180
) -> TimeSeries:
144-
pass
181+
count = 0
182+
timeseries = []
183+
for bound in histogram_record.aggregator.checkpoint.keys():
184+
bb = "+Inf" if bound == float("inf") else str(bound)
185+
name = (
186+
histogram_record.instrument.name + '_bucket{le="' + bb + '"}'
187+
)
188+
value = histogram_record.aggregator.checkpoint[bound]
189+
timeseries.append(
190+
self.create_timeseries(histogram_record, name, value)
191+
)
192+
count += value
193+
name = histogram_record.instrument.name + "_count"
194+
timeseries.append(
195+
self.create_timeseries(histogram_record, name, float(count))
196+
)
197+
return timeseries
145198

146199
def convert_from_last_value(
147200
self, last_value_record: ExportRecord
148201
) -> TimeSeries:
149-
pass
202+
name = last_value_record.instrument.name
203+
value = last_value_record.aggregator.checkpoint
204+
return [self.create_timeseries(last_value_record, name, value)]
150205

151206
def convert_from_value_observer(
152207
self, value_observer_record: ExportRecord
153208
) -> TimeSeries:
154-
pass
209+
timeseries = []
210+
agg_types = ["min", "max", "sum", "count", "last"]
211+
for agg_type in agg_types:
212+
name = value_observer_record.instrument.name + "_" + agg_type
213+
value = getattr(
214+
value_observer_record.aggregator.checkpoint, agg_type
215+
)
216+
timeseries.append(
217+
self.create_timeseries(value_observer_record, name, value)
218+
)
219+
return timeseries
155220

221+
# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
156222
def convert_from_quantile(
157223
self, summary_record: ExportRecord
158224
) -> TimeSeries:
@@ -162,13 +228,37 @@ def convert_from_quantile(
162228
def create_timeseries(
163229
self, export_record: ExportRecord, name, value: float
164230
) -> TimeSeries:
165-
pass
231+
timeseries = TimeSeries()
232+
# Add name label, record labels and resource labels
233+
timeseries.labels.append(self.create_label("__name__", name))
234+
resource_attributes = export_record.resource.attributes
235+
for label_name, label_value in resource_attributes.items():
236+
timeseries.labels.append(
237+
self.create_label(label_name, label_value)
238+
)
239+
for label in export_record.labels:
240+
if label[0] not in resource_attributes.keys():
241+
timeseries.labels.append(self.create_label(label[0], label[1]))
242+
# Add sample
243+
timeseries.samples.append(
244+
self.create_sample(
245+
export_record.aggregator.last_update_timestamp, value
246+
)
247+
)
248+
return timeseries
166249

167250
def create_sample(self, timestamp: int, value: float) -> Sample:
168-
pass
251+
sample = Sample()
252+
sample.timestamp = int(timestamp / 1000000)
253+
sample.value = value
254+
return sample
169255

170256
def create_label(self, name: str, value: str) -> Label:
171-
pass
257+
label = Label()
258+
# Label name must contain only alphanumeric characters and underscores
259+
label.name = re.sub("[^0-9a-zA-Z_]+", "_", name)
260+
label.value = value
261+
return label
172262

173263
def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
174264
pass

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py

+171-12
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,25 @@
1313
# limitations under the License.
1414

1515
import unittest
16+
from unittest import mock
1617

1718
from opentelemetry.exporter.prometheus_remote_write import (
1819
PrometheusRemoteWriteMetricsExporter,
1920
)
21+
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
22+
TimeSeries,
23+
)
24+
from opentelemetry.sdk.metrics import Counter
25+
from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult
26+
from opentelemetry.sdk.metrics.export.aggregate import (
27+
HistogramAggregator,
28+
LastValueAggregator,
29+
MinMaxSumCountAggregator,
30+
SumAggregator,
31+
ValueObserverAggregator,
32+
)
33+
from opentelemetry.sdk.resources import Resource
34+
from opentelemetry.sdk.util import get_dict_as_key
2035

2136

2237
class TestValidation(unittest.TestCase):
@@ -102,35 +117,154 @@ def test_invalid_conflicting_auth_param(self):
102117
class TestConversion(unittest.TestCase):
103118
# Initializes test data that is reused across tests
104119
def setUp(self):
105-
pass
120+
self._test_metric = Counter(
121+
"testname", "testdesc", "testunit", int, None
122+
)
123+
self._exporter = PrometheusRemoteWriteMetricsExporter(
124+
endpoint="/prom/test_endpoint"
125+
)
126+
127+
def generate_record(aggregator_type):
128+
return ExportRecord(
129+
self._test_metric,
130+
None,
131+
aggregator_type(),
132+
Resource({}),
133+
)
134+
135+
self._generate_record = generate_record
136+
137+
def converter_method(record, name, value):
138+
return (type(record.aggregator), name, value)
139+
140+
self._converter_mock = mock.MagicMock(return_value=converter_method)
106141

107142
# Ensures conversion to timeseries function works with valid aggregation types
108143
def test_valid_convert_to_timeseries(self):
109-
pass
144+
timeseries_mock_method = mock.Mock(return_value=["test_value"])
145+
self._exporter.convert_from_sum = timeseries_mock_method
146+
self._exporter.convert_from_min_max_sum_count = timeseries_mock_method
147+
self._exporter.convert_from_histogram = timeseries_mock_method
148+
self._exporter.convert_from_last_value = timeseries_mock_method
149+
self._exporter.convert_from_value_observer = timeseries_mock_method
150+
test_records = [
151+
self._generate_record(SumAggregator),
152+
self._generate_record(MinMaxSumCountAggregator),
153+
self._generate_record(HistogramAggregator),
154+
self._generate_record(LastValueAggregator),
155+
self._generate_record(ValueObserverAggregator),
156+
]
157+
data = self._exporter.convert_to_timeseries(test_records)
158+
self.assertEqual(len(data), 5)
159+
for timeseries in data:
160+
self.assertEqual(timeseries, "test_value")
161+
162+
no_type_records = [self._generate_record(lambda: None)]
163+
with self.assertRaises(ValueError):
164+
self._exporter.convert_to_timeseries(no_type_records)
110165

111166
# Ensures conversion to timeseries fails for unsupported aggregation types
112167
def test_invalid_convert_to_timeseries(self):
113-
pass
168+
no_type_records = [self._generate_record(lambda: None)]
169+
with self.assertRaises(ValueError):
170+
self._exporter.convert_to_timeseries(no_type_records)
114171

115172
# Ensures sum aggregator is correctly converted to timeseries
116173
def test_convert_from_sum(self):
117-
pass
174+
sum_record = self._generate_record(SumAggregator)
175+
sum_record.aggregator.update(3)
176+
sum_record.aggregator.update(2)
177+
sum_record.aggregator.take_checkpoint()
178+
179+
self._exporter.create_timeseries = self._converter_mock()
180+
timeseries = self._exporter.convert_from_sum(sum_record)
181+
self.assertEqual(timeseries[0], (SumAggregator, "testname", 5))
118182

119183
# Ensures sum min_max_count aggregator is correctly converted to timeseries
120184
def test_convert_from_min_max_sum_count(self):
121-
pass
185+
min_max_sum_count_record = self._generate_record(
186+
MinMaxSumCountAggregator
187+
)
188+
min_max_sum_count_record.aggregator.update(5)
189+
min_max_sum_count_record.aggregator.update(1)
190+
min_max_sum_count_record.aggregator.take_checkpoint()
191+
192+
self._exporter.create_timeseries = self._converter_mock()
193+
timeseries = self._exporter.convert_from_min_max_sum_count(
194+
min_max_sum_count_record
195+
)
196+
self.assertEqual(
197+
timeseries[0], (MinMaxSumCountAggregator, "testname_min", 1)
198+
)
199+
self.assertEqual(
200+
timeseries[1], (MinMaxSumCountAggregator, "testname_max", 5)
201+
)
202+
self.assertEqual(
203+
timeseries[2], (MinMaxSumCountAggregator, "testname_sum", 6)
204+
)
205+
self.assertEqual(
206+
timeseries[3], (MinMaxSumCountAggregator, "testname_count", 2)
207+
)
122208

123209
# Ensures histogram aggregator is correctly converted to timeseries
124210
def test_convert_from_histogram(self):
125-
pass
211+
histogram_record = self._generate_record(HistogramAggregator)
212+
histogram_record.aggregator.update(5)
213+
histogram_record.aggregator.update(2)
214+
histogram_record.aggregator.update(-1)
215+
histogram_record.aggregator.take_checkpoint()
216+
217+
self._exporter.create_timeseries = self._converter_mock()
218+
timeseries = self._exporter.convert_from_histogram(histogram_record)
219+
self.assertEqual(
220+
timeseries[0], (HistogramAggregator, 'testname_bucket{le="0"}', 1)
221+
)
222+
self.assertEqual(
223+
timeseries[1],
224+
(HistogramAggregator, 'testname_bucket{le="+Inf"}', 2),
225+
)
226+
self.assertEqual(
227+
timeseries[2], (HistogramAggregator, "testname_count", 3)
228+
)
126229

127230
# Ensures last value aggregator is correctly converted to timeseries
128231
def test_convert_from_last_value(self):
129-
pass
232+
last_value_record = self._generate_record(LastValueAggregator)
233+
last_value_record.aggregator.update(1)
234+
last_value_record.aggregator.update(5)
235+
last_value_record.aggregator.take_checkpoint()
236+
237+
self._exporter.create_timeseries = self._converter_mock()
238+
timeseries = self._exporter.convert_from_last_value(last_value_record)
239+
self.assertEqual(timeseries[0], (LastValueAggregator, "testname", 5))
130240

131241
# Ensures value observer aggregator is correctly converted to timeseries
132242
def test_convert_from_value_observer(self):
133-
pass
243+
value_observer_record = self._generate_record(ValueObserverAggregator)
244+
value_observer_record.aggregator.update(5)
245+
value_observer_record.aggregator.update(1)
246+
value_observer_record.aggregator.update(2)
247+
value_observer_record.aggregator.take_checkpoint()
248+
249+
self._exporter.create_timeseries = self._converter_mock()
250+
timeseries = self._exporter.convert_from_value_observer(
251+
value_observer_record
252+
)
253+
self.assertEqual(
254+
timeseries[0], (ValueObserverAggregator, "testname_min", 1)
255+
)
256+
self.assertEqual(
257+
timeseries[1], (ValueObserverAggregator, "testname_max", 5)
258+
)
259+
self.assertEqual(
260+
timeseries[2], (ValueObserverAggregator, "testname_sum", 8)
261+
)
262+
self.assertEqual(
263+
timeseries[3], (ValueObserverAggregator, "testname_count", 3)
264+
)
265+
self.assertEqual(
266+
timeseries[4], (ValueObserverAggregator, "testname_last", 2)
267+
)
134268

135269
# Ensures quantile aggregator is correctly converted to timeseries
136270
# TODO: Add test once method is implemented
@@ -139,11 +273,36 @@ def test_convert_from_quantile(self):
139273

140274
# Ensures timeseries produced contains appropriate sample and labels
141275
def test_create_timeseries(self):
142-
pass
276+
sum_aggregator = SumAggregator()
277+
sum_aggregator.update(5)
278+
sum_aggregator.take_checkpoint()
279+
sum_aggregator.last_update_timestamp = 10
280+
export_record = ExportRecord(
281+
self._test_metric,
282+
get_dict_as_key({"record_name": "record_value"}),
283+
sum_aggregator,
284+
Resource({"resource_name": "resource_value"}),
285+
)
143286

144-
# Ensure correct headers are added when valid config is provided
145-
def test_get_headers(self):
146-
pass
287+
expected_timeseries = TimeSeries()
288+
expected_timeseries.labels.append(
289+
self._exporter.create_label("__name__", "testname")
290+
)
291+
expected_timeseries.labels.append(
292+
self._exporter.create_label("resource_name", "resource_value")
293+
)
294+
expected_timeseries.labels.append(
295+
self._exporter.create_label("record_name", "record_value")
296+
)
297+
expected_timeseries.samples.append(
298+
self._exporter.create_sample(10, 5.0),
299+
)
300+
timeseries = self._exporter.create_timeseries(
301+
export_record,
302+
"testname",
303+
5.0,
304+
)
305+
self.assertEqual(timeseries, expected_timeseries)
147306

148307

149308
class TestExport(unittest.TestCase):

0 commit comments

Comments
 (0)