diff --git a/socs/agents/acu/agent.py b/socs/agents/acu/agent.py index 5ed66a231..78806b3ce 100644 --- a/socs/agents/acu/agent.py +++ b/socs/agents/acu/agent.py @@ -11,7 +11,7 @@ import yaml from autobahn.twisted.util import sleep as dsleep from ocs import ocs_agent, site_config -from ocs.ocs_twisted import TimeoutLock +from ocs.ocs_twisted import Pacemaker, TimeoutLock from soaculib.twisted_backend import TwistedHttpBackend from twisted.internet import protocol, reactor, threads from twisted.internet.defer import DeferredList, inlineCallbacks @@ -176,6 +176,14 @@ def __init__(self, agent, acu_config='guess', exercise_plan=None, tclient._HTTP11ClientFactory.noisy = False + # Structure for the broadcast process to communicate state to + # the monitor process, for a data quality feed. + self._broadcast_qual = { + 'timestamp': time.time(), + 'active': False, + 'time_offset': 0, + } + self.acu_control = aculib.AcuControl( acu_config, backend=TwistedHttpBackend(persistent=False)) self.acu_read = aculib.AcuControl( @@ -243,6 +251,14 @@ def __init__(self, agent, acu_config='guess', exercise_plan=None, record=True, agg_params=basic_agg_params, buffer_time=1) + self.agent.register_feed('sun', + record=True, + agg_params=basic_agg_params, + buffer_time=0) + self.agent.register_feed('data_qual', + record=True, + agg_params=basic_agg_params, + buffer_time=0) agent.register_task('go_to', self.go_to, blocking=False, @@ -493,6 +509,8 @@ def monitor(self, session, params): 'Status3rdAxis': j2, 'StatusResponseRate': n_ok / (query_t - report_t)}) + qual_pacer = Pacemaker(.1) + was_remote = False last_resp_rate = None data_blocks = {} @@ -515,6 +533,25 @@ def monitor(self, session, params): n_ok = 0 session.data.update({'StatusResponseRate': resp_rate}) + if qual_pacer.next_sample <= time.time(): + # Publish UDP data health feed + qual_pacer.sleep() # should be instantaneous, just update counters + bq = self._broadcast_qual + bq_offset = bq['time_offset'] + if bq_offset is None: + bq_offset = 0. + bq_ok = (bq['active'] and (now - bq['timestamp'] < 5) + and abs(bq_offset) < 1.) + block = { + 'timestamp': time.time(), + 'block_name': 'qual0', + 'data': { + 'Broadcast_stream_ok': int(bq_ok), + 'Broadcast_recv_offset': bq_offset, + } + } + self.agent.publish_to_feed('data_qual', block) + try: j = yield self.acu_read.http.Values(self.acu8100) if self.acu3rdaxis: @@ -736,6 +773,10 @@ def broadcast(self, session, params): FMT = self.udp_schema['format'] FMT_LEN = struct.calcsize(FMT) UDP_PORT = self.udp['port'] + + # The udp_data list is used as a queue; it contains + # struct-unpacked samples from the UDP stream in the form + # (time_received, data). udp_data = [] fields = self.udp_schema['fields'] session.data = {} @@ -745,11 +786,12 @@ def broadcast(self, session, params): class MonitorUDP(protocol.DatagramProtocol): def datagramReceived(self, data, src_addr): + now = time.time() host, port = src_addr offset = 0 while len(data) - offset >= FMT_LEN: d = struct.unpack(FMT, data[offset:offset + FMT_LEN]) - udp_data.append(d) + udp_data.append((now, d)) offset += FMT_LEN handler = reactor.listenUDP(int(UDP_PORT), MonitorUDP()) @@ -758,21 +800,28 @@ def datagramReceived(self, data, src_addr): for i in range(2, len(fields)): influx_data[fields[i].replace(' ', '_') + '_bcast_influx'] = [] + best_dt = None + active = True last_packet_time = time.time() while session.status in ['running']: now = time.time() + if len(udp_data) >= 200: if not active: self.log.info('UDP packets are being received.') active = True last_packet_time = now + best_dt = None process_data = udp_data[:200] udp_data = udp_data[200:] - for d in process_data: + for recv_time, d in process_data: data_ctime = sh.timecode(d[0] + d[1] / sh.DAY) + if best_dt is None or abs(recv_time - data_ctime) < best_dt: + best_dt = recv_time - data_ctime + self.data['broadcast']['Time'] = data_ctime influx_data['Time_bcast_influx'].append(data_ctime) for i in range(2, len(d)): @@ -782,8 +831,7 @@ def datagramReceived(self, data, src_addr): 'block_name': 'ACU_broadcast', 'data': self.data['broadcast'] } - self.agent.publish_to_feed('acu_udp_stream', - acu_udp_stream, from_reactor=True) + self.agent.publish_to_feed('acu_udp_stream', acu_udp_stream) influx_means = {} for key in influx_data.keys(): influx_means[key] = np.mean(influx_data[key]) @@ -792,7 +840,7 @@ def datagramReceived(self, data, src_addr): 'block_name': 'ACU_bcast_influx', 'data': influx_means, } - self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx, from_reactor=True) + self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx) sd = {} for ky in influx_means: sd[ky.split('_bcast_influx')[0]] = influx_means[ky] @@ -810,9 +858,13 @@ def datagramReceived(self, data, src_addr): except Exception as err: self.log.info('Exception while trying to enable stream: {err}', err=err) next_reconfig += 60 - yield dsleep(1) - yield dsleep(0.005) + self._broadcast_qual = { + 'timestamp': now, + 'active': active, + 'time_offset': best_dt, + } + yield dsleep(.01) handler.stopListening() return True, 'Acquisition exited cleanly.' @@ -1984,6 +2036,28 @@ def _notify_recomputed(result): self.sun = new_sun req_out = False + def lookup(keys, tree): + if isinstance(keys, str): + keys = [keys] + if len(keys) == 0: + if isinstance(tree, (bool, np.bool_)): + return int(tree) + return tree + return lookup(keys[1:], tree[keys[0]]) + + # Feed -- unpack some elements of session.data + feed_keys = { + 'sun_avoidance': ('active_avoidance', int), + 'sun_az': (('sun_pos', 'sun_azel', 0), float), + 'sun_el': (('sun_pos', 'sun_azel', 1), float), + 'sun_dist': (('sun_pos', 'sun_dist'), float), + 'sun_safe_time': (('sun_pos', 'sun_safe_time'), float), + } + for k in ['warning_zone', 'danger_zone', + 'escape_triggered', 'escape_active']: + feed_keys[f'sun_{k}'] = (('avoidance', k), int) + feed_pacer = Pacemaker(.1) + req_out = False self.sun = None last_panic = 0 @@ -2078,6 +2152,16 @@ def _notify_recomputed(result): # Update session. session.data.update(new_data) + # Publish -- only if we have the sun pos though.. + if sun_is_real and safety_known and feed_pacer.next_sample <= time.time(): + feed_pacer.sleep() # should be instantaneous, just update counters + block = {'timestamp': time.time(), + 'block_name': 'sun0', + 'data': {}} + for kshort, (keys, cast) in feed_keys.items(): + block['data'][kshort] = cast(lookup(keys, new_data)) + self.agent.publish_to_feed('sun', block) + yield dsleep(1) @ocs_agent.param('reset', type=bool, default=None)