diff --git a/socs/agents/acu/agent.py b/socs/agents/acu/agent.py index d21bb817a..6429cfc2e 100644 --- a/socs/agents/acu/agent.py +++ b/socs/agents/acu/agent.py @@ -1,9 +1,9 @@ import argparse -import calendar -import datetime import struct import time +from enum import Enum +import numpy as np import soaculib as aculib import soaculib.status_keys as status_keys import twisted.web.client as tclient @@ -12,56 +12,12 @@ from ocs.ocs_twisted import TimeoutLock from soaculib.twisted_backend import TwistedHttpBackend from twisted.internet import protocol, reactor -from twisted.internet.defer import inlineCallbacks +from twisted.internet.defer import DeferredList, inlineCallbacks import socs.agents.acu.drivers as sh - -def timecode(acutime): - """ - Takes the time code produced by the ACU status stream and returns - a ctime. - - Parameters: - acutime (float): The time recorded by the ACU status stream, - corresponding to the fractional day of the year - """ - sec_of_day = (acutime - 1) * 60 * 60 * 24 - year = datetime.datetime.now().year - gyear = calendar.timegm(time.strptime(str(year), '%Y')) - comptime = gyear + sec_of_day - return comptime - - -def uploadtime_to_ctime(ptstack_time, upload_year): - year = int(upload_year) - gyear = calendar.timegm(time.strptime(str(year), '%Y')) - day_of_year = float(ptstack_time.split(',')[0]) - 1.0 - hour = float(ptstack_time.split(',')[1].split(':')[0]) - minute = float(ptstack_time.split(',')[1].split(':')[1]) - second = float(ptstack_time.split(',')[1].split(':')[2]) - comptime = gyear + day_of_year * 60 * 60 * 24 + hour * 60 * 60 + minute * 60 + second - return comptime - - -def pop_first_vals(data_dict, group_size): - new_data_dict = {} - for key in data_dict.keys(): - if len(data_dict[key]): - new_data_dict[key] = data_dict[key][group_size:] - else: - print('no more data') - return new_data_dict - - -def front_group(data_dict, group_size): - new_data_dict = {} - for key in data_dict.keys(): - if len(data_dict[key]) > group_size: - new_data_dict[key] = data_dict[key][:group_size] - else: - new_data_dict[key] = data_dict[key] - return new_data_dict +#: The number of free ProgramTrack positions, when stack is empty. +FULL_STACK = 10000 class ACUAgent: @@ -77,13 +33,9 @@ class ACUAgent: """ def __init__(self, agent, acu_config='guess'): - self.lock = TimeoutLock() - self.jobs = { - 'monitor': 'idle', - 'broadcast': 'idle', - 'control': 'idle', # shared by all motion tasks/processes - 'scanspec': 'idle', - } + # Separate locks for exclusive access to az/el, and boresight motions. + self.azel_lock = TimeoutLock() + self.boresight_lock = TimeoutLock() self.acu_config = aculib.guess_config(acu_config) self.sleeptime = self.acu_config['motion_waittime'] @@ -91,6 +43,10 @@ def __init__(self, agent, acu_config='guess'): self.udp_schema = aculib.get_stream_schema(self.udp['schema']) self.udp_ext = self.acu_config['streams']['ext'] self.acu8100 = self.acu_config['status']['status_name'] + + # There may or may not be a special 3rd axis dataset that + # needs to be probed. + self.acu3rdaxis = self.acu_config['status'].get('3rdaxis_name') self.monitor_fields = status_keys.status_fields[self.acu_config['platform']]['status_fields'] self.motion_limits = self.acu_config['motion_limits'] @@ -112,31 +68,15 @@ def __init__(self, agent, acu_config='guess'): 'ACU_failures_errors': {}, 'platform_status': {}, 'ACU_emergency': {}, + 'third_axis': {}, }, 'broadcast': {}, - 'uploads': {'Start_Azimuth': 0.0, - 'Start_Elevation': 0.0, - 'Start_Boresight': 0.0, - 'Command_Type': 0, - 'Preset_Azimuth': 0.0, - 'Preset_Elevation': 0.0, - 'Preset_Boresight': 0.0, - 'PtStack_Lines': 'False', - 'PtStack_Time': '000, 00:00:00.000000', - 'PtStack_Azimuth': 0.0, - 'PtStack_Elevation': 0.0, - 'PtStack_AzVelocity': 0.0, - 'PtStack_ElVelocity': 0.0, - 'PtStack_AzFlag': 0, - 'PtStack_ElFlag': 0}, - 'scanspec': {}, } self.agent = agent self.take_data = False - # self.web_agent = tclient.Agent(reactor) tclient._HTTP11ClientFactory.noisy = False self.acu_control = aculib.AcuControl( @@ -146,17 +86,22 @@ def __init__(self, agent, acu_config='guess'): agent.register_process('monitor', self.monitor, - lambda session, params: self._set_job_stop('monitor'), + self._simple_process_stop, blocking=False, startup=True) agent.register_process('broadcast', self.broadcast, - lambda session, params: self._set_job_stop('broadcast'), + self._simple_process_stop, blocking=False, startup=True) agent.register_process('generate_scan', self.generate_scan, - lambda session, params: self._set_job_stop('control'), + self._simple_process_stop, + blocking=False, + startup=False) + agent.register_process('restart_idle', + self.restart_idle, + self._simple_process_stop, blocking=False, startup=False) basic_agg_params = {'frame_length': 60} @@ -192,122 +137,166 @@ def __init__(self, agent, acu_config='guess'): record=True, agg_params=influx_agg_params, buffer_time=1) - self.agent.register_feed('acu_upload', - record=True, - agg_params=basic_agg_params, - buffer_time=1) self.agent.register_feed('acu_error', record=True, agg_params=basic_agg_params, buffer_time=1) - agent.register_task('go_to', self.go_to, blocking=False) - agent.register_task('constant_velocity_scan', - self.constant_velocity_scan, - blocking=False) + agent.register_task('go_to', + self.go_to, + blocking=False, + aborter=self._simple_task_abort) agent.register_task('fromfile_scan', self.fromfile_scan, - blocking=False) + blocking=False, + aborter=self._simple_task_abort) agent.register_task('set_boresight', self.set_boresight, - blocking=False) + blocking=False, + aborter=self._simple_task_abort) agent.register_task('stop_and_clear', self.stop_and_clear, blocking=False) + agent.register_task('clear_faults', + self.clear_faults, + blocking=False) - # Operation management. This agent has several Processes that - # must be able to alone or simultaneously. The state of each is - # registered in self.jobs, protected by self.lock (though this is - # probably not necessary as long as we don't thread). Any logic - # to assess conflicts should probably be in _try_set_job. + @inlineCallbacks + def _simple_task_abort(self, session, params): + # Trigger a task abort by updating state to "stopping" + yield session.set_status('stopping') - def _try_set_job(self, job_name): - """ - Set a job status to 'run'. + @inlineCallbacks + def _simple_process_stop(self, session, params): + # Trigger a process stop by updating state to "stopping" + yield session.set_status('stopping') - Parameters: - job_name (str): Name of the task/process you are trying to start. - """ - with self.lock.acquire_timeout(timeout=1.0, job=job_name) as acquired: - if not acquired: - self.log.warn("Lock could not be acquired because it is held" - f" by {self.lock.job}") - return False - # Set running. - self.jobs[job_name] = 'run' - return (True, 'ok') - - def _set_job_stop(self, job_name): - """ - Set a job status to 'stop'. + @ocs_agent.param('_') + @inlineCallbacks + def restart_idle(self, session, params): + """restart_idle() - Parameters: - job_name (str): Name of the process you are trying to stop. - """ - print('try to acquire stop') - # return (False, 'Could not stop') - with self.lock.acquire_timeout(timeout=1.0, job=job_name) as acquired: - if not acquired: - self.log.warn("Lock could not be acquired because it is" - f" held by {self.lock.job}") - return False - try: - self.jobs[job_name] = 'stop' - # state = self.jobs.get(job_name, 'idle') - # if state == 'idle': - # return False, 'Job not running.' - # if state == 'stop': - # return False, 'Stop already requested.' - # self.jobs[job_name] = 'stop' - return True, 'Requested Process stop.' - except Exception as e: - print(str(e)) + **Process** - Issue the 'RestartIdleTime' command at regular intervals. - def _set_job_done(self, job_name): - """ - Set a job status to 'idle'. + In some ACU versions, this is required to prevent the antenna + from reverting to survival mode. - Parameters: - job_name (str): Name of the task/process you are trying to idle. """ - with self.lock.acquire_timeout(timeout=1.0, job=job_name) as acquired: - if not acquired: - self.log.warn("Lock could not be acquried because it is held" - f" by {self.lock.job}") - return False - self.jobs[job_name] = 'idle' - - # - # The Operations - # + session.set_status('running') + while session.status in ['running']: + resp = yield self.acu_control.http.Command('DataSets.CmdModeTransfer', + 'RestartIdleTime') + self.log.info('Sent RestartIdleTime') + self.log.info(resp) + yield dsleep(1. * 60.) + return True, 'Process "restart_idle" exited cleanly.' @inlineCallbacks def monitor(self, session, params): """monitor() **Process** - Refresh the cache of SATP ACU status information and - report it on the 'acu_status_summary' and 'acu_status_full' HK feeds. + report it on the 'acu_status' and 'acu_status_influx' HK feeds. Summary parameters are ACU-provided time code, Azimuth mode, Azimuth position, Azimuth velocity, Elevation mode, Elevation position, Elevation velocity, Boresight mode, and Boresight position. + The session.data of this process is a nested dictionary. + Here's an example:: + + { + "StatusDetailed": { + "Time": 81.661170959322, + "Year": 2023, + "Azimuth mode": "Stop", + "Azimuth commanded position": -20.0012, + "Azimuth current position": -20.0012, + "Azimuth current velocity": 0.0002, + "Azimuth average position error": 0, + "Azimuth peak position error": 0, + "Azimuth computer disabled": false, + ... + }, + "Status3rdAxis": { + "3rd axis Mode": "Stop", + "3rd axis commanded position": 77, + "3rd axis current position": 77, + "3rd axis computer disabled": "No Fault", + ... + }, + "StatusResponseRate": 19.237531827325963, + "PlatformType": "satp" + } + + In the case of an SATP, the Status3rdAxis is not populated + (the Boresight info can be found in StatusDetailed). In the + case of the LAT, the corotator info is queried separately and + stored under Status3rdAxis. + """ - ok, msg = self._try_set_job('monitor') - if not ok: - return ok, msg session.set_status('running') version = yield self.acu_read.http.Version() self.log.info(version) - mode_key = {'Stop': 0, - 'Preset': 1, - 'ProgramTrack': 2, - 'Stow': 3, - 'SurvivalMode': 4, - 'Rate': 5, - 'StarTrack': 6, - } + # Note that session.data will get scanned, to assign data to + # feed blocks. Items in session.data that are themselves + # dicts will parsed; but items (such as PlatformType and + # StatusResponseRate) which are simple strings or floats will + # be ignored for feed assignment. + session.data = {'PlatformType': self.acu_config['platform']} + + # Numbering as per ICD. + mode_key = { + 'Stop': 0, + 'Preset': 1, + 'ProgramTrack': 2, + 'Rate': 3, + 'SectorScan': 4, + 'SearchSpiral': 5, + 'SurvivalMode': 6, + 'StepTrack': 7, + 'GeoSync': 8, + 'OPT': 9, + 'TLE': 10, + 'Stow': 11, + 'StarTrack': 12, + 'SunTrack': 13, + 'MoonTrack': 14, + 'I11P': 15, + 'AutoTrack/Preset': 16, + 'AutoTrack/PositionMemory': 17, + 'AutoTrack/PT': 18, + 'AutoTrack/OPT': 19, + 'AutoTrack/PT/Search': 20, + 'AutoTrack/TLE': 21, + 'AutoTrack/TLE/Search': 22, + + # Currently we do not have ICD values for these, but they + # are included in the output of Meta. ElSync, at least, + # is a known third axis mode for the LAT. + 'ElSync': 100, + 'UnStow': 101, + 'MaintenanceStow': 102, + } + + # fault_key digital values taken from ICD (correspond to byte-encoding) + fault_key = { + 'No Fault': 0, + 'Warning': 1, + 'Fault': 2, + 'Critical': 3, + 'No Data': 4, + 'Latched Fault': 5, + 'Latched Critical Fault': 6, + } + pin_key = { + # Capitalization matches strings in ACU binary, not ICD. + 'Any Moving': 0, + 'All Inserted': 1, + 'All Retracted': 2, + 'Failure': 3, + } tfn_key = {'None': float('nan'), 'False': 0, 'True': 1, @@ -317,23 +306,49 @@ def monitor(self, session, params): n_ok = 0 min_query_period = 0.05 # Seconds query_t = 0 - while self.jobs['monitor'] == 'run': - now = time.time() + prev_checkdata = {'ctime': time.time(), + 'Azimuth_mode': None, + 'Elevation_mode': None, + 'Boresight_mode': None, + } - if now > report_t + report_period: - self.log.info('Responses ok at %.3f Hz' - % (n_ok / (now - report_t))) - report_t = now - n_ok = 0 + j = yield self.acu_read.http.Values(self.acu8100) + if self.acu3rdaxis: + j2 = yield self.acu_read.http.Values(self.acu3rdaxis) + else: + j2 = {} + session.data.update({'StatusDetailed': j, + 'Status3rdAxis': j2, + 'StatusResponseRate': n_ok / (query_t - report_t)}) + + was_remote = False + last_resp_rate = None + while session.status in ['running']: + + now = time.time() if now - query_t < min_query_period: yield dsleep(min_query_period - (now - query_t)) query_t = time.time() + if query_t > report_t + report_period: + resp_rate = n_ok / (query_t - report_t) + if last_resp_rate is None or (abs(resp_rate - last_resp_rate) + > max(0.1, last_resp_rate * .01)): + self.log.info('Data rate for "monitor" stream is now %.3f Hz' % (resp_rate)) + last_resp_rate = resp_rate + report_t = query_t + n_ok = 0 + session.data.update({'StatusResponseRate': resp_rate}) + try: j = yield self.acu_read.http.Values(self.acu8100) + if self.acu3rdaxis: + j2 = yield self.acu_read.http.Values(self.acu3rdaxis) + else: + j2 = {} + session.data.update({'StatusDetailed': j, 'Status3rdAxis': j2}) n_ok += 1 - session.data = j except Exception as e: # Need more error handling here... errormsg = {'aculib_error_message': str(e)} @@ -345,20 +360,33 @@ def monitor(self, session, params): self.agent.publish_to_feed('acu_error', acu_error) yield dsleep(1) continue - - for (key, value) in session.data.items(): - for category in self.monitor_fields: - if key in self.monitor_fields[category]: - if isinstance(value, bool): - self.data['status'][category][self.monitor_fields[category][key]] = int(value) - elif isinstance(value, int) or isinstance(value, float): - self.data['status'][category][self.monitor_fields[category][key]] = value - elif value is None: - self.data['status'][category][self.monitor_fields[category][key]] = float('nan') - else: - self.data['status'][category][self.monitor_fields[category][key]] = str(value) + for k, v in session.data.items(): + if isinstance(v, dict): + for (key, value) in v.items(): + for category in self.monitor_fields: + if key in self.monitor_fields[category]: + if isinstance(value, bool): + self.data['status'][category][self.monitor_fields[category][key]] = int(value) + elif isinstance(value, int) or isinstance(value, float): + self.data['status'][category][self.monitor_fields[category][key]] = value + elif value is None: + self.data['status'][category][self.monitor_fields[category][key]] = float('nan') + else: + self.data['status'][category][self.monitor_fields[category][key]] = str(value) self.data['status']['summary']['ctime'] =\ - timecode(self.data['status']['summary']['Time']) + sh.timecode(self.data['status']['summary']['Time']) + if self.data['status']['platform_status']['Remote_mode'] == 0: + if was_remote: + was_remote = False + self.log.warn('ACU in local mode!') + elif not was_remote: + was_remote = True + self.log.warn('ACU now in remote mode.') + if self.data['status']['summary']['ctime'] == prev_checkdata['ctime']: + self.log.warn('ACU time has not changed from previous data point!') + for axis_mode in ['Azimuth_mode', 'Elevation_mode', 'Boresight_mode']: + if self.data['status']['summary'][axis_mode] != prev_checkdata[axis_mode]: + self.log.info(axis_mode + ' has changed to ' + self.data['status']['summary'][axis_mode]) # influx_status refers to all other self.data['status'] keys. Do not add # more keys to any self.data['status'] categories beyond this point @@ -373,8 +401,15 @@ def monitor(self, session, params): influx_status[statkey + '_influx'] = float('nan') elif statval in ['True', 'False']: influx_status[statkey + '_influx'] = tfn_key[statval] - else: + elif statval in mode_key: influx_status[statkey + '_influx'] = mode_key[statval] + elif statval in fault_key: + influx_status[statkey + '_influx'] = fault_key[statval] + elif statval in pin_key: + influx_status[statkey + '_influx'] = pin_key[statval] + else: + raise ValueError('Could not convert value for %s="%s"' % + (statkey, statval)) elif isinstance(statval, int): if statkey in ['Year', 'Free_upload_positions']: influx_status[statkey + '_influx'] = float(statval) @@ -393,14 +428,13 @@ def monitor(self, session, params): 'data': {'Elevation_commanded_position_influx': self.data['status']['commands']['Elevation_commanded_position']} } self.agent.publish_to_feed('acu_commands_influx', acucommand_el) - if str(self.data['status']['commands']['Boresight_commanded_position']) != 'nan': - acucommand_bs = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_commanded_positions_boresight', - 'data': {'Boresight_commanded_position_influx': self.data['status']['commands']['Boresight_commanded_position']} - } - self.agent.publish_to_feed('acu_commands_influx', acucommand_bs) - if self.data['uploads']['PtStack_Time'] == '000, 00:00:00.000000': - self.data['uploads']['PtStack_ctime'] = self.data['status']['summary']['ctime'] + if self.acu_config['platform'] == 'satp': + if str(self.data['status']['commands']['Boresight_commanded_position']) != 'nan': + acucommand_bs = {'timestamp': self.data['status']['summary']['ctime'], + 'block_name': 'ACU_commanded_positions_boresight', + 'data': {'Boresight_commanded_position_influx': self.data['status']['commands']['Boresight_commanded_position']} + } + self.agent.publish_to_feed('acu_commands_influx', acucommand_bs) acustatus_summary = {'timestamp': self.data['status']['summary']['ctime'], @@ -464,31 +498,58 @@ def monitor(self, session, params): self.agent.publish_to_feed('acu_status', block) self.agent.publish_to_feed('acu_status_influx', acustatus_influx, from_reactor=True) - # self._set_job_stop('monitor') - # yield dsleep(1) - # self._set_job_done('monitor') + prev_checkdata = {'ctime': self.data['status']['summary']['ctime'], + 'Azimuth_mode': self.data['status']['summary']['Azimuth_mode'], + 'Elevation_mode': self.data['status']['summary']['Elevation_mode'], + 'Boresight_mode': self.data['status']['summary']['Boresight_mode'], + } return True, 'Acquisition exited cleanly.' @inlineCallbacks def broadcast(self, session, params): """broadcast() - **Process** - Read UDP data from the port specified by self.acu_config, - decode it, and publish to an HK feed. + **Process** - Read UDP data from the port specified by + self.acu_config, decode it, and publish to HK feeds. Full + resolution (200 Hz) data are written to feed "acu_udp_stream" + while 1 Hz decimated are written to "acu_broadcast_influx". + The 1 Hz decimated output are also stored in session.data. + + The session.data looks like this (this is for a SATP running + with servo details in the UDP output):: + + { + "Time": 1679499948.8234625, + "Corrected_Azimuth": -20.00112176010607, + "Corrected_Elevation": 50.011521050839434, + "Corrected_Boresight": 29.998428712246067, + "Raw_Azimuth": -20.00112176010607, + "Raw_Elevation": 50.011521050839434, + "Raw_Boresight": 29.998428712246067, + "Azimuth_Current_1": -0.000384521484375, + "Azimuth_Current_2": -0.0008331298828125, + "Elevation_Current_1": 0.003397979736328125, + "Boresight_Current_1": -0.000483856201171875, + "Boresight_Current_2": -0.000105743408203125, + "Azimuth_Vel_1": -0.000002288818359375, + "Azimuth_Vel_2": 0, + "Az_Vel_Act": -0.0000011444091796875, + "Az_Vel_Des": 0, + "Az_Vffw": 0, + "Az_Pos_Des": -20.00112176010607, + "Az_Pos_Err": 0 + } """ - ok, msg = self._try_set_job('broadcast') - if not ok: - return ok, msg session.set_status('running') FMT = self.udp_schema['format'] FMT_LEN = struct.calcsize(FMT) UDP_PORT = self.udp['port'] udp_data = [] fields = self.udp_schema['fields'] + session.data = {} class MonitorUDP(protocol.DatagramProtocol): - def datagramReceived(self, data, src_addr): host, port = src_addr offset = 0 @@ -496,72 +557,327 @@ def datagramReceived(self, data, src_addr): d = struct.unpack(FMT, data[offset:offset + FMT_LEN]) udp_data.append(d) offset += FMT_LEN + handler = reactor.listenUDP(int(UDP_PORT), MonitorUDP()) - while self.jobs['broadcast'] == 'run': - if udp_data: + influx_data = {} + influx_data['Time_bcast_influx'] = [] + for i in range(2, len(fields)): + influx_data[fields[i].replace(' ', '_') + '_bcast_influx'] = [] + + active = True + last_packet_time = time.time() + + while session.status in ['running']: + now = time.time() + if len(udp_data) >= 200: + if not active: + self.log.info('UDP packets are being received.') + active = True + last_packet_time = now + process_data = udp_data[:200] udp_data = udp_data[200:] - year = datetime.datetime.now().year - gyear = calendar.timegm(time.strptime(str(year), '%Y')) - if len(process_data): - sample_rate = (len(process_data) - / ((process_data[-1][0] - process_data[0][0]) * 86400 - + process_data[-1][1] - process_data[0][1])) - else: - sample_rate = 0.0 - latest_az = process_data[2] - latest_el = process_data[3] - latest_az_raw = process_data[4] - latest_el_raw = process_data[5] - session.data = {'sample_rate': sample_rate, - 'latest_az': latest_az, - 'latest_el': latest_el, - 'latest_az_raw': latest_az_raw, - 'latest_el_raw': latest_el_raw - } - bcast_first = {} - pd0 = process_data[0] - pd0_gday = (pd0[0] - 1) * 86400 - pd0_sec = pd0[1] - pd0_data_ctime = gyear + pd0_gday + pd0_sec - bcast_first['Time_bcast_influx'] = pd0_data_ctime - for i in range(2, len(pd0)): - bcast_first[fields[i].replace(' ', '_') + '_bcast_influx'] = pd0[i] - acu_broadcast_influx = {'timestamp': bcast_first['Time_bcast_influx'], - 'block_name': 'ACU_position_bcast_influx', - 'data': bcast_first, - } - self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx, from_reactor=True) for d in process_data: - gday = (d[0] - 1) * 86400 - sec = d[1] - data_ctime = gyear + gday + sec + data_ctime = sh.timecode(d[0] + d[1] / sh.DAY) self.data['broadcast']['Time'] = data_ctime + influx_data['Time_bcast_influx'].append(data_ctime) for i in range(2, len(d)): self.data['broadcast'][fields[i].replace(' ', '_')] = d[i] + influx_data[fields[i].replace(' ', '_') + '_bcast_influx'].append(d[i]) acu_udp_stream = {'timestamp': self.data['broadcast']['Time'], 'block_name': 'ACU_broadcast', 'data': self.data['broadcast'] } - # print(acu_udp_stream) self.agent.publish_to_feed('acu_udp_stream', acu_udp_stream, from_reactor=True) + influx_means = {} + for key in influx_data.keys(): + influx_means[key] = np.mean(influx_data[key]) + influx_data[key] = [] + acu_broadcast_influx = {'timestamp': influx_means['Time_bcast_influx'], + 'block_name': 'ACU_bcast_influx', + 'data': influx_means, + } + self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx, from_reactor=True) + sd = {} + for ky in influx_means: + sd[ky.split('_bcast_influx')[0]] = influx_means[ky] + session.data.update(sd) else: + if active and now - last_packet_time > 3: + self.log.info('No UDP packets are being received.') + active = False yield dsleep(1) yield dsleep(0.005) handler.stopListening() - self._set_job_done('broadcast') return True, 'Acquisition exited cleanly.' + @inlineCallbacks + def _check_daq_streams(self, stream): + yield + session = self.agent.sessions[stream] + if session.status != 'running': + self.log.warn("Process '%s' is not running" % stream) + return False + if stream == 'broadcast': + timestamp = self.data['broadcast'].get('Time') + else: + timestamp = self.data['status']['summary'].get('ctime') + if timestamp is None: + self.log.warn('%s daq stream has no data yet.' % stream) + return False + delta = time.time() - timestamp + if delta > 2: + self.log.warn(f'{stream} daq stream has old data ({delta} seconds)') + return False + return True + + @inlineCallbacks + def _check_ready_motion(self, session): + bcast_check = yield self._check_daq_streams('broadcast') + if not bcast_check: + return False, 'Motion blocked; problem with "broadcast" data acq process.' + + monitor_check = yield self._check_daq_streams('monitor') + if not monitor_check: + return False, 'Motion blocked; problem with "monitor" data acq process.' + + if self.data['status']['platform_status']['Remote_mode'] == 0: + self.log.warn('ACU in local mode, cannot perform motion with OCS.') + return False, 'ACU not in remote mode.' + + return True, 'Agent state ok for motion.' + + def _get_limit_func(self, axis): + """Construct a function limit(x) that will enforce that x is within + the configured limits for axis. Returns the funcion and the + tuple of limits (lower, upper). + + """ + limits = self.motion_limits[axis.lower()] + limits = limits['lower'], limits['upper'] + + def limit_func(target): + return max(min(target, limits[1]), limits[0]) + return limit_func, limits + + @inlineCallbacks + def _go_to_axis(self, session, axis, target): + """Execute a movement, using "Preset" mode, on a specific axis. + + Args: + session: session object variable of the parent operation. + axis (str): one of 'Azimuth', 'Elevation', 'Boresight'. + target (float): target position. + + Returns: + ok (bool): True if the motion completed successfully and + arrived at target position. + msg (str): success/error message. + + """ + # Step time in event loop. + TICK_TIME = 0.1 + + # Time for which to sample distance for "still" and "moving" + # conditions. + PROFILE_TIME = 1. + + # When aborting, how many seconds to use to project a good + # stopping position (d = v*t) + ABORT_TIME = 2. + + # Threshold (deg) for declaring that we've reached + # destination. + THERE_YET = 0.01 + + # How long to wait after initiation for signs of motion, + # before giving up. This is normally within 2 or 3 seconds + # (SATP), but in "cold" cases where siren needs to sound, this + # can be as long as 12 seconds. + MAX_STARTUP_TIME = 13. + + # Velocity to assume when computing maximum time a move should take (to bail + # out in unforeseen circumstances). + UNREASONABLE_VEL = 0.5 + + # Positive acknowledgment of AcuControl.go_to + OK_RESPONSE = b'OK, Command executed.' + + # Enum for the motion states + State = Enum(f'{axis}State', + ['INIT', 'WAIT_MOVING', 'WAIT_STILL', 'FAIL', 'DONE']) + + # Specialization for different axis types. + # pos/mode are common: + def get_pos(): + return self.data['status']['summary'][f'{axis}_current_position'] + + def get_mode(): + return self.data['status']['summary'][f'{axis}_mode'] + + # vel/goto are different: + if axis in ['Azimuth', 'Elevation']: + def get_vel(): + return self.data['status']['summary'][f'{axis}_current_velocity'] + + if axis == 'Azimuth': + @inlineCallbacks + def goto(target): + result = yield self.acu_control.go_to(az=target) + return result + else: + @inlineCallbacks + def goto(target): + result = yield self.acu_control.go_to(el=target) + return result + + elif axis in ['Boresight']: + def get_vel(): + return 0. + + @inlineCallbacks + def goto(target): + result = yield self.acu_control.go_3rd_axis(target) + return result + + else: + return False, f"No configuration for axis={axis}" + + limit_func, _ = self._get_limit_func(axis) + + # History of recent distances from target. + history = [] + + def get_history(t): + # Returns (ok, hist) where hist is roughly the past t + # seconds of position data and ok is whether or not + # that much history was actually available. + n = int(t // TICK_TIME) + 1 + return (n <= len(history)), history[-n:] + + last_state = None + state = State.INIT + start_time = None + motion_aborted = False + assumption_fail = False + motion_completed = False + give_up_time = None + has_never_moved = True + + while session.status in ['starting', 'running', 'stopping']: + # Time ... + now = time.time() + if start_time is None: + start_time = now + time_since_start = now - start_time + motion_expected = time_since_start > MAX_STARTUP_TIME + + # Space ... + current_pos, current_vel = get_pos(), get_vel() + distance = abs(target - current_pos) + history.append(distance) + if give_up_time is None: + give_up_time = now + distance / UNREASONABLE_VEL \ + + MAX_STARTUP_TIME + 2 * PROFILE_TIME + + # Do we seem to be moving / not moving? + ok, _d = get_history(PROFILE_TIME) + still = ok and (np.std(_d) < 0.01) + moving = ok and (np.std(_d) >= 0.01) + has_never_moved = (has_never_moved and not moving) + + near_destination = distance < THERE_YET + mode_ok = (get_mode() == 'Preset') + + # Log only on state changes + if state != last_state: + _state = f'{axis}.state={state.name}' + self.log.info( + f'{_state:<30} dt={now-start_time:7.3f} dist={distance:8.3f}') + last_state = state + + # Handle task abort + if session.status == 'stopping' and not motion_aborted: + target = limit_func(current_pos + current_vel * ABORT_TIME) + state = State.INIT + motion_aborted = True + + # Turn "too long" into an immediate exit. + if now > give_up_time: + self.log.error('Motion did not complete in a timely fashion; exiting.') + assumption_fail = True + break + + # Main state machine + if state == State.INIT: + # Set target position and change mode to Preset. + result = yield goto(target) + if result == OK_RESPONSE: + state = State.WAIT_MOVING + else: + self.log.error(f'ACU rejected go_to with message: {result}') + state = State.FAIL + # Reset the clock for tracking "still" / "moving". + history = [] + start_time = time.time() + + elif state == State.WAIT_MOVING: + # Position and mode change requested, now wait for + # either mode change or clear failure of motion. + if mode_ok: + state = state.WAIT_STILL + elif still and motion_expected: + self.log.error(f'Motion did not start within {MAX_STARTUP_TIME:.1f} s.') + state = state.FAIL + + elif state == State.WAIT_STILL: + # Once moving, watch for end of motion. + if not mode_ok: + self.log.error('Unexpected axis mode transition; exiting.') + state = State.FAIL + elif still: + if near_destination: + state = State.DONE + elif has_never_moved and motion_expected: + # The settling time, near a soft limit, can be + # a bit long ... so only timeout on + # motion_expected if we've never moved at all. + self.log.error(f'Motion did not start within {MAX_STARTUP_TIME:.1f} s.') + state = State.FAIL + + elif state == State.FAIL: + # Move did not complete as planned. + assumption_fail = True + break + + elif state == State.DONE: + # We seem to have arrived at destination. + motion_completed = True + break + + # Keep only ~20 seconds of history ... + _, history = get_history(20.) + + yield dsleep(TICK_TIME) + + success = motion_completed and not (motion_aborted or assumption_fail) + + if success: + msg = 'Move complete.' + elif motion_aborted: + msg = 'Move aborted!' + else: + msg = 'Irregularity during motion!' + return success, msg + @ocs_agent.param('az', type=float) @ocs_agent.param('el', type=float) - @ocs_agent.param('wait', default=1., type=float) @ocs_agent.param('end_stop', default=False, type=bool) - @ocs_agent.param('rounding', default=1, type=int) @inlineCallbacks def go_to(self, session, params): - """go_to(az=None, el=None, wait=1., end_stop=False, rounding=1) + """go_to(az=None, el=None, end_stop=False) **Task** - Move the telescope to a particular point (azimuth, elevation) in Preset mode. When motion has ended and the telescope @@ -570,124 +886,56 @@ def go_to(self, session, params): Parameters: az (float): destination angle for the azimuthal axis el (float): destination angle for the elevation axis - wait (float): amount of time to wait for motion to end end_stop (bool): put the telescope in Stop mode at the end of the motion - rounding (int): number of decimal places to round to """ - ok, msg = self._try_set_job('control') - if not ok: - return ok, msg - az = params['az'] - el = params['el'] - if az <= self.motion_limits['azimuth']['lower'] or az >= self.motion_limits['azimuth']['upper']: - raise ocs_agent.ParamError("Azimuth out of range! Must be " - + f"{self.motion_limits['azimuth']['lower']} < az " - + f"< {self.motion_limits['azimuth']['upper']}") - if el <= self.motion_limits['elevation']['lower'] or el >= self.motion_limits['elevation']['upper']: - raise ocs_agent.ParamError("Elevation out of range! Must be " - + f"{self.motion_limits['elevation']['lower']} < el " - + f"< {self.motion_limits['elevation']['upper']}") - end_stop = params['end_stop'] - wait_for_motion = params['wait'] - round_int = params['rounding'] - self.log.info('Azimuth commanded position: ' + str(az)) - self.log.info('Elevation commanded position: ' + str(el)) - current_az = round(self.data['broadcast']['Corrected_Azimuth'], 4) - current_el = round(self.data['broadcast']['Corrected_Elevation'], 4) - self.data['uploads']['Start_Azimuth'] = current_az - self.data['uploads']['Start_Elevation'] = current_el - self.data['uploads']['Command_Type'] = 1 - self.data['uploads']['Preset_Azimuth'] = az - self.data['uploads']['Preset_Elevation'] = el - - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - - # Check whether the telescope is already at the point - # self.log.info('Checking current position') - yield self.acu_control.mode('Preset') - if round(current_az, round_int) == az and \ - round(current_el, round_int) == el: - yield self.acu_control.go_to(az, el, wait=0.1) - self.log.info('Already at commanded position.') - self._set_job_done('control') - return True, 'Preset at commanded position' - # yield self.acu.stop() - # yield self.acu_control.mode('Stop') - # self.log.info('Stopped') - # yield dsleep(0.5) - yield self.acu_control.go_to(az, el, wait=0.1) - mdata = self.data['status']['summary'] - # Wait for telescope to start moving - self.log.info('Moving to commanded position') - wait_for_motion_start = time.time() - elapsed_wait_for_motion = 0.0 - while mdata['Azimuth_current_velocity'] == 0.0 and\ - mdata['Elevation_current_velocity'] == 0.0: - if elapsed_wait_for_motion < 30.: - yield dsleep(wait_for_motion) - elapsed_wait_for_motion = time.time() - wait_for_motion_start - mdata = self.data['status']['summary'] - else: - if round(mdata['Azimuth_current_position'] - az, round_int) == 0. and \ - round(mdata['Elevation_current_position'] - el, round_int) == 0.: - if end_stop: - yield self.acu_control.stop() - self.log.info('Az and el in Stop mode') - self._set_job_done('control') - return True, 'Pointing completed' + with self.azel_lock.acquire_timeout(0, job='go_to') as acquired: + if not acquired: + return False, f"Operation failed: {self.azel_lock.job} is running." + + ok, msg = yield self._check_ready_motion(session) + if not ok: + return False, msg + + target_az = params['az'] + target_el = params['el'] + + for axis, target in {'azimuth': target_az, 'elevation': target_el}.items(): + limit_func, limits = self._get_limit_func(axis) + if target != limit_func(target): + raise ocs_agent.ParamError( + f'{axis}={target} not in accepted range, ' + f'[{limits[0]}, {limits[1]}].') + + self.log.info(f'Commanded position: az={target_az}, el={target_el}') + session.set_status('running') + + moves = yield DeferredList([ + self._go_to_axis(session, 'Azimuth', target_az), + self._go_to_axis(session, 'Elevation', target_el), + ]) + all_ok, msgs = True, [] + for _ok, result in moves: + if _ok: + all_ok = all_ok and result[0] + msgs.append(result[1]) else: - yield self.acu_control.stop() - self._set_job_done('control') - return False, 'Motion never occurred! Stop activated' - yield dsleep(wait_for_motion) - mdata = self.data['status']['summary'] - moving = True - while moving: - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - mdata = self.data['status']['summary'] - ve = round(mdata['Elevation_current_velocity'], 2) - va = round(mdata['Azimuth_current_velocity'], 2) - if (ve != 0.0) or (va != 0.0): - moving = True - yield dsleep(wait_for_motion) + all_ok = False + msgs.append(f'Crash! {result}') + + if all_ok: + msg = msgs[0] else: - moving = False - mdata = self.data['status']['summary'] - pe = round(mdata['Elevation_current_position'], round_int) - pa = round(mdata['Azimuth_current_position'], round_int) - if pe != el or pa != az: - yield self.acu_control.stop() - self.log.warn('Stopped before reaching commanded point!') - return False, 'Something went wrong!' - modes = (mdata['Azimuth_mode'], mdata['Elevation_mode']) - if modes != ('Preset', 'Preset'): - return False, 'Fault triggered!' - if end_stop: - yield self.acu_control.stop() - self.log.info('Stop mode activated') - self.data['uploads']['Start_Azimuth'] = 0.0 - self.data['uploads']['Start_Elevation'] = 0.0 - self.data['uploads']['Command_Type'] = 0 - self.data['uploads']['Preset_Azimuth'] = 0.0 - self.data['uploads']['Preset_Elevation'] = 0.0 - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload, from_reactor=True) - self._set_job_done('control') - return True, 'Pointing completed' + msg = f'az: {msgs[0]} el: {msgs[1]}' + + if all_ok and params['end_stop']: + yield self.acu_control.mode('Stop') + return all_ok, msg + + @ocs_agent.param('target', type=float) + @ocs_agent.param('end_stop', default=False, type=bool) @inlineCallbacks def set_boresight(self, session, params): """set_boresight(b=None, end_stop=False) @@ -695,262 +943,200 @@ def set_boresight(self, session, params): **Task** - Move the telescope to a particular third-axis angle. Parameters: - b (float): destination angle for boresight rotation + target (float): destination angle for boresight rotation end_stop (bool): put axes in Stop mode after motion """ - ok, msg = self._try_set_job('control') - if not ok: - return ok, msg - bs_destination = params.get('b') - # yield self.acu_control.stop() - yield dsleep(5) - self.data['uploads']['Start_Boresight'] = self.data['status']['summary']['Boresight_current_position'] - self.data['uploads']['Command_Type'] = 1 - self.data['uploads']['Preset_Boresight'] = bs_destination - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - yield self.acu_control.go_3rd_axis(bs_destination) - current_position = self.data['status']['summary']['Boresight_current_position'] - while current_position != bs_destination: - yield dsleep(1) - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - current_position = self.data['status']['summary']['Boresight_current_position'] - if params.get('end_stop'): - yield self.acu_control.stop() - self.data['uploads']['Start_Boresight'] = 0.0 - self.data['uploads']['Command_Type'] = 0 - self.data['uploads']['Preset_Boresight'] = 0.0 - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - self._set_job_done('control') - return True, 'Moved to new 3rd axis position' + with self.boresight_lock.acquire_timeout(0, job='set_boresight') as acquired: + if not acquired: + return False, f"Operation failed: {self.boresight_lock.job} is running." + + ok, msg = yield self._check_ready_motion(session) + if not ok: + return False, msg + + target = params['target'] + + for axis, target in {'boresight': target}.items(): + limit_func, limits = self._get_limit_func(axis) + if target != limit_func(target): + raise ocs_agent.ParamError( + f'{axis}={target} not in accepted range, ' + f'[{limits[0]}, {limits[1]}].') + + self.log.info(f'Commanded position: boresight={target}') + session.set_status('running') + + ok, msg = yield self._go_to_axis(session, 'Boresight', target) + + if ok and params['end_stop']: + yield self.acu_control.http.Command('DataSets.CmdModeTransfer', + 'Set3rdAxisMode', 'Stop') + + return ok, msg + + @inlineCallbacks + def clear_faults(self, session, params): + """clear_faults() + + **Task** - Clear any axis faults. + + """ + + session.set_status('running') + yield self.acu_control.clear_faults() + session.set_status('stopping') + return True, 'Job completed.' @inlineCallbacks def stop_and_clear(self, session, params): """stop_and_clear() - **Task** - Change the azimuth and elevation modes to Stop and clear - points uploaded to the stack. + **Task** - Change the azimuth, elevation, and 3rd axis modes + to Stop; also clear the ProgramTrack stack. """ - ok, msg = self._try_set_job('control') - if not ok: - self._set_job_done('control') - yield dsleep(0.1) - self._try_set_job('control') - self.log.info('_try_set_job ok') - # yield self.acu.stop() - yield self.acu_control.mode('Stop') - self.log.info('Stop called') - yield dsleep(5) - yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', - 'Clear Stack') - yield dsleep(0.1) - self.log.info('Cleared stack.') - self._set_job_done('control') + + session.set_status('running') + i = 0 + while i < 5: + modes = [self.data['status']['summary']['Azimuth_mode'], + self.data['status']['summary']['Elevation_mode'], + ] + if self.acu_config['platform'] == 'satp': + modes.append(self.data['status']['summary']['Boresight_mode']) + elif self.acu_config['platform'] in ['ccat', 'lat']: + modes.append(self.data['status']['third_axis']['Axis3_mode']) + if modes != ['Stop', 'Stop', 'Stop']: + yield self.acu_control.stop() + self.log.info('Stop called (iteration %i)' % (i + 1)) + yield dsleep(0.1) + i += 1 + else: + self.log.info('All axes in Stop mode') + i = 5 + modes = [self.data['status']['summary']['Azimuth_mode'], + self.data['status']['summary']['Elevation_mode'], + ] + if self.acu_config['platform'] == 'satp': + modes.append(self.data['status']['summary']['Boresight_mode']) + elif self.acu_config['platform'] in ['ccat', 'lat']: + modes.append(self.data['status']['third_axis']['Axis3_mode']) + if modes != ['Stop', 'Stop', 'Stop']: + self.log.error('Axes could not be set to Stop!') + return False, 'Could not set axes to Stop mode' + j = 0 + while j < 5: + free_stack = self.data['status']['summary']['Free_upload_positions'] + if free_stack < FULL_STACK: + yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', + 'Clear Stack') + self.log.info('Clear Stack called (iteration %i)' % (j + 1)) + yield dsleep(0.1) + j += 1 + else: + self.log.info('Stack cleared') + j = 5 + free_stack = self.data['status']['summary']['Free_upload_positions'] + if free_stack < FULL_STACK: + self.log.warn('Stack not fully cleared!') + return False, 'Could not clear stack' + + session.set_status('stopping') return True, 'Job completed' + @ocs_agent.param('filename', type=str) + @ocs_agent.param('adjust_times', type=bool, default=True) + @ocs_agent.param('azonly', type=bool, default=True) @inlineCallbacks def fromfile_scan(self, session, params=None): - """fromfile_scan(filename=None, simulator=None) + """fromfile_scan(filename=None, adjust_times=True, azonly=True) **Task** - Upload and execute a scan pattern from numpy file. Parameters: - filename (str): full path to desired numpy file. File contains - an array of three lists ([list(times), list(azimuths), - list(elevations)]). Times begin from 0.0. - simulator (bool): toggle for using the ACU simulator. + filename (str): full path to desired numpy file. File should + contain an array of shape (5, nsamp) or (7, nsamp). See Note. + adjust_times (bool): If True (the default), the track + timestamps are interpreted as relative times, only, + and the track will be formatted so the first point + happens a few seconds in the future. If False, the + track times will be taken at face value (even if the + first one is, like, 0). + azonly (bool): If True, the elevation part of the track + will be uploaded but the el axis won't be put in + ProgramTrack mode. It might be put in Stop mode + though. + + Notes: + The columns in the numpy array are: + + - 0: timestamps, in seconds. + - 1: azimuth, in degrees. + - 2: elevation, in degrees. + - 3: az_vel, in deg/s. + - 4: el_vel, in deg/s. + - 5: az_flags (2 if last point in a leg; 1 otherwise.) + - 6: el_flags (2 if last point in a leg; 1 otherwise.) + + It is acceptable to omit columns 5 and 6. """ - filename = params.get('filename') - simulator = params.get('simulator') - times, azs, els, vas, ves, azflags, elflags = sh.from_file(filename) - if min(azs) <= self.motion_limits['azimuth']['lower'] or max(azs) >= self.motion_limits['azimith']['upper']: + session.set_status('running') + + times, azs, els, vas, ves, azflags, elflags = sh.from_file(params['filename']) + if min(azs) <= self.motion_limits['azimuth']['lower'] \ + or max(azs) >= self.motion_limits['azimuth']['upper']: return False, 'Azimuth location out of range!' - if min(els) <= self.motion_limits['elevation']['lower'] or max(els) >= self.motion_limits['elevation']['upper']: + if min(els) <= self.motion_limits['elevation']['lower'] \ + or max(els) >= self.motion_limits['elevation']['upper']: return False, 'Elevation location out of range!' - yield self._run_specified_scan(session, times, azs, els, vas, ves, azflags, elflags, azonly=False, simulator=simulator) - yield True, 'Track completed' - - @ocs_agent.param('azpts', type=tuple) - @ocs_agent.param('el', type=float) - @ocs_agent.param('azvel', type=float) - @ocs_agent.param('acc', type=float) - @ocs_agent.param('ntimes', type=int) - @ocs_agent.param('azonly', type=bool) - @ocs_agent.param('simulator', default=False, type=bool) - @inlineCallbacks - def constant_velocity_scan(self, session, params=None): - """constant_velocity_scan(azpts=None, el=None, azvel=None, acc=None, \ - ntimes=None, azonly=None, simulator=False) - - **Task** - Run a constant velocity scan. - - Parameters: - azpts (tuple): spatial endpoints of the azimuth scan - el (float): elevation (constant) throughout the scan - azvel (float): velocity of the azimuth axis - acc (float): acceleration of the turnaround - ntimes (int): number of times the platform traverses - between azimuth endpoints - azonly (bool): option for scans with azimuth motion and - elevation in Stop - simulator (bool): toggle option for ACU simulator - - """ - azpts = params['azpts'] - el = params['el'] - azvel = params['azvel'] - acc = params['acc'] - ntimes = params['ntimes'] - azonly = params['azonly'] - simulator = params['simulator'] - if abs(acc) > self.motion_limits['acc']: - raise ocs_agent.ParamError('Acceleration too great!') - if min(azpts) <= self.motion_limits['azimuth']['lower'] or max(azpts) >= self.motion_limits['azimuth']['upper']: - raise ocs_agent.ParamError('Azimuth location out of range!') - if el <= self.motion_limits['elevation']['lower'] or el >= self.motion_limits['elevation']['upper']: - raise ocs_agent.ParamError('Elevation location out of range!') - times, azs, els, vas, ves, azflags, elflags = sh.constant_velocity_scanpoints(azpts, el, azvel, acc, ntimes) - yield self._run_specified_scan(session, times, azs, els, vas, ves, azflags, elflags, azonly, simulator) - return True, 'Track completed.' - - @inlineCallbacks - def _run_specified_scan(self, session, times, azs, els, vas, ves, azflags, elflags, azonly, simulator): - ok, msg = self._try_set_job('control') - if not ok: - return ok, msg - self.log.info('_try_set_job ok') - - start_az = azs[0] - start_el = els[0] - end_az = azs[-1] - end_el = els[-1] - - self.data['uploads']['Start_Azimuth'] = start_az - self.data['uploads']['Start_Elevation'] = start_el - self.data['uploads']['Command_Type'] = 2 - - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - - # Follow the scan in ProgramTrack mode, then switch to Stop mode - all_lines = sh.ptstack_format(times, azs, els, vas, ves, azflags, elflags) - self.log.info('all_lines generated') - self.data['uploads']['PtStack_Lines'] = 'True' - if azonly: - yield self.acu_control.azmode('ProgramTrack') - else: - yield self.acu_control.mode('ProgramTrack') - m = yield self.acu_control.mode() - print(m) - self.log.info('mode is now ProgramTrack') - if simulator: - group_size = len(all_lines) - else: - group_size = 120 - spec = {'times': times, - 'azs': azs, - 'els': els, - 'vas': vas, - 'ves': ves, - 'azflags': azflags, - 'elflags': elflags, - } - while len(all_lines): - upload_lines = all_lines[:group_size] - all_lines = all_lines[group_size:] - upload_vals = front_group(spec, group_size) - spec = pop_first_vals(spec, group_size) - - for u in range(len(upload_vals['azs'])): - self.data['uploads']['PtStack_Time'] = upload_lines[u].split(';')[0] - self.data['uploads']['PtStack_Azimuth'] = upload_vals['azs'][u] - self.data['uploads']['PtStack_Elevation'] = upload_vals['els'][u] - self.data['uploads']['PtStack_AzVelocity'] = upload_vals['vas'][u] - self.data['uploads']['PtStack_ElVelocity'] = upload_vals['ves'][u] - self.data['uploads']['PtStack_AzFlag'] = upload_vals['azflags'][u] - self.data['uploads']['PtStack_ElFlag'] = upload_vals['elflags'][u] - self.data['uploads']['PtStack_ctime'] = uploadtime_to_ctime(self.data['uploads']['PtStack_Time'], int(self.data['status']['summary']['Year'])) - acu_upload = {'timestamp': self.data['uploads']['PtStack_ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload, from_reactor=True) - text = ''.join(upload_lines) - free_positions = self.data['status']['summary']['Free_upload_positions'] - while free_positions < 9899: - free_positions = self.data['status']['summary']['Free_upload_positions'] - yield dsleep(0.1) - yield self.acu_control.http.UploadPtStack(text) - self.log.info('Uploaded a group') - self.log.info('No more lines to upload') - free_positions = self.data['status']['summary']['Free_upload_positions'] - while free_positions < 9999: - yield dsleep(0.1) - modes = (self.data['status']['summary']['Azimuth_mode'], - self.data['status']['summary']['Elevation_mode']) - if azonly: - if modes[0] != 'ProgramTrack': - return False, 'Azimuth mode no longer ProgramTrack!!' - else: - if modes != ('ProgramTrack', 'ProgramTrack'): - return False, 'Fault triggered (not ProgramTrack)!' - free_positions = self.data['status']['summary']['Free_upload_positions'] - self.log.info('No more points in the queue') - current_az = self.data['broadcast']['Corrected_Azimuth'] - current_el = self.data['broadcast']['Corrected_Elevation'] - while round(current_az - end_az, 1) != 0.: - self.log.info('Waiting to settle at azimuth position') - yield dsleep(0.1) - current_az = self.data['broadcast']['Corrected_Azimuth'] - if not azonly: - while round(current_el - end_el, 1) != 0.: - self.log.info('Waiting to settle at elevation position') - yield dsleep(0.1) - current_el = self.data['broadcast']['Corrected_Elevation'] - yield dsleep(self.sleeptime) - yield self.acu_control.stop() - self.data['uploads']['Start_Azimuth'] = 0.0 - self.data['uploads']['Start_Elevation'] = 0.0 - self.data['uploads']['Command_Type'] = 0 - self.data['uploads']['PtStack_Lines'] = 'False' - self.data['uploads']['PtStack_Time'] = '000, 00:00:00.000000' - self.data['uploads']['PtStack_Azimuth'] = 0.0 - self.data['uploads']['PtStack_Elevation'] = 0.0 - self.data['uploads']['PtStack_AzVelocity'] = 0.0 - self.data['uploads']['PtStack_ElVelocity'] = 0.0 - self.data['uploads']['PtStack_AzFlag'] = 0 - self.data['uploads']['PtStack_ElFlag'] = 0 - acu_upload = {'timestamp': self.data['status']['summary']['ctime'], - 'block_name': 'ACU_upload', - 'data': self.data['uploads'] - } - self.agent.publish_to_feed('acu_upload', acu_upload) - self._set_job_done('control') - return True + # Modify times? + if params['adjust_times']: + times = times + time.time() - times[0] + 5. + + # Turn those lines into a generator. + all_lines = sh.ptstack_format(times, azs, els, vas, ves, azflags, elflags, + absolute=True) + + def line_batcher(lines, n=10): + while len(lines): + some, lines = lines[:n], lines[n:] + yield some + + point_gen = line_batcher(all_lines) + step_time = np.median(np.diff(times)) + + ok, err = yield self._run_track(session, point_gen, step_time, + azonly=params['azonly']) + return ok, err + + @ocs_agent.param('az_endpoint1', type=float) + @ocs_agent.param('az_endpoint2', type=float) + @ocs_agent.param('az_speed', type=float) + @ocs_agent.param('az_accel', type=float) + @ocs_agent.param('el_endpoint1', type=float, default=None) + @ocs_agent.param('el_endpoint2', type=float, default=None) + @ocs_agent.param('el_speed', type=float, default=0.) + @ocs_agent.param('num_scans', type=float, default=None) + @ocs_agent.param('start_time', type=float, default=None) + @ocs_agent.param('wait_to_start', type=float, default=None) + @ocs_agent.param('step_time', type=float, default=None) + @ocs_agent.param('az_start', default='end', + choices=['end', 'mid', 'az_endpoint1', 'az_endpoint2', + 'mid_inc', 'mid_dec']) + @ocs_agent.param('az_only', type=bool, default=True) + @ocs_agent.param('scan_upload_length', type=float, default=None) @inlineCallbacks def generate_scan(self, session, params): - """generate_scan(az_endpoint1=None, az_endpoint2=None, az_speed=None, \ - acc=None, el_endpoint1=None, el_endpoint2=None, \ - el_speed=None, num_batches=None, start_time=None, \ - wait_to_start=None, step_time=None, batch_size=None, \ - az_start=None) + """generate_scan(az_endpoint1, az_endpoint2, \ + az_speed, az_accel, \ + el_endpoint1=None, el_endpoint2=None, \ + el_speed=None, \ + num_scans=None, start_time=None, \ + wait_to_start=None, step_time=None, \ + az_start='end', az_only=True, \ + scan_upload_length=None) **Process** - Scan generator, currently only works for constant-velocity az scans with fixed elevation. @@ -959,69 +1145,219 @@ def generate_scan(self, session, params): az_endpoint1 (float): first endpoint of a linear azimuth scan az_endpoint2 (float): second endpoint of a linear azimuth scan az_speed (float): azimuth speed for constant-velocity scan - acc (float): turnaround acceleration for a constant-velocity scan - el_endpoint1 (float): first endpoint of elevation motion - el_endpoint2 (float): second endpoint of elevation motion. For dev, - currently both el endpoints should be equal - el_speed (float): speed of motion for a scan with changing - elevation. For dev, currently set to 0.0 - num_batches (int or None): sets the number of batches for the - generator to create. Default value is None (interpreted as infinite - batches). - start_time (float or None): a ctime at which to start the scan. - Default is None, interpreted as now - wait_to_start (float): number of seconds to wait before starting a - scan. Default is 3 seconds - step_time (float): time between points on the constant-velocity - parts of the motion. Default is 0.1 s. Minimum 0.05 s - batch_size (int): number of values to produce in each iteration. - Default is 500. Batch size is reset to the length of one leg of the - motion if num_batches is not None. - az_start (str): part of the scan to start at. Options are: - 'az_endpoint1', 'az_endpoint2', 'mid_inc' (start in the middle of - the scan and start with increasing azimuth), 'mid_dec' (start in - the middle of the scan and start with decreasing azimuth). + az_accel (float): turnaround acceleration for a constant-velocity scan + el_endpoint1 (float): first endpoint of elevation motion. + In the present implementation, this will be the + constant elevation declared at every point in the + track. + el_endpoint2 (float): this is ignored. + el_speed (float): this is ignored. + num_scans (int or None): if not None, limits the scan to + the specified number of constant velocity legs. The + process will exit without error once that has + completed. + start_time (float or None): a unix timestamp giving the + time at which the scan should begin. The default is + None, which means the scan will start immediately (but + taking into account the value of wait_to_start). + wait_to_start (float): number of seconds to wait before + starting a scan, in the case that start_time is None. + The default is to compute a minimum time based on the + scan parameters and the ACU ramp-up algorithm; this is + typically 5-10 seconds. + step_time (float): time, in seconds, between points on the + constant-velocity parts of the motion. The default is + None, which will cause an appropriate value to be + chosen automatically (typically 0.1 to 1.0). + az_start (str): part of the scan to start at. To start at one + of the extremes, use 'az_endpoint1', 'az_endpoint2', or + 'end' (same as 'az_endpoint1'). To start in the midpoint + of the scan use 'mid_inc' (for first half-leg to have + positive az velocity), 'mid_dec' (negative az velocity), + or 'mid' (velocity oriented towards endpoint2). + az_only (bool): if True (the default), then only the + Azimuth axis is put in ProgramTrack mode, and the El axis + is put in Stop mode. + scan_upload_length (float): number of seconds for each set + of uploaded points. If this is not specified, the + track manager will try to use as short a time as is + reasonable. + """ - ok, msg = self._try_set_job('control') - if not ok: - return ok, msg - self.log.info('_try_set_job ok') az_endpoint1 = params.get('az_endpoint1') az_endpoint2 = params.get('az_endpoint2') az_speed = params.get('az_speed') - acc = params.get('acc') + az_accel = params.get('az_accel') el_endpoint1 = params.get('el_endpoint1') - scan_params = {k: params.get(k) for k in ['num_batches', 'start_time', - 'wait_to_start', 'step_time', 'batch_size', 'az_start'] - if params.get(k) is not None} + azonly = params.get('az_only', True) + scan_upload_len = params.get('scan_upload_length') + scan_params = {k: params.get(k) for k in [ + 'num_scans', 'num_batches', 'start_time', + 'wait_to_start', 'step_time', 'batch_size', 'az_start'] + if params.get(k) is not None} el_endpoint2 = params.get('el_endpoint2', el_endpoint1) el_speed = params.get('el_speed', 0.0) - yield self.acu_control.stop() + plan = sh.plan_scan(az_endpoint1, az_endpoint2, + el=el_endpoint1, v_az=az_speed, a_az=az_accel, + az_start=scan_params.get('az_start')) + + # Use the plan to set scan upload parameters. + if scan_params.get('step_time') is None: + scan_params['step_time'] = plan['step_time'] + if scan_params.get('wait_to_start') is None: + scan_params['wait_to_start'] = plan['wait_to_start'] + + step_time = scan_params['step_time'] + point_batch_count = None + if scan_upload_len: + point_batch_count = scan_upload_len / step_time + + session.set_status('running') + + # Verify we're good to move + ok, msg = yield self._check_ready_motion(session) + if not ok: + return False, msg + + # Seek to starting position + self.log.info(f'Moving to start position, az={plan["init_az"]}') + ok, msg = yield self._go_to_axis(session, 'Azimuth', plan['init_az']) + if not ok: + return False, f'Start position seek failed with message: {msg}' + + # Prepare the point generator. g = sh.generate_constant_velocity_scan(az_endpoint1=az_endpoint1, az_endpoint2=az_endpoint2, - az_speed=az_speed, acc=acc, + az_speed=az_speed, acc=az_accel, el_endpoint1=el_endpoint1, el_endpoint2=el_endpoint2, el_speed=el_speed, + az_first_pos=plan['init_az'], **scan_params) - self.acu_control.mode('ProgramTrack') - while self.jobs['control'] == 'run': - lines = next(g) - current_lines = lines - group_size = 250 - while len(current_lines): - upload_lines = current_lines[:group_size] - text = ''.join(upload_lines) - current_lines = current_lines[group_size:] + + return (yield self._run_track( + session=session, point_gen=g, step_time=step_time, + azonly=azonly, point_batch_count=point_batch_count)) + + @inlineCallbacks + def _run_track(self, session, point_gen, step_time, azonly=False, + point_batch_count=None): + """Run a ProgramTrack track scan, with points provided by a + generator. + + Args: + session: session object for the parent operation. + point_gen: generator that yields points + step_time: the minimum time between point track points. + This is used to guarantee that points are uploaded + sufficiently in advance for the servo unit to process + them. + azonly: set to True to leave the el axis locked. + point_batch_count: number of points to include in batch + uploads. This parameter can be used to increase the value + beyond the minimum set internally based on step_time. + + Returns: + Tuple (success, msg) where success is a bool. + + """ + # The approximate loop time + LOOP_STEP = 0.1 # seconds + + # Minimum number of points to have in the stack. While the + # docs strictly require 4, this number should be at least 1 + # more than that to allow for rounding when we are setting the + # refill threshold. + MIN_STACK_POP = 6 # points + + if point_batch_count is None: + point_batch_count = 0 + + STACK_REFILL_THRESHOLD = FULL_STACK - \ + max(MIN_STACK_POP + LOOP_STEP / step_time, point_batch_count) + STACK_TARGET = FULL_STACK - \ + max(MIN_STACK_POP * 2 + LOOP_STEP / step_time, point_batch_count * 2) + + with self.azel_lock.acquire_timeout(0, job='generate_scan') as acquired: + + if not acquired: + return False, f"Operation failed: {self.azel_lock.job} is running." + if session.status not in ['starting', 'running']: + return False, "Operation aborted before motion began." + + yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', + 'Clear Stack') + yield dsleep(0.2) + + if azonly: + yield self.acu_control.azmode('ProgramTrack') + else: + yield self.acu_control.mode('ProgramTrack') + + yield dsleep(0.5) + + # Values for mode are: + # - 'go' -- keep uploading points until trajectory is complete + # - 'stop' -- do not request more points from generator; finish the ones you have. + # - 'abort' -- do not upload more points; exit loop and clear stack. + mode = 'go' + + lines = [] + last_mode = None + + while True: + current_modes = {'Az': self.data['status']['summary']['Azimuth_mode'], + 'El': self.data['status']['summary']['Elevation_mode'], + 'Remote': self.data['status']['platform_status']['Remote_mode']} free_positions = self.data['status']['summary']['Free_upload_positions'] - while free_positions < 5099: - yield dsleep(0.1) - free_positions = self.data['status']['summary']['Free_upload_positions'] - yield self.acu_control.http.UploadPtStack(text) - yield self.acu_control.stop() - self._set_job_done('control') - return True, 'Track generation ended cleanly' + + if last_mode != mode: + self.log.info(f'scan mode={mode}, line_buffer={len(lines)}, track_free={free_positions}') + last_mode = mode + + if mode != 'abort': + # Reasons we might decide to abort ... + if current_modes['Az'] != 'ProgramTrack': + self.log.warn('Unexpected mode transition!') + mode = 'abort' + if current_modes['Remote'] == 0: + self.log.warn('ACU no longer in remote mode!') + mode = 'abort' + if session.status == 'stopping': + mode = 'abort' + + if mode == 'abort': + lines = [] + + while mode == 'go' and len(lines) < 100: + try: + lines.extend(next(point_gen)) + except StopIteration: + mode = 'stop' + + if len(lines) and free_positions >= STACK_REFILL_THRESHOLD: + + group_size = max(int(free_positions - STACK_TARGET), 1) + lines, upload_lines = lines[group_size:], lines[:group_size] + text = ''.join(upload_lines) + yield self.acu_control.http.UploadPtStack(text) + + if len(lines) == 0 and free_positions >= FULL_STACK - 1: + break + + yield dsleep(LOOP_STEP) + + # Go to Stop mode? + # yield self.acu_control.stop() + + # Clear the stack, but wait a bit or it can cause a fault. + # Yes, sometimes you have to wait a very long time ... + yield dsleep(10) + yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', + 'Clear Stack') + + return True, 'Track ended cleanly' def add_agent_args(parser_in=None): @@ -1037,7 +1373,6 @@ def main(args=None): args = site_config.parse_args(agent_class='ACUAgent', parser=parser, args=args) - agent, runner = ocs_agent.init_site_agent(args) _ = ACUAgent(agent, args.acu_config) diff --git a/socs/agents/acu/drivers.py b/socs/agents/acu/drivers.py index 513f5b5d2..df5d76a4a 100644 --- a/socs/agents/acu/drivers.py +++ b/socs/agents/acu/drivers.py @@ -1,7 +1,13 @@ +import calendar +import datetime +import math import time import numpy as np +#: The number of seconds in a day. +DAY = 86400 + def constant_velocity_scanpoints(azpts, el, azvel, acc, ntimes): """ @@ -106,28 +112,24 @@ def from_file(filename): the file, all flags are set to 0 to accommodate non-linear scans """ info = np.load(filename) - if len(info) < 5: - raise TypeError('Not enough fields in numpy file! Expected ' - '5 fields.') + if len(info) not in [5, 7]: + raise ValueError(f'Unexpected field count ({len(info)}) in {filename}') conctimes = info[0] concaz = info[1] concel = info[2] concva = info[3] concve = info[4] if len(info) == 5: - az_flags = np.array([0 for x in range(len(conctimes))]) + az_flags = np.zeros(len(conctimes), int) el_flags = az_flags elif len(info) == 7: - az_flags = info[5] - el_flags = info[6] - else: - print('File has too many parameters!') - return False + az_flags = info[5].astype('int') + el_flags = info[6].astype('int') return conctimes, concaz, concel, concva, concve, az_flags, el_flags def ptstack_format(conctimes, concaz, concel, concva, concve, az_flags, - el_flags, start_offset=3., generator=False): + el_flags, start_offset=0, absolute=False): """ Produces a list of lines in the format necessary to upload to the ACU to complete a scan. Params are the outputs of from_file, @@ -144,20 +146,21 @@ def ptstack_format(conctimes, concaz, concel, concva, concve, az_flags, conctimes el_flags (list): Flags associated with elevation motions at conctimes - start_offset (float): Seconds to wait before starting the scan - generator (bool): Toggles the start time. When true, start time is - start_offset, otherwise start time is time.time() + start_offset + start_offset (float): Offset, in seconds, to apply to all + timestamps. + absolute (bool): If true, timestamps are taken at face value, + and only start_offset is added. If false, then the current + time is also added (but note that if the first timestamp + is 0, then you will need to also pass start_offset > 0). Returns: list: Lines in the correct format to upload to the ACU """ fmt = '%j, %H:%M:%S' - if generator: - start_time = start_offset - else: - start_time = time.time() + start_offset - true_times = [start_time + i for i in conctimes] + if not absolute: + start_offset = time.time() + start_offset + true_times = [start_offset + i for i in conctimes] fmt_times = [time.strftime(fmt, time.gmtime(t)) + ('{tt:.6f}'.format(tt=t % 1.))[1:] for t in true_times] @@ -171,14 +174,47 @@ def ptstack_format(conctimes, concaz, concel, concva, concve, az_flags, return all_lines +def timecode(acutime, now=None): + """Takes the time code produced by the ACU status stream and returns + a unix timestamp. + + Parameters: + acutime (float): The time recorded by the ACU status stream, + corresponding to the fractional day of the year. + now (float): The time, as unix timestamp, to assume it is now. + This is for testing, it defaults to time.time(). + + """ + sec_of_day = (acutime - 1) * DAY + if now is None: + now = time.time() # testing + + # This guard protects us at end of year, when time.time() and + # acutime might correspond to different years. + if acutime > 180: + context = datetime.datetime.utcfromtimestamp(now - 30 * DAY) + else: + context = datetime.datetime.utcfromtimestamp(now + 30 * DAY) + + year = context.year + gyear = calendar.timegm(time.strptime(str(year), '%Y')) + comptime = gyear + sec_of_day + return comptime + + def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, acc, el_endpoint1, el_endpoint2, - el_speed, num_batches=None, - start_time=None, wait_to_start=3., - step_time=0.1, batch_size=500, - az_start='mid_inc', ptstack_fmt=True): - """ - Python generator to produce times, azimuth and elevation positions, + el_speed=0, + num_batches=None, + num_scans=None, + start_time=None, + wait_to_start=10., + step_time=1., + batch_size=500, + az_start='mid_inc', + az_first_pos=None, + ptstack_fmt=True): + """Python generator to produce times, azimuth and elevation positions, azimuth and elevation velocities, azimuth and elevation flags for arbitrarily long constant-velocity azimuth scans. @@ -196,53 +232,62 @@ def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, num_batches (int or None): sets the number of batches for the generator to create. Default value is None (interpreted as infinite batches). + num_scans (int or None): if not None, limits the points + returned to the specified number of constant velocity legs. start_time (float or None): a ctime at which to start the scan. Default is None, which is interpreted as starting now + wait_to_start. wait_to_start (float): number of seconds to wait between - start_time and when the scan actually starts. Default is 3 seconds. + start_time and when the scan actually starts. Default is 10 seconds. step_time (float): time between points on the constant-velocity - parts of the motion. Default value is 0.1 seconds. Minimum value is + parts of the motion. Default value is 1.0 seconds. Minimum value is 0.05 seconds. batch_size (int): number of values to produce in each iteration. Default is 500. Batch size is reset to the length of one leg of the motion if num_batches is not None. - az_start (str): part of the scan to start at. Options are: - 'az_endpoint1', 'az_endpoint2', 'mid_inc' (start in the middle of - the scan and start with increasing azimuth), 'mid_dec' (start in - the middle of the scan and start with decreasing azimuth). + az_start (str): part of the scan to start at. To start at one + of the extremes, use 'az_endpoint1', 'az_endpoint2', or + 'end' (same as 'az_endpoint1'). To start in the midpoint + of the scan use 'mid_inc' (for first half-leg to have + positive az velocity), 'mid_dec' (negative az velocity), + or 'mid' (velocity oriented towards endpoint2). + az_first_pos (float): If not None, the first az scan will + start at this position (but otherwise proceed in the same + starting direction). ptstack_fmt (bool): determine whether values are produced with the necessary format to upload to the ACU. If False, this function will produce lists of time, azimuth, elevation, azimuth velocity, elevation velocity, azimuth flags, and elevation flags. Default is True. + """ az_min = min(az_endpoint1, az_endpoint2) az_max = max(az_endpoint1, az_endpoint2) if az_max == az_min: raise ValueError('Generator requires two different az endpoints!') - if az_start in ['az_endpoint1', 'az_endpoint2']: - if az_start == 'az_endpoint1': + if az_start in ['az_endpoint1', 'az_endpoint2', 'end']: + if az_start in ['az_endpoint1', 'end']: az = az_endpoint1 else: az = az_endpoint2 - if az == az_min: - increasing = True - az_vel = az_speed - elif az == az_max: - increasing = False - az_vel = -1 * az_speed - elif az_start in ['mid_inc', 'mid_dec']: + increasing = (az == az_min) + elif az_start in ['mid_inc', 'mid_dec', 'mid']: az = (az_endpoint1 + az_endpoint2) / 2 - if az_start == 'mid_inc': + if az_start == 'mid': + increasing = az_endpoint2 > az_endpoint1 + elif az_start == 'mid_inc': increasing = True - az_vel = az_speed else: increasing = False - az_vel = -1 * az_speed else: raise ValueError('az_start value not supported. Choose from ' 'az_endpoint1, az_endpoint2, mid_inc, mid_dec') + az_vel = az_speed if increasing else -az_speed + + # Bias the starting point for the first leg? + if az_first_pos is not None: + az = az_first_pos + if start_time is None: t0 = time.time() + wait_to_start else: @@ -262,8 +307,17 @@ def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, else: stop_iter = num_batches batch_size = int(np.ceil((az_max - az_min) / daz)) + + def dec_num_scans(): + nonlocal num_scans + if num_scans is not None: + num_scans -= 1 + + def check_num_scans(): + return num_scans is None or num_scans > 0 + i = 0 - while i < stop_iter: + while i < stop_iter and check_num_scans(): i += 1 point_block = [[], [], [], [], [], [], []] for j in range(batch_size): @@ -275,6 +329,7 @@ def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, point_block[5].append(az_flag) point_block[6].append(el_flag) t += step_time + if increasing: if az <= (az_max - 2 * daz): az += daz @@ -290,6 +345,7 @@ def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, az_flag = 1 el_flag = 0 increasing = False + dec_num_scans() else: az_remaining = az_max - az time_remaining = az_remaining / az_speed @@ -315,6 +371,7 @@ def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, az_flag = 1 el_flag = 0 increasing = True + dec_num_scans() else: az_remaining = az - az_min time_remaining = az_remaining / az_speed @@ -325,18 +382,121 @@ def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed, az_flag = 2 el_flag = 0 increasing = False + + if not check_num_scans(): + # Kill the velocity on the last point and exit -- this + # was recommended at LAT FAT for smoothly stopping the + # motino at end of program. + point_block[3][-1] = 0 + point_block[4][-1] = 0 + break + if ptstack_fmt: yield ptstack_format(point_block[0], point_block[1], point_block[2], point_block[3], point_block[4], point_block[5], - point_block[6], generator=True) + point_block[6], + start_offset=3, absolute=True) else: yield (point_block[0], point_block[1], point_block[2], point_block[3], point_block[4], point_block[5], point_block[6]) -# if __name__ == "__main__": -# print(time.time()) -# times, azs, els, vas, ves, azf, elf = linear_turnaround_scanpoints((120., 130.), 55., 1., 4, 2000) -# print(time.time()) +def plan_scan(az_end1, az_end2, el, v_az=1, a_az=1, az_start=None): + """Determine some important parameters for running a ProgramTrack + scan with the desired end points, velocity, and mean turn-around + acceleration. + + These get complicated in the limit of high velocity and narrow + scan. + + Returns: + + A dict with outputs of the calculations. The following items + must be considered when generating and posting the track points: + + - 'step_time': The recommended track point separation, in + seconds. + - 'wait_to_start': The minimum time (s) between initiating + ProgramTrack mode and the first uploaded point's timestamp. + - 'init_az': The az (deg) at which to position the telescope + before beginning the scan. This takes into account any "ramp + up" that needs to occur and the fact that such ramp up needs + to be finished before the ACU starts profiling the first + turn-around. + + The following dict items provide additional detail / + intermediate results: + + - 'scan_start_buffer': Minimum amount (deg of az) by which to + shift the start of the first scan leg in order to satisfy the + requirements for az_prep and az_rampup. This ultimately is + what can make init_az different from the natural first leg + starting point. This parameter is always non-negative. + - 'turnprep_buffer': Minimum azimuth travel required for + ProgramTrack to prepare a turn-around. + - 'rampup_buffer': Minimum azimuth travel required for + ProgramTrack to ramp up to the first leg velocity. Degrees, + positive. + - 'rampup_time': Number of seconds before the first track point + where the platform could start moving (as part of smooth + acceleration into the initial velocity). + + """ + # Convert Agent-friendly arguments to az/throw/init + az = (az_end1 + az_end2) / 2 + throw = (az_end2 - az_end1) / 2 + + if az_start in [None, 'mid']: + init = 'mid' + elif az_start == 'mid_inc': + init = 'mid' + throw = abs(throw) + elif az_start == 'mid_dec': + init = 'mid' + throw = -abs(throw) + elif az_start in ['az_endpoint1', 'end']: + init = 'end' + elif az_start in ['az_endpoint2']: + init = 'end' + throw = -throw + else: + raise ValueError(f'Unexpected az_start={az_start}') + + # Info to pass back. + plan = {} + + # Point time separation: at least 5 points per leg, preferably 10. + dt = 2 * abs(throw / v_az) / 10 + dt = min(max(dt, 0.1), 1.0) + assert (2 * abs(throw / v_az) / dt >= 5) + plan['step_time'] = dt + + # Turn around prep distance (deg)? 5 point periods, times the vel. + turnprep_buffer = 5 * dt * v_az + + # Ramp-up distance needed + a0 = 1. # Peak accel of ramp-up... + rampup_buffer = v_az**2 / a0 + plan['turnprep_buffer'] = turnprep_buffer + plan['rampup_buffer'] = rampup_buffer + + # Any az ramp-up prep required? + if init == 'mid': + scan_start_buffer = max(turnprep_buffer + rampup_buffer - abs(throw), 0) + elif init == 'end': + scan_start_buffer = max(turnprep_buffer + rampup_buffer - 2 * abs(throw), 0) + plan['scan_start_buffer'] = scan_start_buffer + + # Set wait time (this comes out a little lower than its supposed to...) + # plan['wait_time'] = v_az / a0 * 2 + plan['rampup_time'] = v_az / a0 + plan['wait_to_start'] = max(5, plan['rampup_time'] * 1.2) + + # Fill out some other useful info... + plan['init_az'] = az - math.copysign(scan_start_buffer, throw) + if init == 'end': + plan['init_az'] -= throw + + return plan