|
| 1 | +import pika |
| 2 | +from logging import getLogger |
| 3 | +from opentelemetry import trace |
| 4 | +from typing import Dict, Callable |
| 5 | +from typing import Collection, Any |
| 6 | +from pika.adapters import BaseConnection |
| 7 | +from opentelemetry.propagate import inject |
| 8 | +from opentelemetry.instrumentation.pika import utils |
| 9 | +from opentelemetry.trace import Tracer, TracerProvider |
| 10 | +from opentelemetry.semconv.trace import MessagingOperationValues |
| 11 | +from opentelemetry.instrumentation.pika.package import _instruments |
| 12 | +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor |
| 13 | + |
| 14 | + |
| 15 | +_LOG = getLogger(__name__) |
| 16 | +CTX_KEY = "__otel_task_span" |
| 17 | + |
| 18 | + |
| 19 | +class PikaInstrumentation(BaseInstrumentor): |
| 20 | + @staticmethod |
| 21 | + def _instrument_consumers( |
| 22 | + consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer |
| 23 | + ) -> Any: |
| 24 | + for key, callback in consumers_dict.items(): |
| 25 | + |
| 26 | + def decorated_callback( |
| 27 | + channel: pika.channel.Channel, |
| 28 | + method: pika.spec.Basic.Deliver, |
| 29 | + properties: pika.spec.BasicProperties, |
| 30 | + body: bytes, |
| 31 | + ) -> Any: |
| 32 | + if not properties: |
| 33 | + properties = pika.spec.BasicProperties() |
| 34 | + span = utils.get_span( |
| 35 | + tracer, |
| 36 | + channel, |
| 37 | + properties, |
| 38 | + task_name=key, |
| 39 | + operation=MessagingOperationValues.RECEIVE, |
| 40 | + ) |
| 41 | + with trace.use_span(span, end_on_exit=True): |
| 42 | + inject(properties.headers) |
| 43 | + retval = callback(channel, method, properties, body) |
| 44 | + return retval |
| 45 | + |
| 46 | + decorated_callback.__setattr__("_original_callback", callback) |
| 47 | + consumers_dict[key] = decorated_callback |
| 48 | + |
| 49 | + @staticmethod |
| 50 | + def _instrument_publish(channel: Any, tracer: Tracer) -> None: |
| 51 | + original_basic_publish = channel.basic_publish |
| 52 | + |
| 53 | + def decorated_basic_publish( |
| 54 | + exchange, routing_key, body, properties=None, mandatory=False |
| 55 | + ): |
| 56 | + if not properties: |
| 57 | + properties = pika.spec.BasicProperties() |
| 58 | + span = utils.get_span( |
| 59 | + tracer, |
| 60 | + channel, |
| 61 | + properties, |
| 62 | + task_name="(temporary)", |
| 63 | + operation=None, |
| 64 | + ) |
| 65 | + with trace.use_span(span, end_on_exit=True): |
| 66 | + inject(properties.headers) |
| 67 | + retval = original_basic_publish( |
| 68 | + exchange, routing_key, body, properties, mandatory |
| 69 | + ) |
| 70 | + return retval |
| 71 | + |
| 72 | + decorated_basic_publish.__setattr__( |
| 73 | + "_original_function", original_basic_publish |
| 74 | + ) |
| 75 | + channel.basic_publish = decorated_basic_publish |
| 76 | + |
| 77 | + @staticmethod |
| 78 | + def instrument_channel( |
| 79 | + channel: Any, tracer_provider: TracerProvider |
| 80 | + ) -> None: |
| 81 | + if not hasattr(channel, "_impl") or not isinstance( |
| 82 | + channel._impl, pika.channel.Channel |
| 83 | + ): |
| 84 | + _LOG.error("Could not find implementation for provided channel!") |
| 85 | + return |
| 86 | + tracer = trace.get_tracer(__name__, pika.__version__, tracer_provider) |
| 87 | + if channel._impl._consumers: |
| 88 | + PikaInstrumentation._instrument_consumers( |
| 89 | + channel._impl._consumers, tracer |
| 90 | + ) |
| 91 | + PikaInstrumentation._instrument_publish(channel, tracer) |
| 92 | + |
| 93 | + def _uninstrument(self, connection: Any, **kwargs: Dict[str, Any]) -> None: |
| 94 | + if not hasattr(connection, "_impl") or not isinstance( |
| 95 | + connection._impl, BaseConnection |
| 96 | + ): |
| 97 | + _LOG.error("Could not find implementation for provided channel!") |
| 98 | + return |
| 99 | + for key, callback in connection._impl._consumers: |
| 100 | + if hasattr(callback, "_original_callback"): |
| 101 | + connection._consumers[key] = callback._original_callback |
| 102 | + if hasattr(connection.basic_publish, "_original_function"): |
| 103 | + connection.basic_publish = ( |
| 104 | + connection.basic_publish._original_function |
| 105 | + ) |
| 106 | + |
| 107 | + def instrumentation_dependencies(self) -> Collection[str]: |
| 108 | + return _instruments |
0 commit comments