-
Notifications
You must be signed in to change notification settings - Fork 18
/
sbn_adapter.py
203 lines (158 loc) · 7.48 KB
/
sbn_adapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform"
#
# Copyright © 2023 United States Government as represented by the Administrator of
# the National Aeronautics and Space Administration. No copyright is claimed in the
# United States under Title 17, U.S. Code. All Other Rights Reserved.
#
# Licensed under the NASA Open Source Agreement version 1.3
# See "NOSA GSC-19165-1 OnAIR.pdf"
"""
SBN_Adapter class
Receives messages from SBN, serves as a data source for sim.py
"""
import threading
import time
import datetime
import os
import json
from onair.data_handling.on_air_data_source import OnAirDataSource
from onair.data_handling.on_air_data_source import ConfigKeyError
from ctypes import *
import sbn_python_client as sbn
import message_headers as msg_hdr
from onair.data_handling.parser_util import *
# Note: The double buffer does not clear between switching. If fresh data doesn't come in, stale data is returned (delayed by 1 frame)
class DataSource(OnAirDataSource):
def __init__(self, data_file, meta_file, ss_breakdown=False):
super().__init__(data_file, meta_file, ss_breakdown)
self.new_data_lock = threading.Lock()
self.new_data = False
self.double_buffer_read_index = 0
self.connect()
def connect(self):
"""Establish connection to SBN and launch listener thread."""
time.sleep(2)
os.chdir("cf")
sbn.sbn_load_and_init()
os.chdir("../")
print("SBN_Adapter Running")
# Launch thread to listen for messages
self.listener_thread = threading.Thread(target=self.message_listener_thread)
self.listener_thread.start()
# subscribe to message IDs
for msgID in self.msgID_lookup_table.keys():
sbn.subscribe(msgID)
def gather_field_names(self, field_name, field_type):
# recursively find field names in DFS manner
def gather_field_names_helper(field_name: str, field_type, field_names: list):
if "message_headers" in str(field_type) and hasattr(field_type, "_fields_"):
for sub_field_name, sub_field_type in field_type._fields_:
gather_field_names_helper(
field_name + "." + sub_field_name, sub_field_type, field_names
)
else:
field_names.append(field_name)
field_names = []
gather_field_names_helper(field_name, field_type, field_names)
return field_names
def parse_meta_data_file(self, meta_data_file, ss_breakdown):
self.msgID_lookup_table = {}
self.currentData = []
# pull out message ids
file = open(meta_data_file, "rb")
file_str = file.read()
meta_config = json.loads(file_str)
file.close()
if "channels" not in meta_config.keys():
raise ConfigKeyError(
f"Config file: '{meta_data_file}' " "missing required key 'channels'"
)
# Copy message ID table from .json, convert string hex to ints for ID
for key in meta_config["channels"]:
self.msgID_lookup_table[int(key, 16)] = meta_config["channels"][key]
# Use eval() to convert class name from .json to match with message_headers.py
for key in self.msgID_lookup_table:
msg_struct_name = self.msgID_lookup_table[key][1]
self.msgID_lookup_table[key][1] = eval("msg_hdr." + msg_struct_name)
# populate headers and reserve space for data
for x in range(0, 2):
self.currentData.append({"headers": [], "data": []})
for msgID in self.msgID_lookup_table.keys():
app_name, data_struct = self.msgID_lookup_table[msgID]
struct_name = data_struct.__name__
# Skip the header, walk through the stuct
for field_name, field_type in data_struct._fields_[1:]:
field_names = self.gather_field_names(
app_name + "." + field_name, field_type
)
for field_name in field_names:
self.currentData[x]["headers"].append(field_name)
self.currentData[x]["data"].append(
[0]
) # initialize all the data arrays with zero
return extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown)
def process_data_file(self, data_file):
print("SBN Adapter ignoring data file (telemetry should be live)")
def get_vehicle_metadata(self):
return self.all_headers, self.binning_configs["test_assignments"]
def get_next(self):
"""Provides the latest data from SBN in a dictionary of lists structure.
Returned data is safe to use until the next get_next call.
Blocks until new data is available."""
data_available = False
while not data_available:
with self.new_data_lock:
data_available = self.has_data()
if not data_available:
time.sleep(0.01)
read_index = 0
with self.new_data_lock:
self.new_data = False
self.double_buffer_read_index = (self.double_buffer_read_index + 1) % 2
read_index = self.double_buffer_read_index
return self.currentData[read_index]["data"]
def has_more(self):
"""Returns true if the adapter has more data.
For now always true: connection should be live as long as cFS is running.
TODO: allow to detect if cFS/the connection has died"""
return True
def message_listener_thread(self):
"""Thread to listen for incoming messages from SBN"""
while True:
generic_recv_msg_p = POINTER(sbn.sbn_data_generic_t)()
sbn.recv_msg(generic_recv_msg_p)
msgID = generic_recv_msg_p.contents.TlmHeader.Primary.StreamId
app_name, data_struct = self.msgID_lookup_table[msgID]
recv_msg_p = POINTER(data_struct)()
recv_msg_p.contents = generic_recv_msg_p.contents
recv_msg = recv_msg_p.contents
# prints out the data from the message to the terminal
print(
", ".join(
[
field_name + ": " + str(getattr(recv_msg, field_name))
for field_name, field_type in recv_msg._fields_[1:]
]
)
)
# TODO: Lock needed here?
self.get_current_data(recv_msg, data_struct, app_name)
def get_current_data(self, recv_msg, data_struct, app_name):
# TODO: Lock needed here?
current_buffer = self.currentData[(self.double_buffer_read_index + 1) % 2]
# Skip the header, walk through the stuct
for field_name, field_type in recv_msg._fields_[1:]:
field_names = self.gather_field_names(field_name, field_type)
for name in field_names:
idx = current_buffer["headers"].index(app_name + "." + name)
# Pull the data out of the message buy walking down the nested types
data = ""
current_object = recv_msg
for sub_type in name.split("."):
current_object = getattr(current_object, sub_type)
data = str(current_object) # note does not work for arrays?
current_buffer["data"][idx] = data
with self.new_data_lock:
self.new_data = True
def has_data(self):
return self.new_data