Skip to content

Commit 1e654bd

Browse files
committed
Add possibility to split oversized udp batches
If we use the BatchExportSpanProcessor combined with the JaegerSpanExporter and use instrumentations that add a lot of metadata to the spans like sqlalchemy, then we run occationally into the "Data exceeds the max UDP packet size" warning causing dropped spans and incomplete data. The option to reduce the general batch-size to a very small number (in my case >30) may cause a performance issue as the worker thread of the batch exporter gets very busy. Instead this change allows the user to ask the exporter to split oversized batches when they get detected and send the splits separately instead of dropping them. Depending on the usecase this is a better option than reducing the batch-size to a very small value because every now and then they contain a couple of large spans.
1 parent 99128b3 commit 1e654bd

File tree

3 files changed

+328
-1
lines changed

3 files changed

+328
-1
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v0.18b0...HEAD)
88

9+
### Added
10+
- Add `udp_split_oversized_batches` support to jaeger exporter
11+
([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500))
12+
913
## [0.18b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v0.18b0) - 2021-02-16
1014

1115
### Added

exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py

+292-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
# pylint: disable=protected-access
6767

6868
import logging
69+
import socket
6970
from os import environ
7071
from typing import Optional
7172

@@ -120,6 +121,7 @@ class JaegerSpanExporter(SpanExporter):
120121
insecure: True if collector has no encryption or authentication
121122
credentials: Credentials for server authentication.
122123
transport_format: Transport format for exporting spans to collector.
124+
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
123125
"""
124126

125127
def __init__(
@@ -133,6 +135,7 @@ def __init__(
133135
insecure: Optional[bool] = None,
134136
credentials: Optional[ChannelCredentials] = None,
135137
transport_format: Optional[str] = None,
138+
udp_split_oversized_batches: bool = None,
136139
):
137140
self.service_name = service_name
138141
self.agent_host_name = _parameter_setter(
@@ -151,8 +154,15 @@ def __init__(
151154
env_variable=environ_agent_port,
152155
default=DEFAULT_AGENT_PORT,
153156
)
157+
self.udp_split_oversized_batches = _parameter_setter(
158+
param=udp_split_oversized_batches,
159+
env_variable=Configuration().OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
160+
default=False,
161+
)
154162
self._agent_client = AgentClientUDP(
155-
host_name=self.agent_host_name, port=self.agent_port
163+
host_name=self.agent_host_name,
164+
port=self.agent_port,
165+
udp_split_oversized_batches=self.udp_split_oversized_batches,
156166
)
157167
self.collector_endpoint = _parameter_setter(
158168
param=collector_endpoint,
@@ -257,3 +267,284 @@ def _parameter_setter(param, env_variable, default):
257267
res = param
258268

259269
return res
270+
271+
272+
def _nsec_to_usec_round(nsec):
273+
"""Round nanoseconds to microseconds"""
274+
return (nsec + 500) // 10 ** 3
275+
276+
277+
def _translate_to_jaeger(spans: Span):
278+
"""Translate the spans to Jaeger format.
279+
280+
Args:
281+
spans: Tuple of spans to convert
282+
"""
283+
284+
jaeger_spans = []
285+
286+
for span in spans:
287+
ctx = span.get_span_context()
288+
trace_id = ctx.trace_id
289+
span_id = ctx.span_id
290+
291+
start_time_us = _nsec_to_usec_round(span.start_time)
292+
duration_us = _nsec_to_usec_round(span.end_time - span.start_time)
293+
294+
status = span.status
295+
296+
parent_id = span.parent.span_id if span.parent else 0
297+
298+
tags = _extract_tags(span.attributes)
299+
tags.extend(_extract_tags(span.resource.attributes))
300+
301+
tags.extend(
302+
[
303+
_get_long_tag("status.code", status.status_code.value),
304+
_get_string_tag("status.message", status.description),
305+
_get_string_tag("span.kind", OTLP_JAEGER_SPAN_KIND[span.kind]),
306+
]
307+
)
308+
309+
if span.instrumentation_info is not None:
310+
tags.extend(
311+
[
312+
_get_string_tag(
313+
"otel.instrumentation_library.name",
314+
span.instrumentation_info.name,
315+
),
316+
_get_string_tag(
317+
"otel.instrumentation_library.version",
318+
span.instrumentation_info.version,
319+
),
320+
]
321+
)
322+
323+
# Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span.
324+
if not status.is_ok:
325+
tags.append(_get_bool_tag("error", True))
326+
327+
refs = _extract_refs_from_span(span)
328+
logs = _extract_logs_from_span(span)
329+
330+
flags = int(ctx.trace_flags)
331+
332+
jaeger_span = jaeger.Span(
333+
traceIdHigh=_get_trace_id_high(trace_id),
334+
traceIdLow=_get_trace_id_low(trace_id),
335+
# generated code expects i64
336+
spanId=_convert_int_to_i64(span_id),
337+
operationName=span.name,
338+
startTime=start_time_us,
339+
duration=duration_us,
340+
tags=tags,
341+
logs=logs,
342+
references=refs,
343+
flags=flags,
344+
parentSpanId=_convert_int_to_i64(parent_id),
345+
)
346+
347+
jaeger_spans.append(jaeger_span)
348+
349+
return jaeger_spans
350+
351+
352+
def _extract_refs_from_span(span):
353+
if not span.links:
354+
return None
355+
356+
refs = []
357+
for link in span.links:
358+
trace_id = link.context.trace_id
359+
span_id = link.context.span_id
360+
refs.append(
361+
jaeger.SpanRef(
362+
refType=jaeger.SpanRefType.FOLLOWS_FROM,
363+
traceIdHigh=_get_trace_id_high(trace_id),
364+
traceIdLow=_get_trace_id_low(trace_id),
365+
spanId=_convert_int_to_i64(span_id),
366+
)
367+
)
368+
return refs
369+
370+
371+
def _convert_int_to_i64(val):
372+
"""Convert integer to signed int64 (i64)"""
373+
if val > 0x7FFFFFFFFFFFFFFF:
374+
val -= 0x10000000000000000
375+
return val
376+
377+
378+
def _get_trace_id_low(trace_id):
379+
return _convert_int_to_i64(trace_id & 0xFFFFFFFFFFFFFFFF)
380+
381+
382+
def _get_trace_id_high(trace_id):
383+
return _convert_int_to_i64((trace_id >> 64) & 0xFFFFFFFFFFFFFFFF)
384+
385+
386+
def _extract_logs_from_span(span):
387+
if not span.events:
388+
return None
389+
390+
logs = []
391+
392+
for event in span.events:
393+
fields = _extract_tags(event.attributes)
394+
395+
fields.append(
396+
jaeger.Tag(
397+
key="message", vType=jaeger.TagType.STRING, vStr=event.name
398+
)
399+
)
400+
401+
event_timestamp_us = _nsec_to_usec_round(event.timestamp)
402+
logs.append(
403+
jaeger.Log(timestamp=int(event_timestamp_us), fields=fields)
404+
)
405+
return logs
406+
407+
408+
def _extract_tags(attr):
409+
if not attr:
410+
return []
411+
tags = []
412+
for attribute_key, attribute_value in attr.items():
413+
tag = _convert_attribute_to_tag(attribute_key, attribute_value)
414+
if tag is None:
415+
continue
416+
tags.append(tag)
417+
return tags
418+
419+
420+
def _convert_attribute_to_tag(key, attr):
421+
"""Convert the attributes to jaeger tags."""
422+
if isinstance(attr, bool):
423+
return jaeger.Tag(key=key, vBool=attr, vType=jaeger.TagType.BOOL)
424+
if isinstance(attr, str):
425+
return jaeger.Tag(key=key, vStr=attr, vType=jaeger.TagType.STRING)
426+
if isinstance(attr, int):
427+
return jaeger.Tag(key=key, vLong=attr, vType=jaeger.TagType.LONG)
428+
if isinstance(attr, float):
429+
return jaeger.Tag(key=key, vDouble=attr, vType=jaeger.TagType.DOUBLE)
430+
if isinstance(attr, tuple):
431+
return jaeger.Tag(key=key, vStr=str(attr), vType=jaeger.TagType.STRING)
432+
logger.warning("Could not serialize attribute %s:%r to tag", key, attr)
433+
return None
434+
435+
436+
def _get_long_tag(key, val):
437+
return jaeger.Tag(key=key, vLong=val, vType=jaeger.TagType.LONG)
438+
439+
440+
def _get_string_tag(key, val):
441+
return jaeger.Tag(key=key, vStr=val, vType=jaeger.TagType.STRING)
442+
443+
444+
def _get_bool_tag(key, val):
445+
return jaeger.Tag(key=key, vBool=val, vType=jaeger.TagType.BOOL)
446+
447+
448+
class AgentClientUDP:
449+
"""Implement a UDP client to agent.
450+
451+
Args:
452+
host_name: The host name of the Jaeger server.
453+
port: The port of the Jaeger server.
454+
max_packet_size: Maximum size of UDP packet.
455+
client: Class for creating new client objects for agencies.
456+
split_oversized_batches: Re-emit oversized batches in smaller chunks.
457+
"""
458+
459+
def __init__(
460+
self,
461+
host_name,
462+
port,
463+
max_packet_size=UDP_PACKET_MAX_LENGTH,
464+
client=agent.Client,
465+
split_oversized_batches=False,
466+
):
467+
self.address = (host_name, port)
468+
self.max_packet_size = max_packet_size
469+
self.buffer = TTransport.TMemoryBuffer()
470+
self.client = client(
471+
iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer)
472+
)
473+
self.split_oversized_batches = split_oversized_batches
474+
475+
def emit(self, batch: jaeger.Batch):
476+
"""
477+
Args:
478+
batch: Object to emit Jaeger spans.
479+
"""
480+
481+
# pylint: disable=protected-access
482+
self.client._seqid = 0
483+
# truncate and reset the position of BytesIO object
484+
self.buffer._buffer.truncate(0)
485+
self.buffer._buffer.seek(0)
486+
self.client.emitBatch(batch)
487+
buff = self.buffer.getvalue()
488+
if len(buff) > self.max_packet_size:
489+
if self.split_oversized_batches and len(batch.spans) > 1:
490+
packets = math.ceil(len(buff) / self.max_packet_size)
491+
div = math.ceil(len(batch.spans) / packets)
492+
for packet in range(packets):
493+
start = packet * div
494+
end = (packet + 1) * div
495+
self.emit(
496+
jaeger.Batch(
497+
process=batch.process, spans=batch.spans[start:end]
498+
)
499+
)
500+
else:
501+
logger.warning(
502+
"Data exceeds the max UDP packet size; size %r, max %r",
503+
len(buff),
504+
self.max_packet_size,
505+
)
506+
return
507+
508+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
509+
udp_socket.sendto(buff, self.address)
510+
511+
512+
class Collector:
513+
"""Submits collected spans to Thrift HTTP server.
514+
515+
Args:
516+
thrift_url: URL of the Jaeger HTTP Thrift.
517+
auth: Auth tuple that contains username and password for Basic Auth.
518+
"""
519+
520+
def __init__(self, thrift_url="", auth=None):
521+
self.thrift_url = thrift_url
522+
self.auth = auth
523+
self.http_transport = THttpClient.THttpClient(
524+
uri_or_host=self.thrift_url
525+
)
526+
self.protocol = TBinaryProtocol.TBinaryProtocol(self.http_transport)
527+
528+
# set basic auth header
529+
if auth is not None:
530+
auth_header = "{}:{}".format(*auth)
531+
decoded = base64.b64encode(auth_header.encode()).decode("ascii")
532+
basic_auth = dict(Authorization="Basic {}".format(decoded))
533+
self.http_transport.setCustomHeaders(basic_auth)
534+
535+
def submit(self, batch: jaeger.Batch):
536+
"""Submits batches to Thrift HTTP Server through Binary Protocol.
537+
538+
Args:
539+
batch: Object to emit Jaeger spans.
540+
"""
541+
batch.write(self.protocol)
542+
self.http_transport.flush()
543+
code = self.http_transport.code
544+
msg = self.http_transport.message
545+
if code >= 300 or code < 200:
546+
logger.error(
547+
"Traces cannot be uploaded; HTTP status code: %s, message: %s",
548+
code,
549+
msg,
550+
)

exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py

+32
Original file line numberDiff line numberDiff line change
@@ -465,3 +465,35 @@ def test_agent_client(self):
465465
)
466466

467467
agent_client.emit(batch)
468+
469+
def test_agent_client_split(self):
470+
agent_client = jaeger_exporter.AgentClientUDP(
471+
host_name="localhost",
472+
port=6354,
473+
max_packet_size=250,
474+
split_oversized_batches=True,
475+
)
476+
477+
small_batch = jaeger.Batch(
478+
# pylint: disable=protected-access
479+
spans=jaeger_exporter._translate_to_jaeger((self._test_span,)),
480+
process=jaeger.Process(serviceName="xxx"),
481+
)
482+
483+
with unittest.mock.patch(
484+
"socket.socket.sendto", autospec=True
485+
) as fake_sendto:
486+
agent_client.emit(small_batch)
487+
self.assertEqual(fake_sendto.call_count, 1)
488+
489+
large_batch = jaeger.Batch(
490+
# pylint: disable=protected-access
491+
spans=jaeger_exporter._translate_to_jaeger([self._test_span,] * 2),
492+
process=jaeger.Process(serviceName="xxx"),
493+
)
494+
495+
with unittest.mock.patch(
496+
"socket.socket.sendto", autospec=True
497+
) as fake_sendto:
498+
agent_client.emit(large_batch)
499+
self.assertEqual(fake_sendto.call_count, 2)

0 commit comments

Comments
 (0)