Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACU: feeds for Sun info and UDP stream quality #570

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 92 additions & 8 deletions socs/agents/acu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import yaml
from autobahn.twisted.util import sleep as dsleep
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from soaculib.twisted_backend import TwistedHttpBackend
from twisted.internet import protocol, reactor, threads
from twisted.internet.defer import DeferredList, inlineCallbacks
Expand Down Expand Up @@ -176,6 +176,14 @@ def __init__(self, agent, acu_config='guess', exercise_plan=None,

tclient._HTTP11ClientFactory.noisy = False

# Structure for the broadcast process to communicate state to
# the monitor process, for a data quality feed.
self._broadcast_qual = {
'timestamp': time.time(),
'active': False,
'time_offset': 0,
}

self.acu_control = aculib.AcuControl(
acu_config, backend=TwistedHttpBackend(persistent=False))
self.acu_read = aculib.AcuControl(
Expand Down Expand Up @@ -243,6 +251,14 @@ def __init__(self, agent, acu_config='guess', exercise_plan=None,
record=True,
agg_params=basic_agg_params,
buffer_time=1)
self.agent.register_feed('sun',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
self.agent.register_feed('data_qual',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
agent.register_task('go_to',
self.go_to,
blocking=False,
Expand Down Expand Up @@ -493,6 +509,8 @@ def monitor(self, session, params):
'Status3rdAxis': j2,
'StatusResponseRate': n_ok / (query_t - report_t)})

qual_pacer = Pacemaker(.1)

was_remote = False
last_resp_rate = None
data_blocks = {}
Expand All @@ -515,6 +533,25 @@ def monitor(self, session, params):
n_ok = 0
session.data.update({'StatusResponseRate': resp_rate})

if qual_pacer.next_sample <= time.time():
# Publish UDP data health feed
qual_pacer.sleep() # should be instantaneous, just update counters
bq = self._broadcast_qual
bq_offset = bq['time_offset']
if bq_offset is None:
bq_offset = 0.
bq_ok = (bq['active'] and (now - bq['timestamp'] < 5)
and abs(bq_offset) < 1.)
block = {
'timestamp': time.time(),
'block_name': 'qual0',
'data': {
'Broadcast_stream_ok': int(bq_ok),
'Broadcast_recv_offset': bq_offset,
}
}
self.agent.publish_to_feed('data_qual', block)

try:
j = yield self.acu_read.http.Values(self.acu8100)
if self.acu3rdaxis:
Expand Down Expand Up @@ -736,6 +773,10 @@ def broadcast(self, session, params):
FMT = self.udp_schema['format']
FMT_LEN = struct.calcsize(FMT)
UDP_PORT = self.udp['port']

# The udp_data list is used as a queue; it contains
# struct-unpacked samples from the UDP stream in the form
# (time_received, data).
udp_data = []
fields = self.udp_schema['fields']
session.data = {}
Expand All @@ -745,11 +786,12 @@ def broadcast(self, session, params):

class MonitorUDP(protocol.DatagramProtocol):
def datagramReceived(self, data, src_addr):
now = time.time()
host, port = src_addr
offset = 0
while len(data) - offset >= FMT_LEN:
d = struct.unpack(FMT, data[offset:offset + FMT_LEN])
udp_data.append(d)
udp_data.append((now, d))
offset += FMT_LEN

handler = reactor.listenUDP(int(UDP_PORT), MonitorUDP())
Expand All @@ -758,21 +800,28 @@ def datagramReceived(self, data, src_addr):
for i in range(2, len(fields)):
influx_data[fields[i].replace(' ', '_') + '_bcast_influx'] = []

best_dt = None

active = True
last_packet_time = time.time()

while session.status in ['running']:
now = time.time()

if len(udp_data) >= 200:
if not active:
self.log.info('UDP packets are being received.')
active = True
last_packet_time = now
best_dt = None

process_data = udp_data[:200]
udp_data = udp_data[200:]
for d in process_data:
for recv_time, d in process_data:
data_ctime = sh.timecode(d[0] + d[1] / sh.DAY)
if best_dt is None or abs(recv_time - data_ctime) < best_dt:
best_dt = recv_time - data_ctime

self.data['broadcast']['Time'] = data_ctime
influx_data['Time_bcast_influx'].append(data_ctime)
for i in range(2, len(d)):
Expand All @@ -782,8 +831,7 @@ def datagramReceived(self, data, src_addr):
'block_name': 'ACU_broadcast',
'data': self.data['broadcast']
}
self.agent.publish_to_feed('acu_udp_stream',
acu_udp_stream, from_reactor=True)
self.agent.publish_to_feed('acu_udp_stream', acu_udp_stream)
influx_means = {}
for key in influx_data.keys():
influx_means[key] = np.mean(influx_data[key])
Expand All @@ -792,7 +840,7 @@ def datagramReceived(self, data, src_addr):
'block_name': 'ACU_bcast_influx',
'data': influx_means,
}
self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx, from_reactor=True)
self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx)
sd = {}
for ky in influx_means:
sd[ky.split('_bcast_influx')[0]] = influx_means[ky]
Expand All @@ -810,9 +858,13 @@ def datagramReceived(self, data, src_addr):
except Exception as err:
self.log.info('Exception while trying to enable stream: {err}', err=err)
next_reconfig += 60
yield dsleep(1)

yield dsleep(0.005)
self._broadcast_qual = {
'timestamp': now,
'active': active,
'time_offset': best_dt,
}
yield dsleep(.01)

handler.stopListening()
return True, 'Acquisition exited cleanly.'
Expand Down Expand Up @@ -1984,6 +2036,28 @@ def _notify_recomputed(result):
self.sun = new_sun
req_out = False

def lookup(keys, tree):
if isinstance(keys, str):
keys = [keys]
if len(keys) == 0:
if isinstance(tree, (bool, np.bool_)):
return int(tree)
return tree
return lookup(keys[1:], tree[keys[0]])

# Feed -- unpack some elements of session.data
feed_keys = {
'sun_avoidance': ('active_avoidance', int),
'sun_az': (('sun_pos', 'sun_azel', 0), float),
'sun_el': (('sun_pos', 'sun_azel', 1), float),
'sun_dist': (('sun_pos', 'sun_dist'), float),
'sun_safe_time': (('sun_pos', 'sun_safe_time'), float),
}
for k in ['warning_zone', 'danger_zone',
'escape_triggered', 'escape_active']:
feed_keys[f'sun_{k}'] = (('avoidance', k), int)
feed_pacer = Pacemaker(.1)

req_out = False
self.sun = None
last_panic = 0
Expand Down Expand Up @@ -2078,6 +2152,16 @@ def _notify_recomputed(result):
# Update session.
session.data.update(new_data)

# Publish -- only if we have the sun pos though..
if sun_is_real and safety_known and feed_pacer.next_sample <= time.time():
feed_pacer.sleep() # should be instantaneous, just update counters
block = {'timestamp': time.time(),
'block_name': 'sun0',
'data': {}}
for kshort, (keys, cast) in feed_keys.items():
block['data'][kshort] = cast(lookup(keys, new_data))
self.agent.publish_to_feed('sun', block)

yield dsleep(1)

@ocs_agent.param('reset', type=bool, default=None)
Expand Down