Skip to content

Commit

Permalink
V03 issue752 2 flowcb/scheduled was not working with the *scheduled_h…
Browse files Browse the repository at this point in the history
…ours* setting. re-written to address that and other issues. (#754)

* Fixing #752 with re-write of flowcb/scheduled plugin.

 * it now works for multiple hours in the day.
 * it is based on building a list of appointments, once per day,
   then waiting until each one.
 * notice that housekeeping doesn't run when _waiting_ ... hmm...
   desirable or not?
 * output much clearer now.

* adding support for housekeeping in flowcb/scheduled (as part of #752 fix.)

* new implementation for #752 fixes all the previous limitations.
remove documentation of limitations that have been resolved.
  • Loading branch information
petersilva authored Sep 11, 2023
1 parent 671df54 commit cedc037
Showing 1 changed file with 125 additions and 69 deletions.
194 changes: 125 additions & 69 deletions sarracenia/flowcb/scheduled/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from sarracenia.filemetadata import FmdStat
from sarracenia.flowcb import FlowCB

import datetime
import json
import time


Expand All @@ -32,48 +34,68 @@ class Scheduled(FlowCB):
Scheduled_interval takes precedence over the others, making it
easier to specify an interval for testing/debugging purposes.
currently: when using scheduled_hour, only one minute can be given within the hour.
if only using scheduled_minute, then repeated values are fine.
use in code (for subclassing):
from sarracenia.scheduled import Scheduled
class hoho(Scheduled):
in the routine...
self.wait_until_next() # comment out for just every hour.
replace the gather() routine...
keep the top lines "until time to run"
replace whatever is below.
will only run when it should.
if self.stop_requested:
return []
"""

# this code works if only invoked at scheduled times...
would there be a more pythonic way of doing this?
https://schedule.readthedocs.io/en/stable/
would probably be dead easy to do multiple minute within hour,
just haven't thought about it. opportunity for improvement.
def update_appointments(self,when):
"""
# make a flat list from values where comma separated on a single or multiple lines.
"""
set self.appointments to a list of when something needs to be run during the current day.
"""
self.appointments=[]
for h in self.hours:
for m in self.minutes:
if ( h > when.hour ) or ((h == when.hour) and ( m >= when.minute )):
appointment = datetime.time(h, m, tzinfo=datetime.timezone.utc )
next_time = datetime.datetime.combine(when,appointment)
self.appointments.append(next_time)
else:
pass # that time is passed for today.

logger.info( f"for {when}: {json.dumps(list(map( lambda x: str(x), self.appointments))) } ")


def __init__(self,options):
super().__init__(options,logger)
self.o.add_option( 'scheduled_interval', 'duration', 0 )
self.o.add_option( 'scheduled_hour', 'list', [] )
self.o.add_option( 'scheduled_minute', 'list', [] )

self.housekeeping_needed=False
self.interrupted=None

sched_hours = sum([ x.split(',') for x in self.o.scheduled_hour],[])
self.hours = list(map( lambda x: int(x), sched_hours ))
self.hours.sort()
logger.debug( f"hours {self.hours}" )

sched_min = sum([ x.split(',') for x in self.o.scheduled_minute ],[])
self.minutes = list(map( lambda x: int(x), sched_min))
self.minutes.sort()
logger.debug( f'minutes: {self.minutes}')

now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
self.update_appointments(now)

def gather(self):

# for next expected post
self.wait_until_next()

if self.stop_requested:
if self.stop_requested or self.housekeeping_needed:
return []

logger.info('time to post')
logger.info('time to run')

# always post the same file at different time
gathered_messages = []
Expand All @@ -85,68 +107,90 @@ def gather(self):

return gathered_messages

def on_housekeeping(self):

self.housekeeping_needed = False


def wait_seconds(self,sleepfor):
"""
sleep for the given number of seconds, like time.sleep() but broken into
shorter naps to be able to honour stop_requested.
shorter naps to be able to honour stop_requested, or when housekeeping is needed.
"""

if sleepfor > 10:
nap=10
else:
housekeeping=datetime.timedelta(seconds=self.o.housekeeping)
nap=datetime.timedelta(seconds=10)

if self.interrupted:
sleepfor = self.interrupted
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)

# update sleep remaining based on how long other processing took.
interruption_duration= now-self.interrupted_when
sleepfor -= interruption_duration

if sleepfor < nap:
nap=sleepfor

while sleepfor > 0:
time.sleep(nap)
sleptfor=datetime.timedelta(seconds=0)

while sleepfor > datetime.timedelta(seconds=0):
time.sleep(nap.total_seconds())
if self.stop_requested:
break
return

# how long is left to sleep.
sleepfor -= nap
self.interrupted=sleepfor
self.interrupted_when = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)

sleptfor += nap
if sleptfor > housekeeping:
self.housekeeping_needed=True
return

# got to the end of the interval...
self.interrupted=None

def wait_until( self, appointment ):

now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)

sleepfor=appointment-now

logger.info( f"{appointment} duration: {sleepfor}" )
self.wait_seconds( sleepfor )


def wait_until_hour(self):
now = time.gmtime()
# make a flat list from values where comma separated on a single or multiple lines.
sched_hour = sum([ x.split(',') for x in self.o.scheduled_hour],[])
hours = list(map( lambda x: int(x), sched_hour )) + [ 24 ]
for target in hours:
if now.tm_min >= target: continue
break

if target >= 24:
target = 24 + hours[0]

sleepfor = (target - now.tm_hour) * 60 * 60
logger.info("sleep for %d sec" % sleepfor )
self.wait_seconds(sleepfor)

def wait_within_hour(self):
global stop_requested
now = time.gmtime()

sched_min = sum([ x.split(',') for x in self.o.scheduled_minute ],[])
minutes = list(map( lambda x: int(x), sched_min)) + [ 60 ]
minutes.sort()
logger.debug( f'minutes: {minutes}')

for target in minutes:
if now.tm_min >= target: continue
break

if target >= 60:
target = 60 + minutes[0]

sleepfor = (target - now.tm_min) * 60

logger.info("sleep for %d sec" % sleepfor )
self.wait_seconds(sleepfor)

def wait_until_next( self ):

if self.o.scheduled_interval > 0:
self.wait_seconds(self.o.scheduled_interval)
return
if len(self.o.scheduled_hour) > 0:
self.wait_until_hour()
if len(self.o.scheduled_minute) > 0:
self.wait_within_hour()

if ( len(self.o.scheduled_hour) > 0 ) or ( len(self.o.scheduled_minute) > 0 ):
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
next_appointment=None
for t in self.appointments:
if now < t:
next_appointment=t
break

if next_appointment is None:
# done for the day...
tomorrow = datetime.date.today()+datetime.timedelta(days=1)
midnight = datetime.time(0,0,tzinfo=datetime.timezone.utc)
midnight = datetime.datetime.combine(tomorrow,midnight)
self.update_appointments(midnight)
next_appointment=self.appointments[0]

self.wait_until(next_appointment)
if self.interrupted:
logger.info( f"sleep interrupted, returning for housekeeping." )
else:
self.appointments.remove(next_appointment)
logger.info( f"ok {len(self.appointments)} appointments left today" )


if __name__ == '__main__':
Expand All @@ -159,10 +203,22 @@ def wait_until_next( self ):
flow.o.scheduled_hour= [ '1','3','5',' 7',' 9',' 13','21','23']
flow.o.scheduled_minute= [ '1,3,5',' 7',' 9',' 13',' 15',' 51','53' ]
logging.basicConfig(level=logging.DEBUG)

when=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)

me = Scheduled(flow.o)
me.update_appointments(when)

flow.o.scheduled_hour= [ '1' ]
me = Scheduled(flow.o)
me.update_appointments(when)

"""
for unit testing should be able to change when, and self.o.scheduled_x to cover
many different test cases.
"""

while True:
logger.info("hoho!")
me.wait_within_hour()
me.wait_until_next()
logger.info("Do Something!")
#me.wait_until_hour(flow.o)

0 comments on commit cedc037

Please sign in to comment.