From 529cb874a342e7dce454b9fa2a5956054feb54bd Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 21 Nov 2023 12:05:48 -0500 Subject: [PATCH] formatting --- gamutrf/mqtt_reporter.py | 87 ++++++++------- gamutrf/worker.py | 232 +++++++++++++++++++-------------------- 2 files changed, 159 insertions(+), 160 deletions(-) diff --git a/gamutrf/mqtt_reporter.py b/gamutrf/mqtt_reporter.py index c2262342..c364ba06 100644 --- a/gamutrf/mqtt_reporter.py +++ b/gamutrf/mqtt_reporter.py @@ -26,7 +26,7 @@ def __init__( self.compass = compass self.gps_server = gps_server self.mqttc = None - self.heading = 'no heading' + self.heading = "no heading" self.use_external_gps = use_external_gps self.use_external_heading = use_external_heading self.external_gps_server = external_gps_server @@ -37,16 +37,16 @@ def __init__( def log(path, prefix, start_time, record_args): try: with open( - os.path.join(path, f'mqtt-{prefix}-{start_time}.log'), - 'a+', - encoding='utf-8', + os.path.join(path, f"mqtt-{prefix}-{start_time}.log"), + "a+", + encoding="utf-8", ) as f: - f.write(f'{json.dumps(record_args)}\n') + f.write(f"{json.dumps(record_args)}\n") except FileNotFoundError as err: - logging.error(f'could not write to mqtt rssi log: {err}') + logging.error(f"could not write to mqtt rssi log: {err}") def connect(self): - logging.info(f'connecting to {self.mqtt_server}') + logging.info(f"connecting to {self.mqtt_server}") self.mqttc = mqtt.Client() self.mqttc.connect(self.mqtt_server) self.mqttc.loop_start() @@ -57,37 +57,41 @@ def get_heading(self): self.heading = float( json.loads( httpx.get( - f'http://{self.external_gps_server}:{self.external_gps_server_port}/heading' + f"http://{self.external_gps_server}:{self.external_gps_server_port}/heading" ).text - )['heading'] + )["heading"] ) except Exception as err: - logging.error('could not update external heading: %s', err) + logging.error("could not update external heading: %s", err) else: try: self.heading = str( - float( - httpx.get(f'http://{self.gps_server}:8000/v1/heading').text) + float(httpx.get(f"http://{self.gps_server}:8000/v1/heading").text) ) except Exception as err: - logging.error('could not update heading: %s', err) + logging.error("could not update heading: %s", err) def add_gps(self, publish_args): if not self.gps_server and not self.external_gps_server: - logging.error('no gps_server or external_gps_server found') + logging.error("no gps_server or external_gps_server found") return publish_args - if self.external_gps_server and not self.use_external_gps and not self.gps_server: + if ( + self.external_gps_server + and not self.use_external_gps + and not self.gps_server + ): logging.error( - 'only external_gps_server found, but no use_external_gps flag') + "only external_gps_server found, but no use_external_gps flag" + ) return publish_args publish_args.update( { - 'position': [0, 0], - 'altitude': None, - 'gps_time': None, - 'map_url': None, - 'heading': self.heading, - 'gps': 'no fix', + "position": [0, 0], + "altitude": None, + "gps_time": None, + "map_url": None, + "heading": self.heading, + "gps": "no fix", } ) @@ -96,26 +100,26 @@ def add_gps(self, publish_args): try: self.external_gps_msg = json.loads( httpx.get( - f'http://{self.external_gps_server}:{self.external_gps_server_port}/gps-data' + f"http://{self.external_gps_server}:{self.external_gps_server_port}/gps-data" ).text ) publish_args.update( { - 'position': ( - self.external_gps_msg['latitude'], - self.external_gps_msg['longitude'], + "position": ( + self.external_gps_msg["latitude"], + self.external_gps_msg["longitude"], ), - 'altitude': self.external_gps_msg['altitude'], - 'gps_time': self.external_gps_msg['time_usec'], - 'map_url': None, - 'heading': self.heading, - 'gps': 'fix', + "altitude": self.external_gps_msg["altitude"], + "gps_time": self.external_gps_msg["time_usec"], + "map_url": None, + "heading": self.heading, + "gps": "fix", } ) except Exception as err: - logging.error('could not update with external GPS: %s', err) + logging.error("could not update with external GPS: %s", err) # Use internal GPIO GPS else: @@ -127,16 +131,16 @@ def add_gps(self, publish_args): packet = gpsd.get_current() publish_args.update( { - 'position': packet.position(), - 'altitude': packet.altitude(), - 'gps_time': packet.get_time().timestamp(), - 'map_url': packet.map_url(), - 'heading': self.heading, - 'gps': 'fix', + "position": packet.position(), + "altitude": packet.altitude(), + "gps_time": packet.get_time().timestamp(), + "map_url": packet.map_url(), + "heading": self.heading, + "gps": "fix", } ) except (BrokenPipeError, gpsd.NoFixError, AttributeError) as err: - logging.error('could not update with GPS: %s', err) + logging.error("could not update with GPS: %s", err) return publish_args def publish(self, publish_path, publish_args): @@ -146,7 +150,7 @@ def publish(self, publish_path, publish_args): if self.mqttc is None: self.connect() publish_args = self.add_gps(publish_args) - publish_args['name'] = self.name + publish_args["name"] = self.name self.mqttc.publish(publish_path, json.dumps(publish_args)) except ( socket.gaierror, @@ -154,5 +158,4 @@ def publish(self, publish_path, publish_args): mqtt.WebsocketConnectionError, ValueError, ) as err: - logging.error( - f'failed to publish to MQTT {self.mqtt_server}: {err}') + logging.error(f"failed to publish to MQTT {self.mqtt_server}: {err}") diff --git a/gamutrf/worker.py b/gamutrf/worker.py index 8237105b..8082a4f1 100644 --- a/gamutrf/worker.py +++ b/gamutrf/worker.py @@ -23,156 +23,154 @@ from gamutrf.sdr_recorder import get_recorder from gamutrf.sdr_recorder import RECORDER_MAP -WORKER_NAME = os.getenv( - 'WORKER_NAME', socket.gethostbyname(socket.gethostname())) -ORCHESTRATOR = os.getenv('ORCHESTRATOR', 'orchestrator') -ANTENNA = os.getenv('ANTENNA', '') +WORKER_NAME = os.getenv("WORKER_NAME", socket.gethostbyname(socket.gethostname())) +ORCHESTRATOR = os.getenv("ORCHESTRATOR", "orchestrator") +ANTENNA = os.getenv("ANTENNA", "") def argument_parser(): parser = argparse.ArgumentParser() parser.add_argument( - '--loglevel', - '-l', - help='Set logging level', - choices=['critical', 'error', 'warning', 'info', 'debug'], - default='info', + "--loglevel", + "-l", + help="Set logging level", + choices=["critical", "error", "warning", "info", "debug"], + default="info", ) parser.add_argument( - '--antenna', '-a', help='Antenna make/model', type=str, default=ANTENNA + "--antenna", "-a", help="Antenna make/model", type=str, default=ANTENNA ) parser.add_argument( - '--name', '-n', help='Name for the worker', type=str, default=WORKER_NAME + "--name", "-n", help="Name for the worker", type=str, default=WORKER_NAME ) parser.add_argument( - '--path', - '-P', - help='Path prefix for writing out samples to', + "--path", + "-P", + help="Path prefix for writing out samples to", type=str, - default='/data/gamutrf', + default="/data/gamutrf", ) parser.add_argument( - '--port', '-p', help='Port to run the API webserver on', type=int, default=8000 + "--port", "-p", help="Port to run the API webserver on", type=int, default=8000 ) parser.add_argument( - '--rotate_secs', - help='If > 0, rotate storage directories every N seconds', + "--rotate_secs", + help="If > 0, rotate storage directories every N seconds", type=int, default=3600, ) parser.add_argument( - '--sdr', - '-s', - help=f'Specify SDR to record with {list(RECORDER_MAP.keys())} or file', + "--sdr", + "-s", + help=f"Specify SDR to record with {list(RECORDER_MAP.keys())} or file", type=str, - default='ettus', + default="ettus", ) parser.add_argument( - '--sdrargs', - help=f'optional SDR arguments', + "--sdrargs", + help=f"optional SDR arguments", type=str, - default='', + default="", ) parser.add_argument( - '--freq_excluded', - '-e', + "--freq_excluded", + "-e", help='Freq range to exclude in MHz (e.g. "100-200")', - action='append', + action="append", default=[], ) - parser.add_argument('--gain', '-g', help='Gain in dB', - default=30, type=int) + parser.add_argument("--gain", "-g", help="Gain in dB", default=30, type=int) parser.add_argument( - '--mean_window', '-m', help='birdseye mean window size', default=128, type=int + "--mean_window", "-m", help="birdseye mean window size", default=128, type=int ) parser.add_argument( - '--rxb', help='Receive buffer size', default=int(1024 * 1024 * 10), type=int + "--rxb", help="Receive buffer size", default=int(1024 * 1024 * 10), type=int ) parser.add_argument( - '--qsize', help='Max request queue size', default=int(2), type=int + "--qsize", help="Max request queue size", default=int(2), type=int ) parser.add_argument( - '--mqtt_server', - help='MQTT server to report RSSI', + "--mqtt_server", + help="MQTT server to report RSSI", default=ORCHESTRATOR, type=str, ) parser.add_argument( - '--gps_server', - help='GPS Server to get lat,long, and heading', + "--gps_server", + help="GPS Server to get lat,long, and heading", default=ORCHESTRATOR, type=str, ) parser.add_argument( - '--rssi_interval', - help='rate limit in seconds for RSSI updates to MQTT', + "--rssi_interval", + help="rate limit in seconds for RSSI updates to MQTT", default=1.0, type=float, ) parser.add_argument( - '--rssi_throttle', - help='rate limit RSSI calculations to 1 in n', + "--rssi_throttle", + help="rate limit RSSI calculations to 1 in n", default=10, type=int, ) parser.add_argument( - '--rssi_threshold', help='RSSI reporting threshold', default=-45, type=float + "--rssi_threshold", help="RSSI reporting threshold", default=-45, type=float ) external_rssi_parser = parser.add_mutually_exclusive_group(required=False) external_rssi_parser.add_argument( - '--rssi_external', - dest='rssi_external', - action='store_true', + "--rssi_external", + dest="rssi_external", + action="store_true", default=True, - help='proxy external RSSI', + help="proxy external RSSI", ) external_rssi_parser.add_argument( - '--no-rssi_external', - dest='rssi_external', - action='store_false', - help='do not use proxy external RSSI', + "--no-rssi_external", + dest="rssi_external", + action="store_false", + help="do not use proxy external RSSI", ) agc_parser = parser.add_mutually_exclusive_group(required=False) agc_parser.add_argument( - '--agc', dest='agc', action='store_true', default=True, help='use AGC' + "--agc", dest="agc", action="store_true", default=True, help="use AGC" ) agc_parser.add_argument( - '--no-agc', dest='agc', action='store_false', help='do not use AGC' + "--no-agc", dest="agc", action="store_false", help="do not use AGC" ) parser.add_argument( - '--sigmf', - dest='sigmf', + "--sigmf", + dest="sigmf", default=True, action=argparse.BooleanOptionalAction, - help='add sigmf meta file', + help="add sigmf meta file", ) parser.add_argument( - '--use_external_gps', - dest='use_external_gps', + "--use_external_gps", + dest="use_external_gps", default=False, action=argparse.BooleanOptionalAction, - help='Use external Pixhawk/MAVLINK GPS', + help="Use external Pixhawk/MAVLINK GPS", ) parser.add_argument( - '--use_external_heading', - dest='use_external_heading', + "--use_external_heading", + dest="use_external_heading", default=False, action=argparse.BooleanOptionalAction, - help='Use external (Pixhawk/MAVLINK) heading', + help="Use external (Pixhawk/MAVLINK) heading", ) parser.add_argument( - '--external_gps_server', - dest='external_gps_server', + "--external_gps_server", + dest="external_gps_server", default=ORCHESTRATOR, type=str, - help='server to query for external GPS data', + help="server to query for external GPS data", ) parser.add_argument( - '--external_gps_server_port', - dest='external_gps_server_port', - default='8888', + "--external_gps_server_port", + dest="external_gps_server_port", + default="8888", type=str, - help='server port to query for external GPS data', + help="server port to query for external GPS data", ) return parser @@ -197,10 +195,10 @@ def __init__(self, arguments): def on_get(self, _req, resp): resp.text = json.dumps( { - 'version': __version__, - 'sdr': self.arguments.sdr, - 'path_prefix': self.arguments.path, - 'freq_excluded': self.arguments.freq_excluded, + "version": __version__, + "sdr": self.arguments.sdr, + "path_prefix": self.arguments.path, + "freq_excluded": self.arguments.freq_excluded, } ) resp.content_type = falcon.MEDIA_TEXT @@ -221,7 +219,7 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate): status = None if self.q.full(): - status = 'Request queue is full' + status = "Request queue is full" else: status = self.sdr_recorder.validate_request( self.arguments.freq_excluded, center_freq, sample_count, sample_rate @@ -230,28 +228,28 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate): if status is None: self.q.put( { - 'action': self.action, - 'center_freq': int(center_freq), - 'sample_count': int(sample_count), - 'sample_rate': int(sample_rate), + "action": self.action, + "center_freq": int(center_freq), + "sample_count": int(sample_count), + "sample_rate": int(sample_rate), } ) - status = 'Requsted recording' + status = "Requsted recording" resp.status = falcon.HTTP_200 - resp.text = json.dumps({'status': status}) + resp.text = json.dumps({"status": status}) class Record(Action): def __init__(self, arguments, q, sdr_recorder): super().__init__(arguments, q, sdr_recorder) - self.action = 'record' + self.action = "record" class Rssi(Action): def __init__(self, arguments, q, sdr_recorder): super().__init__(arguments, q, sdr_recorder) - self.action = 'record' + self.action = "record" class API: @@ -279,17 +277,17 @@ def __init__(self, arguments): self.app.add_route(self.version() + route, handler) def run_recorder(self, record_func): - logging.info('run recorder') + logging.info("run recorder") while True: - logging.info('awaiting request') + logging.info("awaiting request") action_args = self.q.get() - action = action_args['action'] - if action == 'record': + action = action_args["action"] + if action == "record": self.serve_recording(record_func, action_args) - elif action == 'rssi': + elif action == "rssi": self.serve_rssi(action_args) else: - logging.error('no such action: %s', action) + logging.error("no such action: %s", action) def record(self, center_freq, sample_count, sample_rate=20e6): return self.sdr_recorder.run_recording( @@ -306,38 +304,37 @@ def record(self, center_freq, sample_count, sample_rate=20e6): ) def serve_recording(self, record_func, record_args): - logging.info(f'got a request: {record_args}') + logging.info(f"got a request: {record_args}") record_status = record_func(**record_args) if record_status == -1: # TODO this only kills the thread, not the main process return record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish('gamutrf/record', record_args) + self.mqtt_reporter.publish("gamutrf/record", record_args) self.mqtt_reporter.log( - self.arguments.path, 'record', self.start_time, record_args + self.arguments.path, "record", self.start_time, record_args ) def report_rssi(self, record_args, reported_rssi, reported_time): - logging.info( - f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') - record_args.update({'rssi': reported_rssi, 'time': reported_time}) + logging.info(f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') + record_args.update({"rssi": reported_rssi, "time": reported_time}) record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish('gamutrf/rssi', record_args) + self.mqtt_reporter.publish("gamutrf/rssi", record_args) self.mqtt_reporter.log( - self.arguments.path, 'rssi', self.start_time, record_args + self.arguments.path, "rssi", self.start_time, record_args ) def process_rssi(self, record_args, sock): last_rssi_time = 0 duration = 0 - if record_args['sample_count']: - duration = float(record_args['sample_count']) / float( - record_args['sample_rate'] + if record_args["sample_count"]: + duration = float(record_args["sample_count"]) / float( + record_args["sample_rate"] ) start_time = time.time() while self.q.empty(): rssi_raw, _ = sock.recvfrom(FLOAT_SIZE) - rssi = struct.unpack('f', rssi_raw)[0] + rssi = struct.unpack("f", rssi_raw)[0] if rssi < self.arguments.rssi_threshold: continue if rssi > MAX_RSSI: @@ -358,45 +355,45 @@ def proxy_rssi(self, rssi_addr, record_args): self.process_rssi(record_args, sock) def serve_rssi(self, record_args): - logging.info(f'got request {record_args}') + logging.info(f"got request {record_args}") if self.arguments.rssi_external: - logging.info('proxying external RSSI') + logging.info("proxying external RSSI") # codeql[py/bind-socket-all-network-interfaces] - self.proxy_rssi('0.0.0.0', record_args) # nosec + self.proxy_rssi("0.0.0.0", record_args) # nosec else: - center_freq = int(record_args['center_freq']) + center_freq = int(record_args["center_freq"]) try: rssi_server = BirdsEyeRSSI( self.arguments, - record_args['sample_rate'], + record_args["sample_rate"], center_freq, agc=self.arguments.agc, rssi_throttle=self.arguments.rssi_throttle, ) except RuntimeError as err: - logging.error('could not initialize RSSI server: %s', err) + logging.error("could not initialize RSSI server: %s", err) return rssi_server.start() logging.info( - f'serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}' + f"serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}" ) self.proxy_rssi(RSSI_UDP_ADDR, record_args) - logging.info('RSSI stream stopped') + logging.info("RSSI stream stopped") rssi_server.stop() rssi_server.wait() @staticmethod def paths(): return [ - '', - '/info', - '/record/{center_freq}/{sample_count}/{sample_rate}', - '/rssi/{center_freq}/{sample_count}/{sample_rate}', + "", + "/info", + "/record/{center_freq}/{sample_count}/{sample_rate}", + "/rssi/{center_freq}/{sample_count}/{sample_rate}", ] @staticmethod def version(): - return '/v1' + return "/v1" def routes(self): p = self.paths() @@ -408,23 +405,22 @@ def routes(self): return dict(zip(p, funcs)) def run(self): - logging.info('starting recorder thread') + logging.info("starting recorder thread") recorder_thread = threading.Thread( target=self.run_recorder, args=(self.record,) ) recorder_thread.start() - logging.info('starting API thread') - bjoern.run(self.app, '0.0.0.0', self.arguments.port) # nosec + logging.info("starting API thread") + bjoern.run(self.app, "0.0.0.0", self.arguments.port) # nosec recorder_thread.join() def main(): arguments = argument_parser().parse_args() - level_int = {'CRITICAL': 50, 'ERROR': 40, - 'WARNING': 30, 'INFO': 20, 'DEBUG': 10} + level_int = {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10} level = level_int.get(arguments.loglevel.upper(), 0) - logging.basicConfig(level=level, format='%(asctime)s %(message)s') + logging.basicConfig(level=level, format="%(asctime)s %(message)s") try: app = API(arguments) except ValueError: