You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"""Sample RabbiMQ consumer application demonstrating tracing capabilities of using opentelemetry instrumentation libraries."""
import argparse
import logging
import pika
import subprocess
import sys
from opentelemetry import trace
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.trace import TracerProvider
LoggingInstrumentor().instrument(set_logging_format=False)
logging.basicConfig(filename='consumer.log',
filemode='w',
format='%(asctime)s - %(levelname)s - %(threadName)s - %(filename)s:%(lineno)s - %(funcName)s - [trace_id=%(otelTraceID)s span_id=%(otelSpanID)s resource.service.name=%(otelServiceName)s] - %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
trace.set_tracer_provider(TracerProvider())
logger = logging.getLogger("Consumer_App")
# Callback function which is called on incoming messages
def callback(ch, method, properties, body):
logger.info(" [x] Received Message: " + str(body.decode('utf-8')))
logger.info(" [x] Header " + str(properties.headers))
logger.info(" Exiting the process")
sys.exit(0)
def main():
logger.info("[START]: Consumer Application")
parser = argparse.ArgumentParser()
parser.add_argument("queue", help="queue name", type=str, default="testqueue")
args = parser.parse_args()
# Connect to rabbitmq.
url = 'amqp://admin:password@localhost:31302'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
mychannel = connection.channel() # start a channel
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=mychannel)
mychannel.queue_declare(queue=args.queue, ) # Declare a queue
mychannel.basic_publish("", args.queue, "This is a test message published to RabbitMQ")
# Below code with basic_consume and callback function - it does have trace id and span ids for log statements in callback function.
# set up subscription on the queue
# mychannel.basic_consume(args.queue, callback, auto_ack=True)
# # start consuming (blocks)
# mychannel.start_consuming()
# I'm using channel.consume method to get the messages from the queue. And expecting the logs should have trace ids and span ids.
for method, properties, body in mychannel.consume(queue='testqueue', auto_ack=True, inactivity_timeout=10):
logger.info(" [x] Received Message: " + str(body.decode('utf-8')))
logger.info(" [x] Header " + str(properties.headers))
logger.info(" Exiting the process")
sys.exit(0)
connection.close()
if __name__ == '__main__':
main()
What is the expected behavior?
I'm using channel.consume method to get the messages from the queue. And expecting the logs should have trace ids and span ids.
If you see below, trace_id and span_id is available if I use basic_consume with callback.
02:49:21 - INFO - MainThread - TracingConsumer.py:27 - callback - [trace_id=0f0e9af13cd7618bddf6ecaf8bc8663a span_id=32921be241a75b82 resource.service.name=] - [x] Received Message: Hi_from_publisher
02:49:21 - INFO - MainThread - TracingConsumer.py:28 - callback - [trace_id=0f0e9af13cd7618bddf6ecaf8bc8663a span_id=32921be241a75b82 resource.service.name=] - [x] Header {'traceparent': '00-0f0e9af13cd7618bddf6ecaf8bc8663a-27c540a6ee6fe32f-01'}
02:49:21 - INFO - MainThread - TracingConsumer.py:29 - callback - [trace_id=0f0e9af13cd7618bddf6ecaf8bc8663a span_id=32921be241a75b82 resource.service.name=] - Exiting the process
What is the actual behavior?
Actual behavior with channel.consume(...) where trace_id and span_id are 0. Although we can clearly see that the message header contains "traceparent" id.
15:58:43 - INFO - MainThread - TracingConsumerTest.py:60 - main - [trace_id=0 span_id=0 resource.service.name=] - [x] Received Message: This is a test message published to RabbitMQ
15:58:43 - INFO - MainThread - TracingConsumerTest.py:61 - main - [trace_id=0 span_id=0 resource.service.name=] - [x] Header {'traceparent': '00-9f016203a363df635b9f3d3c76775c92-d5afde2a271cfe41-01'}
15:58:43 - INFO - MainThread - TracingConsumerTest.py:62 - main - [trace_id=0 span_id=0 resource.service.name=] - Exiting the process
Additional context
None
The text was updated successfully, but these errors were encountered:
Hi all, i am facing the same issue and would like to take a stab at fixing it. From my quick look it seems like a consumer group is only attached when next is called on the generator and not when the actual consume function is called. That's why I think i want to add a hook on the first call to the next method to instrument the consumer. let me know if that makes sense
Describe your environment
Steps to reproduce
What is the expected behavior?
I'm using channel.consume method to get the messages from the queue. And expecting the logs should have trace ids and span ids.
If you see below, trace_id and span_id is available if I use basic_consume with callback.
What is the actual behavior?
Actual behavior with channel.consume(...) where trace_id and span_id are 0. Although we can clearly see that the message header contains "traceparent" id.
Additional context
None
The text was updated successfully, but these errors were encountered: