-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathupdatable_timer_lib.py
39 lines (31 loc) · 1.29 KB
/
updatable_timer_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio
from datetime import datetime, timedelta
from temporalio import workflow
class UpdatableTimer:
def __init__(self, wake_up_time: datetime) -> None:
self.wake_up_time = wake_up_time
self.wake_up_time_updated = False
async def sleep(self) -> None:
workflow.logger.info(f"sleep_until: {self.wake_up_time}")
while True:
now = workflow.now()
sleep_interval = self.wake_up_time - now
if sleep_interval <= timedelta(0):
break
workflow.logger.info(f"Going to sleep for {sleep_interval}")
try:
self.wake_up_time_updated = False
await workflow.wait_condition(
lambda: self.wake_up_time_updated,
timeout=sleep_interval,
)
except asyncio.TimeoutError:
# checks condition at the beginning of the loop
continue
workflow.logger.info(f"sleep_until completed")
def update_wake_up_time(self, wake_up_time: datetime) -> None:
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")
self.wake_up_time = wake_up_time
self.wake_up_time_updated = True
def get_wake_up_time(self) -> datetime:
return self.wake_up_time