24
24
25
25
from twisted .python .failure import Failure
26
26
27
- from synapse .logging .context import nested_logging_context
27
+ from synapse .logging .context import (
28
+ ContextResourceUsage ,
29
+ LoggingContext ,
30
+ nested_logging_context ,
31
+ set_current_context ,
32
+ )
28
33
from synapse .metrics import LaterGauge
29
34
from synapse .metrics .background_process_metrics import (
30
35
run_as_background_process ,
@@ -81,6 +86,8 @@ class TaskScheduler:
81
86
MAX_CONCURRENT_RUNNING_TASKS = 5
82
87
# Time from the last task update after which we will log a warning
83
88
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
89
+ # Report a running task's status and usage every so often.
90
+ OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes
84
91
85
92
def __init__ (self , hs : "HomeServer" ):
86
93
self ._hs = hs
@@ -346,6 +353,32 @@ async def _clean_scheduled_tasks(self) -> None:
346
353
assert task .id not in self ._running_tasks
347
354
await self ._store .delete_scheduled_task (task .id )
348
355
356
+ @staticmethod
357
+ def _log_task_usage (
358
+ state : str , task : ScheduledTask , usage : ContextResourceUsage , active_time : float
359
+ ) -> None :
360
+ """
361
+ Log a line describing the state and usage of a task.
362
+ The log line is inspired by / a copy of the request log line format,
363
+ but with irrelevant fields removed.
364
+
365
+ active_time: Time that the task has been running for, in seconds.
366
+ """
367
+
368
+ logger .info (
369
+ "Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
370
+ " [%d dbevts] %r" ,
371
+ state ,
372
+ active_time ,
373
+ usage .ru_utime ,
374
+ usage .ru_stime ,
375
+ usage .db_sched_duration_sec ,
376
+ usage .db_txn_duration_sec ,
377
+ int (usage .db_txn_count ),
378
+ usage .evt_db_fetch_count ,
379
+ task .params ,
380
+ )
381
+
349
382
async def _launch_task (self , task : ScheduledTask ) -> None :
350
383
"""Launch a scheduled task now.
351
384
@@ -360,8 +393,32 @@ async def _launch_task(self, task: ScheduledTask) -> None:
360
393
)
361
394
function = self ._actions [task .action ]
362
395
396
+ def _occasional_report (
397
+ task_log_context : LoggingContext , start_time : int
398
+ ) -> None :
399
+ """
400
+ Helper to log a 'Task continuing' line every so often.
401
+ """
402
+
403
+ current_time = int (self ._clock .time ())
404
+ calling_context = set_current_context (task_log_context )
405
+ try :
406
+ usage = task_log_context .get_resource_usage ()
407
+ TaskScheduler ._log_task_usage (
408
+ "continuing" , task , usage , (current_time - start_time ) * 0.001
409
+ )
410
+ finally :
411
+ set_current_context (calling_context )
412
+
363
413
async def wrapper () -> None :
364
- with nested_logging_context (task .id ):
414
+ with nested_logging_context (task .id ) as log_context :
415
+ start_time = int (self ._clock .time ())
416
+ occasional_status_call = self ._clock .looping_call (
417
+ _occasional_report ,
418
+ TaskScheduler .OCCASIONAL_REPORT_INTERVAL_MS ,
419
+ log_context ,
420
+ start_time ,
421
+ )
365
422
try :
366
423
(status , result , error ) = await function (task )
367
424
except Exception :
@@ -383,6 +440,13 @@ async def wrapper() -> None:
383
440
)
384
441
self ._running_tasks .remove (task .id )
385
442
443
+ current_time = int (self ._clock .time ())
444
+ usage = log_context .get_resource_usage ()
445
+ TaskScheduler ._log_task_usage (
446
+ status .value , task , usage , (current_time - start_time ) * 0.001
447
+ )
448
+ occasional_status_call .stop ()
449
+
386
450
# Try launch a new task since we've finished with this one.
387
451
self ._clock .call_later (0.1 , self ._launch_scheduled_tasks )
388
452
0 commit comments