Skip to content

Commit 04eaa42

Browse files
committed
Add support for B3 headers
Closes #78
1 parent b5a6930 commit 04eaa42

File tree

5 files changed

+372
-1
lines changed

5 files changed

+372
-1
lines changed
+184
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""Demonstrates a Trace distributed across multiple machines.
2+
3+
A SpanContext's text representation is stored in the headers of an HTTP request.
4+
5+
Runs two threads, starts a Trace in the client and passes the SpanContext to the server.
6+
"""
7+
8+
import argparse
9+
import errno
10+
import socket
11+
import sys
12+
import threading
13+
14+
try:
15+
# For Python 3.0 and later
16+
from urllib.request import (
17+
Request,
18+
urlopen,
19+
)
20+
from http.server import BaseHTTPRequestHandler, HTTPServer
21+
except ImportError:
22+
# Fall back to Python 2
23+
from urllib2 import (
24+
Request,
25+
urlopen,
26+
)
27+
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
28+
29+
import opentracing
30+
import opentracing.ext.tags
31+
import lightstep
32+
33+
from lightstep.propagation import LightStepFormat, B3
34+
35+
36+
class RemoteHandler(BaseHTTPRequestHandler):
37+
"""This handler receives the request from the client.
38+
"""
39+
40+
def do_GET(self):
41+
server_span = before_answering_request(self, opentracing.tracer)
42+
with opentracing.tracer.scope_manager.activate(server_span, True):
43+
server_span.log_event('request received', self.path)
44+
45+
self.send_response(200)
46+
self.send_header('Content-type', 'text/html')
47+
self.end_headers()
48+
self.wfile.write("Hello World!".encode("utf-8"))
49+
50+
server_span.log_event('prepared response', self.path)
51+
52+
53+
def before_sending_request(request):
54+
"""Context manager creates Span and encodes the span's SpanContext into request.
55+
"""
56+
span = opentracing.tracer.start_span('Sending request')
57+
span.set_tag('server.http.url', request.get_full_url())
58+
try:
59+
# Python 2
60+
host = request.get_host()
61+
except:
62+
# Python 3
63+
host = request.host
64+
65+
if host:
66+
span.set_tag(opentracing.ext.tags.PEER_HOST_IPV4, host)
67+
68+
carrier_dict = {}
69+
span.tracer.inject(span.context, B3, carrier_dict)
70+
for k, v in carrier_dict.items():
71+
request.add_header(k, v)
72+
return span
73+
74+
75+
def before_answering_request(handler, tracer):
76+
"""Context manager creates a Span, using SpanContext encoded in handler if possible.
77+
"""
78+
operation = 'handle_request:' + handler.path
79+
carrier_dict = {}
80+
for k, v in handler.headers.items():
81+
carrier_dict[k] = v
82+
extracted_context = tracer.extract(B3, carrier_dict)
83+
84+
span = None
85+
if extracted_context:
86+
span = tracer.start_span(
87+
operation_name=operation,
88+
child_of=extracted_context)
89+
else:
90+
print('ERROR: Context missing, starting new trace')
91+
global _exit_code
92+
_exit_code = errno.ENOMSG
93+
span = tracer.start_span(operation_name=operation)
94+
headers = ', '.join({k + '=' + v for k, v in handler.headers.items()})
95+
span.log_event('extract_failed', headers)
96+
print('Could not extract context from http headers: ' + headers)
97+
98+
host, port = handler.client_address
99+
if host:
100+
span.set_tag(opentracing.ext.tags.PEER_HOST_IPV4, host)
101+
if port:
102+
span.set_tag(opentracing.ext.tags.PEER_PORT, str(port))
103+
104+
return span
105+
106+
107+
def pick_unused_port():
108+
""" Since we don't reserve the port, there's a chance it'll get grabed, but that's unlikely.
109+
"""
110+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
111+
s.bind(('localhost', 0))
112+
port = s.getsockname()[1]
113+
s.close()
114+
return port
115+
116+
117+
def lightstep_tracer_from_args():
118+
"""Initializes lightstep from the commandline args.
119+
"""
120+
parser = argparse.ArgumentParser()
121+
parser.add_argument('--token', help='Your LightStep access token.',
122+
default='{your_access_token}')
123+
parser.add_argument('--host', help='The LightStep reporting service host to contact.',
124+
default='collector.lightstep.com')
125+
parser.add_argument('--port', help='The LightStep reporting service port.',
126+
type=int, default=443)
127+
parser.add_argument('--no_tls', help='Disable TLS for reporting',
128+
dest="no_tls", action='store_true')
129+
parser.add_argument('--component_name', help='The LightStep component name',
130+
default='TrivialExample')
131+
args = parser.parse_args()
132+
133+
if args.no_tls:
134+
collector_encryption = 'none'
135+
else:
136+
collector_encryption = 'tls'
137+
138+
return lightstep.Tracer(
139+
component_name=args.component_name,
140+
access_token=args.token,
141+
collector_host=args.host,
142+
collector_port=args.port,
143+
collector_encryption=collector_encryption,
144+
)
145+
146+
147+
if __name__ == '__main__':
148+
with lightstep_tracer_from_args() as tracer:
149+
opentracing.tracer = tracer
150+
global _exit_code
151+
_exit_code = 0
152+
153+
# Create a web server and define the handler to manage the incoming request
154+
port_number = pick_unused_port()
155+
server = HTTPServer(('', port_number), RemoteHandler)
156+
157+
try:
158+
# Run the server in a separate thread.
159+
server_thread = threading.Thread(target=server.serve_forever)
160+
server_thread.start()
161+
print('Started httpserver on port ', port_number)
162+
163+
# Prepare request in the client
164+
url = 'http://localhost:{}'.format(port_number)
165+
request = Request(url)
166+
client_span = before_sending_request(request)
167+
with opentracing.tracer.scope_manager.activate(client_span, True):
168+
client_span.log_event('sending request', url)
169+
170+
# Send request to server
171+
response = urlopen(request)
172+
173+
response_body = response.read()
174+
client_span.log_event('server returned', {
175+
"code": response.code,
176+
"body": response_body,
177+
})
178+
179+
print('Server returned ' + str(response.code) + ': ' + str(response_body))
180+
181+
sys.exit(_exit_code)
182+
183+
finally:
184+
server.shutdown()

lightstep/b3_propagator.py

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
from warnings import warn
2+
from re import compile as re_compile, IGNORECASE
3+
from logging import getLogger
4+
5+
from basictracer.propagator import Propagator
6+
from basictracer.context import SpanContext
7+
from opentracing import span
8+
from opentracing import SpanContext as OTSpanContext
9+
10+
_LOG = getLogger(__name__)
11+
_SINGLE_HEADER = "b3"
12+
# Lower case is used here as the B3 specification recommends
13+
_TRACEID = "x-b3-traceid"
14+
_SPANID = "x-b3-spanid"
15+
_PARENTSPANID = "x-b3-parentspanid"
16+
_SAMPLED = "x-b3-sampled"
17+
_FLAGS = "x-b3-flags"
18+
19+
20+
class B3Propagator(Propagator):
21+
"""
22+
Propagator for the B3 HTTP header format.
23+
24+
See: https://github.com/openzipkin/b3-propagation
25+
"""
26+
27+
def inject(self, span_context, carrier):
28+
29+
traceid = span_context.trace_id
30+
spanid = span_context.span_id
31+
32+
baggage = span_context.baggage
33+
34+
parentspanid = baggage.pop(_PARENTSPANID, None)
35+
if parentspanid is not None:
36+
carrier[_PARENTSPANID] = parentspanid
37+
38+
flags = baggage.pop(_FLAGS, None)
39+
if flags is not None:
40+
carrier[self._FLAGS] = flags
41+
42+
sampled = baggage.pop(_SAMPLED, None)
43+
if sampled is not None:
44+
if flags == 1:
45+
_LOG.warning(
46+
"x-b3-flags: 1 implies x-b3-sampled: 1, not sending "
47+
"the value of x-b3-sampled"
48+
)
49+
else:
50+
if sampled in [True, False]:
51+
warn(
52+
"The value of x-b3-sampled should "
53+
"be {} instead of {}".format(
54+
int(sampled), sampled
55+
)
56+
)
57+
carrier[_SAMPLED] = int(sampled)
58+
59+
if (sampled is None and flags is None) and (
60+
traceid is None or spanid is None
61+
):
62+
warn(
63+
"If not propagating only the sampling state, traceid and "
64+
"spanid must be defined"
65+
)
66+
67+
carrier.update(baggage)
68+
69+
if traceid is not None:
70+
carrier[_TRACEID] = format(traceid, "032x")
71+
if spanid is not None:
72+
carrier[_SPANID] = format(spanid, "016x")
73+
74+
def extract(self, carrier):
75+
76+
case_insensitive_carrier = {}
77+
for key, value in carrier.items():
78+
for b3_key in [
79+
_SINGLE_HEADER,
80+
_TRACEID,
81+
_SPANID,
82+
_PARENTSPANID,
83+
_SAMPLED,
84+
_FLAGS,
85+
]:
86+
if re_compile(key, IGNORECASE).match(b3_key):
87+
case_insensitive_carrier[b3_key] = value
88+
else:
89+
case_insensitive_carrier[key] = value
90+
91+
carrier = case_insensitive_carrier
92+
baggage = {}
93+
94+
if _SINGLE_HEADER in carrier.keys():
95+
fields = carrier[_SINGLE_HEADER].split("-", 4)
96+
len_fields = len(fields)
97+
if len(fields) == 1:
98+
sampled = fields[0]
99+
elif len(fields) == 2:
100+
traceid, spanid = fields
101+
elif len(fields) == 3:
102+
traceid, spanid, sampled = fields
103+
else:
104+
traceid, spanid, sampled, parent_spanid = fields
105+
baggage[_PARENTSPANID] = parent_spanid
106+
if sampled == "d":
107+
baggage[_FLAGS] = 1
108+
else:
109+
baggage[_SAMPLED] = sampled
110+
else:
111+
traceid = carrier.pop(_TRACEID, None)
112+
spanid = carrier.pop(_SPANID, None)
113+
parentspanid = carrier.pop(_PARENTSPANID, None)
114+
sampled = carrier.pop(_SAMPLED, None)
115+
flags = carrier.pop(_FLAGS, None)
116+
117+
if parentspanid is not None:
118+
baggage[_PARENTSPANID] = parentspanid
119+
if flags == 1:
120+
baggage[_FLAGS] = flags
121+
if sampled is not None:
122+
warn(
123+
"x-b3-flags: 1 implies x-b3-sampled: 1, ignoring "
124+
"the received value of x-b3-sampled"
125+
)
126+
elif sampled is not None:
127+
baggage[_SAMPLED] = sampled
128+
baggage.update(carrier)
129+
if baggage == OTSpanContext.EMPTY_BAGGAGE:
130+
baggage = None
131+
return SpanContext(
132+
# traceid and spanid are encoded in hex, so thet must be encoded
133+
trace_id=int(traceid, 16),
134+
span_id=int(spanid, 16),
135+
baggage=baggage
136+
)

lightstep/propagation.py

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
from __future__ import absolute_import
66

7+
# The B3 format represents SpanContexts in B3 format.
8+
# https://github.com/openzipkin/b3-propagation
9+
B3 = "b3"
10+
711

812
class LightStepFormat(object):
913
"""A namespace for lightstep supported carrier formats.

lightstep/tracer.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
from opentracing import Format
1414

1515
from lightstep.lightstep_binary_propagator import LightStepBinaryPropagator
16-
from lightstep.propagation import LightStepFormat
16+
from lightstep.b3_propagator import B3Propagator
17+
from lightstep.propagation import LightStepFormat, B3
1718
from .recorder import Recorder
1819

1920

@@ -75,6 +76,7 @@ def __init__(self, enable_binary_format, recorder, scope_manager):
7576
super(_LightstepTracer, self).__init__(recorder, scope_manager=scope_manager)
7677
self.register_propagator(Format.TEXT_MAP, TextPropagator())
7778
self.register_propagator(Format.HTTP_HEADERS, TextPropagator())
79+
self.register_propagator(B3, B3Propagator())
7880
if enable_binary_format:
7981
# We do this import lazily because protobuf versioning issues
8082
# can cause process-level failure at import time.

tests/b3_propagator_test.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import unittest
2+
3+
import lightstep
4+
from lightstep.propagation import B3
5+
6+
7+
class B3PropagatorTest(unittest.TestCase):
8+
def setUp(self):
9+
self._tracer = lightstep.Tracer(
10+
periodic_flush_seconds=0,
11+
collector_host='localhost')
12+
13+
def tracer(self):
14+
return self._tracer
15+
16+
def tearDown(self):
17+
self._tracer.flush()
18+
19+
def testInjectExtract(self):
20+
carrier = {}
21+
span = self.tracer().start_span('Sending request')
22+
23+
span.set_baggage_item('checked', 'baggage')
24+
25+
self.tracer().inject(span.context, B3, carrier)
26+
27+
result = self.tracer().extract(B3, carrier)
28+
self.assertEqual(span.context.span_id, result.span_id)
29+
self.assertEqual(span.context.trace_id, result.trace_id)
30+
self.assertEqual(span.context.baggage, result.baggage)
31+
self.assertEqual(span.context.sampled, result.sampled)
32+
33+
def single_header_extraction(self):
34+
# FIXME complete this test case
35+
input_ = 1
36+
37+
result = self.tracer().extract(B3, bytearray(input, 'utf-8'))
38+
self.assertEqual(6397081719746291766, result.span_id)
39+
self.assertEqual(506100417967962170, result.trace_id)
40+
self.assertEqual(True, result.sampled)
41+
self.assertDictEqual(result.baggage, {"checked" : "baggage"})
42+
43+
44+
if __name__ == '__main__':
45+
unittest.main()

0 commit comments

Comments
 (0)