24
24
25
25
from twisted .python .failure import Failure
26
26
27
- from synapse .logging .context import nested_logging_context
27
+ from synapse .logging .context import (
28
+ ContextResourceUsage ,
29
+ LoggingContext ,
30
+ nested_logging_context ,
31
+ set_current_context ,
32
+ )
28
33
from synapse .metrics import LaterGauge
29
34
from synapse .metrics .background_process_metrics import (
30
35
run_as_background_process ,
@@ -81,6 +86,8 @@ class TaskScheduler:
81
86
MAX_CONCURRENT_RUNNING_TASKS = 5
82
87
# Time from the last task update after which we will log a warning
83
88
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
89
+ # Report a running task's status and usage every so often.
90
+ OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes
84
91
85
92
def __init__ (self , hs : "HomeServer" ):
86
93
self ._hs = hs
@@ -346,6 +353,33 @@ async def _clean_scheduled_tasks(self) -> None:
346
353
assert task .id not in self ._running_tasks
347
354
await self ._store .delete_scheduled_task (task .id )
348
355
356
+ @staticmethod
357
+ def _log_task_usage (
358
+ state : str , task : ScheduledTask , usage : ContextResourceUsage , active_time : float
359
+ ) -> None :
360
+ """
361
+ Log a line describing the state and usage of a task.
362
+ The log line is inspired by / a copy of the request log line format,
363
+ but with irrelevant fields removed.
364
+
365
+ active_time: Time that the task has been running for, in seconds.
366
+ """
367
+
368
+ logger .info (
369
+ "Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
370
+ " [%d dbevts] %r, %r" ,
371
+ state ,
372
+ active_time ,
373
+ usage .ru_utime ,
374
+ usage .ru_stime ,
375
+ usage .db_sched_duration_sec ,
376
+ usage .db_txn_duration_sec ,
377
+ int (usage .db_txn_count ),
378
+ usage .evt_db_fetch_count ,
379
+ task .resource_id ,
380
+ task .params ,
381
+ )
382
+
349
383
async def _launch_task (self , task : ScheduledTask ) -> None :
350
384
"""Launch a scheduled task now.
351
385
@@ -360,8 +394,32 @@ async def _launch_task(self, task: ScheduledTask) -> None:
360
394
)
361
395
function = self ._actions [task .action ]
362
396
397
+ def _occasional_report (
398
+ task_log_context : LoggingContext , start_time : int
399
+ ) -> None :
400
+ """
401
+ Helper to log a 'Task continuing' line every so often.
402
+ """
403
+
404
+ current_time = int (self ._clock .time ())
405
+ calling_context = set_current_context (task_log_context )
406
+ try :
407
+ usage = task_log_context .get_resource_usage ()
408
+ TaskScheduler ._log_task_usage (
409
+ "continuing" , task , usage , (current_time - start_time ) * 0.001
410
+ )
411
+ finally :
412
+ set_current_context (calling_context )
413
+
363
414
async def wrapper () -> None :
364
- with nested_logging_context (task .id ):
415
+ with nested_logging_context (task .id ) as log_context :
416
+ start_time = int (self ._clock .time ())
417
+ occasional_status_call = self ._clock .looping_call (
418
+ _occasional_report ,
419
+ TaskScheduler .OCCASIONAL_REPORT_INTERVAL_MS ,
420
+ log_context ,
421
+ start_time ,
422
+ )
365
423
try :
366
424
(status , result , error ) = await function (task )
367
425
except Exception :
@@ -383,6 +441,13 @@ async def wrapper() -> None:
383
441
)
384
442
self ._running_tasks .remove (task .id )
385
443
444
+ current_time = int (self ._clock .time ())
445
+ usage = log_context .get_resource_usage ()
446
+ TaskScheduler ._log_task_usage (
447
+ status .value , task , usage , (current_time - start_time ) * 0.001
448
+ )
449
+ occasional_status_call .stop ()
450
+
386
451
# Try launch a new task since we've finished with this one.
387
452
self ._clock .call_later (0.1 , self ._launch_scheduled_tasks )
388
453
0 commit comments