Skip to content

Commit 5b2d357

Browse files
committed
add test for async jobs + client
1 parent df93f6f commit 5b2d357

File tree

4 files changed

+400
-22
lines changed

4 files changed

+400
-22
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py

+126-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
from typing import Final
1+
import datetime
2+
import logging
3+
from asyncio import CancelledError
4+
from collections.abc import AsyncGenerator, Awaitable
5+
from typing import Any, Final
26

7+
from attr import dataclass
38
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
49
AsyncJobGet,
510
AsyncJobId,
@@ -9,12 +14,25 @@
914
)
1015
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
1116
from pydantic import NonNegativeInt, TypeAdapter
17+
from tenacity import (
18+
AsyncRetrying,
19+
TryAgain,
20+
before_sleep_log,
21+
retry,
22+
retry_if_exception_type,
23+
stop_after_delay,
24+
wait_fixed,
25+
wait_random_exponential,
26+
)
1227

28+
from ....rabbitmq import RemoteMethodNotRegisteredError
1329
from ... import RabbitMQRPCClient
1430

1531
_DEFAULT_TIMEOUT_S: Final[NonNegativeInt] = 30
1632

1733
_RPC_METHOD_NAME_ADAPTER = TypeAdapter(RPCMethodName)
34+
_DEFAULT_POLL_INTERVAL_S: Final[float] = 0.1
35+
_logger = logging.getLogger(__name__)
1836

1937

2038
async def cancel(
@@ -103,3 +121,110 @@ async def submit(
103121
)
104122
assert isinstance(_result, AsyncJobGet) # nosec
105123
return _result
124+
125+
126+
_DEFAULT_RPC_RETRY_POLICY: dict[str, Any] = {
127+
"retry": retry_if_exception_type(RemoteMethodNotRegisteredError),
128+
"wait": wait_random_exponential(max=20),
129+
"stop": stop_after_delay(60),
130+
"reraise": True,
131+
"before_sleep": before_sleep_log(_logger, logging.INFO),
132+
}
133+
134+
135+
@retry(**_DEFAULT_RPC_RETRY_POLICY)
136+
async def _wait_for_completion(
137+
rabbitmq_rpc_client: RabbitMQRPCClient,
138+
*,
139+
rpc_namespace: RPCNamespace,
140+
job_id: AsyncJobId,
141+
job_id_data: AsyncJobNameData,
142+
client_timeout: int,
143+
) -> AsyncGenerator[AsyncJobStatus, None]:
144+
try:
145+
async for attempt in AsyncRetrying(
146+
stop=stop_after_delay(client_timeout),
147+
reraise=True,
148+
retry=retry_if_exception_type(TryAgain),
149+
before_sleep=before_sleep_log(_logger, logging.DEBUG),
150+
wait=wait_fixed(_DEFAULT_POLL_INTERVAL_S),
151+
):
152+
with attempt:
153+
job_status = await status(
154+
rabbitmq_rpc_client,
155+
rpc_namespace=rpc_namespace,
156+
job_id=job_id,
157+
job_id_data=job_id_data,
158+
)
159+
yield job_status
160+
if not job_status.done:
161+
msg = f"{job_status.job_id=}: '{job_status.progress=}'"
162+
raise TryAgain(msg) # noqa: TRY301
163+
164+
except TryAgain as exc:
165+
# this is a timeout
166+
msg = f"Long running task {job_id=}, calling to timed-out after {client_timeout} seconds"
167+
raise TimeoutError(msg) from exc
168+
169+
170+
@dataclass(frozen=True)
171+
class AsyncJobComposedResult:
172+
status: AsyncJobStatus
173+
_result: Awaitable[Any] | None = None
174+
175+
@property
176+
def done(self) -> bool:
177+
return self._result is not None
178+
179+
async def result(self) -> Any:
180+
if not self._result:
181+
msg = "No result ready!"
182+
raise ValueError(msg)
183+
return await self._result
184+
185+
186+
async def submit_and_wait(
187+
rabbitmq_rpc_client: RabbitMQRPCClient,
188+
*,
189+
rpc_namespace: RPCNamespace,
190+
method_name: str,
191+
job_id_data: AsyncJobNameData,
192+
client_timeout: datetime.timedelta,
193+
**kwargs,
194+
) -> AsyncGenerator[AsyncJobComposedResult, None]:
195+
async_job_rpc_get = None
196+
try:
197+
async_job_rpc_get = await submit(
198+
rabbitmq_rpc_client,
199+
rpc_namespace=rpc_namespace,
200+
method_name=method_name,
201+
job_id_data=job_id_data,
202+
**kwargs,
203+
)
204+
async for job_status in _wait_for_completion(
205+
rabbitmq_rpc_client,
206+
rpc_namespace=rpc_namespace,
207+
job_id=async_job_rpc_get.job_id,
208+
job_id_data=job_id_data,
209+
client_timeout=client_timeout,
210+
):
211+
yield AsyncJobComposedResult(job_status)
212+
213+
yield AsyncJobComposedResult(
214+
job_status,
215+
result(
216+
rabbitmq_rpc_client,
217+
rpc_namespace=rpc_namespace,
218+
job_id=async_job_rpc_get.job_id,
219+
job_id_data=job_id_data,
220+
),
221+
)
222+
except (TimeoutError, CancelledError):
223+
if async_job_rpc_get is not None:
224+
await cancel(
225+
rabbitmq_rpc_client,
226+
rpc_namespace=rpc_namespace,
227+
job_id=async_job_rpc_get.job_id,
228+
job_id_data=job_id_data,
229+
)
230+
raise

packages/service-library/tests/rabbitmq/conftest.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,31 @@
1-
from collections.abc import AsyncIterator, Callable, Coroutine
1+
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
22
from typing import cast
33

44
import aiodocker
55
import arrow
66
import pytest
77
from faker import Faker
8+
from models_library.rabbitmq_basic_types import RPCNamespace
9+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
10+
11+
12+
@pytest.fixture
13+
async def rpc_client(
14+
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
15+
) -> RabbitMQRPCClient:
16+
return await rabbitmq_rpc_client("pytest_rpc_client")
17+
18+
19+
@pytest.fixture
20+
async def rpc_server(
21+
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
22+
) -> RabbitMQRPCClient:
23+
return await rabbitmq_rpc_client("pytest_rpc_server")
24+
25+
26+
@pytest.fixture
27+
def namespace() -> RPCNamespace:
28+
return RPCNamespace.from_entries({f"test{i}": f"test{i}" for i in range(8)})
829

930

1031
@pytest.fixture(autouse=True)

packages/service-library/tests/rabbitmq/test_rabbitmq_rpc.py

+1-20
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# pylint:disable=unused-argument
33

44
import asyncio
5-
from collections.abc import Awaitable, Callable
5+
from collections.abc import Awaitable
66
from typing import Any, Final
77

88
import pytest
@@ -23,11 +23,6 @@
2323
MULTIPLE_REQUESTS_COUNT: Final[NonNegativeInt] = 100
2424

2525

26-
@pytest.fixture
27-
def namespace() -> RPCNamespace:
28-
return RPCNamespace.from_entries({f"test{i}": f"test{i}" for i in range(8)})
29-
30-
3126
async def add_me(*, x: Any, y: Any) -> Any:
3227
return x + y
3328
# NOTE: types are not enforced
@@ -49,20 +44,6 @@ def __add__(self, other: "CustomClass") -> "CustomClass":
4944
return CustomClass(x=self.x + other.x, y=self.y + other.y)
5045

5146

52-
@pytest.fixture
53-
async def rpc_client(
54-
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
55-
) -> RabbitMQRPCClient:
56-
return await rabbitmq_rpc_client("pytest_rpc_client")
57-
58-
59-
@pytest.fixture
60-
async def rpc_server(
61-
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
62-
) -> RabbitMQRPCClient:
63-
return await rabbitmq_rpc_client("pytest_rpc_server")
64-
65-
6647
@pytest.mark.parametrize(
6748
"x,y,expected_result,expected_type",
6849
[

0 commit comments

Comments
 (0)