-
Notifications
You must be signed in to change notification settings - Fork 1
/
scheduler.py
88 lines (68 loc) · 2.34 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import atexit
import datetime
import logging
from typing import Dict, Callable
from abc import ABC, abstractmethod
import tzlocal
from dateutil import tz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
class Task(ABC):
function: Callable
args: Dict
def __init__(self, function: Callable, args: Dict | None = None) -> None:
super().__init__()
self.function = function
self.args = args if args is not None else {}
@abstractmethod
def create_trigger(self) -> CronTrigger:
return NotImplemented
class DailyTask(Task):
target_time: str
def __init__(
self, target_time: str, function: Callable, args: Dict | None = None
) -> None:
self.target_time = target_time
super().__init__(function, args)
def create_trigger(self) -> CronTrigger:
t = datetime.time.fromisoformat(self.target_time)
return CronTrigger(
hour=t.hour,
minute=t.minute,
second=t.second,
timezone=str(tzlocal.get_localzone()),
)
class SingularTask(Task):
target_time: datetime.datetime
def __init__(
self,
target_time: datetime.datetime,
function: Callable,
args: Dict | None = None,
) -> None:
self.target_time = target_time.astimezone(tz.tzlocal())
super().__init__(function, args)
def create_trigger(self) -> CronTrigger:
return CronTrigger(
year=self.target_time.year,
month=self.target_time.month,
day=self.target_time.day,
hour=self.target_time.hour,
minute=self.target_time.minute,
second=self.target_time.second,
timezone=str(tzlocal.get_localzone()),
)
class TimeScheduler:
def __init__(self):
self.task_list = []
def start(self):
scheduler = BackgroundScheduler()
scheduler.start()
logging.getLogger("apscheduler.scheduler").setLevel("WARNING")
for task in self.task_list:
trigger = task.create_trigger()
scheduler.add_job(func=task.function, trigger=trigger)
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())
def add(self, task: Task):
self.task_list.append(task)