Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio duplicated instrumentation cause memory leak #3383

Open
yonathan-wolloch-lendbuzz opened this issue Mar 24, 2025 · 16 comments · May be fixed by #3408
Open

asyncio duplicated instrumentation cause memory leak #3383

yonathan-wolloch-lendbuzz opened this issue Mar 24, 2025 · 16 comments · May be fixed by #3408
Labels
bug Something isn't working

Comments

@yonathan-wolloch-lendbuzz

Describe your environment

OS: Ubuntu
Python version: (e.g., Python 3.13.1)
Package version: 0.51b0
Aiokafka[lz4] version: 0.12.0

What happened?

we have a memory leakage caused by asyncio when using AIOKafkaConsumer in our fastapi app, exactly as documented in aiokafka documentation.
we do think that using getone() within a while loop instead of anext solves that issue but we want to follow aiokafka best practices.

Steps to Reproduce

add the following code as part of the fastapi app startup lifespan:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

then trigger the consume once with a message, and the memory will scale up exponentially.

you can check the heap using guppy3 and tracemalloc. the best way is just to measure the memory utilization of the process.

Expected Result

stable memory utilization when using aiokafka's best practices.

Actual Result

exponentially increasing memory utilization.

Additional context

No response

Would you like to implement a fix?

No

@yonathan-wolloch-lendbuzz yonathan-wolloch-lendbuzz added the bug Something isn't working label Mar 24, 2025
@xrmx
Copy link
Contributor

xrmx commented Mar 24, 2025

@yonathan-wolloch-lendbuzz I guess we still have references to something, are you able to sort out the culprit with the tool you suggested?

@yonathan-wolloch-lendbuzz
Copy link
Author

yonathan-wolloch-lendbuzz commented Mar 24, 2025

@xrmx I don't want to mistakenly focus the discussion on it, but it seems that there is endless increase in objects from the path /python3.13/site-packages/opentelemetry/instrumentation/asyncio/init.py

@yonathan-wolloch-lendbuzz
Copy link
Author

@xrmx it also seems that using getone() within a while loop instead of anext is also causing a constant linear increase in memory utilization.

@yonathan-wolloch-lendbuzz
Copy link
Author

Also setting the env var OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=asyncio solves that issue, but we don't want to use this solution.

@yonathan-wolloch-lendbuzz
Copy link
Author

@xrmx are there any additional checks or things I can do to help soving this issue?

@yonathan-wolloch-lendbuzz
Copy link
Author

@bourbonkk @dimastbk tagging you as its probably related to asyncio and aiokafka instrumentation 🙏

@bourbonkk
Copy link
Contributor

@yonathan-wolloch-lendbuzz
Hi, thank you for raising this issue.

I'm currently investigating this from the instrumentation side (@bourbonkk here 👋), and I wanted to add a few technical points that may help with narrowing things down:

The AsyncioInstrumentor in opentelemetry-instrumentation-asyncio uses a whitelist-based approach to determine whether a coroutine, future, or to_thread function should be traced. This means that if no relevant environment variables are set, no span should be created at all, and only metrics will be emitted.

The relevant environment variables are:

  • OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE
  • OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE
  • OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED

So I’m curious if in your setup, any of these environment variables were set, even unintentionally (perhaps via Docker ENV, .env file, or shell profile)?

Also, since the memory increase seems tied to span or metric creation, it would be helpful to know:

  • What tracer or meter providers are being used?
  • Whether you observe the same issue with metrics-only tracing (no span generation)?

We’ve run stress tests using AIOKafkaConsumer under instrumented conditions, and couldn’t reproduce exponential memory growth when no whitelist env vars were set — even with thousands of mocked messages.

Any more insight into the environment would be very helpful 🙏

@yonathan-wolloch-lendbuzz
Copy link
Author

yonathan-wolloch-lendbuzz commented Mar 30, 2025

@bourbonkk
Hi, thank you for your help!

None of these env vars are defined.
I used AWS Memory utilization metric as we deploy the service on ECS Fargate.
There are no custom metrics in that service, so no provider is used for metrics. regarding spans we use auto instrumentation based on opentelemetry-contrib.
We don't observe the issue with metrics-only tracing.

regarding the stress testing we didn't see correlation between number of requests to memory usage, just a constant linear increase of memory over time.

Here are the OTEL env vars we do use:

  • OTEL_METRICS_EXPORTER=otlp
  • OTEL_LOGS_EXPORTER=console
  • OTEL_PYTHON_EXCLUDED_URLS=/healthcheck
  • OTEL_EXPORTER_OTLP_TRACES_INSECURE=true
  • OTEL_EXPORTER_OTLP_INSECURE=true
  • OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=localhost:4317
  • OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317

and the packages we use:

"opentelemetry-distro==0.51b0",
"opentelemetry-exporter-otlp==1.30.*",
"opentelemetry-instrumentation==0.51b0",
"aiokafka[lz4]~=0.12",
"fastapi[standard]~=0.115",

Please let me know if there is anything else I can do or provide in order to solve this. 🙏

@bourbonkk
Copy link
Contributor

@yonathan-wolloch-lendbuzz

👋 I tested this issue by setting up a real FastAPI application using AIOKafkaConsumer and applying OpenTelemetry instrumentation with AsyncioInstrumentor and FastAPIInstrumentor. My goal was to verify if memory growth occurs when consuming a large number of messages from Kafka using best practices (i.e., async for msg in consumer).

✅ Test Setup

  • Framework: FastAPI (run via uvicorn)
  • Kafka Client: aiokafka==0.12.0
  • OpenTelemetry:
    • opentelemetry-instrumentation-asyncio
    • opentelemetry-instrumentation-fastapi
  • Instrumentation:
    • Both instrumentors enabled
    • A root span was created using tracer.start_as_current_span("root-kafka-span") (Same when removed)
  • Memory Tracking:
    • tracemalloc used to take memory snapshots before and after consumption
    • Memory diff logged and total increase reported
  • Kafka Messages: Consumed 1,000,000 messages from topic spans

🧪 Test Code Snippet

@app.get("/")
async def root():
    tracemalloc.start()
    AsyncioInstrumentor().instrument()
    asyncio.create_task(kafka_consumer_task())
    return {"message": "Kafka consumer running with FastAPI"}

async def kafka_consumer_task():
    snapshot1 = tracemalloc.take_snapshot()
    consumer = AIOKafkaConsumer(...)
    await consumer.start()
    try:
        count = 0
        with tracer.start_as_current_span("root-kafka-span"):
            async for msg in consumer:
                count += 1
                if count >= 1_000_000:
                    break
    finally:
        await consumer.stop()
        gc.collect()
        snapshot2 = tracemalloc.take_snapshot()
        # Compare and log memory diff

📈 Test Result
Here is the actual output from uvicorn main:app after the consumer finished processing 1,000,000 messages:

INFO:     📊 Top memory diffs:
INFO:     aiokafka/protocol/types.py:161: +2048 KiB
INFO:     aiokafka/conn.py:281: +30.4 KiB
INFO:     aiokafka/conn.py:526: +5.2 KiB
INFO:     asyncio/streams.py:132: +5.2 KiB
...
INFO:     📈 Total increased memory: 2.25 MB

✅ Conclusion

  • No exponential or abnormal memory growth was observed
  • Memory increase remained stable and under 3MB even after consuming 1M Kafka messages
  • The presence of a root span likely helped manage context propagation and GC more effectively
  • tracemalloc showed predictable object growth from expected areas (aiokafka, asyncio)

Let me know if you'd like the full source or logs. I'd be happy to share more!
Or if anyone else has a test case with the same result, I'd love to see it.

@yonathan-wolloch-lendbuzz
Copy link
Author

yonathan-wolloch-lendbuzz commented Mar 31, 2025

@bourbonkk

👋 Thank you so much for taking the time to thoroughly test and investigate this issue—I greatly appreciate your detailed response and the effort you've invested.

Based on your results, it seems the memory growth you're seeing is minimal, which is reassuring. In our case, the memory leak seems unrelated to the number of messages consumed but rather to the total runtime of the service, as memory usage gradually increases over time. aiokafka also experienced similar issues.

We are still actively investigating the exact cause of the leak on our end. It would be very helpful if you could share the full source code and logs from your testing. Having your detailed setup might help us pinpoint differences that contribute to our ongoing memory issues.

Thanks again for your invaluable assistance! 🙏

@yonathan-wolloch-lendbuzz
Copy link
Author

📌 Memory Diff Results (After 4 Hours, 2 Kafka Messages)

We've observed significant memory growth after just 2 messages consumed over a period of 4 hours since deployment. Below is our detailed memory diff captured using tracemalloc:

Top 10 Memory Differences:

Location Memory Size Increase Count Increase Average per Object
.venv/.../opentelemetry/instrumentation/asyncio/init.py:306 +9536 KiB (Total: 12.7 MiB) +87184 (Total: 119215) 112 B
.venv/.../opentelemetry/instrumentation/asyncio/init.py:262 +5108 KiB (Total: 6987 KiB) +130776 (Total: 178860) 40 B
.venv/.../opentelemetry/instrumentation/asyncio/init.py:319 +5029 KiB (Total: 6894 KiB) +85729 (Total: 117550) 60 B
.venv/.../opentelemetry/instrumentation/asyncio/init.py:299 +1022 KiB (Total: 1397 KiB) +43587 (Total: 59600) 24 B
/usr/local/lib/python3.13/asyncio/tasks.py:534 +345 KiB (Total: 476 KiB) 0 (Total: 31) 15.3 KiB
/usr/local/lib/python3.13/tracemalloc.py:558 +68.4 KiB (Total: 68.5 KiB) +1193 (Total: 1194) 59 B
frozen importlib._bootstrap:488 -41.3 KiB (Total: 185 KiB) -484 (Total: 1556) 122 B
.venv/.../aiokafka/conn.py:281 +40.6 KiB (Total: 214 KiB) +8 (Total: 1876) 117 B
/usr/local/lib/python3.13/email/utils.py:275 +12.5 KiB (Total: 33.7 KiB) +265 (Total: 719) 48 B
.venv/.../aiokafka/protocol/types.py:204 -11.7 KiB (Total: 10.4 KiB) -181 (Total: 148) 72 B

⚠️ Conclusion:

  • Significant memory growth detected, particularly within opentelemetry-instrumentation-asyncio.

  • Memory usage increased significantly despite only 2 Kafka messages consumed.

Happy to provide additional details or logs if needed!, we also try to reproduce this issue using minimal code

@yonathan-wolloch-lendbuzz
Copy link
Author

@xrmx @bourbonkk
👋

Final Issue Description

After extensive investigation, we've identified a memory leak involving the async def __coordination_routine(self) method in aiokafka.consumer.group_coordinator. This routine creates six futures, of which the following three:

  • self._closing
  • subscription.unsubscribe_future
  • self._rejoin_needed_fut

continuously accumulate callbacks due to the interaction with AsyncioInstrumentor.trace_future from opentelemetry.instrumentation.asyncio. Each invocation of __coordination_routine adds three new callbacks to these futures indefinitely, causing unbounded memory growth.

Debugging Snippet

Here's a sanitized excerpt from our debug session highlighting the accumulated callbacks:

[
    <Future pending cb=[AsyncioInstrumentor.trace_future.<locals>.callback() at /path/to/project/.venv/lib/python3.13/site-packages/opentelemetry/instrumentation/asyncio/__init__.py:306, <70066 more callbacks>]>,
    <Future finished result=None>,
    <Future pending cb=[AsyncioInstrumentor.trace_future.<locals>.callback() at /path/to/project/.venv/lib/python3.13/site-packages/opentelemetry/instrumentation/asyncio/__init__.py:306, <70066 more callbacks>]>,
    <Future pending cb=[AsyncioInstrumentor.trace_future.<locals>.callback() at /path/to/project/.venv/lib/python3.13/site-packages/opentelemetry/instrumentation/asyncio/__init__.py:306, <70066 more callbacks>]>,
    <Task pending coro=<GroupCoordinator._heartbeat_routine() running at /path/to/project/.venv/lib/python3.13/site-packages/aiokafka/consumer/group_coordinator.py:773> wait_for=<Future pending cb=[Task.task_wakeup()]>>,
    <Task pending coro=<GroupCoordinator._commit_refresh_routine() running at /path/to/project/.venv/lib/python3.13/site-packages/aiokafka/consumer/group_coordinator.py:910> wait_for=<Future pending cb=[Task.task_wakeup()]>>
]

We'd greatly appreciate assistance or suggestions on resolving this issue.

@dimastbk
Copy link
Contributor

dimastbk commented Apr 4, 2025

MRE (run with pytest --memray):

import asyncio

from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor


async def test_intrument():
    AsyncioInstrumentor().instrument()

    fut = asyncio.Future()

    for _ in range(10000):
        await asyncio.wait(
            [fut, asyncio.create_task(asyncio.sleep(0))], return_when=asyncio.FIRST_COMPLETED
        )

Result:

         📦 Total memory allocated: 5.2MiB
         📏 Total allocations: 9
         📊 Histogram of allocation sizes: |▅  ▅█|
         🥇 Biggest allocating functions:
                - wait:/usr/lib/python3.12/asyncio/tasks.py:460 -> 2.0MiB
                - _run_once:/usr/lib/python3.12/asyncio/base_events.py:1971 -> 1.0MiB
                - wrap_coro_or_future:/home/virtualenv/lib/python3.12/site-packages/opentelemetry/instrumentation/asyncio/__init__.py:183 -> 1.0MiB
                - __init__:/usr/lib/python3.12/asyncio/events.py:45 -> 1.0MiB
                - _wait:/usr/lib/python3.12/asyncio/tasks.py:555 -> 156.2KiB

Patch:

diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py
index 9905d91d..950c7d5a 100644
--- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py
+++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py
@@ -90,6 +90,7 @@ import sys
 from asyncio import futures
 from timeit import default_timer
 from typing import Collection
+import weakref
 
 from wrapt import wrap_function_wrapper as _wrap
 
@@ -125,6 +126,8 @@ class AsyncioInstrumentor(BaseInstrumentor):
         "run_coroutine_threadsafe",
     ]
 
+    _future_callbacks = weakref.WeakValueDictionary()
+
     def instrumentation_dependencies(self) -> Collection[str]:
         return _instruments
 
@@ -303,6 +306,9 @@ class AsyncioInstrumentor(BaseInstrumentor):
             self.record_process(start, attr, span, exception)
 
     def trace_future(self, future):
+        if future in self._future_callbacks:
+            return future
+
         start = default_timer()
         span = (
             self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
@@ -324,6 +330,7 @@ class AsyncioInstrumentor(BaseInstrumentor):
             )
 
         future.add_done_callback(callback)
+        self._future_callbacks[future] = callback
         return future
 
     def record_process(

Result:

         📦 Total memory allocated: 64.0B
         📏 Total allocations: 2
         📊 Histogram of allocation sizes: |█ |
         🥇 Biggest allocating functions:
                - __init__:/home/virtualenv/lib/python3.12/site-packages/opentelemetry/metrics/_internal/__init__.py:199 -> 32.0B
                - __init__:/home/virtualenv/lib/python3.12/site-packages/opentelemetry/metrics/_internal/__init__.py:458 -> 32.0B

bourbonkk pushed a commit to bourbonkk/opentelemetry-python-contrib that referenced this issue Apr 4, 2025
@bourbonkk bourbonkk linked a pull request Apr 4, 2025 that will close this issue
12 tasks
bourbonkk added a commit to bourbonkk/opentelemetry-python-contrib that referenced this issue Apr 4, 2025
@bourbonkk
Copy link
Contributor

@yonathan-wolloch-lendbuzz
Thank you for your report.
I have updated the code to align with the approach taken in PR #3408.
I hope you find my suggestion helpful and aligned with the direction of the project.

@dani-shemesh-lendbuzz
Copy link

@yonathan-wolloch-lendbuzz Respect!

@aabmass
Copy link
Member

aabmass commented Apr 10, 2025

Is the issue title still accurate, or is it actually related to aiokafka as well? It sounds like the leak is only in asyncio instrumentation

@yonathan-wolloch-lendbuzz yonathan-wolloch-lendbuzz changed the title aiokafka instrumentation cause memory leak asyncio duplicated instrumentation cause memory leak Apr 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants