Skip to content

Commit 267544e

Browse files
committed
Add possibility to split oversized udp batches
If we use the BatchExportSpanProcessor combined with the JaegerSpanExporter and use instrumentations that add a lot of metadata to the spans like sqlalchemy, then we run occationally into the "Data exceeds the max UDP packet size" warning causing dropped spans and incomplete data. The option to reduce the general batch-size to a very small number (in my case >30) may cause a performance issue as the worker thread of the batch exporter gets very busy. Instead this change allows the user to ask the exporter to split oversized batches when they get detected and send the splits separately instead of dropping them. Depending on the usecase this is a better option than reducing the batch-size to a very small value because every now and then they contain a couple of large spans.
1 parent 99128b3 commit 267544e

File tree

5 files changed

+75
-6
lines changed

5 files changed

+75
-6
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v0.18b0...HEAD)
88

9+
### Added
10+
- Add `udp_split_oversized_batches` support to jaeger exporter
11+
([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500))
12+
913
## [0.18b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v0.18b0) - 2021-02-16
1014

1115
### Added

exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
from opentelemetry.sdk.environment_variables import (
8686
OTEL_EXPORTER_JAEGER_AGENT_HOST,
8787
OTEL_EXPORTER_JAEGER_AGENT_PORT,
88+
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
8889
OTEL_EXPORTER_JAEGER_ENDPOINT,
8990
OTEL_EXPORTER_JAEGER_PASSWORD,
9091
OTEL_EXPORTER_JAEGER_USER,
@@ -120,6 +121,7 @@ class JaegerSpanExporter(SpanExporter):
120121
insecure: True if collector has no encryption or authentication
121122
credentials: Credentials for server authentication.
122123
transport_format: Transport format for exporting spans to collector.
124+
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
123125
"""
124126

125127
def __init__(
@@ -133,6 +135,7 @@ def __init__(
133135
insecure: Optional[bool] = None,
134136
credentials: Optional[ChannelCredentials] = None,
135137
transport_format: Optional[str] = None,
138+
udp_split_oversized_batches: bool = None,
136139
):
137140
self.service_name = service_name
138141
self.agent_host_name = _parameter_setter(
@@ -151,8 +154,17 @@ def __init__(
151154
env_variable=environ_agent_port,
152155
default=DEFAULT_AGENT_PORT,
153156
)
157+
self.udp_split_oversized_batches = _parameter_setter(
158+
param=udp_split_oversized_batches,
159+
env_variable=environ.get(
160+
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES
161+
),
162+
default=False,
163+
)
154164
self._agent_client = AgentClientUDP(
155-
host_name=self.agent_host_name, port=self.agent_port
165+
host_name=self.agent_host_name,
166+
port=self.agent_port,
167+
split_oversized_batches= self.udp_split_oversized_batches,
156168
)
157169
self.collector_endpoint = _parameter_setter(
158170
param=collector_endpoint,

exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py

+21-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import base64
1616
import logging
17+
import math
1718
import socket
1819

1920
from thrift.protocol import TBinaryProtocol, TCompactProtocol
@@ -36,6 +37,7 @@ class AgentClientUDP:
3637
port: The port of the Jaeger server.
3738
max_packet_size: Maximum size of UDP packet.
3839
client: Class for creating new client objects for agencies.
40+
split_oversized_batches: Re-emit oversized batches in smaller chunks.
3941
"""
4042

4143
def __init__(
@@ -44,13 +46,15 @@ def __init__(
4446
port,
4547
max_packet_size=UDP_PACKET_MAX_LENGTH,
4648
client=agent.Client,
49+
split_oversized_batches=False,
4750
):
4851
self.address = (host_name, port)
4952
self.max_packet_size = max_packet_size
5053
self.buffer = TTransport.TMemoryBuffer()
5154
self.client = client(
5255
iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer)
5356
)
57+
self.split_oversized_batches = split_oversized_batches
5458

5559
def emit(self, batch: jaeger.Batch):
5660
"""
@@ -66,11 +70,23 @@ def emit(self, batch: jaeger.Batch):
6670
self.client.emitBatch(batch)
6771
buff = self.buffer.getvalue()
6872
if len(buff) > self.max_packet_size:
69-
logger.warning(
70-
"Data exceeds the max UDP packet size; size %r, max %r",
71-
len(buff),
72-
self.max_packet_size,
73-
)
73+
if self.split_oversized_batches and len(batch.spans) > 1:
74+
packets = math.ceil(len(buff) / self.max_packet_size)
75+
div = math.ceil(len(batch.spans) / packets)
76+
for packet in range(packets):
77+
start = packet * div
78+
end = (packet + 1) * div
79+
self.emit(
80+
jaeger.Batch(
81+
process=batch.process, spans=batch.spans[start:end]
82+
)
83+
)
84+
else:
85+
logger.warning(
86+
"Data exceeds the max UDP packet size; size %r, max %r",
87+
len(buff),
88+
self.max_packet_size,
89+
)
7490
return
7591

7692
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:

exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py

+34
Original file line numberDiff line numberDiff line change
@@ -465,3 +465,37 @@ def test_agent_client(self):
465465
)
466466

467467
agent_client.emit(batch)
468+
469+
def test_agent_client_split(self):
470+
agent_client = jaeger_exporter.AgentClientUDP(
471+
host_name="localhost",
472+
port=6354,
473+
max_packet_size=250,
474+
split_oversized_batches=True,
475+
)
476+
477+
translator = jaeger_exporter.Translate((self._test_span,))
478+
small_batch = jaeger.Batch(
479+
# pylint: disable=protected-access
480+
spans=translator._translate(jaeger_exporter.ThriftTranslator()),
481+
process=jaeger.Process(serviceName="xxx"),
482+
)
483+
484+
with unittest.mock.patch(
485+
"socket.socket.sendto", autospec=True
486+
) as fake_sendto:
487+
agent_client.emit(small_batch)
488+
self.assertEqual(fake_sendto.call_count, 1)
489+
490+
translator = jaeger_exporter.Translate([self._test_span,] * 2)
491+
large_batch = jaeger.Batch(
492+
# pylint: disable=protected-access
493+
spans=translator._translate(jaeger_exporter.ThriftTranslator()),
494+
process=jaeger.Process(serviceName="xxx"),
495+
)
496+
497+
with unittest.mock.patch(
498+
"socket.socket.sendto", autospec=True
499+
) as fake_sendto:
500+
agent_client.emit(large_batch)
501+
self.assertEqual(fake_sendto.call_count, 2)

opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,6 @@
4646
OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE"
4747
OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE"
4848
OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE"
49+
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = (
50+
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES"
51+
)

0 commit comments

Comments
 (0)