This repository was archived by the owner on Mar 28, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.py
124 lines (93 loc) · 3.73 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import logging
import traceback
import typing
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from django.utils import timezone
from task_processor.models import (
RecurringTask,
RecurringTaskRun,
Task,
TaskResult,
TaskRun,
)
logger = logging.getLogger(__name__)
UNREGISTERED_RECURRING_TASK_GRACE_PERIOD = timedelta(minutes=30)
def run_tasks(num_tasks: int = 1) -> typing.List[TaskRun]:
if num_tasks < 1:
raise ValueError("Number of tasks to process must be at least one")
tasks = Task.objects.get_tasks_to_process(num_tasks)
if tasks:
executed_tasks = []
task_runs = []
for task in tasks:
task, task_run = _run_task(task)
executed_tasks.append(task)
task_runs.append(task_run)
if executed_tasks:
Task.objects.bulk_update(
executed_tasks,
fields=["completed", "num_failures", "is_locked"],
)
if task_runs:
TaskRun.objects.bulk_create(task_runs)
return task_runs
logger.debug("No tasks to process.")
return []
def run_recurring_tasks() -> typing.List[RecurringTaskRun]:
# NOTE: We will probably see a lot of delay in the execution of recurring tasks
# if the tasks take longer then `run_every` to execute. This is not
# a problem for now, but we should be mindful of this limitation
tasks = RecurringTask.objects.get_tasks_to_process()
if tasks:
task_runs = []
for task in tasks:
if not task.is_task_registered:
# This is necessary to ensure that old instances of the task processor,
# which may still be running during deployment, do not remove tasks added by new instances.
# Reference: https://github.com/Flagsmith/flagsmith/issues/2551
if (
timezone.now() - task.created_at
) > UNREGISTERED_RECURRING_TASK_GRACE_PERIOD:
task.delete()
continue
if task.should_execute:
task, task_run = _run_task(task)
task_runs.append(task_run)
else:
task.unlock()
# update all tasks that were not deleted
to_update = [task for task in tasks if task.id]
RecurringTask.objects.bulk_update(to_update, fields=["is_locked", "locked_at"])
if task_runs:
RecurringTaskRun.objects.bulk_create(task_runs)
return task_runs
logger.debug("No tasks to process.")
return []
def _run_task(task: typing.Union[Task, RecurringTask]) -> typing.Tuple[Task, TaskRun]:
task_run = task.task_runs.model(started_at=timezone.now(), task=task)
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(task.run)
timeout = task.timeout.total_seconds() if task.timeout else None
future.result(timeout=timeout) # Wait for completion or timeout
task_run.result = TaskResult.SUCCESS
task_run.finished_at = timezone.now()
task.mark_success()
except Exception as e:
# For errors that don't include a default message (e.g., TimeoutError),
# fall back to using repr.
err_msg = str(e) or repr(e)
logger.error(
"Failed to execute task '%s', with id %d. Exception: %s",
task.task_identifier,
task.id,
err_msg,
exc_info=True,
)
logger.debug("args: %s", str(task.args))
logger.debug("kwargs: %s", str(task.kwargs))
task.mark_failure()
task_run.result = TaskResult.FAILURE
task_run.error_details = str(traceback.format_exc())
return task, task_run