Skip to content

Commit 46feea4

Browse files
committed
use worker_init/worker_process_init to start event processor threads in celery (#444)
Celery uses a prefork model for its worker processes. The app is loaded in a "worker", which then forks one or more "worker processes". Similiarly to the uwsgi prefork issues we had to work around, this fork happens _after_ we created the event processor thread, so the worker processes don't have an event processor thread of their own, and just fill up the event queue without it ever being processed. To work around this, we use worker_init and worker_process_init signals to start the event processor thread _after_ the fork. This only became an issue with version 4.2.0 of the agent. In earlier versions of the agent, threads were started relatively late, after celery already forked the worker processes. closes #444
1 parent e84f1d9 commit 46feea4

File tree

4 files changed

+28
-5
lines changed

4 files changed

+28
-5
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## Unreleased
4+
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v4.2.1...master)
5+
* fixed an issue with Celery and the prefork worker pool (#444)
6+
37
## v4.2.1
48
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v4.2.0...v4.2.1)
59
* fixed an issue with the certificate pinning feature introduced in 4.2.0 (#433, #434)

elasticapm/contrib/celery/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def process_failure_signal(sender, task_id, exception, args, kwargs, traceback,
5252

5353
signals.task_failure.disconnect(process_failure_signal, dispatch_uid=dispatch_uid)
5454
signals.task_failure.connect(process_failure_signal, weak=False, dispatch_uid=dispatch_uid)
55+
_register_worker_signals(client)
5556

5657

5758
def register_instrumentation(client):
@@ -71,3 +72,20 @@ def end_transaction(task_id, task, *args, **kwargs):
7172
# register for this client
7273
signals.task_prerun.connect(begin_transaction, dispatch_uid=dispatch_uid % "prerun", weak=False)
7374
signals.task_postrun.connect(end_transaction, weak=False, dispatch_uid=dispatch_uid % "postrun")
75+
_register_worker_signals(client)
76+
77+
78+
def _register_worker_signals(client):
79+
def worker_startup(*args, **kwargs):
80+
client._transport._start_event_processor()
81+
82+
def worker_shutdown(*args, **kwargs):
83+
client.close()
84+
85+
def connect_worker_process_init(*args, **kwargs):
86+
signals.worker_process_init.connect(worker_startup, dispatch_uid="elasticapm-start-worker", weak=False)
87+
signals.worker_process_shutdown.connect(worker_shutdown, dispatch_uid="elasticapm-shutdown-worker", weak=False)
88+
89+
signals.worker_init.connect(
90+
connect_worker_process_init, dispatch_uid="elasticapm-connect-start-threads", weak=False
91+
)

elasticapm/transport/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ def __init__(
9090
self._queued_data = None
9191
self._event_queue = self._init_event_queue(chill_until=queue_chill_count, max_chill_time=queue_chill_time)
9292
self._is_chilled_queue = isinstance(self._event_queue, ChilledQueue)
93-
self._event_process_thread = threading.Thread(target=self._process_queue, name="eapm event processor thread")
94-
self._event_process_thread.daemon = True
93+
self._event_process_thread = None
9594
self._last_flush = timeit.default_timer()
9695
self._counts = defaultdict(int)
9796
self._flushed = threading.Event()
@@ -212,8 +211,12 @@ def _flush(self, buffer):
212211
self.handle_transport_fail(e)
213212

214213
def _start_event_processor(self):
215-
if not self._event_process_thread.is_alive() and not self._closed:
214+
if (not self._event_process_thread or not self._event_process_thread.is_alive()) and not self._closed:
216215
try:
216+
self._event_process_thread = threading.Thread(
217+
target=self._process_queue, name="eapm event processor thread"
218+
)
219+
self._event_process_thread.daemon = True
217220
self._event_process_thread.start()
218221
except RuntimeError:
219222
pass

tests/contrib/celery/django_tests.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
django = pytest.importorskip("django") # isort:skip
3636
celery = pytest.importorskip("celery") # isort:skip
3737

38-
import mock
39-
4038
from elasticapm.conf.constants import ERROR, TRANSACTION
4139
from elasticapm.contrib.celery import register_exception_tracking, register_instrumentation
4240
from tests.contrib.django.testapp.tasks import failing_task, successful_task

0 commit comments

Comments
 (0)