From de42587c1e0b197d8fe3434bc8f5eebfadc5f2c4 Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 21 Nov 2023 11:41:02 -0500 Subject: [PATCH 1/3] Added checks for GPS source --- gamutrf/mqtt_reporter.py | 83 +++++++------- gamutrf/worker.py | 236 ++++++++++++++++++++------------------- 2 files changed, 168 insertions(+), 151 deletions(-) diff --git a/gamutrf/mqtt_reporter.py b/gamutrf/mqtt_reporter.py index 1e21e7aa..c2262342 100644 --- a/gamutrf/mqtt_reporter.py +++ b/gamutrf/mqtt_reporter.py @@ -26,7 +26,7 @@ def __init__( self.compass = compass self.gps_server = gps_server self.mqttc = None - self.heading = "no heading" + self.heading = 'no heading' self.use_external_gps = use_external_gps self.use_external_heading = use_external_heading self.external_gps_server = external_gps_server @@ -37,16 +37,16 @@ def __init__( def log(path, prefix, start_time, record_args): try: with open( - os.path.join(path, f"mqtt-{prefix}-{start_time}.log"), - "a+", - encoding="utf-8", + os.path.join(path, f'mqtt-{prefix}-{start_time}.log'), + 'a+', + encoding='utf-8', ) as f: - f.write(f"{json.dumps(record_args)}\n") + f.write(f'{json.dumps(record_args)}\n') except FileNotFoundError as err: - logging.error(f"could not write to mqtt rssi log: {err}") + logging.error(f'could not write to mqtt rssi log: {err}') def connect(self): - logging.info(f"connecting to {self.mqtt_server}") + logging.info(f'connecting to {self.mqtt_server}') self.mqttc = mqtt.Client() self.mqttc.connect(self.mqtt_server) self.mqttc.loop_start() @@ -57,31 +57,37 @@ def get_heading(self): self.heading = float( json.loads( httpx.get( - f"http://{self.external_gps_server}:{self.external_gps_server_port}/heading" + f'http://{self.external_gps_server}:{self.external_gps_server_port}/heading' ).text - )["heading"] + )['heading'] ) except Exception as err: - logging.error("could not update external heading: %s", err) + logging.error('could not update external heading: %s', err) else: try: self.heading = str( - float(httpx.get(f"http://{self.gps_server}:8000/v1/heading").text) + float( + httpx.get(f'http://{self.gps_server}:8000/v1/heading').text) ) except Exception as err: - logging.error("could not update heading: %s", err) + logging.error('could not update heading: %s', err) def add_gps(self, publish_args): - if not self.gps_server or not self.use_external_gps: + if not self.gps_server and not self.external_gps_server: + logging.error('no gps_server or external_gps_server found') + return publish_args + if self.external_gps_server and not self.use_external_gps and not self.gps_server: + logging.error( + 'only external_gps_server found, but no use_external_gps flag') return publish_args publish_args.update( { - "position": [0, 0], - "altitude": None, - "gps_time": None, - "map_url": None, - "heading": self.heading, - "gps": "no fix", + 'position': [0, 0], + 'altitude': None, + 'gps_time': None, + 'map_url': None, + 'heading': self.heading, + 'gps': 'no fix', } ) @@ -90,26 +96,26 @@ def add_gps(self, publish_args): try: self.external_gps_msg = json.loads( httpx.get( - f"http://{self.external_gps_server}:{self.external_gps_server_port}/gps-data" + f'http://{self.external_gps_server}:{self.external_gps_server_port}/gps-data' ).text ) publish_args.update( { - "position": ( - self.external_gps_msg["latitude"], - self.external_gps_msg["longitude"], + 'position': ( + self.external_gps_msg['latitude'], + self.external_gps_msg['longitude'], ), - "altitude": self.external_gps_msg["altitude"], - "gps_time": self.external_gps_msg["time_usec"], - "map_url": None, - "heading": self.heading, - "gps": "fix", + 'altitude': self.external_gps_msg['altitude'], + 'gps_time': self.external_gps_msg['time_usec'], + 'map_url': None, + 'heading': self.heading, + 'gps': 'fix', } ) except Exception as err: - logging.error("could not update with external GPS: %s", err) + logging.error('could not update with external GPS: %s', err) # Use internal GPIO GPS else: @@ -121,16 +127,16 @@ def add_gps(self, publish_args): packet = gpsd.get_current() publish_args.update( { - "position": packet.position(), - "altitude": packet.altitude(), - "gps_time": packet.get_time().timestamp(), - "map_url": packet.map_url(), - "heading": self.heading, - "gps": "fix", + 'position': packet.position(), + 'altitude': packet.altitude(), + 'gps_time': packet.get_time().timestamp(), + 'map_url': packet.map_url(), + 'heading': self.heading, + 'gps': 'fix', } ) except (BrokenPipeError, gpsd.NoFixError, AttributeError) as err: - logging.error("could not update with GPS: %s", err) + logging.error('could not update with GPS: %s', err) return publish_args def publish(self, publish_path, publish_args): @@ -140,7 +146,7 @@ def publish(self, publish_path, publish_args): if self.mqttc is None: self.connect() publish_args = self.add_gps(publish_args) - publish_args["name"] = self.name + publish_args['name'] = self.name self.mqttc.publish(publish_path, json.dumps(publish_args)) except ( socket.gaierror, @@ -148,4 +154,5 @@ def publish(self, publish_path, publish_args): mqtt.WebsocketConnectionError, ValueError, ) as err: - logging.error(f"failed to publish to MQTT {self.mqtt_server}: {err}") + logging.error( + f'failed to publish to MQTT {self.mqtt_server}: {err}') diff --git a/gamutrf/worker.py b/gamutrf/worker.py index b53e9b49..8237105b 100644 --- a/gamutrf/worker.py +++ b/gamutrf/worker.py @@ -23,148 +23,156 @@ from gamutrf.sdr_recorder import get_recorder from gamutrf.sdr_recorder import RECORDER_MAP -WORKER_NAME = os.getenv("WORKER_NAME", socket.gethostbyname(socket.gethostname())) -ORCHESTRATOR = os.getenv("ORCHESTRATOR", "orchestrator") -ANTENNA = os.getenv("ANTENNA", "") +WORKER_NAME = os.getenv( + 'WORKER_NAME', socket.gethostbyname(socket.gethostname())) +ORCHESTRATOR = os.getenv('ORCHESTRATOR', 'orchestrator') +ANTENNA = os.getenv('ANTENNA', '') def argument_parser(): parser = argparse.ArgumentParser() parser.add_argument( - "--loglevel", - "-l", - help="Set logging level", - choices=["critical", "error", "warning", "info", "debug"], - default="info", + '--loglevel', + '-l', + help='Set logging level', + choices=['critical', 'error', 'warning', 'info', 'debug'], + default='info', ) parser.add_argument( - "--antenna", "-a", help="Antenna make/model", type=str, default=ANTENNA + '--antenna', '-a', help='Antenna make/model', type=str, default=ANTENNA ) parser.add_argument( - "--name", "-n", help="Name for the worker", type=str, default=WORKER_NAME + '--name', '-n', help='Name for the worker', type=str, default=WORKER_NAME ) parser.add_argument( - "--path", - "-P", - help="Path prefix for writing out samples to", + '--path', + '-P', + help='Path prefix for writing out samples to', type=str, - default="/data/gamutrf", + default='/data/gamutrf', ) parser.add_argument( - "--port", "-p", help="Port to run the API webserver on", type=int, default=8000 + '--port', '-p', help='Port to run the API webserver on', type=int, default=8000 ) parser.add_argument( - "--rotate_secs", - help="If > 0, rotate storage directories every N seconds", + '--rotate_secs', + help='If > 0, rotate storage directories every N seconds', type=int, default=3600, ) parser.add_argument( - "--sdr", - "-s", - help=f"Specify SDR to record with {list(RECORDER_MAP.keys())} or file", + '--sdr', + '-s', + help=f'Specify SDR to record with {list(RECORDER_MAP.keys())} or file', type=str, - default="ettus", + default='ettus', ) parser.add_argument( - "--sdrargs", - help=f"optional SDR arguments", + '--sdrargs', + help=f'optional SDR arguments', type=str, - default="", + default='', ) parser.add_argument( - "--freq_excluded", - "-e", + '--freq_excluded', + '-e', help='Freq range to exclude in MHz (e.g. "100-200")', - action="append", + action='append', default=[], ) - parser.add_argument("--gain", "-g", help="Gain in dB", default=30, type=int) + parser.add_argument('--gain', '-g', help='Gain in dB', + default=30, type=int) parser.add_argument( - "--mean_window", "-m", help="birdseye mean window size", default=128, type=int + '--mean_window', '-m', help='birdseye mean window size', default=128, type=int ) parser.add_argument( - "--rxb", help="Receive buffer size", default=int(1024 * 1024 * 10), type=int + '--rxb', help='Receive buffer size', default=int(1024 * 1024 * 10), type=int ) parser.add_argument( - "--qsize", help="Max request queue size", default=int(2), type=int + '--qsize', help='Max request queue size', default=int(2), type=int ) parser.add_argument( - "--mqtt_server", - help="MQTT server to report RSSI", + '--mqtt_server', + help='MQTT server to report RSSI', default=ORCHESTRATOR, type=str, ) parser.add_argument( - "--rssi_interval", - help="rate limit in seconds for RSSI updates to MQTT", + '--gps_server', + help='GPS Server to get lat,long, and heading', + default=ORCHESTRATOR, + type=str, + ) + parser.add_argument( + '--rssi_interval', + help='rate limit in seconds for RSSI updates to MQTT', default=1.0, type=float, ) parser.add_argument( - "--rssi_throttle", - help="rate limit RSSI calculations to 1 in n", + '--rssi_throttle', + help='rate limit RSSI calculations to 1 in n', default=10, type=int, ) parser.add_argument( - "--rssi_threshold", help="RSSI reporting threshold", default=-45, type=float + '--rssi_threshold', help='RSSI reporting threshold', default=-45, type=float ) external_rssi_parser = parser.add_mutually_exclusive_group(required=False) external_rssi_parser.add_argument( - "--rssi_external", - dest="rssi_external", - action="store_true", + '--rssi_external', + dest='rssi_external', + action='store_true', default=True, - help="proxy external RSSI", + help='proxy external RSSI', ) external_rssi_parser.add_argument( - "--no-rssi_external", - dest="rssi_external", - action="store_false", - help="do not use proxy external RSSI", + '--no-rssi_external', + dest='rssi_external', + action='store_false', + help='do not use proxy external RSSI', ) agc_parser = parser.add_mutually_exclusive_group(required=False) agc_parser.add_argument( - "--agc", dest="agc", action="store_true", default=True, help="use AGC" + '--agc', dest='agc', action='store_true', default=True, help='use AGC' ) agc_parser.add_argument( - "--no-agc", dest="agc", action="store_false", help="do not use AGC" + '--no-agc', dest='agc', action='store_false', help='do not use AGC' ) parser.add_argument( - "--sigmf", - dest="sigmf", + '--sigmf', + dest='sigmf', default=True, action=argparse.BooleanOptionalAction, - help="add sigmf meta file", + help='add sigmf meta file', ) parser.add_argument( - "--use_external_gps", - dest="use_external_gps", + '--use_external_gps', + dest='use_external_gps', default=False, action=argparse.BooleanOptionalAction, - help="Use external Pixhawk/MAVLINK GPS", + help='Use external Pixhawk/MAVLINK GPS', ) parser.add_argument( - "--use_external_heading", - dest="use_external_heading", + '--use_external_heading', + dest='use_external_heading', default=False, action=argparse.BooleanOptionalAction, - help="Use external (Pixhawk/MAVLINK) heading", + help='Use external (Pixhawk/MAVLINK) heading', ) parser.add_argument( - "--external_gps_server", - dest="external_gps_server", + '--external_gps_server', + dest='external_gps_server', default=ORCHESTRATOR, type=str, - help="server to query for external GPS data", + help='server to query for external GPS data', ) parser.add_argument( - "--external_gps_server_port", - dest="external_gps_server_port", - default="8888", + '--external_gps_server_port', + dest='external_gps_server_port', + default='8888', type=str, - help="server port to query for external GPS data", + help='server port to query for external GPS data', ) return parser @@ -189,10 +197,10 @@ def __init__(self, arguments): def on_get(self, _req, resp): resp.text = json.dumps( { - "version": __version__, - "sdr": self.arguments.sdr, - "path_prefix": self.arguments.path, - "freq_excluded": self.arguments.freq_excluded, + 'version': __version__, + 'sdr': self.arguments.sdr, + 'path_prefix': self.arguments.path, + 'freq_excluded': self.arguments.freq_excluded, } ) resp.content_type = falcon.MEDIA_TEXT @@ -213,7 +221,7 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate): status = None if self.q.full(): - status = "Request queue is full" + status = 'Request queue is full' else: status = self.sdr_recorder.validate_request( self.arguments.freq_excluded, center_freq, sample_count, sample_rate @@ -222,28 +230,28 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate): if status is None: self.q.put( { - "action": self.action, - "center_freq": int(center_freq), - "sample_count": int(sample_count), - "sample_rate": int(sample_rate), + 'action': self.action, + 'center_freq': int(center_freq), + 'sample_count': int(sample_count), + 'sample_rate': int(sample_rate), } ) - status = "Requsted recording" + status = 'Requsted recording' resp.status = falcon.HTTP_200 - resp.text = json.dumps({"status": status}) + resp.text = json.dumps({'status': status}) class Record(Action): def __init__(self, arguments, q, sdr_recorder): super().__init__(arguments, q, sdr_recorder) - self.action = "record" + self.action = 'record' class Rssi(Action): def __init__(self, arguments, q, sdr_recorder): super().__init__(arguments, q, sdr_recorder) - self.action = "record" + self.action = 'record' class API: @@ -252,7 +260,7 @@ def __init__(self, arguments): self.mqtt_reporter = MQTTReporter( name=self.arguments.name, mqtt_server=self.arguments.mqtt_server, - gps_server=ORCHESTRATOR, + gps_server=self.arguments.gps_server, compass=True, use_external_gps=self.arguments.use_external_gps, use_external_heading=self.arguments.use_external_heading, @@ -271,17 +279,17 @@ def __init__(self, arguments): self.app.add_route(self.version() + route, handler) def run_recorder(self, record_func): - logging.info("run recorder") + logging.info('run recorder') while True: - logging.info("awaiting request") + logging.info('awaiting request') action_args = self.q.get() - action = action_args["action"] - if action == "record": + action = action_args['action'] + if action == 'record': self.serve_recording(record_func, action_args) - elif action == "rssi": + elif action == 'rssi': self.serve_rssi(action_args) else: - logging.error("no such action: %s", action) + logging.error('no such action: %s', action) def record(self, center_freq, sample_count, sample_rate=20e6): return self.sdr_recorder.run_recording( @@ -298,37 +306,38 @@ def record(self, center_freq, sample_count, sample_rate=20e6): ) def serve_recording(self, record_func, record_args): - logging.info(f"got a request: {record_args}") + logging.info(f'got a request: {record_args}') record_status = record_func(**record_args) if record_status == -1: # TODO this only kills the thread, not the main process return record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish("gamutrf/record", record_args) + self.mqtt_reporter.publish('gamutrf/record', record_args) self.mqtt_reporter.log( - self.arguments.path, "record", self.start_time, record_args + self.arguments.path, 'record', self.start_time, record_args ) def report_rssi(self, record_args, reported_rssi, reported_time): - logging.info(f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') - record_args.update({"rssi": reported_rssi, "time": reported_time}) + logging.info( + f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') + record_args.update({'rssi': reported_rssi, 'time': reported_time}) record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish("gamutrf/rssi", record_args) + self.mqtt_reporter.publish('gamutrf/rssi', record_args) self.mqtt_reporter.log( - self.arguments.path, "rssi", self.start_time, record_args + self.arguments.path, 'rssi', self.start_time, record_args ) def process_rssi(self, record_args, sock): last_rssi_time = 0 duration = 0 - if record_args["sample_count"]: - duration = float(record_args["sample_count"]) / float( - record_args["sample_rate"] + if record_args['sample_count']: + duration = float(record_args['sample_count']) / float( + record_args['sample_rate'] ) start_time = time.time() while self.q.empty(): rssi_raw, _ = sock.recvfrom(FLOAT_SIZE) - rssi = struct.unpack("f", rssi_raw)[0] + rssi = struct.unpack('f', rssi_raw)[0] if rssi < self.arguments.rssi_threshold: continue if rssi > MAX_RSSI: @@ -349,45 +358,45 @@ def proxy_rssi(self, rssi_addr, record_args): self.process_rssi(record_args, sock) def serve_rssi(self, record_args): - logging.info(f"got request {record_args}") + logging.info(f'got request {record_args}') if self.arguments.rssi_external: - logging.info("proxying external RSSI") + logging.info('proxying external RSSI') # codeql[py/bind-socket-all-network-interfaces] - self.proxy_rssi("0.0.0.0", record_args) # nosec + self.proxy_rssi('0.0.0.0', record_args) # nosec else: - center_freq = int(record_args["center_freq"]) + center_freq = int(record_args['center_freq']) try: rssi_server = BirdsEyeRSSI( self.arguments, - record_args["sample_rate"], + record_args['sample_rate'], center_freq, agc=self.arguments.agc, rssi_throttle=self.arguments.rssi_throttle, ) except RuntimeError as err: - logging.error("could not initialize RSSI server: %s", err) + logging.error('could not initialize RSSI server: %s', err) return rssi_server.start() logging.info( - f"serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}" + f'serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}' ) self.proxy_rssi(RSSI_UDP_ADDR, record_args) - logging.info("RSSI stream stopped") + logging.info('RSSI stream stopped') rssi_server.stop() rssi_server.wait() @staticmethod def paths(): return [ - "", - "/info", - "/record/{center_freq}/{sample_count}/{sample_rate}", - "/rssi/{center_freq}/{sample_count}/{sample_rate}", + '', + '/info', + '/record/{center_freq}/{sample_count}/{sample_rate}', + '/rssi/{center_freq}/{sample_count}/{sample_rate}', ] @staticmethod def version(): - return "/v1" + return '/v1' def routes(self): p = self.paths() @@ -399,22 +408,23 @@ def routes(self): return dict(zip(p, funcs)) def run(self): - logging.info("starting recorder thread") + logging.info('starting recorder thread') recorder_thread = threading.Thread( target=self.run_recorder, args=(self.record,) ) recorder_thread.start() - logging.info("starting API thread") - bjoern.run(self.app, "0.0.0.0", self.arguments.port) # nosec + logging.info('starting API thread') + bjoern.run(self.app, '0.0.0.0', self.arguments.port) # nosec recorder_thread.join() def main(): arguments = argument_parser().parse_args() - level_int = {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10} + level_int = {'CRITICAL': 50, 'ERROR': 40, + 'WARNING': 30, 'INFO': 20, 'DEBUG': 10} level = level_int.get(arguments.loglevel.upper(), 0) - logging.basicConfig(level=level, format="%(asctime)s %(message)s") + logging.basicConfig(level=level, format='%(asctime)s %(message)s') try: app = API(arguments) except ValueError: From 529cb874a342e7dce454b9fa2a5956054feb54bd Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 21 Nov 2023 12:05:48 -0500 Subject: [PATCH 2/3] formatting --- gamutrf/mqtt_reporter.py | 87 ++++++++------- gamutrf/worker.py | 232 +++++++++++++++++++-------------------- 2 files changed, 159 insertions(+), 160 deletions(-) diff --git a/gamutrf/mqtt_reporter.py b/gamutrf/mqtt_reporter.py index c2262342..c364ba06 100644 --- a/gamutrf/mqtt_reporter.py +++ b/gamutrf/mqtt_reporter.py @@ -26,7 +26,7 @@ def __init__( self.compass = compass self.gps_server = gps_server self.mqttc = None - self.heading = 'no heading' + self.heading = "no heading" self.use_external_gps = use_external_gps self.use_external_heading = use_external_heading self.external_gps_server = external_gps_server @@ -37,16 +37,16 @@ def __init__( def log(path, prefix, start_time, record_args): try: with open( - os.path.join(path, f'mqtt-{prefix}-{start_time}.log'), - 'a+', - encoding='utf-8', + os.path.join(path, f"mqtt-{prefix}-{start_time}.log"), + "a+", + encoding="utf-8", ) as f: - f.write(f'{json.dumps(record_args)}\n') + f.write(f"{json.dumps(record_args)}\n") except FileNotFoundError as err: - logging.error(f'could not write to mqtt rssi log: {err}') + logging.error(f"could not write to mqtt rssi log: {err}") def connect(self): - logging.info(f'connecting to {self.mqtt_server}') + logging.info(f"connecting to {self.mqtt_server}") self.mqttc = mqtt.Client() self.mqttc.connect(self.mqtt_server) self.mqttc.loop_start() @@ -57,37 +57,41 @@ def get_heading(self): self.heading = float( json.loads( httpx.get( - f'http://{self.external_gps_server}:{self.external_gps_server_port}/heading' + f"http://{self.external_gps_server}:{self.external_gps_server_port}/heading" ).text - )['heading'] + )["heading"] ) except Exception as err: - logging.error('could not update external heading: %s', err) + logging.error("could not update external heading: %s", err) else: try: self.heading = str( - float( - httpx.get(f'http://{self.gps_server}:8000/v1/heading').text) + float(httpx.get(f"http://{self.gps_server}:8000/v1/heading").text) ) except Exception as err: - logging.error('could not update heading: %s', err) + logging.error("could not update heading: %s", err) def add_gps(self, publish_args): if not self.gps_server and not self.external_gps_server: - logging.error('no gps_server or external_gps_server found') + logging.error("no gps_server or external_gps_server found") return publish_args - if self.external_gps_server and not self.use_external_gps and not self.gps_server: + if ( + self.external_gps_server + and not self.use_external_gps + and not self.gps_server + ): logging.error( - 'only external_gps_server found, but no use_external_gps flag') + "only external_gps_server found, but no use_external_gps flag" + ) return publish_args publish_args.update( { - 'position': [0, 0], - 'altitude': None, - 'gps_time': None, - 'map_url': None, - 'heading': self.heading, - 'gps': 'no fix', + "position": [0, 0], + "altitude": None, + "gps_time": None, + "map_url": None, + "heading": self.heading, + "gps": "no fix", } ) @@ -96,26 +100,26 @@ def add_gps(self, publish_args): try: self.external_gps_msg = json.loads( httpx.get( - f'http://{self.external_gps_server}:{self.external_gps_server_port}/gps-data' + f"http://{self.external_gps_server}:{self.external_gps_server_port}/gps-data" ).text ) publish_args.update( { - 'position': ( - self.external_gps_msg['latitude'], - self.external_gps_msg['longitude'], + "position": ( + self.external_gps_msg["latitude"], + self.external_gps_msg["longitude"], ), - 'altitude': self.external_gps_msg['altitude'], - 'gps_time': self.external_gps_msg['time_usec'], - 'map_url': None, - 'heading': self.heading, - 'gps': 'fix', + "altitude": self.external_gps_msg["altitude"], + "gps_time": self.external_gps_msg["time_usec"], + "map_url": None, + "heading": self.heading, + "gps": "fix", } ) except Exception as err: - logging.error('could not update with external GPS: %s', err) + logging.error("could not update with external GPS: %s", err) # Use internal GPIO GPS else: @@ -127,16 +131,16 @@ def add_gps(self, publish_args): packet = gpsd.get_current() publish_args.update( { - 'position': packet.position(), - 'altitude': packet.altitude(), - 'gps_time': packet.get_time().timestamp(), - 'map_url': packet.map_url(), - 'heading': self.heading, - 'gps': 'fix', + "position": packet.position(), + "altitude": packet.altitude(), + "gps_time": packet.get_time().timestamp(), + "map_url": packet.map_url(), + "heading": self.heading, + "gps": "fix", } ) except (BrokenPipeError, gpsd.NoFixError, AttributeError) as err: - logging.error('could not update with GPS: %s', err) + logging.error("could not update with GPS: %s", err) return publish_args def publish(self, publish_path, publish_args): @@ -146,7 +150,7 @@ def publish(self, publish_path, publish_args): if self.mqttc is None: self.connect() publish_args = self.add_gps(publish_args) - publish_args['name'] = self.name + publish_args["name"] = self.name self.mqttc.publish(publish_path, json.dumps(publish_args)) except ( socket.gaierror, @@ -154,5 +158,4 @@ def publish(self, publish_path, publish_args): mqtt.WebsocketConnectionError, ValueError, ) as err: - logging.error( - f'failed to publish to MQTT {self.mqtt_server}: {err}') + logging.error(f"failed to publish to MQTT {self.mqtt_server}: {err}") diff --git a/gamutrf/worker.py b/gamutrf/worker.py index 8237105b..8082a4f1 100644 --- a/gamutrf/worker.py +++ b/gamutrf/worker.py @@ -23,156 +23,154 @@ from gamutrf.sdr_recorder import get_recorder from gamutrf.sdr_recorder import RECORDER_MAP -WORKER_NAME = os.getenv( - 'WORKER_NAME', socket.gethostbyname(socket.gethostname())) -ORCHESTRATOR = os.getenv('ORCHESTRATOR', 'orchestrator') -ANTENNA = os.getenv('ANTENNA', '') +WORKER_NAME = os.getenv("WORKER_NAME", socket.gethostbyname(socket.gethostname())) +ORCHESTRATOR = os.getenv("ORCHESTRATOR", "orchestrator") +ANTENNA = os.getenv("ANTENNA", "") def argument_parser(): parser = argparse.ArgumentParser() parser.add_argument( - '--loglevel', - '-l', - help='Set logging level', - choices=['critical', 'error', 'warning', 'info', 'debug'], - default='info', + "--loglevel", + "-l", + help="Set logging level", + choices=["critical", "error", "warning", "info", "debug"], + default="info", ) parser.add_argument( - '--antenna', '-a', help='Antenna make/model', type=str, default=ANTENNA + "--antenna", "-a", help="Antenna make/model", type=str, default=ANTENNA ) parser.add_argument( - '--name', '-n', help='Name for the worker', type=str, default=WORKER_NAME + "--name", "-n", help="Name for the worker", type=str, default=WORKER_NAME ) parser.add_argument( - '--path', - '-P', - help='Path prefix for writing out samples to', + "--path", + "-P", + help="Path prefix for writing out samples to", type=str, - default='/data/gamutrf', + default="/data/gamutrf", ) parser.add_argument( - '--port', '-p', help='Port to run the API webserver on', type=int, default=8000 + "--port", "-p", help="Port to run the API webserver on", type=int, default=8000 ) parser.add_argument( - '--rotate_secs', - help='If > 0, rotate storage directories every N seconds', + "--rotate_secs", + help="If > 0, rotate storage directories every N seconds", type=int, default=3600, ) parser.add_argument( - '--sdr', - '-s', - help=f'Specify SDR to record with {list(RECORDER_MAP.keys())} or file', + "--sdr", + "-s", + help=f"Specify SDR to record with {list(RECORDER_MAP.keys())} or file", type=str, - default='ettus', + default="ettus", ) parser.add_argument( - '--sdrargs', - help=f'optional SDR arguments', + "--sdrargs", + help=f"optional SDR arguments", type=str, - default='', + default="", ) parser.add_argument( - '--freq_excluded', - '-e', + "--freq_excluded", + "-e", help='Freq range to exclude in MHz (e.g. "100-200")', - action='append', + action="append", default=[], ) - parser.add_argument('--gain', '-g', help='Gain in dB', - default=30, type=int) + parser.add_argument("--gain", "-g", help="Gain in dB", default=30, type=int) parser.add_argument( - '--mean_window', '-m', help='birdseye mean window size', default=128, type=int + "--mean_window", "-m", help="birdseye mean window size", default=128, type=int ) parser.add_argument( - '--rxb', help='Receive buffer size', default=int(1024 * 1024 * 10), type=int + "--rxb", help="Receive buffer size", default=int(1024 * 1024 * 10), type=int ) parser.add_argument( - '--qsize', help='Max request queue size', default=int(2), type=int + "--qsize", help="Max request queue size", default=int(2), type=int ) parser.add_argument( - '--mqtt_server', - help='MQTT server to report RSSI', + "--mqtt_server", + help="MQTT server to report RSSI", default=ORCHESTRATOR, type=str, ) parser.add_argument( - '--gps_server', - help='GPS Server to get lat,long, and heading', + "--gps_server", + help="GPS Server to get lat,long, and heading", default=ORCHESTRATOR, type=str, ) parser.add_argument( - '--rssi_interval', - help='rate limit in seconds for RSSI updates to MQTT', + "--rssi_interval", + help="rate limit in seconds for RSSI updates to MQTT", default=1.0, type=float, ) parser.add_argument( - '--rssi_throttle', - help='rate limit RSSI calculations to 1 in n', + "--rssi_throttle", + help="rate limit RSSI calculations to 1 in n", default=10, type=int, ) parser.add_argument( - '--rssi_threshold', help='RSSI reporting threshold', default=-45, type=float + "--rssi_threshold", help="RSSI reporting threshold", default=-45, type=float ) external_rssi_parser = parser.add_mutually_exclusive_group(required=False) external_rssi_parser.add_argument( - '--rssi_external', - dest='rssi_external', - action='store_true', + "--rssi_external", + dest="rssi_external", + action="store_true", default=True, - help='proxy external RSSI', + help="proxy external RSSI", ) external_rssi_parser.add_argument( - '--no-rssi_external', - dest='rssi_external', - action='store_false', - help='do not use proxy external RSSI', + "--no-rssi_external", + dest="rssi_external", + action="store_false", + help="do not use proxy external RSSI", ) agc_parser = parser.add_mutually_exclusive_group(required=False) agc_parser.add_argument( - '--agc', dest='agc', action='store_true', default=True, help='use AGC' + "--agc", dest="agc", action="store_true", default=True, help="use AGC" ) agc_parser.add_argument( - '--no-agc', dest='agc', action='store_false', help='do not use AGC' + "--no-agc", dest="agc", action="store_false", help="do not use AGC" ) parser.add_argument( - '--sigmf', - dest='sigmf', + "--sigmf", + dest="sigmf", default=True, action=argparse.BooleanOptionalAction, - help='add sigmf meta file', + help="add sigmf meta file", ) parser.add_argument( - '--use_external_gps', - dest='use_external_gps', + "--use_external_gps", + dest="use_external_gps", default=False, action=argparse.BooleanOptionalAction, - help='Use external Pixhawk/MAVLINK GPS', + help="Use external Pixhawk/MAVLINK GPS", ) parser.add_argument( - '--use_external_heading', - dest='use_external_heading', + "--use_external_heading", + dest="use_external_heading", default=False, action=argparse.BooleanOptionalAction, - help='Use external (Pixhawk/MAVLINK) heading', + help="Use external (Pixhawk/MAVLINK) heading", ) parser.add_argument( - '--external_gps_server', - dest='external_gps_server', + "--external_gps_server", + dest="external_gps_server", default=ORCHESTRATOR, type=str, - help='server to query for external GPS data', + help="server to query for external GPS data", ) parser.add_argument( - '--external_gps_server_port', - dest='external_gps_server_port', - default='8888', + "--external_gps_server_port", + dest="external_gps_server_port", + default="8888", type=str, - help='server port to query for external GPS data', + help="server port to query for external GPS data", ) return parser @@ -197,10 +195,10 @@ def __init__(self, arguments): def on_get(self, _req, resp): resp.text = json.dumps( { - 'version': __version__, - 'sdr': self.arguments.sdr, - 'path_prefix': self.arguments.path, - 'freq_excluded': self.arguments.freq_excluded, + "version": __version__, + "sdr": self.arguments.sdr, + "path_prefix": self.arguments.path, + "freq_excluded": self.arguments.freq_excluded, } ) resp.content_type = falcon.MEDIA_TEXT @@ -221,7 +219,7 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate): status = None if self.q.full(): - status = 'Request queue is full' + status = "Request queue is full" else: status = self.sdr_recorder.validate_request( self.arguments.freq_excluded, center_freq, sample_count, sample_rate @@ -230,28 +228,28 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate): if status is None: self.q.put( { - 'action': self.action, - 'center_freq': int(center_freq), - 'sample_count': int(sample_count), - 'sample_rate': int(sample_rate), + "action": self.action, + "center_freq": int(center_freq), + "sample_count": int(sample_count), + "sample_rate": int(sample_rate), } ) - status = 'Requsted recording' + status = "Requsted recording" resp.status = falcon.HTTP_200 - resp.text = json.dumps({'status': status}) + resp.text = json.dumps({"status": status}) class Record(Action): def __init__(self, arguments, q, sdr_recorder): super().__init__(arguments, q, sdr_recorder) - self.action = 'record' + self.action = "record" class Rssi(Action): def __init__(self, arguments, q, sdr_recorder): super().__init__(arguments, q, sdr_recorder) - self.action = 'record' + self.action = "record" class API: @@ -279,17 +277,17 @@ def __init__(self, arguments): self.app.add_route(self.version() + route, handler) def run_recorder(self, record_func): - logging.info('run recorder') + logging.info("run recorder") while True: - logging.info('awaiting request') + logging.info("awaiting request") action_args = self.q.get() - action = action_args['action'] - if action == 'record': + action = action_args["action"] + if action == "record": self.serve_recording(record_func, action_args) - elif action == 'rssi': + elif action == "rssi": self.serve_rssi(action_args) else: - logging.error('no such action: %s', action) + logging.error("no such action: %s", action) def record(self, center_freq, sample_count, sample_rate=20e6): return self.sdr_recorder.run_recording( @@ -306,38 +304,37 @@ def record(self, center_freq, sample_count, sample_rate=20e6): ) def serve_recording(self, record_func, record_args): - logging.info(f'got a request: {record_args}') + logging.info(f"got a request: {record_args}") record_status = record_func(**record_args) if record_status == -1: # TODO this only kills the thread, not the main process return record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish('gamutrf/record', record_args) + self.mqtt_reporter.publish("gamutrf/record", record_args) self.mqtt_reporter.log( - self.arguments.path, 'record', self.start_time, record_args + self.arguments.path, "record", self.start_time, record_args ) def report_rssi(self, record_args, reported_rssi, reported_time): - logging.info( - f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') - record_args.update({'rssi': reported_rssi, 'time': reported_time}) + logging.info(f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') + record_args.update({"rssi": reported_rssi, "time": reported_time}) record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish('gamutrf/rssi', record_args) + self.mqtt_reporter.publish("gamutrf/rssi", record_args) self.mqtt_reporter.log( - self.arguments.path, 'rssi', self.start_time, record_args + self.arguments.path, "rssi", self.start_time, record_args ) def process_rssi(self, record_args, sock): last_rssi_time = 0 duration = 0 - if record_args['sample_count']: - duration = float(record_args['sample_count']) / float( - record_args['sample_rate'] + if record_args["sample_count"]: + duration = float(record_args["sample_count"]) / float( + record_args["sample_rate"] ) start_time = time.time() while self.q.empty(): rssi_raw, _ = sock.recvfrom(FLOAT_SIZE) - rssi = struct.unpack('f', rssi_raw)[0] + rssi = struct.unpack("f", rssi_raw)[0] if rssi < self.arguments.rssi_threshold: continue if rssi > MAX_RSSI: @@ -358,45 +355,45 @@ def proxy_rssi(self, rssi_addr, record_args): self.process_rssi(record_args, sock) def serve_rssi(self, record_args): - logging.info(f'got request {record_args}') + logging.info(f"got request {record_args}") if self.arguments.rssi_external: - logging.info('proxying external RSSI') + logging.info("proxying external RSSI") # codeql[py/bind-socket-all-network-interfaces] - self.proxy_rssi('0.0.0.0', record_args) # nosec + self.proxy_rssi("0.0.0.0", record_args) # nosec else: - center_freq = int(record_args['center_freq']) + center_freq = int(record_args["center_freq"]) try: rssi_server = BirdsEyeRSSI( self.arguments, - record_args['sample_rate'], + record_args["sample_rate"], center_freq, agc=self.arguments.agc, rssi_throttle=self.arguments.rssi_throttle, ) except RuntimeError as err: - logging.error('could not initialize RSSI server: %s', err) + logging.error("could not initialize RSSI server: %s", err) return rssi_server.start() logging.info( - f'serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}' + f"serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}" ) self.proxy_rssi(RSSI_UDP_ADDR, record_args) - logging.info('RSSI stream stopped') + logging.info("RSSI stream stopped") rssi_server.stop() rssi_server.wait() @staticmethod def paths(): return [ - '', - '/info', - '/record/{center_freq}/{sample_count}/{sample_rate}', - '/rssi/{center_freq}/{sample_count}/{sample_rate}', + "", + "/info", + "/record/{center_freq}/{sample_count}/{sample_rate}", + "/rssi/{center_freq}/{sample_count}/{sample_rate}", ] @staticmethod def version(): - return '/v1' + return "/v1" def routes(self): p = self.paths() @@ -408,23 +405,22 @@ def routes(self): return dict(zip(p, funcs)) def run(self): - logging.info('starting recorder thread') + logging.info("starting recorder thread") recorder_thread = threading.Thread( target=self.run_recorder, args=(self.record,) ) recorder_thread.start() - logging.info('starting API thread') - bjoern.run(self.app, '0.0.0.0', self.arguments.port) # nosec + logging.info("starting API thread") + bjoern.run(self.app, "0.0.0.0", self.arguments.port) # nosec recorder_thread.join() def main(): arguments = argument_parser().parse_args() - level_int = {'CRITICAL': 50, 'ERROR': 40, - 'WARNING': 30, 'INFO': 20, 'DEBUG': 10} + level_int = {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10} level = level_int.get(arguments.loglevel.upper(), 0) - logging.basicConfig(level=level, format='%(asctime)s %(message)s') + logging.basicConfig(level=level, format="%(asctime)s %(message)s") try: app = API(arguments) except ValueError: From 9c47a1db648f6aa9db7f56451b19c18b764a2c9f Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 21 Nov 2023 12:56:39 -0500 Subject: [PATCH 3/3] Updated api tests --- tests/test_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_api.py b/tests/test_api.py index 04c8385b..a24a79ce 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -9,6 +9,7 @@ class FakeArgs: def __init__(self): self.name = "test" self.mqtt_server = "" + self.gps_server = "" self.use_external_gps = False self.use_external_heading = False self.external_gps_server = ""