Skip to content

Commit 5ed1b5c

Browse files
committed
Add a configurable max_export_batch_size to the gRPC metrics exporter
1 parent b9a6358 commit 5ed1b5c

File tree

4 files changed

+305
-4
lines changed

4 files changed

+305
-4
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD)
99

10+
- Add a configurable max_export_batch_size to the gRPC metrics exporter
11+
([#2809](https://github.com/open-telemetry/opentelemetry-python/pull/2809))
12+
1013
## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-04
1114

1215

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

+87-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from logging import getLogger
1515
from os import environ
16-
from typing import Optional, Sequence
16+
from typing import Iterable, List, Optional, Sequence
1717
from grpc import ChannelCredentials, Compression
1818
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
1919
OTLPExporterMixin,
@@ -41,6 +41,8 @@
4141
MetricExporter,
4242
MetricExportResult,
4343
MetricsData,
44+
ResourceMetrics,
45+
ScopeMetrics,
4446
)
4547

4648
_logger = getLogger(__name__)
@@ -61,6 +63,7 @@ def __init__(
6163
headers: Optional[Sequence] = None,
6264
timeout: Optional[int] = None,
6365
compression: Optional[Compression] = None,
66+
max_export_batch_size: Optional[int] = None,
6467
):
6568

6669
if insecure is None:
@@ -79,6 +82,8 @@ def __init__(
7982
}
8083
)
8184

85+
self.max_export_batch_size: Optional[int] = max_export_batch_size
86+
8287
def _translate_data(
8388
self, data: MetricsData
8489
) -> ExportMetricsServiceRequest:
@@ -180,8 +185,8 @@ def _translate_data(
180185
)
181186
pb2_metric.sum.data_points.append(pt)
182187
else:
183-
_logger.warn(
184-
"unsupported datapoint type %s", metric.point
188+
_logger.warning(
189+
"unsupported datapoint type %s", metric.data
185190
)
186191
continue
187192

@@ -202,7 +207,85 @@ def export(
202207
**kwargs,
203208
) -> MetricExportResult:
204209
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
205-
return self._export(metrics_data)
210+
if not self.max_export_batch_size:
211+
return self._export(data=metrics_data)
212+
213+
if len(metrics_data) <= self.max_export_batch_size:
214+
return self._export(data=metrics_data)
215+
216+
for split_metrics_data in self._split_metrics_data(metrics_data):
217+
export_result: MetricExportResult = self._export(
218+
data=split_metrics_data
219+
)
220+
if export_result is MetricExportResult.FAILURE:
221+
return export_result
222+
223+
return MetricExportResult.SUCCESS
224+
225+
def _split_metrics_data(
226+
self,
227+
metrics_data: MetricsData,
228+
) -> Iterable[MetricsData]:
229+
for index in range(0, len(metrics_data), self.max_export_batch_size):
230+
yield self.__split_metrics_data(
231+
start_metric_index=index,
232+
stop_metric_index=index + self.max_export_batch_size,
233+
metrics_data=metrics_data,
234+
)
235+
236+
def __split_metrics_data(
237+
self,
238+
start_metric_index: int,
239+
stop_metric_index: int,
240+
metrics_data: MetricsData,
241+
) -> MetricsData:
242+
metric_index: int = 0
243+
split_resource_metrics: List[ResourceMetrics] = []
244+
245+
for resource_metrics in metrics_data.resource_metrics:
246+
247+
if metric_index + len(resource_metrics) <= start_metric_index:
248+
metric_index += len(resource_metrics)
249+
continue
250+
251+
if metric_index >= stop_metric_index:
252+
break
253+
254+
split_scope_metrics: List[ScopeMetrics] = []
255+
256+
split_resource_metrics.append(
257+
ResourceMetrics(
258+
resource=resource_metrics.resource,
259+
schema_url=resource_metrics.schema_url,
260+
scope_metrics=split_scope_metrics,
261+
)
262+
)
263+
264+
for scope_metrics in resource_metrics.scope_metrics:
265+
266+
if metric_index + len(scope_metrics) <= start_metric_index:
267+
metric_index += len(scope_metrics)
268+
continue
269+
270+
if metric_index >= stop_metric_index:
271+
break
272+
273+
split_metrics: Sequence[Metric] = scope_metrics.metrics[
274+
max(start_metric_index - metric_index, 0) : (
275+
stop_metric_index - metric_index
276+
)
277+
]
278+
metric_index += len(split_metrics)
279+
280+
split_scope_metrics.append(
281+
ScopeMetrics(
282+
scope=scope_metrics.scope,
283+
schema_url=scope_metrics.schema_url,
284+
metrics=split_metrics,
285+
)
286+
)
287+
288+
return MetricsData(resource_metrics=split_resource_metrics)
206289

207290
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
208291
pass

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py

+199
Original file line numberDiff line numberDiff line change
@@ -922,3 +922,202 @@ def test_translate_multiple_scope_histogram(self):
922922
# pylint: disable=protected-access
923923
actual = self.exporter._translate_data(self.multiple_scope_histogram)
924924
self.assertEqual(expected, actual)
925+
926+
def test_split_metrics_data(self):
927+
metrics_data = MetricsData(
928+
resource_metrics=[
929+
ResourceMetrics(
930+
resource=Resource(
931+
attributes={"a": 1, "b": False},
932+
schema_url="resource_schema_url_1",
933+
),
934+
scope_metrics=[
935+
ScopeMetrics(
936+
scope=SDKInstrumentationScope(
937+
name="first_name",
938+
version="first_version",
939+
schema_url="insrumentation_scope_schema_url_1",
940+
),
941+
metrics=[_generate_sum("sum_int", 1)],
942+
schema_url="instrumentation_scope_schema_url_1",
943+
),
944+
ScopeMetrics(
945+
scope=SDKInstrumentationScope(
946+
name="first_name",
947+
version="first_version",
948+
schema_url="insrumentation_scope_schema_url_2",
949+
),
950+
metrics=[
951+
_generate_sum("sum_int", 2),
952+
_generate_sum("sum_int", 3),
953+
],
954+
schema_url="instrumentation_scope_schema_url_2",
955+
),
956+
],
957+
schema_url="resource_schema_url_1",
958+
),
959+
ResourceMetrics(
960+
resource=Resource(
961+
attributes={"a": 1, "b": False},
962+
schema_url="resource_schema_url_2",
963+
),
964+
scope_metrics=[
965+
ScopeMetrics(
966+
scope=SDKInstrumentationScope(
967+
name="first_name",
968+
version="first_version",
969+
schema_url="insrumentation_scope_schema_url_3",
970+
),
971+
metrics=[_generate_sum("sum_int", 4)],
972+
schema_url="instrumentation_scope_schema_url_3",
973+
)
974+
],
975+
schema_url="resource_schema_url_2",
976+
),
977+
]
978+
)
979+
self.assertEqual(
980+
[
981+
MetricsData(
982+
resource_metrics=[
983+
ResourceMetrics(
984+
resource=Resource(
985+
attributes={"a": 1, "b": False},
986+
schema_url="resource_schema_url_1",
987+
),
988+
scope_metrics=[
989+
ScopeMetrics(
990+
scope=SDKInstrumentationScope(
991+
name="first_name",
992+
version="first_version",
993+
schema_url="insrumentation_scope_schema_url_1",
994+
),
995+
metrics=[_generate_sum("sum_int", 1)],
996+
schema_url="instrumentation_scope_schema_url_1",
997+
),
998+
ScopeMetrics(
999+
scope=SDKInstrumentationScope(
1000+
name="first_name",
1001+
version="first_version",
1002+
schema_url="insrumentation_scope_schema_url_2",
1003+
),
1004+
metrics=[_generate_sum("sum_int", 2)],
1005+
schema_url="instrumentation_scope_schema_url_2",
1006+
),
1007+
],
1008+
schema_url="resource_schema_url_1",
1009+
)
1010+
]
1011+
),
1012+
MetricsData(
1013+
resource_metrics=[
1014+
ResourceMetrics(
1015+
resource=Resource(
1016+
attributes={"a": 1, "b": False},
1017+
schema_url="resource_schema_url_1",
1018+
),
1019+
scope_metrics=[
1020+
ScopeMetrics(
1021+
scope=SDKInstrumentationScope(
1022+
name="first_name",
1023+
version="first_version",
1024+
schema_url="insrumentation_scope_schema_url_2",
1025+
),
1026+
metrics=[_generate_sum("sum_int", 3)],
1027+
schema_url="instrumentation_scope_schema_url_2",
1028+
),
1029+
],
1030+
schema_url="resource_schema_url_1",
1031+
),
1032+
ResourceMetrics(
1033+
resource=Resource(
1034+
attributes={"a": 1, "b": False},
1035+
schema_url="resource_schema_url_2",
1036+
),
1037+
scope_metrics=[
1038+
ScopeMetrics(
1039+
scope=SDKInstrumentationScope(
1040+
name="first_name",
1041+
version="first_version",
1042+
schema_url="insrumentation_scope_schema_url_3",
1043+
),
1044+
metrics=[_generate_sum("sum_int", 4)],
1045+
schema_url="instrumentation_scope_schema_url_3",
1046+
)
1047+
],
1048+
schema_url="resource_schema_url_2",
1049+
),
1050+
]
1051+
),
1052+
],
1053+
list(
1054+
OTLPMetricExporter(
1055+
max_export_batch_size=2
1056+
)._split_metrics_data(metrics_data)
1057+
),
1058+
)
1059+
self.assertEqual(
1060+
[
1061+
MetricsData(
1062+
resource_metrics=[
1063+
ResourceMetrics(
1064+
resource=Resource(
1065+
attributes={"a": 1, "b": False},
1066+
schema_url="resource_schema_url_1",
1067+
),
1068+
scope_metrics=[
1069+
ScopeMetrics(
1070+
scope=SDKInstrumentationScope(
1071+
name="first_name",
1072+
version="first_version",
1073+
schema_url="insrumentation_scope_schema_url_1",
1074+
),
1075+
metrics=[_generate_sum("sum_int", 1)],
1076+
schema_url="instrumentation_scope_schema_url_1",
1077+
),
1078+
ScopeMetrics(
1079+
scope=SDKInstrumentationScope(
1080+
name="first_name",
1081+
version="first_version",
1082+
schema_url="insrumentation_scope_schema_url_2",
1083+
),
1084+
metrics=[
1085+
_generate_sum("sum_int", 2),
1086+
_generate_sum("sum_int", 3),
1087+
],
1088+
schema_url="instrumentation_scope_schema_url_2",
1089+
),
1090+
],
1091+
schema_url="resource_schema_url_1",
1092+
)
1093+
]
1094+
),
1095+
MetricsData(
1096+
resource_metrics=[
1097+
ResourceMetrics(
1098+
resource=Resource(
1099+
attributes={"a": 1, "b": False},
1100+
schema_url="resource_schema_url_2",
1101+
),
1102+
scope_metrics=[
1103+
ScopeMetrics(
1104+
scope=SDKInstrumentationScope(
1105+
name="first_name",
1106+
version="first_version",
1107+
schema_url="insrumentation_scope_schema_url_3",
1108+
),
1109+
metrics=[_generate_sum("sum_int", 4)],
1110+
schema_url="instrumentation_scope_schema_url_3",
1111+
)
1112+
],
1113+
schema_url="resource_schema_url_2",
1114+
)
1115+
]
1116+
),
1117+
],
1118+
list(
1119+
OTLPMetricExporter(
1120+
max_export_batch_size=3
1121+
)._split_metrics_data(metrics_data)
1122+
),
1123+
)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/point.py

+16
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ class ScopeMetrics:
162162
metrics: Sequence[Metric]
163163
schema_url: str
164164

165+
def __len__(self):
166+
return len(self.metrics)
167+
165168
def to_json(self, indent=4) -> str:
166169
return dumps(
167170
{
@@ -184,6 +187,11 @@ class ResourceMetrics:
184187
scope_metrics: Sequence[ScopeMetrics]
185188
schema_url: str
186189

190+
def __len__(self):
191+
return sum(
192+
[len(scope_metrics) for scope_metrics in self.scope_metrics]
193+
)
194+
187195
def to_json(self, indent=4) -> str:
188196
return dumps(
189197
{
@@ -204,6 +212,14 @@ class MetricsData:
204212

205213
resource_metrics: Sequence[ResourceMetrics]
206214

215+
def __len__(self):
216+
return sum(
217+
[
218+
len(resource_metrics)
219+
for resource_metrics in self.resource_metrics
220+
]
221+
)
222+
207223
def to_json(self, indent=4) -> str:
208224
return dumps(
209225
{

0 commit comments

Comments
 (0)