Skip to content

Commit

Permalink
Allow jobs time to be aligned to specified parameter (start_ts)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anze committed Mar 14, 2020
1 parent 4dae6ad commit 632c68b
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions grafoleancollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ class MultipleIntervalsTrigger(BaseTrigger):
- multiple intervals, when aligned, cause only a single job invocation
- remembers which intervals have caused the invocation; the list is cleared after
`forget_affecting_after` seconds
- if start_ts is specified, it allows aligning jobs' start time; start_ts should be in the past
"""
__slots__ = 'intervals', 'start_ts', 'affecting_intervals', 'forget_affecting_after'

def __init__(self, intervals, forget_affecting_after=300):
def __init__(self, intervals, forget_affecting_after=300, start_ts=None):
if not intervals:
raise Exception("At least one interval must be specified")
# we only operate in whole seconds, and only care about unique values:
self.intervals = list(set([int(i) for i in intervals]))
self.forget_affecting_after = forget_affecting_after
self.start_ts = int(time.time())
now = int(time.time())
self.start_ts = now if start_ts is None else int(start_ts)
if self.start_ts > now:
logging.warning("Job aligning with start_ts failed! Parameter start_ts must be in the past, never in the future.")
self.start_ts = now
self.affecting_intervals = {}

def get_next_fire_time(self, previous_fire_time, now):
Expand Down Expand Up @@ -308,14 +313,22 @@ def fetch_job_configs(self, protocol):

def refresh_jobs(self):
wanted_jobs = set()
for job_id, intervals, job_func, job_data in self.jobs():
for next_job in self.jobs():
# We didn't anticipate that we might need another parameter (start_ts)... since this is a library, we can't simply
# change it now until all bots change this too. But we need another parameter and it would be ugly to put it elsewhere,
# so we detect different number of returned parameters and act accordingly:
if len(next_job) == 4:
job_id, intervals, job_func, job_data = next_job
start_ts = None
else:
job_id, intervals, job_func, job_data, start_ts = next_job
wanted_jobs.add(job_id)
# if the existing job's configuration is the same, leave it alone, otherwise the trigger will be reset:
if self.known_jobs.get(job_id) == job_data:
continue
self.known_jobs[job_id] = job_data

trigger = MultipleIntervalsTrigger(intervals)
trigger = MultipleIntervalsTrigger(intervals, start_ts=start_ts)
logging.info(f"Adding job: {job_id}")
self.scheduler.add_job(job_func, id=job_id, trigger=trigger, executor='iaexecutor', kwargs=job_data, replace_existing=True)

Expand Down

0 comments on commit 632c68b

Please sign in to comment.