Skip to content

Commit

Permalink
Merge pull request #1087 from anarkiwi/cheapsig2
Browse files Browse the repository at this point in the history
faster predictions UI
  • Loading branch information
anarkiwi authored Jan 9, 2024
2 parents e3b9446 + c7b0ab7 commit 62a9f33
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 40 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libvulkan-dev \
python3-numpy
WORKDIR /root
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.64
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.65
COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs
WORKDIR /root/gr-iqtlabs/build
COPY --from=sigmf-builder /usr/local /usr/local
Expand Down
120 changes: 84 additions & 36 deletions gamutrfwaterfall/gamutrfwaterfall/waterfall.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import csv
import datetime
import glob
import json
import logging
import multiprocessing
Expand All @@ -16,6 +15,7 @@
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import zmq
from flask import Flask, current_app, send_file
from matplotlib.collections import LineCollection
from matplotlib.ticker import MultipleLocator, AutoMinorLocator
Expand Down Expand Up @@ -54,7 +54,7 @@ def draw_title(
freq_resolution,
):
title_text = {
"Time": str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
"Time": str(datetime.datetime.now().isoformat()),
"Scan time": "%.2fs" % scan_duration,
"Step FFTs": "%u" % tune_step_fft,
"Step size": "%.2fMHz" % (tune_step_hz / 1e6),
Expand Down Expand Up @@ -279,16 +279,22 @@ def argument_parser():
help="Waterfall refresh time",
)
parser.add_argument(
"--predictions_path",
default="/logs/inference/**/predictions*png",
"--predictions",
default=3,
type=int,
help="If set, render N recent predictions",
)
parser.add_argument(
"--inference_server",
default="",
type=str,
help="If set, render recent predictions",
help="Address of scanner for inference feed",
)
parser.add_argument(
"--predictions",
default=1,
"--inference_port",
default=10002,
type=int,
help="If set, render N recent predictions",
help="Port on scanner to connect to inference feed",
)
return parser

Expand Down Expand Up @@ -1056,12 +1062,16 @@ def sig_handler(_sig=None, _frame=None):


class FlaskHandler:
def __init__(self, savefig_path, predictions_path, predictions, port, refresh):
def __init__(
self, savefig_path, predictions, port, refresh, inference_server, inference_port
):
self.inference_addr = f"tcp://{inference_server}:{inference_port}"
self.savefig_path = savefig_path
self.static_folder = os.path.dirname(savefig_path)
self.predictions_file = "predictions.html"
self.refresh = refresh
self.predictions_path = predictions_path
self.predictions = predictions
self.app = Flask(__name__, static_folder=os.path.dirname(savefig_path))
self.app = Flask(__name__, static_folder=self.static_folder)
self.savefig_file = os.path.basename(self.savefig_path)
self.app.add_url_rule("/predictions", "predictions", self.serve_predictions)
self.app.add_url_rule(
Expand All @@ -1073,17 +1083,75 @@ def __init__(self, savefig_path, predictions_path, predictions, port, refresh):
target=self.app.run,
kwargs={"host": "0.0.0.0", "port": port}, # nosec
)
self.zmq_process = multiprocessing.Process(target=self.poll_zmq)

def start(self):
self.process.start()
self.zmq_process.start()

def poll_zmq(self):
zmq_context = zmq.Context()
socket = zmq_context.socket(zmq.SUB)
socket.connect(self.inference_addr)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
DELIM = "\n\n"
json_buffer = ""
item_buffer = []

def write_content(content):
tmpfile = os.path.join(self.static_folder, "." + self.predictions_file)
with open(tmpfile, "w", encoding="utf8") as f:
f.write(
'<html><head><meta http-equiv="refresh" content="%u"></head><body>%s</body></html>'
% (self.refresh, content)
)
os.rename(tmpfile, os.path.join(self.static_folder, self.predictions_file))

write_content("no predictions yet")

while True:
try:
while True:
sock_txt = socket.recv(flags=zmq.NOBLOCK).decode("utf8")
json_buffer += sock_txt
except zmq.error.Again:
pass
while True:
delim_pos = json_buffer.find(DELIM)
if delim_pos == -1:
break
raw_item = json_buffer[:delim_pos]
json_buffer = json_buffer[delim_pos + len(DELIM) :]
try:
item = json.loads(raw_item)
except json.decoder.JSONDecodeError:
continue
ts = float(item["metadata"]["ts"])
if "predictions_image_path" not in item["metadata"]:
continue
ts = float(item["metadata"]["ts"])
item_buffer.append((ts, item))
item_buffer = item_buffer[-self.predictions :]
predictions = sorted(item_buffer, key=lambda x: x[0], reverse=True)
images = []
now = time.time()
for ts, item in predictions:
image = item["metadata"]["predictions_image_path"]
images.append(
"%s (age %.1fs)<p><img src=%s></img></p>" % (image, now - ts, image)
)
if images:
write_content(
f"<p>{datetime.datetime.now().isoformat()}</p>" + "".join(images)
)
time.sleep(0.1)

def serve(self, path):
if path:
full_path = os.path.realpath(os.path.join("/", path))
if os.path.exists(full_path):
return send_file(full_path, mimetype="image/png")
else:
return "%s: not found" % full_path, 404
return "%s: not found" % full_path, 404
if os.path.exists(self.savefig_path):
return (
'<html><head><meta http-equiv="refresh" content="%u"></head><body><img src="%s"></img></body></html>'
Expand All @@ -1097,28 +1165,7 @@ def serve(self, path):
)

def serve_predictions(self):
if self.predictions_path and self.predictions:
predictions = sorted(
[
(os.stat(prediction).st_ctime, prediction)
for prediction in glob.glob(self.predictions_path, recursive=True)
]
)[-self.predictions :]
if not predictions:
return "no recent predictions at %s" % self.predictions_path, 200
images = []
now = time.time()
for ctime, prediction in sorted(predictions, reverse=True):
images.append(
"%s (age %.1fs)<p><img src=%s></img></p>"
% (prediction, now - ctime, prediction)
)
content = (
'<html><head><meta http-equiv="refresh" content="%u"></head><body>%s</body></html>'
% (self.refresh, "".join(images))
)
return content, 200
return "prediction serving is disabled", 200
return current_app.send_static_file(self.predictions_file)

def serve_waterfall(self):
return current_app.send_static_file(self.savefig_file)
Expand Down Expand Up @@ -1147,10 +1194,11 @@ def main():
savefig_path = os.path.join(tempdir, "waterfall.png")
flask = FlaskHandler(
savefig_path,
args.predictions_path,
args.predictions,
args.port,
args.refresh,
args.inference_server,
args.inference_port,
)
flask.start()

Expand Down
8 changes: 5 additions & 3 deletions orchestrator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ services:
- --inference_addr=0.0.0.0
- --inference_port=10001
- --inference_min_confidence=0.25
- --inference_min_db=-80
- --inference_min_db=-1e9
- --inference_model_name=mini2_snr
- --n_inference=10
- --n_image=10
- --no-vkfft
- --rotate_secs=300
# - --external_gps_server=1.2.3.4
# - --external_gps_server_port=8888
# - --inference_output_dir=/logs/inference
# - --inference_model_server=torchserve:8080
- --inference_output_dir=/logs/inference
- --inference_model_server=torchserve:8080
healthcheck:
test: [CMD, "/gamutrf/bin/scanhc.sh", "9000"]
interval: 10s
Expand Down Expand Up @@ -127,6 +127,8 @@ services:
command:
- gamutrf-waterfall
- --scanners=gamutrf:10000
- --inference_server=gamutrf
- --inference_port=10001
- --save_path=/logs/waterfall
- --port=9003
- --detection_type=narrowband
Expand Down

0 comments on commit 62a9f33

Please sign in to comment.