|
| 1 | +# Copyright 2019, OpenTelemetry Authors |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +"""Zipkin Span Exporter for OpenTelemetry.""" |
| 16 | + |
| 17 | +import json |
| 18 | +import logging |
| 19 | +from typing import Optional, Sequence |
| 20 | + |
| 21 | +import requests |
| 22 | + |
| 23 | +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult |
| 24 | +from opentelemetry.trace import Span, SpanContext, SpanKind |
| 25 | + |
| 26 | +DEFAULT_ENDPOINT = "/api/v2/spans" |
| 27 | +DEFAULT_HOST_NAME = "localhost" |
| 28 | +DEFAULT_PORT = 9411 |
| 29 | +DEFAULT_PROTOCOL = "http" |
| 30 | +DEFAULT_RETRY = False |
| 31 | +ZIPKIN_HEADERS = {"Content-Type": "application/json"} |
| 32 | + |
| 33 | +SPAN_KIND_MAP = { |
| 34 | + SpanKind.INTERNAL: None, |
| 35 | + SpanKind.SERVER: "SERVER", |
| 36 | + SpanKind.CLIENT: "CLIENT", |
| 37 | + SpanKind.PRODUCER: "PRODUCER", |
| 38 | + SpanKind.CONSUMER: "CONSUMER", |
| 39 | +} |
| 40 | + |
| 41 | +SUCCESS_STATUS_CODES = (200, 202) |
| 42 | + |
| 43 | +logger = logging.getLogger(__name__) |
| 44 | + |
| 45 | + |
| 46 | +class ZipkinSpanExporter(SpanExporter): |
| 47 | + """Zipkin span exporter for OpenTelemetry. |
| 48 | +
|
| 49 | + Args: |
| 50 | + service_name: Service that logged an annotation in a trace.Classifier |
| 51 | + when query for spans. |
| 52 | + host_name: The host name of the Zipkin server |
| 53 | + port: The port of the Zipkin server |
| 54 | + endpoint: The endpoint of the Zipkin server |
| 55 | + protocol: The protocol used for the request. |
| 56 | + ipv4: Primary IPv4 address associated with this connection. |
| 57 | + ipv6: Primary IPv6 address associated with this connection. |
| 58 | + retry: Set to True to configure the exporter to retry on failure. |
| 59 | + """ |
| 60 | + |
| 61 | + def __init__( |
| 62 | + self, |
| 63 | + service_name: str, |
| 64 | + host_name: str = DEFAULT_HOST_NAME, |
| 65 | + port: int = DEFAULT_PORT, |
| 66 | + endpoint: str = DEFAULT_ENDPOINT, |
| 67 | + protocol: str = DEFAULT_PROTOCOL, |
| 68 | + ipv4: Optional[str] = None, |
| 69 | + ipv6: Optional[str] = None, |
| 70 | + retry: Optional[str] = DEFAULT_RETRY, |
| 71 | + ): |
| 72 | + self.service_name = service_name |
| 73 | + self.host_name = host_name |
| 74 | + self.port = port |
| 75 | + self.endpoint = endpoint |
| 76 | + self.protocol = protocol |
| 77 | + self.url = "{}://{}:{}{}".format( |
| 78 | + self.protocol, self.host_name, self.port, self.endpoint |
| 79 | + ) |
| 80 | + self.ipv4 = ipv4 |
| 81 | + self.ipv6 = ipv6 |
| 82 | + self.retry = retry |
| 83 | + |
| 84 | + def export(self, spans: Sequence[Span]) -> SpanExportResult: |
| 85 | + zipkin_spans = self._translate_to_zipkin(spans) |
| 86 | + result = requests.post( |
| 87 | + url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS |
| 88 | + ) |
| 89 | + |
| 90 | + if result.status_code not in SUCCESS_STATUS_CODES: |
| 91 | + logger.error( |
| 92 | + "Traces cannot be uploaded; status code: %s, message %s", |
| 93 | + result.status_code, |
| 94 | + result.text, |
| 95 | + ) |
| 96 | + |
| 97 | + if self.retry: |
| 98 | + return SpanExportResult.FAILED_RETRYABLE |
| 99 | + return SpanExportResult.FAILED_NOT_RETRYABLE |
| 100 | + return SpanExportResult.SUCCESS |
| 101 | + |
| 102 | + def _translate_to_zipkin(self, spans: Sequence[Span]): |
| 103 | + |
| 104 | + local_endpoint = { |
| 105 | + "serviceName": self.service_name, |
| 106 | + "port": self.port, |
| 107 | + } |
| 108 | + |
| 109 | + if self.ipv4 is not None: |
| 110 | + local_endpoint["ipv4"] = self.ipv4 |
| 111 | + |
| 112 | + if self.ipv6 is not None: |
| 113 | + local_endpoint["ipv6"] = self.ipv6 |
| 114 | + |
| 115 | + zipkin_spans = [] |
| 116 | + for span in spans: |
| 117 | + context = span.get_context() |
| 118 | + trace_id = context.trace_id |
| 119 | + span_id = context.span_id |
| 120 | + |
| 121 | + # Timestamp in zipkin spans is int of microseconds. |
| 122 | + # see: https://zipkin.io/pages/instrumenting.html |
| 123 | + start_timestamp_mus = _nsec_to_usec_round(span.start_time) |
| 124 | + duration_mus = _nsec_to_usec_round(span.end_time - span.start_time) |
| 125 | + |
| 126 | + zipkin_span = { |
| 127 | + "traceId": format(trace_id, "x"), |
| 128 | + "id": format(span_id, "x"), |
| 129 | + "name": span.name, |
| 130 | + "timestamp": start_timestamp_mus, |
| 131 | + "duration": duration_mus, |
| 132 | + "localEndpoint": local_endpoint, |
| 133 | + "kind": SPAN_KIND_MAP[span.kind], |
| 134 | + "tags": _extract_tags_from_span(span.attributes), |
| 135 | + "annotations": _extract_annotations_from_events(span.events), |
| 136 | + } |
| 137 | + |
| 138 | + if context.trace_options.sampled: |
| 139 | + zipkin_span["debug"] = 1 |
| 140 | + |
| 141 | + if isinstance(span.parent, Span): |
| 142 | + zipkin_span["parentId"] = format( |
| 143 | + span.parent.get_context().span_id, "x" |
| 144 | + ) |
| 145 | + elif isinstance(span.parent, SpanContext): |
| 146 | + zipkin_span["parentId"] = format(span.parent.span_id, "x") |
| 147 | + |
| 148 | + zipkin_spans.append(zipkin_span) |
| 149 | + return zipkin_spans |
| 150 | + |
| 151 | + def shutdown(self) -> None: |
| 152 | + pass |
| 153 | + |
| 154 | + |
| 155 | +def _extract_tags_from_span(attr): |
| 156 | + if not attr: |
| 157 | + return None |
| 158 | + tags = {} |
| 159 | + for attribute_key, attribute_value in attr.items(): |
| 160 | + if isinstance(attribute_value, (int, bool, float)): |
| 161 | + value = str(attribute_value) |
| 162 | + elif isinstance(attribute_value, str): |
| 163 | + value = attribute_value[:128] |
| 164 | + else: |
| 165 | + logger.warning("Could not serialize tag %s", attribute_key) |
| 166 | + continue |
| 167 | + tags[attribute_key] = value |
| 168 | + return tags |
| 169 | + |
| 170 | + |
| 171 | +def _extract_annotations_from_events(events): |
| 172 | + return ( |
| 173 | + [ |
| 174 | + {"timestamp": _nsec_to_usec_round(e.timestamp), "value": e.name} |
| 175 | + for e in events |
| 176 | + ] |
| 177 | + if events |
| 178 | + else None |
| 179 | + ) |
| 180 | + |
| 181 | + |
| 182 | +def _nsec_to_usec_round(nsec): |
| 183 | + """Round nanoseconds to microseconds""" |
| 184 | + return (nsec + 500) // 10 ** 3 |
0 commit comments