Skip to content

Commit

Permalink
Merge pull request #1299 from anarkiwi/wfsplit
Browse files Browse the repository at this point in the history
Move arg processing and waterfall drawing to own modules.
  • Loading branch information
anarkiwi authored Jun 6, 2024
2 parents faaa87b + 689becc commit c7969f1
Show file tree
Hide file tree
Showing 5 changed files with 1,155 additions and 1,128 deletions.
111 changes: 111 additions & 0 deletions gamutrfwaterfall/gamutrfwaterfall/argparser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import argparse


def argument_parser():
parser = argparse.ArgumentParser(description="waterfall plotter from scan data")
parser.add_argument(
"--min_freq",
default=0,
type=float,
help="Minimum frequency for plot (or 0 for automatic).",
)
parser.add_argument(
"--max_freq",
default=0,
type=float,
help="Maximum frequency for plot (or 0 for automatic).",
)
parser.add_argument(
"--n_detect", default=0, type=int, help="Number of detected signals to plot."
)
parser.add_argument(
"--plot_snr", action="store_true", help="Plot SNR rather than power."
)
parser.add_argument(
"--detection_type",
default="",
type=str,
help="Detection type to plot (wideband, narrowband).",
)
parser.add_argument(
"--save_path", default="", type=str, help="Path to save screenshots."
)
parser.add_argument(
"--save_time",
default=1,
type=int,
help="Save screenshot every save_time minutes. Only used if save_path also defined.",
)
parser.add_argument(
"--scanners",
default="127.0.0.1:8001",
type=str,
help="Scanner FFT endpoints to use.",
)
parser.add_argument(
"--port",
default=0,
type=int,
help="If set, serve waterfall on this port.",
)
parser.add_argument(
"--rotate_secs",
default=900,
type=int,
help="If > 0, rotate save directories every N seconds",
)
parser.add_argument(
"--width",
default=28,
type=float,
help="Waterfall image width",
)
parser.add_argument(
"--height",
default=10,
type=float,
help="Waterfall image height",
)
parser.add_argument(
"--waterfall_height",
default=100,
type=int,
help="Waterfall height",
)
parser.add_argument(
"--waterfall_width",
default=5000,
type=int,
help="Waterfall width (maximum)",
)
parser.add_argument(
"--refresh",
default=5,
type=int,
help="Waterfall refresh time",
)
parser.add_argument(
"--predictions",
default=3,
type=int,
help="If set, render N recent predictions",
)
parser.add_argument(
"--inference_server",
default="",
type=str,
help="Address of scanner for inference feed",
)
parser.add_argument(
"--inference_port",
default=10002,
type=int,
help="Port on scanner to connect to inference feed",
)
parser.add_argument(
"--api_endpoint",
default="127.0.0.1:9001",
type=str,
help="Scanner API endpoints to use.",
)
return parser
224 changes: 224 additions & 0 deletions gamutrfwaterfall/gamutrfwaterfall/flask_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import datetime
import json
import logging
import multiprocessing
import os
import time
import requests
import zmq
from flask import (
Flask,
send_file,
render_template,
send_from_directory,
request,
redirect,
)


def get_scanner_args(api_endpoint, config_vars):
try:
response = requests.get(f"http://{api_endpoint}/getconf", timeout=30)
response.raise_for_status()

for name, val in json.loads(response.text).items():
config_vars[name] = val

except (
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.ConnectTimeout,
) as err:
logging.error(err)


def write_scanner_args(config_vars_path, config_vars):
tmpfile = os.path.join(
os.path.dirname(config_vars_path),
"." + os.path.basename(config_vars_path),
)
with open(tmpfile, "w", encoding="utf8") as f:
json.dump(config_vars, f)
os.rename(tmpfile, config_vars_path)


class FlaskHandler:
def __init__(
self,
savefig_path,
tempdir,
predictions,
port,
refresh,
inference_server,
inference_port,
api_endpoint,
config_vars,
config_vars_path,
):
self.inference_addr = f"tcp://{inference_server}:{inference_port}"
self.savefig_path = savefig_path
self.config_vars = config_vars
self.config_vars_path = config_vars_path
self.tempdir = tempdir
self.predictions_file = "predictions.html"
self.refresh = refresh
self.predictions = predictions
self.api_endpoint = api_endpoint
self.app = Flask(__name__, template_folder="templates", static_folder="static")
self.savefig_file = os.path.basename(self.savefig_path)
self.app.add_url_rule("/", "index", self.serve_waterfall_page)
self.app.add_url_rule(
"/waterfall", "serve_waterfall_page", self.serve_waterfall_page
)
self.app.add_url_rule(
"/waterfall_img", "serve_waterfall_img", self.serve_waterfall_img
)
self.app.add_url_rule(
"/config_form", "config_form", self.config_form, methods=["POST", "GET"]
)
self.app.add_url_rule(
"/predictions", "serve_predictions_page", self.serve_predictions_page
)
self.app.add_url_rule(
"/predictions_content",
"serve_predictions_content",
self.serve_predictions_content,
)
self.app.add_url_rule("/<path:path>", "", self.serve)
self.process = multiprocessing.Process(
target=self.app.run,
kwargs={"host": "0.0.0.0", "port": port}, # nosec
)
self.zmq_process = multiprocessing.Process(target=self.poll_zmq)
self.write_predictions_content("no predictions yet")
self.read_config_vars()

def start(self):
self.process.start()
self.zmq_process.start()

def write_predictions_content(self, content):
tmpfile = os.path.join(self.tempdir, "." + self.predictions_file)
with open(tmpfile, "w", encoding="utf8") as f:
f.write(f"{content}")
os.rename(tmpfile, os.path.join(self.tempdir, self.predictions_file))

def poll_zmq(self):
zmq_context = zmq.Context()
socket = zmq_context.socket(zmq.SUB)
socket.connect(self.inference_addr)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
DELIM = "\n\n"
json_buffer = ""
item_buffer = []

while True:
try:
while True:
sock_txt = socket.recv(flags=zmq.NOBLOCK).decode("utf8")
json_buffer += sock_txt
except zmq.error.Again:
pass
while True:
delim_pos = json_buffer.find(DELIM)
if delim_pos == -1:
break
raw_item = json_buffer[:delim_pos]
json_buffer = json_buffer[delim_pos + len(DELIM) :]
try:
item = json.loads(raw_item)
except json.decoder.JSONDecodeError:
continue
ts = float(item["metadata"]["ts"])
if "predictions_image_path" not in item["metadata"]:
continue
ts = float(item["metadata"]["ts"])
item_buffer.append((ts, item))
item_buffer = item_buffer[-self.predictions :]
predictions = sorted(item_buffer, key=lambda x: x[0], reverse=True)
images = []
now = time.time()
for ts, item in predictions:
image = item["metadata"]["predictions_image_path"]
age = now - ts
style = ""
if age > 3 * self.refresh:
style = 'style="color:red;"'
images.append(
"<p %s>%s (age %.1fs)</p><p %s><img src=%s></img></p>"
% (style, image, age, style, image)
)
if images:
self.write_predictions_content(
f"<p>{datetime.datetime.now().isoformat()}</p>" + "".join(images)
)
time.sleep(0.1)

def serve(self, path):
if path:
full_path = os.path.realpath(os.path.join("/", path))
if os.path.exists(full_path):
return send_file(full_path, mimetype="image/png")
return "%s: not found" % full_path, 404
if os.path.exists(self.savefig_path):
return (
'<html><head><meta http-equiv="refresh" content="%u"></head><body><img src="%s"></img></body></html>'
% (self.refresh, self.savefig_file),
200,
)
return (
'<html><head><meta http-equiv="refresh" content="%u"></head><body>waterfall initializing, please wait or reload...</body></html>'
% self.refresh,
200,
)

def serve_predictions_content(self):
return send_from_directory(self.tempdir, self.predictions_file)

def serve_predictions_page(self):
# return send_from_directory(self.tempdir, self.predictions_file)
return render_template("predictions.html")

def read_config_vars(self):
try:
with open(self.config_vars_path, encoding="utf8") as f:
self.config_vars = json.load(f)
except FileNotFoundError:
pass

def serve_waterfall_page(self):
self.read_config_vars()
return render_template("waterfall.html", config_vars=self.config_vars)

# @app.route("/file/<path:filename>")
# def serve_file(self, filename):
# return send_from_directory(self.tempdir, filename)

def serve_waterfall_img(self):
return send_from_directory(self.tempdir, self.savefig_file)

def config_form(self):
for var in self.config_vars:
self.config_vars[var] = request.form.get(var, self.config_vars[var])
write_scanner_args(self.config_vars_path, self.config_vars)
reset = request.form.get("reset", None)
if reset == "reset":
reconf_query_str = "&".join(
[f"{k}={v}" for k, v in self.config_vars.items()]
)
logging.info(f"\n\n{reconf_query_str=}\n\n")
try:
response = requests.get(
f"http://{self.api_endpoint}/reconf?{reconf_query_str}",
timeout=30,
)
logging.info(f"\n\n{response=}\n\n")
logging.info(f"\n\n{response.text=}\n\n")
except (
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.ConnectTimeout,
) as err:
logging.error(str(err))
return redirect("/", code=302)
Loading

0 comments on commit c7969f1

Please sign in to comment.