11
11
# See the License for the specific language governing permissions and
12
12
# limitations under the License.
13
13
14
+ import dataclasses
14
15
from logging import getLogger
15
16
from os import environ
16
- from typing import Dict , Optional , Sequence
17
+ from typing import Dict , Iterable , List , Optional , Sequence
17
18
from grpc import ChannelCredentials , Compression
18
19
from opentelemetry .sdk .metrics ._internal .aggregation import Aggregation
19
20
from opentelemetry .exporter .otlp .proto .grpc .exporter import (
42
43
)
43
44
from opentelemetry .sdk .metrics .export import (
44
45
AggregationTemporality ,
46
+ DataPointT ,
45
47
Gauge ,
46
48
Histogram as HistogramType ,
47
49
Metric ,
48
50
MetricExporter ,
49
51
MetricExportResult ,
50
52
MetricsData ,
53
+ ResourceMetrics ,
54
+ ScopeMetrics ,
51
55
Sum ,
52
56
)
53
57
@@ -58,6 +62,14 @@ class OTLPMetricExporter(
58
62
MetricExporter ,
59
63
OTLPExporterMixin [Metric , ExportMetricsServiceRequest , MetricExportResult ],
60
64
):
65
+ """OTLP metric exporter
66
+
67
+ Args:
68
+ max_export_batch_size: Maximum number of data points to export in a single request. This is to deal with
69
+ gRPC's 4MB message size limit. If not set there is no limit to the number of data points in a request.
70
+ If it is set and the number of data points exceeds the max, the request will be split.
71
+ """
72
+
61
73
_result = MetricExportResult
62
74
_stub = MetricsServiceStub
63
75
@@ -71,6 +83,7 @@ def __init__(
71
83
compression : Optional [Compression ] = None ,
72
84
preferred_temporality : Dict [type , AggregationTemporality ] = None ,
73
85
preferred_aggregation : Dict [type , Aggregation ] = None ,
86
+ max_export_batch_size : Optional [int ] = None ,
74
87
):
75
88
76
89
if insecure is None :
@@ -122,6 +135,8 @@ def __init__(
122
135
compression = compression ,
123
136
)
124
137
138
+ self ._max_export_batch_size : Optional [int ] = max_export_batch_size
139
+
125
140
def _translate_data (
126
141
self , data : MetricsData
127
142
) -> ExportMetricsServiceRequest :
@@ -223,8 +238,9 @@ def _translate_data(
223
238
)
224
239
pb2_metric .sum .data_points .append (pt )
225
240
else :
226
- _logger .warn (
227
- "unsupported datapoint type %s" , metric .point
241
+ _logger .warning (
242
+ "unsupported data type %s" ,
243
+ metric .data .__class__ .__name__ ,
228
244
)
229
245
continue
230
246
@@ -245,7 +261,101 @@ def export(
245
261
** kwargs ,
246
262
) -> MetricExportResult :
247
263
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
248
- return self ._export (metrics_data )
264
+ if self ._max_export_batch_size is None :
265
+ return self ._export (data = metrics_data )
266
+
267
+ export_result = MetricExportResult .SUCCESS
268
+
269
+ for split_metrics_data in self ._split_metrics_data (metrics_data ):
270
+ split_export_result = self ._export (data = split_metrics_data )
271
+
272
+ if split_export_result is MetricExportResult .FAILURE :
273
+ export_result = MetricExportResult .FAILURE
274
+
275
+ return export_result
276
+
277
+ def _split_metrics_data (
278
+ self ,
279
+ metrics_data : MetricsData ,
280
+ ) -> Iterable [MetricsData ]:
281
+ batch_size : int = 0
282
+ split_resource_metrics : List [ResourceMetrics ] = []
283
+
284
+ for resource_metrics in metrics_data .resource_metrics :
285
+ split_scope_metrics : List [ScopeMetrics ] = []
286
+ split_resource_metrics .append (
287
+ dataclasses .replace (
288
+ resource_metrics ,
289
+ scope_metrics = split_scope_metrics ,
290
+ )
291
+ )
292
+ for scope_metrics in resource_metrics .scope_metrics :
293
+ split_metrics : List [Metric ] = []
294
+ split_scope_metrics .append (
295
+ dataclasses .replace (
296
+ scope_metrics ,
297
+ metrics = split_metrics ,
298
+ )
299
+ )
300
+ for metric in scope_metrics .metrics :
301
+ split_data_points : List [DataPointT ] = []
302
+ split_metrics .append (
303
+ dataclasses .replace (
304
+ metric ,
305
+ data = dataclasses .replace (
306
+ metric .data ,
307
+ data_points = split_data_points ,
308
+ ),
309
+ )
310
+ )
311
+
312
+ for data_point in metric .data .data_points :
313
+ split_data_points .append (data_point )
314
+ batch_size += 1
315
+
316
+ if batch_size >= self ._max_export_batch_size :
317
+ yield MetricsData (
318
+ resource_metrics = split_resource_metrics
319
+ )
320
+ # Reset all the variables
321
+ batch_size = 0
322
+ split_data_points = []
323
+ split_metrics = [
324
+ dataclasses .replace (
325
+ metric ,
326
+ data = dataclasses .replace (
327
+ metric .data ,
328
+ data_points = split_data_points ,
329
+ ),
330
+ )
331
+ ]
332
+ split_scope_metrics = [
333
+ dataclasses .replace (
334
+ scope_metrics ,
335
+ metrics = split_metrics ,
336
+ )
337
+ ]
338
+ split_resource_metrics = [
339
+ dataclasses .replace (
340
+ resource_metrics ,
341
+ scope_metrics = split_scope_metrics ,
342
+ )
343
+ ]
344
+
345
+ if not split_data_points :
346
+ # If data_points is empty remove the whole metric
347
+ split_metrics .pop ()
348
+
349
+ if not split_metrics :
350
+ # If metrics is empty remove the whole scope_metrics
351
+ split_scope_metrics .pop ()
352
+
353
+ if not split_scope_metrics :
354
+ # If scope_metrics is empty remove the whole resource_metrics
355
+ split_resource_metrics .pop ()
356
+
357
+ if batch_size > 0 :
358
+ yield MetricsData (resource_metrics = split_resource_metrics )
249
359
250
360
def shutdown (self , timeout_millis : float = 30_000 , ** kwargs ) -> None :
251
361
pass
0 commit comments