py-tools/src/tools/schedule.py
Stefan Harmuth 99c4ef2ce6
Some checks failed
Publish to PyPI / Publish to PyPI (push) Failing after 5s
code reformatting and cleanup
2023-11-11 15:37:42 +01:00

33 lines
786 B
Python

import datetime
from typing import Callable, Any
class Scheduler:
def __init__(self):
self.jobs = {}
def schedule(
self,
name: str,
every: datetime.timedelta,
func: Callable[..., None],
*args: list[Any],
):
self.jobs[name] = {
"call": func,
"args": args,
"timedelta": every,
"runat": (datetime.datetime.utcnow() + every),
}
def unschedule(self, name: str):
if name in self.jobs:
del self.jobs[name]
def run_pending(self):
now = datetime.datetime.utcnow()
for job in self.jobs.values():
if job["runat"] <= now:
job["runat"] += job["timedelta"]
job["call"](*job["args"])