Skip to content

Commit ad2a723

Browse files
committed
Add a configurable max_export_batch_size to the gRPC metrics exporter
1 parent e569196 commit ad2a723

File tree

3 files changed

+451
-4
lines changed

3 files changed

+451
-4
lines changed

Diff for: CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
- Add a configurable max_export_batch_size to the gRPC metrics exporter
11+
([#2809](https://github.com/open-telemetry/opentelemetry-python/pull/2809))
1012
- Remove support for 3.6
1113
([#2763](https://github.com/open-telemetry/opentelemetry-python/pull/2763))
1214
- Update PeriodicExportingMetricReader to never call export() concurrently

Diff for: exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

+136-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from logging import getLogger
1515
from os import environ
16-
from typing import Dict, Optional, Sequence
16+
from typing import Dict, Iterable, List, Optional, Sequence
1717
from grpc import ChannelCredentials, Compression
1818
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
1919
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
@@ -42,12 +42,15 @@
4242
)
4343
from opentelemetry.sdk.metrics.export import (
4444
AggregationTemporality,
45+
DataPointT,
4546
Gauge,
4647
Histogram as HistogramType,
4748
Metric,
4849
MetricExporter,
4950
MetricExportResult,
5051
MetricsData,
52+
ResourceMetrics,
53+
ScopeMetrics,
5154
Sum,
5255
)
5356

@@ -71,6 +74,7 @@ def __init__(
7174
compression: Optional[Compression] = None,
7275
preferred_temporality: Dict[type, AggregationTemporality] = None,
7376
preferred_aggregation: Dict[type, Aggregation] = None,
77+
max_export_batch_size: Optional[int] = None,
7478
):
7579

7680
if insecure is None:
@@ -122,6 +126,8 @@ def __init__(
122126
compression=compression,
123127
)
124128

129+
self._max_export_batch_size: Optional[int] = max_export_batch_size
130+
125131
def _translate_data(
126132
self, data: MetricsData
127133
) -> ExportMetricsServiceRequest:
@@ -223,8 +229,9 @@ def _translate_data(
223229
)
224230
pb2_metric.sum.data_points.append(pt)
225231
else:
226-
_logger.warn(
227-
"unsupported datapoint type %s", metric.point
232+
_logger.warning(
233+
"unsupported data type %s",
234+
metric.data.__class__.__name__,
228235
)
229236
continue
230237

@@ -245,7 +252,132 @@ def export(
245252
**kwargs,
246253
) -> MetricExportResult:
247254
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
248-
return self._export(metrics_data)
255+
if self._max_export_batch_size is None:
256+
return self._export(data=metrics_data)
257+
258+
export_result = MetricExportResult.SUCCESS
259+
260+
for split_metrics_data in self._split_metrics_data(metrics_data):
261+
split_export_result = self._export(data=split_metrics_data)
262+
263+
if split_export_result is MetricExportResult.FAILURE:
264+
export_result = MetricExportResult.FAILURE
265+
266+
return export_result
267+
268+
def _split_metrics_data(
269+
self,
270+
metrics_data: MetricsData,
271+
) -> Iterable[MetricsData]:
272+
batch_size: int = 0
273+
split_resource_metrics: List[ResourceMetrics] = []
274+
275+
for resource_metrics in metrics_data.resource_metrics:
276+
split_scope_metrics: List[ScopeMetrics] = []
277+
split_resource_metrics.append(
278+
ResourceMetrics(
279+
resource=resource_metrics.resource,
280+
schema_url=resource_metrics.schema_url,
281+
scope_metrics=split_scope_metrics,
282+
)
283+
)
284+
for scope_metrics in resource_metrics.scope_metrics:
285+
split_metrics: List[Metric] = []
286+
split_scope_metrics.append(
287+
ScopeMetrics(
288+
scope=scope_metrics.scope,
289+
schema_url=scope_metrics.schema_url,
290+
metrics=split_metrics,
291+
)
292+
)
293+
for metric in scope_metrics.metrics:
294+
split_data_points: List[DataPointT] = []
295+
split_metrics.append(
296+
self._create_metric_copy(
297+
metric=metric,
298+
data_points=split_data_points,
299+
)
300+
)
301+
302+
for data_point in metric.data.data_points:
303+
split_data_points.append(data_point)
304+
batch_size += 1
305+
306+
if batch_size >= self._max_export_batch_size:
307+
yield MetricsData(
308+
resource_metrics=split_resource_metrics
309+
)
310+
# Reset all the variables
311+
batch_size = 0
312+
split_data_points = []
313+
split_metrics = [
314+
self._create_metric_copy(
315+
metric=metric,
316+
data_points=split_data_points,
317+
),
318+
]
319+
split_scope_metrics = [
320+
ScopeMetrics(
321+
scope=scope_metrics.scope,
322+
schema_url=scope_metrics.schema_url,
323+
metrics=split_metrics,
324+
)
325+
]
326+
split_resource_metrics = [
327+
ResourceMetrics(
328+
resource=resource_metrics.resource,
329+
schema_url=resource_metrics.schema_url,
330+
scope_metrics=split_scope_metrics,
331+
)
332+
]
333+
334+
if not split_data_points:
335+
# If data_points is empty remove the whole metric
336+
split_metrics.pop()
337+
338+
if not split_metrics:
339+
# If metrics is empty remove the whole scope_metrics
340+
split_scope_metrics.pop()
341+
342+
if not split_scope_metrics:
343+
# If scope_metrics is empty remove the whole resource_metrics
344+
split_resource_metrics.pop()
345+
346+
if batch_size > 0:
347+
yield MetricsData(resource_metrics=split_resource_metrics)
348+
349+
@staticmethod
350+
def _create_metric_copy(
351+
metric: Metric,
352+
data_points: List[DataPointT],
353+
) -> Metric:
354+
if isinstance(metric.data, Sum):
355+
empty_data = Sum(
356+
aggregation_temporality=metric.data.aggregation_temporality,
357+
is_monotonic=metric.data.is_monotonic,
358+
data_points=data_points,
359+
)
360+
elif isinstance(metric.data, Gauge):
361+
empty_data = Gauge(
362+
data_points=data_points,
363+
)
364+
elif isinstance(metric.data, HistogramType):
365+
empty_data = HistogramType(
366+
aggregation_temporality=metric.data.aggregation_temporality,
367+
data_points=data_points,
368+
)
369+
else:
370+
_logger.warning(
371+
"unsupported data type %s", metric.data.__class__.__name__
372+
)
373+
empty_data = None
374+
375+
return Metric(
376+
name=metric.name,
377+
description=metric.description,
378+
unit=metric.unit,
379+
data=empty_data,
380+
)
249381

250382
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
251383
pass

0 commit comments

Comments
 (0)