From cedc03748e0569274352ef0b082d288116aa9bf2 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 11 Sep 2023 11:17:53 -0400 Subject: [PATCH] V03 issue752 2 flowcb/scheduled was not working with the *scheduled_hours* setting. re-written to address that and other issues. (#754) * Fixing #752 with re-write of flowcb/scheduled plugin. * it now works for multiple hours in the day. * it is based on building a list of appointments, once per day, then waiting until each one. * notice that housekeeping doesn't run when _waiting_ ... hmm... desirable or not? * output much clearer now. * adding support for housekeeping in flowcb/scheduled (as part of #752 fix.) * new implementation for #752 fixes all the previous limitations. remove documentation of limitations that have been resolved. --- sarracenia/flowcb/scheduled/__init__.py | 194 +++++++++++++++--------- 1 file changed, 125 insertions(+), 69 deletions(-) diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index abdfa68a8..0103b78a2 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -6,6 +6,8 @@ from sarracenia.filemetadata import FmdStat from sarracenia.flowcb import FlowCB +import datetime +import json import time @@ -32,31 +34,35 @@ class Scheduled(FlowCB): Scheduled_interval takes precedence over the others, making it easier to specify an interval for testing/debugging purposes. - currently: when using scheduled_hour, only one minute can be given within the hour. - if only using scheduled_minute, then repeated values are fine. - use in code (for subclassing): from sarracenia.scheduled import Scheduled class hoho(Scheduled): - in the routine... - self.wait_until_next() # comment out for just every hour. + replace the gather() routine... + keep the top lines "until time to run" + replace whatever is below. + will only run when it should. - if self.stop_requested: - return [] + """ - # this code works if only invoked at scheduled times... - - - would there be a more pythonic way of doing this? - - https://schedule.readthedocs.io/en/stable/ - - would probably be dead easy to do multiple minute within hour, - just haven't thought about it. opportunity for improvement. + def update_appointments(self,when): + """ + # make a flat list from values where comma separated on a single or multiple lines. - """ + set self.appointments to a list of when something needs to be run during the current day. + """ + self.appointments=[] + for h in self.hours: + for m in self.minutes: + if ( h > when.hour ) or ((h == when.hour) and ( m >= when.minute )): + appointment = datetime.time(h, m, tzinfo=datetime.timezone.utc ) + next_time = datetime.datetime.combine(when,appointment) + self.appointments.append(next_time) + else: + pass # that time is passed for today. + + logger.info( f"for {when}: {json.dumps(list(map( lambda x: str(x), self.appointments))) } ") def __init__(self,options): @@ -64,16 +70,32 @@ def __init__(self,options): self.o.add_option( 'scheduled_interval', 'duration', 0 ) self.o.add_option( 'scheduled_hour', 'list', [] ) self.o.add_option( 'scheduled_minute', 'list', [] ) + + self.housekeeping_needed=False + self.interrupted=None + + sched_hours = sum([ x.split(',') for x in self.o.scheduled_hour],[]) + self.hours = list(map( lambda x: int(x), sched_hours )) + self.hours.sort() + logger.debug( f"hours {self.hours}" ) + + sched_min = sum([ x.split(',') for x in self.o.scheduled_minute ],[]) + self.minutes = list(map( lambda x: int(x), sched_min)) + self.minutes.sort() + logger.debug( f'minutes: {self.minutes}') + + now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) + self.update_appointments(now) def gather(self): # for next expected post self.wait_until_next() - if self.stop_requested: + if self.stop_requested or self.housekeeping_needed: return [] - logger.info('time to post') + logger.info('time to run') # always post the same file at different time gathered_messages = [] @@ -85,68 +107,90 @@ def gather(self): return gathered_messages + def on_housekeeping(self): + + self.housekeeping_needed = False + + def wait_seconds(self,sleepfor): """ sleep for the given number of seconds, like time.sleep() but broken into - shorter naps to be able to honour stop_requested. + shorter naps to be able to honour stop_requested, or when housekeeping is needed. + """ - if sleepfor > 10: - nap=10 - else: + housekeeping=datetime.timedelta(seconds=self.o.housekeeping) + nap=datetime.timedelta(seconds=10) + + if self.interrupted: + sleepfor = self.interrupted + now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) + + # update sleep remaining based on how long other processing took. + interruption_duration= now-self.interrupted_when + sleepfor -= interruption_duration + + if sleepfor < nap: nap=sleepfor - while sleepfor > 0: - time.sleep(nap) + sleptfor=datetime.timedelta(seconds=0) + + while sleepfor > datetime.timedelta(seconds=0): + time.sleep(nap.total_seconds()) if self.stop_requested: - break + return + + # how long is left to sleep. sleepfor -= nap + self.interrupted=sleepfor + self.interrupted_when = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) + + sleptfor += nap + if sleptfor > housekeeping: + self.housekeeping_needed=True + return + + # got to the end of the interval... + self.interrupted=None + + def wait_until( self, appointment ): + + now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) + + sleepfor=appointment-now + + logger.info( f"{appointment} duration: {sleepfor}" ) + self.wait_seconds( sleepfor ) + - def wait_until_hour(self): - now = time.gmtime() - # make a flat list from values where comma separated on a single or multiple lines. - sched_hour = sum([ x.split(',') for x in self.o.scheduled_hour],[]) - hours = list(map( lambda x: int(x), sched_hour )) + [ 24 ] - for target in hours: - if now.tm_min >= target: continue - break - - if target >= 24: - target = 24 + hours[0] - - sleepfor = (target - now.tm_hour) * 60 * 60 - logger.info("sleep for %d sec" % sleepfor ) - self.wait_seconds(sleepfor) - - def wait_within_hour(self): - global stop_requested - now = time.gmtime() - - sched_min = sum([ x.split(',') for x in self.o.scheduled_minute ],[]) - minutes = list(map( lambda x: int(x), sched_min)) + [ 60 ] - minutes.sort() - logger.debug( f'minutes: {minutes}') - - for target in minutes: - if now.tm_min >= target: continue - break - - if target >= 60: - target = 60 + minutes[0] - - sleepfor = (target - now.tm_min) * 60 - - logger.info("sleep for %d sec" % sleepfor ) - self.wait_seconds(sleepfor) - def wait_until_next( self ): + if self.o.scheduled_interval > 0: self.wait_seconds(self.o.scheduled_interval) return - if len(self.o.scheduled_hour) > 0: - self.wait_until_hour() - if len(self.o.scheduled_minute) > 0: - self.wait_within_hour() + + if ( len(self.o.scheduled_hour) > 0 ) or ( len(self.o.scheduled_minute) > 0 ): + now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) + next_appointment=None + for t in self.appointments: + if now < t: + next_appointment=t + break + + if next_appointment is None: + # done for the day... + tomorrow = datetime.date.today()+datetime.timedelta(days=1) + midnight = datetime.time(0,0,tzinfo=datetime.timezone.utc) + midnight = datetime.datetime.combine(tomorrow,midnight) + self.update_appointments(midnight) + next_appointment=self.appointments[0] + + self.wait_until(next_appointment) + if self.interrupted: + logger.info( f"sleep interrupted, returning for housekeeping." ) + else: + self.appointments.remove(next_appointment) + logger.info( f"ok {len(self.appointments)} appointments left today" ) if __name__ == '__main__': @@ -159,10 +203,22 @@ def wait_until_next( self ): flow.o.scheduled_hour= [ '1','3','5',' 7',' 9',' 13','21','23'] flow.o.scheduled_minute= [ '1,3,5',' 7',' 9',' 13',' 15',' 51','53' ] logging.basicConfig(level=logging.DEBUG) + + when=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) + + me = Scheduled(flow.o) + me.update_appointments(when) + + flow.o.scheduled_hour= [ '1' ] me = Scheduled(flow.o) + me.update_appointments(when) + """ + for unit testing should be able to change when, and self.o.scheduled_x to cover + many different test cases. + """ + while True: logger.info("hoho!") - me.wait_within_hour() + me.wait_until_next() logger.info("Do Something!") - #me.wait_until_hour(flow.o)