Skip to content

Commit 3fae967

Browse files
Move encoding of metrics to opentelemetry-exporter-otlp-proto-common
1 parent 9e885f8 commit 3fae967

File tree

4 files changed

+209
-342
lines changed
  • exporter
    • opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/metrics_encoder
    • opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter
    • opentelemetry-exporter-otlp-proto-http

4 files changed

+209
-342
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import logging
15+
from typing import Sequence, Any, Mapping
16+
17+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
18+
ExportMetricsServiceRequest,
19+
)
20+
from opentelemetry.proto.common.v1.common_pb2 import (
21+
KeyValue,
22+
InstrumentationScope,
23+
AnyValue,
24+
ArrayValue,
25+
KeyValueList,
26+
)
27+
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2
28+
from opentelemetry.proto.resource.v1.resource_pb2 import Resource
29+
from opentelemetry.sdk.metrics._internal.point import (
30+
MetricsData,
31+
Gauge,
32+
Histogram,
33+
Sum,
34+
)
35+
36+
_logger = logging.getLogger(__name__)
37+
38+
39+
def _translate_attributes(attributes) -> Sequence[KeyValue]:
40+
output = []
41+
if attributes:
42+
43+
for key, value in attributes.items():
44+
try:
45+
output.append(_translate_key_values(key, value))
46+
except Exception as error: # pylint: disable=broad-except
47+
_logger.exception(error)
48+
return output
49+
50+
51+
def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
52+
resource_metrics_dict = {}
53+
54+
for resource_metrics in data.resource_metrics:
55+
56+
resource = resource_metrics.resource
57+
58+
# It is safe to assume that each entry in data.resource_metrics is
59+
# associated with an unique resource.
60+
scope_metrics_dict = {}
61+
62+
resource_metrics_dict[resource] = scope_metrics_dict
63+
64+
for scope_metrics in resource_metrics.scope_metrics:
65+
66+
instrumentation_scope = scope_metrics.scope
67+
68+
# The SDK groups metrics in instrumentation scopes already so
69+
# there is no need to check for existing instrumentation scopes
70+
# here.
71+
pb2_scope_metrics = pb2.ScopeMetrics(
72+
scope=InstrumentationScope(
73+
name=instrumentation_scope.name,
74+
version=instrumentation_scope.version,
75+
)
76+
)
77+
78+
scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics
79+
80+
for metric in scope_metrics.metrics:
81+
pb2_metric = pb2.Metric(
82+
name=metric.name,
83+
description=metric.description,
84+
unit=metric.unit,
85+
)
86+
87+
if isinstance(metric.data, Gauge):
88+
for data_point in metric.data.data_points:
89+
pt = pb2.NumberDataPoint(
90+
attributes=_translate_attributes(
91+
data_point.attributes
92+
),
93+
time_unix_nano=data_point.time_unix_nano,
94+
)
95+
if isinstance(data_point.value, int):
96+
pt.as_int = data_point.value
97+
else:
98+
pt.as_double = data_point.value
99+
pb2_metric.gauge.data_points.append(pt)
100+
101+
elif isinstance(metric.data, Histogram):
102+
for data_point in metric.data.data_points:
103+
pt = pb2.HistogramDataPoint(
104+
attributes=_translate_attributes(
105+
data_point.attributes
106+
),
107+
time_unix_nano=data_point.time_unix_nano,
108+
start_time_unix_nano=(
109+
data_point.start_time_unix_nano
110+
),
111+
count=data_point.count,
112+
sum=data_point.sum,
113+
bucket_counts=data_point.bucket_counts,
114+
explicit_bounds=data_point.explicit_bounds,
115+
max=data_point.max,
116+
min=data_point.min,
117+
)
118+
pb2_metric.histogram.aggregation_temporality = (
119+
metric.data.aggregation_temporality
120+
)
121+
pb2_metric.histogram.data_points.append(pt)
122+
123+
elif isinstance(metric.data, Sum):
124+
for data_point in metric.data.data_points:
125+
pt = pb2.NumberDataPoint(
126+
attributes=_translate_attributes(
127+
data_point.attributes
128+
),
129+
start_time_unix_nano=(
130+
data_point.start_time_unix_nano
131+
),
132+
time_unix_nano=data_point.time_unix_nano,
133+
)
134+
if isinstance(data_point.value, int):
135+
pt.as_int = data_point.value
136+
else:
137+
pt.as_double = data_point.value
138+
# note that because sum is a message type, the
139+
# fields must be set individually rather than
140+
# instantiating a pb2.Sum and setting it once
141+
pb2_metric.sum.aggregation_temporality = (
142+
metric.data.aggregation_temporality
143+
)
144+
pb2_metric.sum.is_monotonic = metric.data.is_monotonic
145+
pb2_metric.sum.data_points.append(pt)
146+
else:
147+
_logger.warning(
148+
"unsupported data type %s",
149+
metric.data.__class__.__name__,
150+
)
151+
continue
152+
153+
pb2_scope_metrics.metrics.append(pb2_metric)
154+
155+
resource_data = []
156+
for (
157+
sdk_resource,
158+
scope_data,
159+
) in resource_metrics_dict.items():
160+
resource_data.append(
161+
pb2.ResourceMetrics(
162+
resource=Resource(
163+
attributes=_translate_attributes(sdk_resource.attributes)
164+
),
165+
scope_metrics=scope_data.values(),
166+
)
167+
)
168+
resource_metrics = resource_data
169+
return ExportMetricsServiceRequest(resource_metrics=resource_metrics)
170+
171+
172+
def _translate_value(value: Any) -> KeyValue:
173+
if isinstance(value, bool):
174+
return AnyValue(bool_value=value)
175+
elif isinstance(value, str):
176+
return AnyValue(string_value=value)
177+
elif isinstance(value, int):
178+
return AnyValue(int_value=value)
179+
elif isinstance(value, float):
180+
return AnyValue(double_value=value)
181+
elif isinstance(value, Sequence):
182+
return AnyValue(
183+
array_value=ArrayValue(values=[_translate_value(v) for v in value])
184+
)
185+
elif isinstance(value, Mapping):
186+
return AnyValue(
187+
kvlist_value=KeyValueList(
188+
values=[
189+
_translate_key_values(str(k), v) for k, v in value.items()
190+
]
191+
)
192+
)
193+
raise Exception(f"Invalid type {type(value)} of value {value}")
194+
195+
196+
def _translate_key_values(key: str, value: Any) -> KeyValue:
197+
return KeyValue(key=key, value=_translate_value(value))

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

+3-120
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
from os import environ
1717
from typing import Dict, Iterable, List, Optional, Sequence
1818
from grpc import ChannelCredentials, Compression
19+
20+
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics
1921
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
2022
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
2123
OTLPExporterMixin,
22-
get_resource_data,
2324
_get_credentials,
2425
environ_to_compression,
2526
)
@@ -29,8 +30,6 @@
2930
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
3031
MetricsServiceStub,
3132
)
32-
from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope
33-
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2
3433
from opentelemetry.sdk.environment_variables import (
3534
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
3635
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
@@ -51,15 +50,12 @@
5150
from opentelemetry.sdk.metrics.export import (
5251
AggregationTemporality,
5352
DataPointT,
54-
Gauge,
55-
Histogram as HistogramType,
5653
Metric,
5754
MetricExporter,
5855
MetricExportResult,
5956
MetricsData,
6057
ResourceMetrics,
6158
ScopeMetrics,
62-
Sum,
6359
)
6460

6561
_logger = getLogger(__name__)
@@ -118,7 +114,6 @@ def __init__(
118114
else compression
119115
)
120116

121-
instrument_class_temporality = {}
122117
if (
123118
environ.get(
124119
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
@@ -169,119 +164,7 @@ def __init__(
169164
def _translate_data(
170165
self, data: MetricsData
171166
) -> ExportMetricsServiceRequest:
172-
173-
resource_metrics_dict = {}
174-
175-
for resource_metrics in data.resource_metrics:
176-
177-
resource = resource_metrics.resource
178-
179-
# It is safe to assume that each entry in data.resource_metrics is
180-
# associated with an unique resource.
181-
scope_metrics_dict = {}
182-
183-
resource_metrics_dict[resource] = scope_metrics_dict
184-
185-
for scope_metrics in resource_metrics.scope_metrics:
186-
187-
instrumentation_scope = scope_metrics.scope
188-
189-
# The SDK groups metrics in instrumentation scopes already so
190-
# there is no need to check for existing instrumentation scopes
191-
# here.
192-
pb2_scope_metrics = pb2.ScopeMetrics(
193-
scope=InstrumentationScope(
194-
name=instrumentation_scope.name,
195-
version=instrumentation_scope.version,
196-
)
197-
)
198-
199-
scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics
200-
201-
for metric in scope_metrics.metrics:
202-
pb2_metric = pb2.Metric(
203-
name=metric.name,
204-
description=metric.description,
205-
unit=metric.unit,
206-
)
207-
208-
if isinstance(metric.data, Gauge):
209-
for data_point in metric.data.data_points:
210-
pt = pb2.NumberDataPoint(
211-
attributes=self._translate_attributes(
212-
data_point.attributes
213-
),
214-
time_unix_nano=data_point.time_unix_nano,
215-
)
216-
if isinstance(data_point.value, int):
217-
pt.as_int = data_point.value
218-
else:
219-
pt.as_double = data_point.value
220-
pb2_metric.gauge.data_points.append(pt)
221-
222-
elif isinstance(metric.data, HistogramType):
223-
for data_point in metric.data.data_points:
224-
pt = pb2.HistogramDataPoint(
225-
attributes=self._translate_attributes(
226-
data_point.attributes
227-
),
228-
time_unix_nano=data_point.time_unix_nano,
229-
start_time_unix_nano=(
230-
data_point.start_time_unix_nano
231-
),
232-
count=data_point.count,
233-
sum=data_point.sum,
234-
bucket_counts=data_point.bucket_counts,
235-
explicit_bounds=data_point.explicit_bounds,
236-
max=data_point.max,
237-
min=data_point.min,
238-
)
239-
pb2_metric.histogram.aggregation_temporality = (
240-
metric.data.aggregation_temporality
241-
)
242-
pb2_metric.histogram.data_points.append(pt)
243-
244-
elif isinstance(metric.data, Sum):
245-
for data_point in metric.data.data_points:
246-
pt = pb2.NumberDataPoint(
247-
attributes=self._translate_attributes(
248-
data_point.attributes
249-
),
250-
start_time_unix_nano=(
251-
data_point.start_time_unix_nano
252-
),
253-
time_unix_nano=data_point.time_unix_nano,
254-
)
255-
if isinstance(data_point.value, int):
256-
pt.as_int = data_point.value
257-
else:
258-
pt.as_double = data_point.value
259-
# note that because sum is a message type, the
260-
# fields must be set individually rather than
261-
# instantiating a pb2.Sum and setting it once
262-
pb2_metric.sum.aggregation_temporality = (
263-
metric.data.aggregation_temporality
264-
)
265-
pb2_metric.sum.is_monotonic = (
266-
metric.data.is_monotonic
267-
)
268-
pb2_metric.sum.data_points.append(pt)
269-
else:
270-
_logger.warning(
271-
"unsupported data type %s",
272-
metric.data.__class__.__name__,
273-
)
274-
continue
275-
276-
pb2_scope_metrics.metrics.append(pb2_metric)
277-
278-
return ExportMetricsServiceRequest(
279-
resource_metrics=get_resource_data(
280-
resource_metrics_dict,
281-
pb2.ResourceMetrics,
282-
"metrics",
283-
)
284-
)
167+
return encode_metrics(data)
285168

286169
def export(
287170
self,

0 commit comments

Comments
 (0)