diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index a48c6464..620fd71e 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -28,7 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libvulkan-dev \ python3-numpy WORKDIR /root -RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.64 +RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.65 COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs WORKDIR /root/gr-iqtlabs/build COPY --from=sigmf-builder /usr/local /usr/local diff --git a/gamutrfwaterfall/gamutrfwaterfall/waterfall.py b/gamutrfwaterfall/gamutrfwaterfall/waterfall.py index 5ee1b7f8..302ef759 100644 --- a/gamutrfwaterfall/gamutrfwaterfall/waterfall.py +++ b/gamutrfwaterfall/gamutrfwaterfall/waterfall.py @@ -1,7 +1,6 @@ import argparse import csv import datetime -import glob import json import logging import multiprocessing @@ -16,6 +15,7 @@ import matplotlib import matplotlib.pyplot as plt import numpy as np +import zmq from flask import Flask, current_app, send_file from matplotlib.collections import LineCollection from matplotlib.ticker import MultipleLocator, AutoMinorLocator @@ -54,7 +54,7 @@ def draw_title( freq_resolution, ): title_text = { - "Time": str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + "Time": str(datetime.datetime.now().isoformat()), "Scan time": "%.2fs" % scan_duration, "Step FFTs": "%u" % tune_step_fft, "Step size": "%.2fMHz" % (tune_step_hz / 1e6), @@ -279,16 +279,22 @@ def argument_parser(): help="Waterfall refresh time", ) parser.add_argument( - "--predictions_path", - default="/logs/inference/**/predictions*png", + "--predictions", + default=3, + type=int, + help="If set, render N recent predictions", + ) + parser.add_argument( + "--inference_server", + default="", type=str, - help="If set, render recent predictions", + help="Address of scanner for inference feed", ) parser.add_argument( - "--predictions", - default=1, + "--inference_port", + default=10002, type=int, - help="If set, render N recent predictions", + help="Port on scanner to connect to inference feed", ) return parser @@ -1056,12 +1062,16 @@ def sig_handler(_sig=None, _frame=None): class FlaskHandler: - def __init__(self, savefig_path, predictions_path, predictions, port, refresh): + def __init__( + self, savefig_path, predictions, port, refresh, inference_server, inference_port + ): + self.inference_addr = f"tcp://{inference_server}:{inference_port}" self.savefig_path = savefig_path + self.static_folder = os.path.dirname(savefig_path) + self.predictions_file = "predictions.html" self.refresh = refresh - self.predictions_path = predictions_path self.predictions = predictions - self.app = Flask(__name__, static_folder=os.path.dirname(savefig_path)) + self.app = Flask(__name__, static_folder=self.static_folder) self.savefig_file = os.path.basename(self.savefig_path) self.app.add_url_rule("/predictions", "predictions", self.serve_predictions) self.app.add_url_rule( @@ -1073,17 +1083,75 @@ def __init__(self, savefig_path, predictions_path, predictions, port, refresh): target=self.app.run, kwargs={"host": "0.0.0.0", "port": port}, # nosec ) + self.zmq_process = multiprocessing.Process(target=self.poll_zmq) def start(self): self.process.start() + self.zmq_process.start() + + def poll_zmq(self): + zmq_context = zmq.Context() + socket = zmq_context.socket(zmq.SUB) + socket.connect(self.inference_addr) + socket.setsockopt_string(zmq.SUBSCRIBE, "") + DELIM = "\n\n" + json_buffer = "" + item_buffer = [] + + def write_content(content): + tmpfile = os.path.join(self.static_folder, "." + self.predictions_file) + with open(tmpfile, "w", encoding="utf8") as f: + f.write( + '%s' + % (self.refresh, content) + ) + os.rename(tmpfile, os.path.join(self.static_folder, self.predictions_file)) + + write_content("no predictions yet") + + while True: + try: + while True: + sock_txt = socket.recv(flags=zmq.NOBLOCK).decode("utf8") + json_buffer += sock_txt + except zmq.error.Again: + pass + while True: + delim_pos = json_buffer.find(DELIM) + if delim_pos == -1: + break + raw_item = json_buffer[:delim_pos] + json_buffer = json_buffer[delim_pos + len(DELIM) :] + try: + item = json.loads(raw_item) + except json.decoder.JSONDecodeError: + continue + ts = float(item["metadata"]["ts"]) + if "predictions_image_path" not in item["metadata"]: + continue + ts = float(item["metadata"]["ts"]) + item_buffer.append((ts, item)) + item_buffer = item_buffer[-self.predictions :] + predictions = sorted(item_buffer, key=lambda x: x[0], reverse=True) + images = [] + now = time.time() + for ts, item in predictions: + image = item["metadata"]["predictions_image_path"] + images.append( + "%s (age %.1fs)

" % (image, now - ts, image) + ) + if images: + write_content( + f"

{datetime.datetime.now().isoformat()}

" + "".join(images) + ) + time.sleep(0.1) def serve(self, path): if path: full_path = os.path.realpath(os.path.join("/", path)) if os.path.exists(full_path): return send_file(full_path, mimetype="image/png") - else: - return "%s: not found" % full_path, 404 + return "%s: not found" % full_path, 404 if os.path.exists(self.savefig_path): return ( '' @@ -1097,28 +1165,7 @@ def serve(self, path): ) def serve_predictions(self): - if self.predictions_path and self.predictions: - predictions = sorted( - [ - (os.stat(prediction).st_ctime, prediction) - for prediction in glob.glob(self.predictions_path, recursive=True) - ] - )[-self.predictions :] - if not predictions: - return "no recent predictions at %s" % self.predictions_path, 200 - images = [] - now = time.time() - for ctime, prediction in sorted(predictions, reverse=True): - images.append( - "%s (age %.1fs)

" - % (prediction, now - ctime, prediction) - ) - content = ( - '%s' - % (self.refresh, "".join(images)) - ) - return content, 200 - return "prediction serving is disabled", 200 + return current_app.send_static_file(self.predictions_file) def serve_waterfall(self): return current_app.send_static_file(self.savefig_file) @@ -1147,10 +1194,11 @@ def main(): savefig_path = os.path.join(tempdir, "waterfall.png") flask = FlaskHandler( savefig_path, - args.predictions_path, args.predictions, args.port, args.refresh, + args.inference_server, + args.inference_port, ) flask.start() diff --git a/orchestrator.yml b/orchestrator.yml index 6a8e1048..9fd8c0b7 100644 --- a/orchestrator.yml +++ b/orchestrator.yml @@ -77,7 +77,7 @@ services: - --inference_addr=0.0.0.0 - --inference_port=10001 - --inference_min_confidence=0.25 - - --inference_min_db=-80 + - --inference_min_db=-1e9 - --inference_model_name=mini2_snr - --n_inference=10 - --n_image=10 @@ -85,8 +85,8 @@ services: - --rotate_secs=300 # - --external_gps_server=1.2.3.4 # - --external_gps_server_port=8888 - # - --inference_output_dir=/logs/inference - # - --inference_model_server=torchserve:8080 + - --inference_output_dir=/logs/inference + - --inference_model_server=torchserve:8080 healthcheck: test: [CMD, "/gamutrf/bin/scanhc.sh", "9000"] interval: 10s @@ -127,6 +127,8 @@ services: command: - gamutrf-waterfall - --scanners=gamutrf:10000 + - --inference_server=gamutrf + - --inference_port=10001 - --save_path=/logs/waterfall - --port=9003 - --detection_type=narrowband