Skip to content

Commit d550a5f

Browse files
committed
Add conversion to timeseries methods
1 parent 18a25f4 commit d550a5f

File tree

2 files changed

+275
-20
lines changed

2 files changed

+275
-20
lines changed

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py

+101-10
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import re
1515
from typing import Dict, Sequence
1616

17+
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
18+
WriteRequest,
19+
)
1720
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
1821
Label,
1922
Sample,
@@ -24,6 +27,13 @@
2427
MetricsExporter,
2528
MetricsExportResult,
2629
)
30+
from opentelemetry.sdk.metrics.export.aggregate import (
31+
HistogramAggregator,
32+
LastValueAggregator,
33+
MinMaxSumCountAggregator,
34+
SumAggregator,
35+
ValueObserverAggregator,
36+
)
2737

2838

2939
class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
@@ -131,31 +141,88 @@ def shutdown(self) -> None:
131141
def convert_to_timeseries(
132142
self, export_records: Sequence[ExportRecord]
133143
) -> Sequence[TimeSeries]:
134-
raise NotImplementedError()
144+
converter_map = {
145+
MinMaxSumCountAggregator: self.convert_from_min_max_sum_count,
146+
SumAggregator: self.convert_from_sum,
147+
HistogramAggregator: self.convert_from_histogram,
148+
LastValueAggregator: self.convert_from_last_value,
149+
ValueObserverAggregator: self.convert_from_last_value,
150+
}
151+
timeseries = []
152+
for export_record in export_records:
153+
aggregator_type = type(export_record.aggregator)
154+
converter = converter_map.get(aggregator_type)
155+
if not converter:
156+
raise ValueError(
157+
str(aggregator_type) + " conversion is not supported"
158+
)
159+
timeseries.extend(converter(export_record))
160+
return timeseries
135161

136162
def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
137-
raise NotImplementedError()
163+
name = sum_record.instrument.name
164+
value = sum_record.aggregator.checkpoint
165+
return [self.create_timeseries(sum_record, name, value)]
138166

139167
def convert_from_min_max_sum_count(
140168
self, min_max_sum_count_record: ExportRecord
141169
) -> TimeSeries:
142-
raise NotImplementedError()
170+
timeseries = []
171+
agg_types = ["min", "max", "sum", "count"]
172+
for agg_type in agg_types:
173+
name = min_max_sum_count_record.instrument.name + "_" + agg_type
174+
value = getattr(
175+
min_max_sum_count_record.aggregator.checkpoint, agg_type
176+
)
177+
timeseries.append(
178+
self.create_timeseries(min_max_sum_count_record, name, value)
179+
)
180+
return timeseries
143181

144182
def convert_from_histogram(
145183
self, histogram_record: ExportRecord
146184
) -> TimeSeries:
147-
raise NotImplementedError()
185+
count = 0
186+
timeseries = []
187+
for bound in histogram_record.aggregator.checkpoint.keys():
188+
bb = "+Inf" if bound == float("inf") else str(bound)
189+
name = (
190+
histogram_record.instrument.name + '_bucket{le="' + bb + '"}'
191+
)
192+
value = histogram_record.aggregator.checkpoint[bound]
193+
timeseries.append(
194+
self.create_timeseries(histogram_record, name, value)
195+
)
196+
count += value
197+
name = histogram_record.instrument.name + "_count"
198+
timeseries.append(
199+
self.create_timeseries(histogram_record, name, float(count))
200+
)
201+
return timeseries
148202

149203
def convert_from_last_value(
150204
self, last_value_record: ExportRecord
151205
) -> TimeSeries:
152-
raise NotImplementedError()
206+
name = last_value_record.instrument.name
207+
value = last_value_record.aggregator.checkpoint
208+
return [self.create_timeseries(last_value_record, name, value)]
153209

154210
def convert_from_value_observer(
155211
self, value_observer_record: ExportRecord
156212
) -> TimeSeries:
157-
raise NotImplementedError()
213+
timeseries = []
214+
agg_types = ["min", "max", "sum", "count", "last"]
215+
for agg_type in agg_types:
216+
name = value_observer_record.instrument.name + "_" + agg_type
217+
value = getattr(
218+
value_observer_record.aggregator.checkpoint, agg_type
219+
)
220+
timeseries.append(
221+
self.create_timeseries(value_observer_record, name, value)
222+
)
223+
return timeseries
158224

225+
# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
159226
def convert_from_quantile(
160227
self, summary_record: ExportRecord
161228
) -> TimeSeries:
@@ -165,13 +232,37 @@ def convert_from_quantile(
165232
def create_timeseries(
166233
self, export_record: ExportRecord, name, value: float
167234
) -> TimeSeries:
168-
raise NotImplementedError()
235+
timeseries = TimeSeries()
236+
# Add name label, record labels and resource labels
237+
timeseries.labels.append(self.create_label("__name__", name))
238+
resource_attributes = export_record.resource.attributes
239+
for label_name, label_value in resource_attributes.items():
240+
timeseries.labels.append(
241+
self.create_label(label_name, label_value)
242+
)
243+
for label in export_record.labels:
244+
if label[0] not in resource_attributes.keys():
245+
timeseries.labels.append(self.create_label(label[0], label[1]))
246+
# Add sample
247+
timeseries.samples.append(
248+
self.create_sample(
249+
export_record.aggregator.last_update_timestamp, value
250+
)
251+
)
252+
return timeseries
169253

170254
def create_sample(self, timestamp: int, value: float) -> Sample:
171-
raise NotImplementedError()
255+
sample = Sample()
256+
sample.timestamp = int(timestamp / 1000000)
257+
sample.value = value
258+
return sample
172259

173260
def create_label(self, name: str, value: str) -> Label:
174-
raise NotImplementedError()
261+
label = Label()
262+
# Label name must contain only alphanumeric characters and underscores
263+
label.name = re.sub("[^0-9a-zA-Z_]+", "_", name)
264+
label.value = value
265+
return label
175266

176267
def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
177268
raise NotImplementedError()

0 commit comments

Comments
 (0)