simple scheduler to be called from daemon processes or similar

This commit is contained in:
Stefan Harmuth 2021-12-05 10:08:38 +01:00
parent 37166532ae
commit f8db937643

22
schedule.py Normal file
View File

@ -0,0 +1,22 @@
import datetime
from typing import Callable, List, Any
class Scheduler:
def __init__(self):
self.jobs = []
def schedule(self, timedelta: datetime.timedelta, func: Callable, *args: List[Any]):
self.jobs.append({
'func': func,
'args': args,
'timedelta': timedelta,
'runat': (datetime.datetime.utcnow() + timedelta)
})
def run_pending(self):
now = datetime.datetime.utcnow()
for job in self.jobs:
if job['runat'] <= now:
job['runat'] += job['timedelta']
job['func'](*job['args'])