Skip to content

Commit 1c7a161

Browse files
♻️ Adding additional checks to RUT + 🐛 Setting RabbitMQ message TTL (#5150)
1 parent 3f92d39 commit 1c7a161

File tree

6 files changed

+83
-7
lines changed

6 files changed

+83
-7
lines changed

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
from typing import Final
55

66
import aio_pika
7+
from pydantic import NonNegativeInt
78

89
from ..logging_utils import log_context
910
from ._client_base import RabbitMQClientBase
1011
from ._models import MessageHandler, RabbitMessage
11-
from ._utils import declare_queue, get_rabbitmq_client_unique_name
12+
from ._utils import (
13+
RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
14+
declare_queue,
15+
get_rabbitmq_client_unique_name,
16+
)
1217

1318
_logger = logging.getLogger(__name__)
1419

@@ -73,6 +78,7 @@ async def subscribe(
7378
*,
7479
exclusive_queue: bool = True,
7580
topics: list[str] | None = None,
81+
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
7682
) -> str:
7783
"""subscribe to exchange_name calling message_handler for every incoming message
7884
- exclusive_queue: True means that every instance of this application will receive the incoming messages
@@ -114,6 +120,7 @@ async def subscribe(
114120
self.client_name,
115121
exchange_name,
116122
exclusive_queue=exclusive_queue,
123+
message_ttl=message_ttl,
117124
)
118125
if topics is None:
119126
await queue.bind(exchange, routing_key="")

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Final
55

66
import aio_pika
7+
from pydantic import NonNegativeInt
78
from tenacity import retry
89
from tenacity.before_sleep import before_sleep_log
910
from tenacity.stop import stop_after_delay
@@ -16,8 +17,7 @@
1617

1718
_MINUTE: Final[int] = 60
1819

19-
20-
_RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S: Final[int] = 15 * _MINUTE
20+
RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS: Final[int] = 15 * _MINUTE * 1000
2121

2222

2323
class RabbitMQRetryPolicyUponInitialization:
@@ -55,11 +55,12 @@ async def declare_queue(
5555
exchange_name: str,
5656
*,
5757
exclusive_queue: bool,
58+
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
5859
) -> aio_pika.abc.AbstractRobustQueue:
5960
queue_parameters = {
6061
"durable": True,
6162
"exclusive": exclusive_queue,
62-
"arguments": {"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S},
63+
"arguments": {"x-message-ttl": message_ttl},
6364
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive",
6465
}
6566
if not exclusive_queue:

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,19 @@ async def update_service_run_stopped_at(
176176
return None
177177
return ServiceRunDB.from_orm(row)
178178

179+
async def get_service_run_by_id(
180+
self, service_run_id: ServiceRunId
181+
) -> ServiceRunDB | None:
182+
async with self.db_engine.begin() as conn:
183+
stmt = sa.select(resource_tracker_service_runs).where(
184+
resource_tracker_service_runs.c.service_run_id == service_run_id
185+
)
186+
result = await conn.execute(stmt)
187+
row = result.first()
188+
if row is None:
189+
return None
190+
return ServiceRunDB.from_orm(row)
191+
179192
async def list_service_runs_by_product_and_user_and_wallet(
180193
self,
181194
product_name: ProductName,

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
_TASK_NAME_START_PERIODIC_TASK = "start_background_task"
2424
_TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES = "periodic_check_of_running_services"
2525

26+
_RUT_MESSAGE_TTL_IN_MS = 2 * 60 * 60 * 1000 # 2 hours
27+
2628

2729
async def _subscribe_to_rabbitmq(app) -> str:
2830
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"):
@@ -31,6 +33,7 @@ async def _subscribe_to_rabbitmq(app) -> str:
3133
RabbitResourceTrackingBaseMessage.get_channel_name(),
3234
message_handler=functools.partial(process_message, app),
3335
exclusive_queue=False,
36+
message_ttl=_RUT_MESSAGE_TTL_IN_MS,
3437
)
3538
return subscribed_queue
3639

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_process_messages.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,25 @@ async def _process_start_event(
6363
msg: RabbitResourceTrackingStartedMessage,
6464
rabbitmq_client: RabbitMQClient,
6565
):
66+
service_run_db = await resource_tracker_repo.get_service_run_by_id(
67+
service_run_id=msg.service_run_id
68+
)
69+
if service_run_db:
70+
# NOTE: After we find out why sometimes RUT recieves multiple start events and fix it, we can change it to log level `error`
71+
_logger.warning(
72+
"On process start event the service run id %s already exists in DB, INVESTIGATE! Current msg created_at: %s, already stored msg created_at: %s",
73+
msg.service_run_id,
74+
msg.created_at,
75+
service_run_db.started_at,
76+
)
77+
return
78+
79+
# Prepare `service run` record (if billable `credit transaction`) in the DB
6680
service_type = (
6781
ResourceTrackerServiceType.COMPUTATIONAL_SERVICE
6882
if msg.service_type == ServiceType.COMPUTATIONAL
6983
else ResourceTrackerServiceType.DYNAMIC_SERVICE
7084
)
71-
7285
pricing_unit_cost = None
7386
if msg.pricing_unit_cost_id:
7487
pricing_unit_cost_db = await resource_tracker_repo.get_pricing_unit_cost_by_id(
@@ -134,10 +147,29 @@ async def _process_heartbeat_event(
134147
msg: RabbitResourceTrackingHeartbeatMessage,
135148
rabbitmq_client: RabbitMQClient,
136149
):
150+
service_run_db = await resource_tracker_repo.get_service_run_by_id(
151+
service_run_id=msg.service_run_id
152+
)
153+
if not service_run_db:
154+
_logger.error(
155+
"Recieved process heartbeat event for service_run_id: %s, but we do not have the started record in the DB, INVESTIGATE!",
156+
msg.service_run_id,
157+
)
158+
return
159+
if service_run_db.service_run_status in {
160+
ServiceRunStatus.SUCCESS,
161+
ServiceRunStatus.ERROR,
162+
}:
163+
_logger.error(
164+
"Recieved process heartbeat event for service_run_id: %s, but it was already closed, INVESTIGATE!",
165+
msg.service_run_id,
166+
)
167+
return
168+
169+
# Update `service run` record (if billable `credit transaction`) in the DB
137170
update_service_run_last_heartbeat = ServiceRunLastHeartbeatUpdate(
138171
service_run_id=msg.service_run_id, last_heartbeat_at=msg.created_at
139172
)
140-
141173
running_service = await resource_tracker_repo.update_service_run_last_heartbeat(
142174
update_service_run_last_heartbeat
143175
)
@@ -184,6 +216,26 @@ async def _process_stop_event(
184216
msg: RabbitResourceTrackingStoppedMessage,
185217
rabbitmq_client: RabbitMQClient,
186218
):
219+
service_run_db = await resource_tracker_repo.get_service_run_by_id(
220+
service_run_id=msg.service_run_id
221+
)
222+
if not service_run_db:
223+
_logger.error(
224+
"Recieved stop event for service_run_id: %s, but we do not have the started record in the DB, INVESTIGATE!",
225+
msg.service_run_id,
226+
)
227+
return
228+
if service_run_db.service_run_status in {
229+
ServiceRunStatus.SUCCESS,
230+
ServiceRunStatus.ERROR,
231+
}:
232+
_logger.error(
233+
"Recieved stop event for service_run_id: %s, but it was already closed, INVESTIGATE!",
234+
msg.service_run_id,
235+
)
236+
return
237+
238+
# Update `service run` record (if billable `credit transaction`) in the DB
187239
_run_status, _run_status_msg = ServiceRunStatus.SUCCESS, None
188240
if msg.simcore_platform_status is SimcorePlatformStatus.BAD:
189241
_run_status, _run_status_msg = (

services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async def test_process_event_functions(
5959
output = await assert_service_runs_db_row(postgres_db, msg.service_run_id)
6060
assert output.stopped_at is None
6161
assert output.service_run_status == "RUNNING"
62-
first_occurence_of_last_heartbeat_at < output.last_heartbeat_at
62+
assert first_occurence_of_last_heartbeat_at < output.last_heartbeat_at
6363

6464
stopped_msg = RabbitResourceTrackingStoppedMessage(
6565
service_run_id=msg.service_run_id,

0 commit comments

Comments
 (0)