16
16
import re
17
17
from typing import Dict , Sequence
18
18
19
+ from collections import defaultdict
20
+ from itertools import chain
19
21
import requests
20
22
import snappy
21
23
30
32
from opentelemetry .sdk .metrics .export import (
31
33
MetricExporter ,
32
34
MetricExportResult ,
33
- AggregationTemporality ,
34
35
Gauge ,
35
36
Sum ,
36
37
Histogram ,
38
+ MetricExportResult ,
39
+ MetricsData ,
40
+ Metric ,
37
41
)
38
42
#from opentelemetry.sdk.metrics.export.aggregate import (
39
43
# HistogramAggregator,
@@ -162,8 +166,8 @@ def headers(self, headers: Dict):
162
166
self ._headers = headers
163
167
164
168
def export (
165
- self , export_records : Sequence [ ExportRecord ]
166
- ) -> MetricsExportResult :
169
+ self , export_records
170
+ ) -> MetricExportResult :
167
171
if not export_records :
168
172
return MetricsExportResult .SUCCESS
169
173
timeseries = self ._convert_to_timeseries (export_records )
@@ -181,9 +185,82 @@ def shutdown(self) -> None:
181
185
182
186
def _translate_data (self , data : MetricsData ):
183
187
rw_timeseries = []
184
-
188
+
189
+ for resource_metrics in data .resource_metrics :
190
+ resource = resource_metrics .resource
191
+ # OTLP Data model suggests combining some attrs into job/instance
192
+ # Should we do that here?
193
+ resource_labels = self ._get_resource_labels (resource .attributes )
194
+ # Scope name/version probably not too useful from a labeling perspective
195
+ for scope_metrics in resource_metrics .scope_metrics :
196
+ for metric in scope_metrics .metrics :
197
+ rw_timeseries .extend ( self ._parse_metric (metric ,resource_labels ) )
198
+
199
+ def _get_resource_labels (self ,attrs ):
200
+ """ Converts Resource Attributes to Prometheus Labels based on
201
+ OTLP Metric Data Model's recommendations on Resource Attributes
202
+ """
203
+ return [ (n ,str (v )) for n ,v in resource .attributes .items () ]
204
+
205
+ def _parse_metric (self , metric : Metric , resource_labels : Sequence ) -> Sequence [TimeSeries ]:
206
+ """
207
+ Parses the Metric & lower objects, then converts the output into
208
+ OM TimeSeries. Returns a List of TimeSeries objects based on one Metric
209
+ """
210
+ # datapoints have attributes associated with them. these would be sent
211
+ # to RW as different metrics: name & labels is a unique time series
212
+ sample_sets = defaultdict (list )
213
+ if isinstance (metric .data ,(Gauge ,Sum )):
214
+ for dp in metric .data .data_points :
215
+ attrs ,sample = self ._parse_data_point (dp )
216
+ sample_sets [attrs ].append (sample )
217
+ elif isinstance (metric .data ,(HistogramType )):
218
+ raise NotImplementedError ("Coming sooN!" )
219
+ else :
220
+ logger .warn ("Unsupported Metric Type: %s" ,type (metric .data ))
221
+ return []
222
+
223
+ # Create the metric name, will be a label later
224
+ if metric .unit :
225
+ #Prom. naming guidelines add unit to the name
226
+ name = f"{ metric .name } _{ metric .unit } "
227
+ else :
228
+ name = metric .name
229
+
230
+ timeseries = []
231
+ for labels , samples in sample_sets .items ():
232
+ ts = TimeSeries ()
233
+ ts .labels .append (self ._label ("__name__" ,name ))
234
+ for label_name ,label_value in chain (resource_labels ,labels ):
235
+ # Previous implementation did not str() the names...
236
+ ts .labels .append (self ._label (label_name ,str (label_value )))
237
+ for value ,timestamp in samples :
238
+ ts .samples .append (self ._sample (value ,timestamp ))
239
+ timeseries .append (ts )
240
+ return timeseries
241
+
242
+ def _sample (self ,value ,timestamp :int ):
243
+ sample = Sample ()
244
+ sample .value = value
245
+ sample .timestamp = timestamp
246
+ return sample
247
+
248
+ def _label (self ,name :str ,value :str ):
249
+ label = Label ()
250
+ label .name = name
251
+ label .value = value
252
+ return label
253
+
254
+ def _parse_data_point (self , data_point ):
255
+
256
+ attrs = tuple (data_point .attributes .items ())
257
+ #TODO: Optimize? create Sample here
258
+ # remote write time is in milliseconds
259
+ sample = (data_point .value ,(data_point .time_unix_nano // 1_000_000 ))
260
+ return attrs ,sample
261
+
185
262
def _convert_to_timeseries (
186
- self , export_records : Sequence [ ExportRecord ]
263
+ self , export_records
187
264
) -> Sequence [TimeSeries ]:
188
265
timeseries = []
189
266
for export_record in export_records :
@@ -199,7 +276,7 @@ def _convert_to_timeseries(
199
276
return timeseries
200
277
201
278
def _convert_from_sum (
202
- self , sum_record : ExportRecord
279
+ self , sum_record
203
280
) -> Sequence [TimeSeries ]:
204
281
return [
205
282
self ._create_timeseries (
@@ -211,22 +288,9 @@ def _convert_from_sum(
211
288
212
289
def _convert_from_gauge (self , gauge_record ):
213
290
raise NotImplementedError ("Do this" )
214
- def _convert_from_min_max_sum_count (
215
- self , min_max_sum_count_record : ExportRecord
216
- ) -> Sequence [TimeSeries ]:
217
- timeseries = []
218
- for agg_type in ["min" , "max" , "sum" , "count" ]:
219
- name = min_max_sum_count_record .instrument .name + "_" + agg_type
220
- value = getattr (
221
- min_max_sum_count_record .aggregator .checkpoint , agg_type
222
- )
223
- timeseries .append (
224
- self ._create_timeseries (min_max_sum_count_record , name , value )
225
- )
226
- return timeseries
227
291
228
292
def _convert_from_histogram (
229
- self , histogram_record : ExportRecord
293
+ self , histogram_record
230
294
) -> Sequence [TimeSeries ]:
231
295
timeseries = []
232
296
for bound in histogram_record .aggregator .checkpoint .keys ():
@@ -242,43 +306,10 @@ def _convert_from_histogram(
242
306
)
243
307
return timeseries
244
308
245
- def _convert_from_last_value (
246
- self , last_value_record : ExportRecord
247
- ) -> Sequence [TimeSeries ]:
248
- return [
249
- self ._create_timeseries (
250
- last_value_record ,
251
- last_value_record .instrument .name + "_last" ,
252
- last_value_record .aggregator .checkpoint ,
253
- )
254
- ]
255
-
256
- def _convert_from_value_observer (
257
- self , value_observer_record : ExportRecord
258
- ) -> Sequence [TimeSeries ]:
259
- timeseries = []
260
- for agg_type in ["min" , "max" , "sum" , "count" , "last" ]:
261
- timeseries .append (
262
- self ._create_timeseries (
263
- value_observer_record ,
264
- value_observer_record .instrument .name + "_" + agg_type ,
265
- getattr (
266
- value_observer_record .aggregator .checkpoint , agg_type
267
- ),
268
- )
269
- )
270
- return timeseries
271
-
272
- # TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
273
- def _convert_from_quantile (
274
- self , summary_record : ExportRecord
275
- ) -> Sequence [TimeSeries ]:
276
- raise NotImplementedError ()
277
-
278
309
# pylint: disable=no-member,no-self-use
279
310
def _create_timeseries (
280
311
self ,
281
- export_record : ExportRecord ,
312
+ export_record ,
282
313
name : str ,
283
314
value : float ,
284
315
extra_label : (str , str ) = None ,
@@ -344,7 +375,7 @@ def _build_headers(self) -> Dict:
344
375
345
376
def _send_message (
346
377
self , message : bytes , headers : Dict
347
- ) -> MetricsExportResult :
378
+ ) -> MetricExportResult :
348
379
auth = None
349
380
if self .basic_auth :
350
381
auth = (self .basic_auth ["username" ], self .basic_auth ["password" ])
0 commit comments