|
1 | 1 | import asyncio
|
| 2 | +from datetime import timedelta |
| 3 | + |
2 | 4 | import pytest
|
3 | 5 |
|
4 | 6 | from sentry_sdk import get_client, start_transaction
|
@@ -376,3 +378,48 @@ async def job(ctx):
|
376 | 378 | assert event["contexts"]["trace"]["origin"] == "auto.queue.arq"
|
377 | 379 | assert event["spans"][0]["origin"] == "auto.db.redis"
|
378 | 380 | assert event["spans"][1]["origin"] == "auto.db.redis"
|
| 381 | + |
| 382 | + |
| 383 | +@pytest.mark.asyncio |
| 384 | +async def test_job_concurrency(capture_events, init_arq): |
| 385 | + """ |
| 386 | + 10 - division starts |
| 387 | + 70 - sleepy starts |
| 388 | + 110 - division raises error |
| 389 | + 120 - sleepy finishes |
| 390 | +
|
| 391 | + """ |
| 392 | + |
| 393 | + async def sleepy(_): |
| 394 | + await asyncio.sleep(0.05) |
| 395 | + |
| 396 | + async def division(_): |
| 397 | + await asyncio.sleep(0.1) |
| 398 | + return 1 / 0 |
| 399 | + |
| 400 | + sleepy.__qualname__ = sleepy.__name__ |
| 401 | + division.__qualname__ = division.__name__ |
| 402 | + |
| 403 | + pool, worker = init_arq([sleepy, division]) |
| 404 | + |
| 405 | + events = capture_events() |
| 406 | + |
| 407 | + await pool.enqueue_job( |
| 408 | + "division", _job_id="123", _defer_by=timedelta(milliseconds=10) |
| 409 | + ) |
| 410 | + await pool.enqueue_job( |
| 411 | + "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) |
| 412 | + ) |
| 413 | + |
| 414 | + loop = asyncio.get_event_loop() |
| 415 | + task = loop.create_task(worker.async_run()) |
| 416 | + await asyncio.sleep(1) |
| 417 | + |
| 418 | + task.cancel() |
| 419 | + |
| 420 | + await worker.close() |
| 421 | + |
| 422 | + exception_event = events[1] |
| 423 | + assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" |
| 424 | + assert exception_event["transaction"] == "division" |
| 425 | + assert exception_event["extra"]["arq-job"]["task"] == "division" |
0 commit comments