From b117bc54cb4cc52329fe927cfb6336f6af2f4d26 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Thu, 8 Jun 2023 14:57:13 -0400 Subject: [PATCH] ACU Exercisor mode (#447) * ACUAgent: make monitor process more tolerant to ACU dropout * ACUAgent: exercisor for grid testing * ACU: use absolute imports * ACUAgent: (re-)enable UDP stream automatically This requires latest soaculib -- June 2, 2023. * ACU: Clean up in response to PR * ACU: Update docs for changed acu config flag --------- Co-authored-by: Brian Koopman --- docs/agents/acu_agent.rst | 36 +++- socs/agents/acu/agent.py | 267 ++++++++++++++++++++---- socs/agents/acu/exercisor.py | 394 +++++++++++++++++++++++++++++++++++ 3 files changed, 654 insertions(+), 43 deletions(-) create mode 100644 socs/agents/acu/exercisor.py diff --git a/docs/agents/acu_agent.rst b/docs/agents/acu_agent.rst index c1d31de66..c08f3cbeb 100644 --- a/docs/agents/acu_agent.rst +++ b/docs/agents/acu_agent.rst @@ -38,7 +38,7 @@ file. An example configuration block using all availabile arguments is below:: {'agent-class': 'ACUAgent', 'instance-id': 'acu-satp1', - 'arguments': [['--acu_config', 'satp1']], + 'arguments': [['--acu-config', 'satp1']], } soaculib @@ -88,6 +88,40 @@ example configuration block is below:: } +Exercisor Mode +-------------- + +The agent can run itself through various motion patterns, using the +Process ``exercise``. This process is only visible if the agent is +invoked with the ``--exercise-plan`` argument and a path to the +exercise plan config file. Here is an example config file: + +.. code-block:: yaml + + satp1: + settings: + use_boresight: false + steps: + - type: 'elnod' + repeat: 2 + - type: 'grid' + duration: 3600 + - type: 'schedule' + files: + - /path/to/schedule1.txt + - /path/to/schedule2.txt + duration: 3600 + dwell_time: 600 + - type: 'grease' + duration: 900 + +Note that the root level may contain multiple entries; the key +corresponds to the ACU config block name, which would correspond to +the ACU agent ``--acu-config`` argument. + +The exercisor writes some diagnostic and timing information to a feed +called ``activity``. + Agent API --------- diff --git a/socs/agents/acu/agent.py b/socs/agents/acu/agent.py index 6429cfc2e..79b9aa8cf 100644 --- a/socs/agents/acu/agent.py +++ b/socs/agents/acu/agent.py @@ -1,4 +1,5 @@ import argparse +import random import struct import time from enum import Enum @@ -7,6 +8,7 @@ import soaculib as aculib import soaculib.status_keys as status_keys import twisted.web.client as tclient +import yaml from autobahn.twisted.util import sleep as dsleep from ocs import ocs_agent, site_config from ocs.ocs_twisted import TimeoutLock @@ -14,7 +16,8 @@ from twisted.internet import protocol, reactor from twisted.internet.defer import DeferredList, inlineCallbacks -import socs.agents.acu.drivers as sh +from socs.agents.acu import drivers as sh +from socs.agents.acu import exercisor #: The number of free ProgramTrack positions, when stack is empty. FULL_STACK = 10000 @@ -29,14 +32,18 @@ class ACUAgent: acu_config (str): The configuration for the ACU, as referenced in aculib.configs. Default value is 'guess'. - + exercise_plan (str): + The full path to a scan config file describing motions to cycle + through on the ACU. If this is None, the associated process and + feed will not be registered. """ - def __init__(self, agent, acu_config='guess'): + def __init__(self, agent, acu_config='guess', exercise_plan=None): # Separate locks for exclusive access to az/el, and boresight motions. self.azel_lock = TimeoutLock() self.boresight_lock = TimeoutLock() + self.acu_config_name = acu_config self.acu_config = aculib.guess_config(acu_config) self.sleeptime = self.acu_config['motion_waittime'] self.udp = self.acu_config['streams']['main'] @@ -50,6 +57,8 @@ def __init__(self, agent, acu_config='guess'): self.monitor_fields = status_keys.status_fields[self.acu_config['platform']]['status_fields'] self.motion_limits = self.acu_config['motion_limits'] + self.exercise_plan = exercise_plan + self.log = agent.log # self.data provides a place to reference data from the monitors. @@ -160,6 +169,18 @@ def __init__(self, agent, acu_config='guess'): self.clear_faults, blocking=False) + # Automatic exercise program... + if exercise_plan: + agent.register_process( + 'exercise', self.exercise, self._simple_process_stop) + # Use longer default frame length ... very low volume feed. + self.agent.register_feed('activity', + record=True, + buffer_time=0, + agg_params={ + 'frame_length': 600, + }) + @inlineCallbacks def _simple_task_abort(self, session, params): # Trigger a task abort by updating state to "stopping" @@ -225,7 +246,8 @@ def monitor(self, session, params): ... }, "StatusResponseRate": 19.237531827325963, - "PlatformType": "satp" + "PlatformType": "satp", + "connected": True, } In the case of an SATP, the Status3rdAxis is not populated @@ -236,15 +258,29 @@ def monitor(self, session, params): """ session.set_status('running') - version = yield self.acu_read.http.Version() - self.log.info(version) # Note that session.data will get scanned, to assign data to # feed blocks. Items in session.data that are themselves # dicts will parsed; but items (such as PlatformType and # StatusResponseRate) which are simple strings or floats will # be ignored for feed assignment. - session.data = {'PlatformType': self.acu_config['platform']} + session.data = {'PlatformType': self.acu_config['platform'], + 'connected': False} + + last_complaint = 0 + while True: + try: + version = yield self.acu_read.http.Version() + break + except Exception as e: + if time.time() - last_complaint > 3600: + errormsg = {'aculib_error_message': str(e)} + self.log.error(str(e)) + self.log.error('monitor process failed to query version! Will keep trying.') + last_complaint = time.time() + yield dsleep(10) + self.log.info(version) + session.data['connected'] = True # Numbering as per ICD. mode_key = { @@ -347,17 +383,21 @@ def monitor(self, session, params): j2 = yield self.acu_read.http.Values(self.acu3rdaxis) else: j2 = {} - session.data.update({'StatusDetailed': j, 'Status3rdAxis': j2}) + session.data.update({'StatusDetailed': j, 'Status3rdAxis': j2, + 'connected': True}) n_ok += 1 + last_complaint = 0 except Exception as e: - # Need more error handling here... - errormsg = {'aculib_error_message': str(e)} - self.log.error(str(e)) - acu_error = {'timestamp': time.time(), - 'block_name': 'ACU_error', - 'data': errormsg - } - self.agent.publish_to_feed('acu_error', acu_error) + if now - last_complaint > 3600: + errormsg = {'aculib_error_message': str(e)} + self.log.error(str(e)) + acu_error = {'timestamp': time.time(), + 'block_name': 'ACU_error', + 'data': errormsg + } + self.agent.publish_to_feed('acu_error', acu_error) + last_complaint = time.time() + session.data['connected'] = False yield dsleep(1) continue for k, v in session.data.items(): @@ -505,9 +545,10 @@ def monitor(self, session, params): } return True, 'Acquisition exited cleanly.' + @ocs_agent.param('auto_enable', type=bool, default=True) @inlineCallbacks def broadcast(self, session, params): - """broadcast() + """broadcast(auto_enable=True) **Process** - Read UDP data from the port specified by self.acu_config, decode it, and publish to HK feeds. Full @@ -515,30 +556,36 @@ def broadcast(self, session, params): while 1 Hz decimated are written to "acu_broadcast_influx". The 1 Hz decimated output are also stored in session.data. - The session.data looks like this (this is for a SATP running - with servo details in the UDP output):: + Args: + auto_enable (bool): If True, the Process will try to + configure and (re-)enable the UDP stream if at any point + the stream seems to drop out. - { - "Time": 1679499948.8234625, - "Corrected_Azimuth": -20.00112176010607, - "Corrected_Elevation": 50.011521050839434, - "Corrected_Boresight": 29.998428712246067, - "Raw_Azimuth": -20.00112176010607, - "Raw_Elevation": 50.011521050839434, - "Raw_Boresight": 29.998428712246067, - "Azimuth_Current_1": -0.000384521484375, - "Azimuth_Current_2": -0.0008331298828125, - "Elevation_Current_1": 0.003397979736328125, - "Boresight_Current_1": -0.000483856201171875, - "Boresight_Current_2": -0.000105743408203125, - "Azimuth_Vel_1": -0.000002288818359375, - "Azimuth_Vel_2": 0, - "Az_Vel_Act": -0.0000011444091796875, - "Az_Vel_Des": 0, - "Az_Vffw": 0, - "Az_Pos_Des": -20.00112176010607, - "Az_Pos_Err": 0 - } + Notes: + The session.data looks like this (this is for a SATP running + with servo details in the UDP output):: + + { + "Time": 1679499948.8234625, + "Corrected_Azimuth": -20.00112176010607, + "Corrected_Elevation": 50.011521050839434, + "Corrected_Boresight": 29.998428712246067, + "Raw_Azimuth": -20.00112176010607, + "Raw_Elevation": 50.011521050839434, + "Raw_Boresight": 29.998428712246067, + "Azimuth_Current_1": -0.000384521484375, + "Azimuth_Current_2": -0.0008331298828125, + "Elevation_Current_1": 0.003397979736328125, + "Boresight_Current_1": -0.000483856201171875, + "Boresight_Current_2": -0.000105743408203125, + "Azimuth_Vel_1": -0.000002288818359375, + "Azimuth_Vel_2": 0, + "Az_Vel_Act": -0.0000011444091796875, + "Az_Vel_Des": 0, + "Az_Vffw": 0, + "Az_Pos_Des": -20.00112176010607, + "Az_Pos_Err": 0 + } """ session.set_status('running') @@ -549,6 +596,9 @@ def broadcast(self, session, params): fields = self.udp_schema['fields'] session.data = {} + # BroadcastStreamControl instance. + stream = self.acu_control.streams['main'] + class MonitorUDP(protocol.DatagramProtocol): def datagramReceived(self, data, src_addr): host, port = src_addr @@ -604,10 +654,20 @@ def datagramReceived(self, data, src_addr): sd[ky.split('_bcast_influx')[0]] = influx_means[ky] session.data.update(sd) else: + # Consider logging an outage, attempting reconfig. if active and now - last_packet_time > 3: self.log.info('No UDP packets are being received.') active = False + next_reconfig = time.time() + if not active and params['auto_enable'] and next_reconfig <= time.time(): + self.log.info('Requesting UDP stream enable.') + try: + cfg, raw = yield stream.safe_enable() + except Exception as err: + self.log.info('Exception while trying to enable stream: {err}', err=err) + next_reconfig += 60 yield dsleep(1) + yield dsleep(0.005) handler.stopListening() @@ -1359,12 +1419,135 @@ def _run_track(self, session, point_gen, step_time, azonly=False, return True, 'Track ended cleanly' + @ocs_agent.param('starting_index', type=int, default=0) + def exercise(self, session, params): + """exercise(starting_index=0) + + **Process** - Run telescope platform through some pre-defined motions. + + For historical reasons, this does not command agent functions + internally, but rather instantiates a *client* and calls the + agent as though it were an external entity. + + """ + # Load the exercise plan. + plans = yaml.safe_load(open(self.exercise_plan, 'rb')) + super_plan = exercisor.get_plan(plans[self.acu_config_name]) + + session.data = { + 'timestamp': time.time(), + 'iterations': 0, + 'attempts': 0, + 'errors': 0, + } + session.set_status('running') + + def _publish_activity(activity): + msg = { + 'block_name': 'A', + 'timestamp': time.time(), + 'data': {'activity': activity}, + } + self.agent.publish_to_feed('activity', msg) + + def _publish_error(delta_error=1): + session.data['errors'] += delta_error + msg = { + 'block_name': 'B', + 'timestamp': time.time(), + 'data': {'error_count': session.data['errors']} + } + self.agent.publish_to_feed('activity', msg) + + def _exit_now(ok, msg): + _publish_activity('idle') + self.agent.feeds['activity'].flush_buffer() + return ok, msg + + _publish_activity('idle') + _publish_error(0) + + target_instance_id = self.agent.agent_address.split('.')[-1] + exercisor.set_client(target_instance_id) + settings = super_plan.get('settings', {}) + + plan_idx = 0 + plan_t = None + + for plan in super_plan['steps']: + plan['iter'] = iter(plan['driver']) + + while session.status in ['running']: + time.sleep(1) + session.data['timestamp'] = time.time() + session.data['iterations'] += 1 + + # Fault maintenance + faults = exercisor.get_faults() + if faults['safe_lock']: + self.log.info('SAFE lock detected, exiting') + return _exit_now(False, 'Exiting on SAFE lock.') + + if faults['local_mode']: + self.log.info('LOCAL mode detected, exiting') + return _exit_now(False, 'Exiting on LOCAL mode.') + + if faults['az_summary']: + if session.data['attempts'] > 5: + self.log.info('Too many az summary faults, exiting.') + return _exit_now(False, 'Too many az summary faults.') + session.data['attempts'] += 1 + self.log.info('az summary fault -- trying to clear.') + exercisor.clear_faults() + time.sleep(10) + continue + + session.data['attempts'] = 0 + + # Plan execution + active_plan = super_plan['steps'][plan_idx] + if plan_t is None: + plan_t = time.time() + + now = time.time() + if now - plan_t > active_plan['duration']: + plan_idx = (plan_idx + 1) % len(super_plan['steps']) + plan_t = None + continue + + if settings.get('use_boresight'): + bore_target = random.choice(settings['boresight_opts']) + self.log.info(f'Setting boresight={bore_target}...') + _publish_activity('boresight') + exercisor.set_boresight(bore_target) + + plan, info = next(active_plan['iter']) + + self.log.info(f'Launching next scan. plan={plan}') + + _publish_activity(active_plan['driver'].code) + ok = None + if 'targets' in plan: + exercisor.steps(**plan) + else: + exercisor.scan(**plan) + _publish_activity('idle') + + if ok is None: + self.log.info('Scan completed without error.') + else: + self.log.info(f'Scan exited with error: {ok}') + _publish_error() + + return _exit_now(True, "Stopped run process") + def add_agent_args(parser_in=None): if parser_in is None: parser_in = argparse.ArgumentParser() pgroup = parser_in.add_argument_group('Agent Options') - pgroup.add_argument("--acu_config") + pgroup.add_argument("--acu-config") + pgroup.add_argument("--exercise-plan") return parser_in @@ -1374,7 +1557,7 @@ def main(args=None): parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) - _ = ACUAgent(agent, args.acu_config) + _ = ACUAgent(agent, args.acu_config, args.exercise_plan) runner.run(agent, auto_reconnect=True) diff --git a/socs/agents/acu/exercisor.py b/socs/agents/acu/exercisor.py new file mode 100644 index 000000000..733ca9a34 --- /dev/null +++ b/socs/agents/acu/exercisor.py @@ -0,0 +1,394 @@ +""" +temporary automation for telescope platforms. + +""" + +import argparse +import math +import random +import time + +import ocs +import yaml +from ocs.ocs_client import OCSClient + + +def assert_ok(result): + if result.status != ocs.base.OK: + print('RESULT!=OK!', result) + raise RuntimeError() + + +def wait_verbosely(target, timeout=5, msg=' ... still going '): + last_stop = 0 + while True: + # Watch for process exit + result = target.wait(timeout=timeout) + if result.status == ocs.base.TIMEOUT: + print(msg, get_pos()) + elif result.status == ocs.base.ERROR: + raise RuntimeError('Operation failed.') + else: + break + # Detect az fault + if get_faults()['az_summary'] and time.time() - last_stop > 10: + print(' -- az summary fault detected, stop&clear.') + assert_ok(c.stop_and_clear()) + last_stop = time.time() + + return True + + +def safe_get_status(): + # dodge a race condition + for i in range(10): + try: + return c.monitor.status().session['data']['StatusDetailed'] + except KeyError: + time.sleep(0.01) + raise KeyError("Could not read StatusDetailed!") + + +def get_pos(): + status = safe_get_status() + return (status['Azimuth current position'], + status['Elevation current position']) + + +def get_faults(): + status = safe_get_status() + return { + 'az_summary': status['Azimuth summary fault'], + 'local_mode': not status['ACU in remote mode'], + 'safe_lock': status['Safe'], + } + + +def clear_faults(): + c.clear_faults.start() + c.clear_faults.wait() + + +def set_boresight(angle): + assert_ok(c.set_boresight.start(target=angle)) + wait_verbosely(c.set_boresight, msg=' ... setting boresight...') + + +def steps(targets=[], **kw): + for az, el in targets: + _az, _el = get_pos() + if az is None: + az = _az + if el is None: + el = _el + assert_ok(c.go_to.start(az=az, el=el)) + wait_verbosely(c.go_to, msg=' ... moving') + + +def scan(az, el, throw, + v_az=1., + a_az=1., + num_scans=3, + step_time=1., + wait_to_start=10., + init='mid', + init_az=None, + ramp_up=0, + ): + if init_az is None: + init_az = az + print(f'Going to {init_az}, {el}') + assert_ok(c.go_to.start(az=init_az, el=el)) + wait_verbosely(c.go_to, msg=' ... still go_to-ing') + + # assert(init == 'end') + # ramp_up = az - throw - init_az + # print(ramp_up) + + print('Checking positions ...') + az1, el1 = get_pos() + assert (abs(az1 - init_az) < .1 and abs(el1 - el) < .1) + + print('Stop and clear.') + assert_ok(c.stop_and_clear()) + + assert_ok( + c.generate_scan.start( + az_endpoint1=az - throw, + az_endpoint2=az + throw, + az_speed=v_az, + az_accel=a_az, + el_endpoint1=el, + el_endpoint2=el, + num_scans=num_scans, + az_start=init) + ) + + wait_verbosely(c.generate_scan, msg=' ... still scanning ...') + + print('Finally we are at', get_pos()) + # time.sleep(2) + time.sleep(5) + + print('Stop + clear') + assert_ok(c.stop_and_clear()) + + +def plan_scan(az, el, throw, v_az=1, a_az=1, init='end', + full_ramp=False, num_scans=1): + # Initialize arguments suitable for passing to scan() ... + plan = { + 'az': az, + 'el': el, + 'throw': throw, + 'v_az': v_az, + 'a_az': a_az, + 'init': init, + 'init_az': az, + 'num_scans': num_scans, + } + info = {} + + # Point separation? At least 5 points per leg, preferably 10. + dt = 2 * abs(throw / v_az) / 10 + dt = min(max(dt, 0.1), 1.0) + assert (2 * abs(throw / v_az) / dt >= 5) + plan['step_time'] = dt + + # Turn around prep distance? 5 point periods, times the vel. + az_prep = 5 * dt * v_az + + # Ramp-up distance needed + a0 = 1. # Peak accel of ramp-up... + az_rampup = v_az**2 / a0 + info['az_prep'] = az_prep + info['az_rampup'] = az_rampup + + # Any az ramp-up prep required? + if full_ramp: + ramp_up = az_rampup + elif init == 'mid': + ramp_up = max(az_prep + az_rampup - abs(throw), 0) + elif init == 'end': + ramp_up = max(az_prep + az_rampup - 2 * abs(throw), 0) + else: + raise # init in ['mid', 'end'] + plan['ramp_up'] = ramp_up + + # Set wait time (this comes out a little lower than its supposed to...) + # plan['wait_time'] = v_az / a0 * 2 + info['pre_time'] = v_az / a0 + plan['wait_to_start'] = max(5, info['pre_time'] * 1.2) + + # Fill out some other useful info... + plan['init_az'] = az - math.copysign(ramp_up, throw) + if init == 'end': + plan['init_az'] -= throw + + info['total_time'] = ( + num_scans * (2 * abs(throw) / v_az + 2 * v_az / a_az) + + ramp_up / v_az * 2 + + plan['wait_to_start']) + + return plan, info + + +class _Plan: + code = 'something' + + def __iter__(self): + self.index = 0 + return self + + def __next__(self): + X = self.get_plan(self.index) + self.index += 1 + return X + + +class ScanPlan(_Plan): + pass + + +class PointPlan(_Plan): + pass + + +class GridPlan(ScanPlan): + DEFAULT_DURATION = 1200 + code = 'grid' + + def __init__(self, **kw): + self.config = { + 'els': [50., 55., 60.], + 'azs': [0, 50, 90, 140, 180, 220, 270, 310], + 'az_throw': 20, + 'num_scans': 7, + # 'num_scans': 1, + 'v_az': 1., + 'a_az': 1., + } + self.config.update(kw) + self.index = 0 + + def get_plan(self, index): + c = self.config + superphase = index // len(c['azs']) % 2 + step = 1 - 2 * superphase + + def get_mod(entry): + return entry[index % len(entry)] + + plan, info = plan_scan(get_mod(c['azs'][::step]), get_mod(c['els']), + c['az_throw'] * step, + v_az=c['v_az'], a_az=c['a_az'], + num_scans=c['num_scans']) + return plan, info + + +class ElNod(PointPlan): + DEFAULT_DURATION = 60 + code = 'elnod' + + def __init__(self, **kw): + self.repeat = kw.get('repeat', 1) + + def get_plan(self, index): + t = {'targets': [(None, 20), (None, 90)] * self.repeat} + return t, {} + + +class SchedulePlan(ScanPlan): + DEFAULT_DURATION = 1200 + code = 'mock-sched' + + def __init__(self, sched_files, format='toast3', + dedup=True, dwell_time=None): + if isinstance(sched_files, str): + sched_files = [sched_files] + if dwell_time is None: + dwell_time = 300 + rows = [] + row_track = set() + for filename in sched_files: + skip = 3 + for line in open(filename): + if skip > 0: + skip -= 1 + continue + w = line.split() + row = { + 'mjd0': float(w[4]), # mjd start + 'mjd1': float(w[5]), # mjd end + 'boresight': float(w[6]), # boresight rotation + 'az_min': float(w[8]), # az min + 'az_max': float(w[9]), # az max + 'el': float(w[10]), # el + } + # rebranch + row['az_max'] = row['az_min'] + (row['az_max'] - row['az_min']) % 360 + row['az'] = (row['az_max'] + row['az_min']) / 2 + row['az_throw'] = (row['az_max'] - row['az_min']) / 2 + + # enrich + row['v_az'] = 1. / math.cos(row['el'] * math.pi / 180) + row['a_az'] = 2. + row['num_scans'] = (dwell_time // (row['az_throw'] / row['v_az'])) + 1 + if dedup: + key = tuple([row[k] for k in ['az', 'az_throw', 'el']]) + if key in row_track: + continue + row_track.add(key) + rows.append(row) + self.rows = rows + + def get_plan(self, index): + row = self.rows[index % len(self.rows)] + kw = {k: row[k] for k in ['v_az', 'a_az', 'num_scans']} + return plan_scan(row['az'], row['el'], row['az_throw'], + **kw) + + +class GreasePlan(ScanPlan): + DEFAULT_DURATION = 120 + code = 'grease' + + def __init__(self, phase=None): + self.n = 4 + self.el_n = 9 + if phase is None: + phase = int(random.random() * self.n) + self.phase = phase + + def get_plan(self, index): + phase = (self.phase + index) % self.n + el = 30 + 40 * (index % self.el_n) / (self.el_n - 1) + limits = [-80 + phase * 5, 440 + (phase - self.n - 1) * 5] + print(limits) + return plan_scan((limits[0] + limits[1]) / 2, el, (limits[1] - limits[0]) / 2, + v_az=2, a_az=0.5, num_scans=2, init='end') + + +def get_plan(config): + if isinstance(config, str): + config = yaml.safe_load(open(config, 'rb').read()) + for i, step in enumerate(config['steps']): + if step['type'] == 'grease': + driver = GreasePlan() + elif step['type'] == 'schedule': + driver = SchedulePlan(step['files'], dwell_time=step.get('dwell_time')) + elif step['type'] == 'grid': + driver = GridPlan() + elif step['type'] == 'elnod': + driver = ElNod(**step) + else: + raise ValueError('Invalid step type "%s"' % step['type']) + plan = {'driver': driver, + 'duration': step.get('duration', driver.DEFAULT_DURATION), + } + config['steps'][i] = plan # replace + return config + + +def set_client(instance_id): + global c + c = OCSClient(instance_id) + return c + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--config') + parser.add_argument('instance_id', help="e.g. acu-sat1") + parser.add_argument('--hours', type=float) + args = parser.parse_args() + + if args.config is not None: + # Get the plan. + super_plan = get_plan(args.config) + + c = OCSClient(args.instance_id) + now = time.time() + stop_at = None + if args.hours: + stop_at = now + args.hours * 3600. + + plan_idx = 1 + while stop_at is None or time.time() < stop_at: + t0 = time.time() + t = t0 + active_plan = super_plan['steps'][plan_idx] + print(active_plan['driver']) + for plan, info in active_plan['driver']: + if isinstance(plan, ScanPlan): + scan(**plan) + elif isinstance(plan, PointPlan): + steps(**plan) + else: + raise ValueError + if time.time() - t0 > active_plan['duration']: + break + plan_idx += 1 + if plan_idx >= len(super_plan['steps']): + if super_plan.get('loop', True): + plan_idx = 0