diff --git a/examples/http/context_in_headers.py b/examples/http/context_in_headers.py index 86df9f0..fc35676 100644 --- a/examples/http/context_in_headers.py +++ b/examples/http/context_in_headers.py @@ -29,6 +29,8 @@ import opentracing import opentracing.ext.tags import lightstep +from opentracing import Format +from lightstep.b3_propagator import B3Propagator class RemoteHandler(BaseHTTPRequestHandler): @@ -145,6 +147,12 @@ def lightstep_tracer_from_args(): if __name__ == '__main__': with lightstep_tracer_from_args() as tracer: opentracing.tracer = tracer + + opentracing.tracer.register_propagator(Format.TEXT_MAP, B3Propagator()) + opentracing.tracer.register_propagator( + Format.HTTP_HEADERS, B3Propagator() + ) + global _exit_code _exit_code = 0 diff --git a/lightstep/b3_propagator.py b/lightstep/b3_propagator.py new file mode 100644 index 0000000..887967c --- /dev/null +++ b/lightstep/b3_propagator.py @@ -0,0 +1,141 @@ +from warnings import warn +from logging import getLogger + +from basictracer.propagator import Propagator +from basictracer.context import SpanContext +from opentracing import SpanContext as OTSpanContext +from opentracing import SpanContextCorruptedException + +_LOG = getLogger(__name__) +_SINGLE_HEADER = "b3" +# Lower case is used here as the B3 specification recommends +_TRACEID = "x-b3-traceid" +_SPANID = "x-b3-spanid" +_PARENTSPANID = "x-b3-parentspanid" +_SAMPLED = "x-b3-sampled" +_FLAGS = "x-b3-flags" + + +class B3Propagator(Propagator): + """ + Propagator for the B3 HTTP header format. + + See: https://github.com/openzipkin/b3-propagation + """ + + def inject(self, span_context, carrier): + + traceid = span_context.trace_id + spanid = span_context.span_id + + baggage = span_context.baggage + + parentspanid = baggage.pop(_PARENTSPANID, None) + if parentspanid is not None: + carrier[_PARENTSPANID] = parentspanid + + flags = baggage.pop(_FLAGS, None) + if flags is not None: + carrier[_FLAGS] = flags + + sampled = baggage.pop(_SAMPLED, None) + if sampled is not None: + if flags == 1: + _LOG.warning( + "x-b3-flags: 1 implies x-b3-sampled: 1, not sending " + "the value of x-b3-sampled" + ) + else: + if sampled in [True, False]: + warn( + "The value of x-b3-sampled should " + "be {} instead of {}".format( + int(sampled), sampled + ) + ) + carrier[_SAMPLED] = int(sampled) + + if sampled is flags is (traceid and spanid) is None: + warn( + "If not propagating only the sampling state, traceid and " + "spanid must be defined" + ) + + carrier.update(baggage) + + if traceid is not None: + carrier[_TRACEID] = format(traceid, "x").ljust(32, "0") + if spanid is not None: + carrier[_SPANID] = format(spanid, "016x") + + def extract(self, carrier): + + case_insensitive_carrier = {} + for key, value in carrier.items(): + for b3_key in [ + _SINGLE_HEADER, + _TRACEID, + _SPANID, + _PARENTSPANID, + _SAMPLED, + _FLAGS, + ]: + if key.lower() == b3_key: + case_insensitive_carrier[b3_key] = value + else: + case_insensitive_carrier[key] = value + + carrier = case_insensitive_carrier + baggage = {} + + if _SINGLE_HEADER in carrier.keys(): + fields = carrier.pop(_SINGLE_HEADER).split("-", 4) + baggage.update(carrier) + len_fields = len(fields) + if len_fields == 1: + sampled = fields[0] + elif len_fields == 2: + traceid, spanid = fields + elif len_fields == 3: + traceid, spanid, sampled = fields + else: + traceid, spanid, sampled, parent_spanid = fields + baggage[_PARENTSPANID] = int(parent_spanid, 16) + if sampled == "d": + baggage[_FLAGS] = 1 + else: + baggage[_SAMPLED] = int(sampled, 16) + else: + traceid = carrier.pop(_TRACEID, None) + spanid = carrier.pop(_SPANID, None) + parentspanid = carrier.pop(_PARENTSPANID, None) + sampled = carrier.pop(_SAMPLED, None) + flags = carrier.pop(_FLAGS, None) + + if sampled is flags is (traceid and spanid) is None: + + raise SpanContextCorruptedException() + + if parentspanid is not None: + baggage[_PARENTSPANID] = int(parentspanid, 16) + + if flags == 1: + baggage[_FLAGS] = flags + if sampled is not None: + warn( + "x-b3-flags: 1 implies x-b3-sampled: 1, ignoring " + "the received value of x-b3-sampled" + ) + elif sampled is not None: + baggage[_SAMPLED] = int(sampled, 16) + + baggage.update(carrier) + + if baggage == OTSpanContext.EMPTY_BAGGAGE: + baggage = None + + return SpanContext( + trace_id=int(traceid, 16), + span_id=int(spanid, 16), + baggage=baggage + ) diff --git a/tests/b3_propagator_test.py b/tests/b3_propagator_test.py new file mode 100644 index 0000000..39fff0d --- /dev/null +++ b/tests/b3_propagator_test.py @@ -0,0 +1,141 @@ +from unittest import TestCase + +from pytest import raises +from opentracing import SpanContextCorruptedException + +from opentracing import Format +from lightstep import Tracer +from lightstep.b3_propagator import B3Propagator + + +class B3PropagatorTest(TestCase): + def setUp(self): + self._tracer = Tracer( + periodic_flush_seconds=0, + collector_host="localhost" + ) + self._tracer.register_propagator(Format.HTTP_HEADERS, B3Propagator()) + + def tracer(self): + return self._tracer + + def tearDown(self): + self._tracer.flush() + + def test_inject(self): + carrier = {} + span = self.tracer().start_span("test_inject") + span.set_baggage_item("checked", "baggage") + self.tracer().inject(span.context, Format.HTTP_HEADERS, carrier) + self.assertEqual( + carrier, + { + "x-b3-traceid": ( + format(span.context.trace_id, "x").ljust(32, "0") + ), + "x-b3-spanid": format(span.context.span_id, "016x"), + "checked": "baggage" + } + ) + + carrier = {} + span = self.tracer().start_span("test_inject") + span.set_baggage_item("x-b3-flags", 1) + span.set_baggage_item("x-b3-sampled", 0) + self.tracer().inject(span.context, Format.HTTP_HEADERS, carrier) + self.assertEqual( + carrier, + { + "x-b3-traceid": ( + format(span.context.trace_id, "x").ljust(32, "0") + ), + "x-b3-spanid": format(span.context.span_id, "016x"), + "x-b3-flags": 1, + } + ) + + def test_extract_multiple_headers(self): + + result = self.tracer().extract( + Format.HTTP_HEADERS, + { + "x-b3-traceid": format(12, "032x"), + "x-b3-spanid": format(345, "016x"), + "checked": "baggage" + } + ) + + self.assertEqual(12, result.trace_id) + self.assertEqual(345, result.span_id) + self.assertEqual({"checked": "baggage"}, result.baggage) + + result = self.tracer().extract( + Format.HTTP_HEADERS, + { + "x-b3-traceid": format(12, "032x"), + "x-b3-spanid": format(345, "016x"), + "x-b3-flags": 1, + "x-b3-sampled": 0 + } + ) + + self.assertEqual(12, result.trace_id) + self.assertEqual(345, result.span_id) + self.assertEqual({"x-b3-flags": 1}, result.baggage) + + def test_extract_single_header(self): + result = self.tracer().extract( + Format.HTTP_HEADERS, + { + "b3": "a12-b34-1-c56", + "checked": "baggage" + } + ) + self.assertEqual(2578, result.trace_id) + self.assertEqual(2868, result.span_id) + self.assertDictEqual( + { + "x-b3-sampled": 1, + "x-b3-parentspanid": 3158, + "checked": "baggage" + }, + result.baggage + ) + + result = self.tracer().extract( + Format.HTTP_HEADERS, + { + "b3": "a12-b34-d-c56", + "checked": "baggage" + } + ) + self.assertEqual(2578, result.trace_id) + self.assertEqual(2868, result.span_id) + self.assertDictEqual( + { + "x-b3-flags": 1, + "x-b3-parentspanid": 3158, + "checked": "baggage" + }, + result.baggage + ) + + def test_invalid_traceid_spanid(self): + + with raises(SpanContextCorruptedException): + self.tracer().extract( + Format.HTTP_HEADERS, + { + "x-b3-spanid": format(345, "016x"), + "checked": "baggage" + } + ) + + with raises(SpanContextCorruptedException): + self.tracer().extract( + Format.HTTP_HEADERS, + { + "x-b3-traceid": format(345, "032x"), + "checked": "baggage" + } + )