Skip to content

Commit 860d2a9

Browse files
committed
Add a configurable max_export_batch_size to the gRPC metrics exporter
1 parent c9222bf commit 860d2a9

File tree

3 files changed

+389
-4
lines changed

3 files changed

+389
-4
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
- Add a configurable max_export_batch_size to the gRPC metrics exporter
11+
([#2809](https://github.com/open-telemetry/opentelemetry-python/pull/2809))
1012
- Change tracing to use `Resource.to_json()`
1113
([#2784](https://github.com/open-telemetry/opentelemetry-python/pull/2784))
1214
- Fix get_log_emitter instrumenting_module_version args typo

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

+133-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from logging import getLogger
1515
from os import environ
16-
from typing import Optional, Sequence
16+
from typing import Iterable, List, Optional, Sequence
1717
from grpc import ChannelCredentials, Compression
1818
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
1919
OTLPExporterMixin,
@@ -31,6 +31,7 @@
3131
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
3232
)
3333
from opentelemetry.sdk.metrics.export import (
34+
DataPointT,
3435
Gauge,
3536
Histogram,
3637
Metric,
@@ -41,6 +42,8 @@
4142
MetricExporter,
4243
MetricExportResult,
4344
MetricsData,
45+
ResourceMetrics,
46+
ScopeMetrics,
4447
)
4548

4649
_logger = getLogger(__name__)
@@ -61,6 +64,7 @@ def __init__(
6164
headers: Optional[Sequence] = None,
6265
timeout: Optional[int] = None,
6366
compression: Optional[Compression] = None,
67+
max_export_batch_size: Optional[int] = None,
6468
):
6569

6670
if insecure is None:
@@ -79,6 +83,8 @@ def __init__(
7983
}
8084
)
8185

86+
self._max_export_batch_size: Optional[int] = max_export_batch_size
87+
8288
def _translate_data(
8389
self, data: MetricsData
8490
) -> ExportMetricsServiceRequest:
@@ -180,8 +186,8 @@ def _translate_data(
180186
)
181187
pb2_metric.sum.data_points.append(pt)
182188
else:
183-
_logger.warn(
184-
"unsupported datapoint type %s", metric.point
189+
_logger.warning(
190+
"unsupported data type %s", metric.data.__class__.__name__
185191
)
186192
continue
187193

@@ -202,7 +208,130 @@ def export(
202208
**kwargs,
203209
) -> MetricExportResult:
204210
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
205-
return self._export(metrics_data)
211+
if self._max_export_batch_size is None:
212+
return self._export(data=metrics_data)
213+
214+
export_result = MetricExportResult.SUCCESS
215+
216+
for split_metrics_data in self._split_metrics_data(metrics_data):
217+
split_export_result = self._export(data=split_metrics_data)
218+
219+
if split_export_result is MetricExportResult.FAILURE:
220+
export_result = MetricExportResult.FAILURE
221+
222+
return export_result
223+
224+
def _split_metrics_data(
225+
self,
226+
metrics_data: MetricsData,
227+
) -> Iterable[MetricsData]:
228+
batch_size: int = 0
229+
split_resource_metrics: List[ResourceMetrics] = []
230+
231+
for resource_metrics in metrics_data.resource_metrics:
232+
split_scope_metrics: List[ScopeMetrics] = []
233+
split_resource_metrics.append(ResourceMetrics(
234+
resource=resource_metrics.resource,
235+
schema_url=resource_metrics.schema_url,
236+
scope_metrics=split_scope_metrics,
237+
))
238+
for scope_metrics in resource_metrics.scope_metrics:
239+
split_metrics: List[Metric] = []
240+
split_scope_metrics.append(ScopeMetrics(
241+
scope=scope_metrics.scope,
242+
schema_url=scope_metrics.schema_url,
243+
metrics=split_metrics,
244+
))
245+
for metric in scope_metrics.metrics:
246+
split_data_points: List[DataPointT] = []
247+
split_metrics.append(
248+
self._create_metric_copy(
249+
metric=metric,
250+
data_points=split_data_points,
251+
)
252+
)
253+
254+
for data_point in metric.data.data_points:
255+
split_data_points.append(data_point)
256+
batch_size += 1
257+
258+
if batch_size >= self._max_export_batch_size:
259+
yield MetricsData(
260+
resource_metrics=split_resource_metrics
261+
)
262+
# Reset all the variables
263+
batch_size = 0
264+
split_data_points = []
265+
split_metrics = [
266+
self._create_metric_copy(
267+
metric=metric,
268+
data_points=split_data_points,
269+
),
270+
]
271+
split_scope_metrics = [
272+
ScopeMetrics(
273+
scope=scope_metrics.scope,
274+
schema_url=scope_metrics.schema_url,
275+
metrics=split_metrics,
276+
)
277+
]
278+
split_resource_metrics = [
279+
ResourceMetrics(
280+
resource=resource_metrics.resource,
281+
schema_url=resource_metrics.schema_url,
282+
scope_metrics=split_scope_metrics,
283+
)
284+
]
285+
286+
if not split_data_points:
287+
# If data_points is empty remove the whole metric
288+
split_metrics.pop()
289+
290+
if not split_metrics:
291+
# If metrics is empty remove the whole scope_metrics
292+
split_scope_metrics.pop()
293+
294+
if not split_scope_metrics:
295+
# If scope_metrics is empty remove the whole resource_metrics
296+
split_resource_metrics.pop()
297+
298+
if batch_size > 0:
299+
yield MetricsData(
300+
resource_metrics=split_resource_metrics
301+
)
302+
303+
@staticmethod
304+
def _create_metric_copy(
305+
metric: Metric,
306+
data_points: List[DataPointT],
307+
) -> Metric:
308+
if isinstance(metric.data, Sum):
309+
empty_data = Sum(
310+
aggregation_temporality=metric.data.aggregation_temporality,
311+
is_monotonic=metric.data.is_monotonic,
312+
data_points=data_points,
313+
)
314+
elif isinstance(metric.data, Gauge):
315+
empty_data = Gauge(
316+
data_points=data_points,
317+
)
318+
elif isinstance(metric.data, Histogram):
319+
empty_data = Histogram(
320+
aggregation_temporality=metric.data.aggregation_temporality,
321+
data_points=data_points,
322+
)
323+
else:
324+
_logger.warning(
325+
"unsupported data type %s", metric.data.__class__.__name__
326+
)
327+
empty_data = None
328+
329+
return Metric(
330+
name=metric.name,
331+
description=metric.description,
332+
unit=metric.unit,
333+
data=empty_data,
334+
)
206335

207336
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
208337
pass

0 commit comments

Comments
 (0)