From dad2ecaa2185b5060c0c338749d82533364dbd69 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Sat, 13 Jul 2024 03:27:45 +0000 Subject: [PATCH 1/4] remove worker. --- .github/workflows/docker-test.yml | 4 +- .github/workflows/docker.yml | 9 - docker/Dockerfile.base | 1 - docker/Dockerfile.uhd-sr | 24 -- gamutrf/__main__.py | 6 - gamutrf/birdseye_rssi.py | 53 ---- gamutrf/sdr_recorder.py | 334 ----------------------- gamutrf/sigwindows.py | 40 --- gamutrf/worker.py | 428 ------------------------------ templates/scanner_form.html | 14 - tests/test_birdseye_rssi.py | 101 ------- tests/test_main.py | 8 - tests/test_sdr_recorder.py | 66 ----- tests/test_sigwindows.py | 28 -- worker.yml | 44 --- 15 files changed, 1 insertion(+), 1159 deletions(-) delete mode 100644 docker/Dockerfile.uhd-sr delete mode 100644 gamutrf/birdseye_rssi.py delete mode 100644 gamutrf/sdr_recorder.py delete mode 100644 gamutrf/sigwindows.py delete mode 100644 gamutrf/worker.py delete mode 100644 templates/scanner_form.html delete mode 100644 tests/test_birdseye_rssi.py delete mode 100644 tests/test_sdr_recorder.py delete mode 100644 tests/test_sigwindows.py delete mode 100644 worker.yml diff --git a/.github/workflows/docker-test.yml b/.github/workflows/docker-test.yml index a0312b0b..80d2f395 100644 --- a/.github/workflows/docker-test.yml +++ b/.github/workflows/docker-test.yml @@ -33,15 +33,13 @@ jobs: docker build -f docker/Dockerfile.vkfft . -t iqtlabs/gamutrf-vkfft:latest docker build -f docker/Dockerfile.sigmf docker -t iqtlabs/gamutrf-sigmf:latest docker build -f docker/Dockerfile.driver docker -t iqtlabs/gamutrf-driver:latest - docker build -f docker/Dockerfile.uhd-sr . -t iqtlabs/gamutrf-uhd-sr:latest docker build -f docker/Dockerfile.base docker -t iqtlabs/gamutrf-base:latest - docker rmi -f iqtlabs/gamutrf-vkfft:latest iqtlabs/gamutrf-driver:latest iqtlabs/gamutrf-uhd-sr:latest + docker rmi -f iqtlabs/gamutrf-vkfft:latest iqtlabs/gamutrf-driver:latest docker build -f Dockerfile . -t iqtlabs/gamutrf:latest docker build -f docker/Dockerfile.torchsig . -t iqtlabs/gamutrf-torchsig:latest docker run -t iqtlabs/gamutrf:latest gamutrf-compress_dirs --help docker run -t iqtlabs/gamutrf:latest gamutrf-offline --help docker run -t iqtlabs/gamutrf:latest gamutrf-scan --help - docker run -t iqtlabs/gamutrf:latest gamutrf-worker --help - name: offline consistency # VkFFT is more consistent, but still not perfectly consistent. run: | diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 72664280..8f2b5293 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -58,15 +58,6 @@ jobs: push: true tags: iqtlabs/gamutrf-driver:${{ steps.change_version.outputs.VERSION }} if: github.repository == 'iqtlabs/gamutrf' && github.event_name == 'push' - - name: Build and push platforms uhd-sr - uses: docker/build-push-action@v6 - with: - context: . - file: docker/Dockerfile.uhd-sr - platforms: linux/amd64,linux/arm64 - push: true - tags: iqtlabs/gamutrf-uhd-sr:${{ steps.change_version.outputs.VERSION }} - if: github.repository == 'iqtlabs/gamutrf' && github.event_name == 'push' - name: Build and push platforms base uses: docker/build-push-action@v6 with: diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index ae05c95d..ade7242e 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -41,7 +41,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && apt-get -y -q clean && rm -rf /var/lib/apt/lists/* COPY --from=iqtlabs/gnuradio:3.10.10.0 /usr/local /usr/local COPY --from=iqtlabs/gamutrf-driver /usr/local /usr/local -COPY --from=iqtlabs/gamutrf-uhd-sr /usr/local /usr/local COPY --from=gr-iqtlabs-builder /usr/local /usr/local RUN ldconfig -v RUN python3 -c "from gnuradio import soapy, iqtlabs ; from gnuradio.iqtlabs import vkfft" diff --git a/docker/Dockerfile.uhd-sr b/docker/Dockerfile.uhd-sr deleted file mode 100644 index 9a29ee82..00000000 --- a/docker/Dockerfile.uhd-sr +++ /dev/null @@ -1,24 +0,0 @@ -FROM ubuntu:24.04 AS uhd_sample_recorder-builder -ENV DEBIAN_FRONTEND=noninteractive -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential \ - ca-certificates \ - cmake \ - git \ - libboost-all-dev \ - libuhd-dev \ - libvulkan-dev \ - sudo -WORKDIR /root/.config/pip -COPY pip.conf pip.conf -WORKDIR /root -RUN git clone https://github.com/iqtlabs/uhd_sample_recorder -b v1.0.12 -WORKDIR /root/uhd_sample_recorder -RUN sed -i "/git clone/d" bin/install-deps.sh && echo "true" >> bin/install-deps.sh && bin/install-deps.sh -COPY --from=iqtlabs/gamutrf-vkfft:latest /root/VkFFT /root/uhd_sample_recorder/VkFFT -COPY --from=iqtlabs/gamutrf-sigmf:latest /usr/local /usr/local -WORKDIR /root/uhd_sample_recorder/build -RUN cmake ../lib && make -j $(nproc) && cp uhd_sample_recorder /usr/local/bin - -FROM ubuntu:24.04 -COPY --from=uhd_sample_recorder-builder /usr/local /usr/local diff --git a/gamutrf/__main__.py b/gamutrf/__main__.py index f098df45..43ac119e 100644 --- a/gamutrf/__main__.py +++ b/gamutrf/__main__.py @@ -3,7 +3,6 @@ from gamutrf.compress_dirs import main as compress_dirs_main from gamutrf.offline import main as offline_main from gamutrf.scan import main as scan_main -from gamutrf.worker import main as worker_main def compress_dirs(): @@ -19,8 +18,3 @@ def offline(): def scan(): """Entrypoint for scan""" scan_main() - - -def worker(): - """Entrypoint for worker""" - worker_main() diff --git a/gamutrf/birdseye_rssi.py b/gamutrf/birdseye_rssi.py deleted file mode 100644 index faa56711..00000000 --- a/gamutrf/birdseye_rssi.py +++ /dev/null @@ -1,53 +0,0 @@ -import sys - -try: - from gnuradio import blocks - from gnuradio import gr - from gnuradio import network -except ModuleNotFoundError as err: - print( - "Run from outside a supported environment, please run via Docker (https://github.com/IQTLabs/gamutRF#readme): %s" - % err - ) - sys.exit(1) - -from gamutrf.grsource import get_source - -FLOAT_SIZE = 4 -RSSI_UDP_ADDR = "127.0.0.1" -RSSI_UDP_PORT = 2001 -MAX_RSSI = 100 - - -class BirdsEyeRSSI(gr.top_block): - def __init__(self, args, samp_rate, center_freq, rssi_throttle=10, agc=False): - gr.top_block.__init__(self, "BirdsEyeRSSI", catch_exceptions=True) - - self.threshold = args.rssi_threshold - self.mean_window = args.mean_window - self.rssi_throttle = rssi_throttle - sources, _cmd_port, _workaround_start_hook = get_source( - sdr=args.sdr, - samp_rate=samp_rate, - gain=args.gain, - nfft=1024, - tune_step_fft=512, - agc=agc, - center_freq=center_freq, - ) - - rssi_blocks = sources + [ - blocks.complex_to_mag_squared(1), - blocks.moving_average_ff(self.mean_window, 1 / self.mean_window, 2000, 1), - blocks.nlog10_ff(10, 1, 0), - blocks.add_const_ff(-34), - blocks.keep_one_in_n(gr.sizeof_float, int(self.rssi_throttle)), - network.udp_sink( - gr.sizeof_float, 1, RSSI_UDP_ADDR, RSSI_UDP_PORT, 0, 32768, False - ), - ] - - last_block = rssi_blocks[0] - for block in rssi_blocks[1:]: - self.connect((last_block, 0), (block, 0)) - last_block = block diff --git a/gamutrf/sdr_recorder.py b/gamutrf/sdr_recorder.py deleted file mode 100644 index 4f798992..00000000 --- a/gamutrf/sdr_recorder.py +++ /dev/null @@ -1,334 +0,0 @@ -import datetime -import json -import logging -import os -import subprocess -import tempfile -import time -from urllib.parse import urlparse - -import sigmf - -from gamutrf.sigwindows import freq_excluded -from gamutrf.sigwindows import parse_freq_excluded -from gamutrf.utils import ( - ETTUS_ANT, - ETTUS_ARGS, - endianstr, -) - -IMSHOW_INTERPOLATION = os.getenv("IMSHOW_INTERPOLATION", "bilinear") -NFFT_OVERLAP = 512 -SAMPLE_TYPE = "i16" -MIN_SAMPLE_RATE = int(1e6) -MAX_SAMPLE_RATE = int(30 * 1e6) - - -class SDRRecorder: - def __init__(self, sdrargs, rotate_secs): - self.tmpdir = tempfile.TemporaryDirectory() - self.zst_fifo = os.path.join(self.tmpdir.name, "zstfifo") - self.sdrargs = sdrargs - self.rotate_secs = rotate_secs - os.mkfifo(self.zst_fifo) - - @staticmethod - def validate_request(freqs_excluded, center_freq, sample_count, sample_rate): - for arg in [center_freq, sample_count, sample_rate]: - try: - int(float(arg)) - except ValueError: - return "Invalid values in request" - if freq_excluded(int(center_freq), parse_freq_excluded(freqs_excluded)): - return "Requested frequency is excluded" - if int(sample_rate) < MIN_SAMPLE_RATE or int(sample_rate) > MAX_SAMPLE_RATE: - return "sample rate {sample_rate} out of range {MIN_SAMPLE_RATE} to {MAX_SAMPLE_RATE}" - duration_sec = int(sample_count) / int(sample_rate) - if duration_sec < 1: - return "cannot record for less than 1 second" - return None - - @staticmethod - def get_sample_file(path, epoch_time, center_freq, sample_rate, sdr, antenna, gain): - return os.path.join( - path, - f"gamutrf_recording_{sdr}_{antenna}_gain{gain}_{epoch_time}_{int(center_freq)}Hz_{int(sample_rate)}sps.{SAMPLE_TYPE}.zst", - ) - - def record_args( - self, sample_file, sample_rate, sample_count, center_freq, gain, agc, rxb - ): - raise NotImplementedError - - def write_recording( - self, sample_file, sample_rate, sample_count, center_freq, gain, agc, rxb - ): - record_status = -1 - args = self.record_args( - self.zst_fifo, sample_rate, sample_count, center_freq, gain, agc, rxb - ) - dotfile = os.path.join( - os.path.dirname(sample_file), "." + os.path.basename(sample_file) - ) - zstd_args = ["nice", "zstd", "-1", self.zst_fifo, "-o", dotfile] - logging.info("starting recording: %s", args) - with subprocess.Popen(zstd_args, stdin=subprocess.DEVNULL) as zstd_proc: - record_status = subprocess.check_call(args) - zstd_proc.communicate() - if os.path.exists(dotfile): - os.rename(dotfile, sample_file) - return record_status - - def run_recording( - self, - path, - sample_rate, - sample_count, - center_freq, - gain, - agc, - rxb, - sigmf_, - sdr, - antenna, - ): - epoch_time = int(time.time()) - meta_time = datetime.datetime.utcnow().isoformat() + "Z" - if self.rotate_secs: - ts_dir = int(epoch_time / self.rotate_secs) * self.rotate_secs - path = os.path.join(path, str(ts_dir)) - if not os.path.exists(path): - os.makedirs(path) - sample_file = self.get_sample_file( - path, str(epoch_time), center_freq, sample_rate, sdr, antenna, gain - ) - record_status = -1 - try: - record_status = self.write_recording( - sample_file, sample_rate, sample_count, center_freq, gain, agc, rxb - ) - if sigmf_: - sigmf_file = sample_file + ".sigmf-meta" - if os.path.exists(sample_file): - if not os.path.exists(sigmf_file): - meta = sigmf.SigMFFile( - skip_checksum=True, # expensive for large files - # data_file=sample_file, # don't set for ZST, confuses sigmf - global_info={ - sigmf.SigMFFile.DATATYPE_KEY: "_".join( - ("c" + SAMPLE_TYPE, endianstr()) - ), - sigmf.SigMFFile.SAMPLE_RATE_KEY: sample_rate, - sigmf.SigMFFile.VERSION_KEY: sigmf.__version__, - }, - ) - # TODO: add capture_details, source_file and gain when supported. - meta.add_capture( - 0, - metadata={ - sigmf.SigMFFile.FREQUENCY_KEY: center_freq, - sigmf.SigMFFile.DATETIME_KEY: meta_time, - }, - ) - meta.tofile(sigmf_file) - else: - logging.error("{sample_file} missing, cannot write sigmf file") - except subprocess.CalledProcessError as err: - logging.error("record failed: %s", err) - logging.info("record status: %d", record_status) - return (record_status, sample_file) - - -class EttusRecorder(SDRRecorder): - def __init__(self, sdrargs, rotate_secs): - super().__init__(sdrargs, rotate_secs) - self.worker_subprocess = None - self.last_worker_line = None - if not self.sdrargs: - self.sdrargs = ETTUS_ARGS - try: - subprocess.check_call( - [ - "/usr/local/bin/uhd_sample_recorder", - "--rate", - str(1e6), - "--args", - self.sdrargs, - "--ant", - ETTUS_ANT, - "--duration", - "1", - "--null", - "--fftnull", - "--novkfft", - "--nfft", - "0", - ] - ) - except subprocess.CalledProcessError: - raise ValueError - - def record_args( - self, sample_file, sample_rate, sample_count, center_freq, gain, _agc, rxb - ): - # Ettus "nsamps" API has an internal limit, so translate "stream for druation". - duration = round(sample_count / sample_rate) - rxb = min(rxb, sample_rate) - - args = [ - "/usr/local/bin/uhd_sample_recorder", - "--json", - "--rate", - str(sample_rate), - "--bw", - str(sample_rate), - "--gain", - str(gain), - "--args", - self.sdrargs, - "--ant", - ETTUS_ANT, - "--spb", - str(rxb), - ] - - json_args = { - "file": sample_file, - "duration": duration, - "freq": center_freq, - } - return (args, json_args) - - def write_recording( - self, sample_file, sample_rate, sample_count, center_freq, gain, agc, rxb - ): - # Ettus doesn't need a wrapper, it can do its own zst compression. - record_status = -1 - args, json_args = self.record_args( - sample_file, sample_rate, sample_count, center_freq, gain, agc, rxb - ) - - try: - - def worker_read(): - self.last_worker_line = ( - self.worker_subprocess.stdout.readline().decode("utf-8").strip() - ) - logging.info(self.last_worker_line) - - def worker_write(s): - self.worker_subprocess.stdin.write(("%s\n" % s).encode("utf-8")) - self.worker_subprocess.stdin.flush() - - if self.worker_subprocess is None: - logging.info("starting worker subprocess: %s", args) - self.worker_subprocess = subprocess.Popen( - args, stdin=subprocess.PIPE, stdout=subprocess.PIPE - ) - worker_read() - - logging.info("starting recording: %s", json_args) - worker_write(json.dumps(json_args)) - worker_read() - last_error = json.loads(self.last_worker_line).get("last_error") - if not last_error: - record_status = 0 - except ( - subprocess.SubprocessError, - BrokenPipeError, - json.decoder.JSONDecodeError, - ) as e: - logging.error(e) - if self.worker_subprocess: - self.worker_subprocess.kill() - self.worker_subprocess = None - return record_status - - -class BladeRecorder(SDRRecorder): - def record_args( - self, sample_file, sample_rate, sample_count, center_freq, gain, agc, _rxb - ): - gain_args = [ - "-e", - "set agc rx off", - "-e", - f"set gain rx {gain}", - ] - if agc: - gain_args = [ - "-e", - "set agc rx on", - ] - return ( - ["bladeRF-cli"] - + gain_args - + [ - "-e", - f"set samplerate rx {sample_rate}", - "-e", - f"set bandwidth rx {sample_rate}", - "-e", - f"set frequency rx {center_freq}", - "-e", - f"rx config file={sample_file} format=bin n={sample_count}", - "-e", - "rx start", - "-e", - "rx wait", - ] - ) - - -class LimeRecorder(SDRRecorder): - def record_args( - self, sample_file, sample_rate, sample_count, center_freq, gain, agc, _rxb - ): - gain_args = [] - if gain: - gain_args = [ - "-g", - f"{gain}", - ] - return ( - ["/usr/local/bin/LimeStream"] - + gain_args - + [ - "-f", - f"{center_freq}", - "-s", - f"{sample_rate}", - "-C", - f"{sample_count}", - "-r", - f"{sample_file}", - ] - ) - - -class FileTestRecorder(SDRRecorder): - def record_args( - self, sample_file, sample_rate, sample_count, center_freq, gain, agc, rxb - ): - args = [ - "dd", - f"if={urlparse(self.sdrargs).path}", - f"of={sample_file}", - f"count={int(sample_count)}", - f"bs={int(sample_rate)}", - ] - return args - - -RECORDER_MAP = { - "ettus": EttusRecorder, - "bladerf": BladeRecorder, - "lime": LimeRecorder, -} - - -def get_recorder(recorder_name, sdrargs=None, rotate_secs=0): - try: - return RECORDER_MAP[recorder_name](sdrargs, rotate_secs) - except KeyError: - return FileTestRecorder(recorder_name, rotate_secs) diff --git a/gamutrf/sigwindows.py b/gamutrf/sigwindows.py deleted file mode 100644 index d2ad13e7..00000000 --- a/gamutrf/sigwindows.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/python3 - -import numpy as np -from gamutrf.utils import SCAN_FRES, SCAN_FROLL - - -ROLLOVERHZ = 100e6 -CSV = ".csv" -ROLLING_FACTOR = int(SCAN_FROLL / SCAN_FRES) - - -def parse_freq_excluded(freq_exclusions_raw): - freq_exclusions = [] - for pair in freq_exclusions_raw: - freq_min, freq_max = pair.split("-") - if len(freq_min): - freq_min = int(freq_min) - else: - freq_min = None - if len(freq_max): - freq_max = int(freq_max) - else: - freq_max = None - freq_exclusions.append((freq_min, freq_max)) - return tuple(freq_exclusions) - - -def freq_excluded(freq, freq_exclusions): - for freq_min, freq_max in freq_exclusions: - if freq_min is not None and freq_max is not None: - if freq >= freq_min and freq <= freq_max: - return True - continue - if freq_min is None: - if freq <= freq_max: - return True - continue - if freq >= freq_min: - return True - return False diff --git a/gamutrf/worker.py b/gamutrf/worker.py deleted file mode 100644 index 8082a4f1..00000000 --- a/gamutrf/worker.py +++ /dev/null @@ -1,428 +0,0 @@ -import argparse -import json -import logging -import os -import queue -import socket -import struct -import sys -import threading -import time - -import bjoern -import falcon -from falcon_cors import CORS - -from gamutrf.__init__ import __version__ -from gamutrf.birdseye_rssi import BirdsEyeRSSI -from gamutrf.birdseye_rssi import FLOAT_SIZE -from gamutrf.birdseye_rssi import MAX_RSSI -from gamutrf.birdseye_rssi import RSSI_UDP_ADDR -from gamutrf.birdseye_rssi import RSSI_UDP_PORT -from gamutrf.mqtt_reporter import MQTTReporter -from gamutrf.sdr_recorder import get_recorder -from gamutrf.sdr_recorder import RECORDER_MAP - -WORKER_NAME = os.getenv("WORKER_NAME", socket.gethostbyname(socket.gethostname())) -ORCHESTRATOR = os.getenv("ORCHESTRATOR", "orchestrator") -ANTENNA = os.getenv("ANTENNA", "") - - -def argument_parser(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--loglevel", - "-l", - help="Set logging level", - choices=["critical", "error", "warning", "info", "debug"], - default="info", - ) - parser.add_argument( - "--antenna", "-a", help="Antenna make/model", type=str, default=ANTENNA - ) - parser.add_argument( - "--name", "-n", help="Name for the worker", type=str, default=WORKER_NAME - ) - parser.add_argument( - "--path", - "-P", - help="Path prefix for writing out samples to", - type=str, - default="/data/gamutrf", - ) - parser.add_argument( - "--port", "-p", help="Port to run the API webserver on", type=int, default=8000 - ) - parser.add_argument( - "--rotate_secs", - help="If > 0, rotate storage directories every N seconds", - type=int, - default=3600, - ) - parser.add_argument( - "--sdr", - "-s", - help=f"Specify SDR to record with {list(RECORDER_MAP.keys())} or file", - type=str, - default="ettus", - ) - parser.add_argument( - "--sdrargs", - help=f"optional SDR arguments", - type=str, - default="", - ) - parser.add_argument( - "--freq_excluded", - "-e", - help='Freq range to exclude in MHz (e.g. "100-200")', - action="append", - default=[], - ) - parser.add_argument("--gain", "-g", help="Gain in dB", default=30, type=int) - parser.add_argument( - "--mean_window", "-m", help="birdseye mean window size", default=128, type=int - ) - parser.add_argument( - "--rxb", help="Receive buffer size", default=int(1024 * 1024 * 10), type=int - ) - parser.add_argument( - "--qsize", help="Max request queue size", default=int(2), type=int - ) - parser.add_argument( - "--mqtt_server", - help="MQTT server to report RSSI", - default=ORCHESTRATOR, - type=str, - ) - parser.add_argument( - "--gps_server", - help="GPS Server to get lat,long, and heading", - default=ORCHESTRATOR, - type=str, - ) - parser.add_argument( - "--rssi_interval", - help="rate limit in seconds for RSSI updates to MQTT", - default=1.0, - type=float, - ) - parser.add_argument( - "--rssi_throttle", - help="rate limit RSSI calculations to 1 in n", - default=10, - type=int, - ) - parser.add_argument( - "--rssi_threshold", help="RSSI reporting threshold", default=-45, type=float - ) - external_rssi_parser = parser.add_mutually_exclusive_group(required=False) - external_rssi_parser.add_argument( - "--rssi_external", - dest="rssi_external", - action="store_true", - default=True, - help="proxy external RSSI", - ) - external_rssi_parser.add_argument( - "--no-rssi_external", - dest="rssi_external", - action="store_false", - help="do not use proxy external RSSI", - ) - agc_parser = parser.add_mutually_exclusive_group(required=False) - agc_parser.add_argument( - "--agc", dest="agc", action="store_true", default=True, help="use AGC" - ) - agc_parser.add_argument( - "--no-agc", dest="agc", action="store_false", help="do not use AGC" - ) - parser.add_argument( - "--sigmf", - dest="sigmf", - default=True, - action=argparse.BooleanOptionalAction, - help="add sigmf meta file", - ) - parser.add_argument( - "--use_external_gps", - dest="use_external_gps", - default=False, - action=argparse.BooleanOptionalAction, - help="Use external Pixhawk/MAVLINK GPS", - ) - parser.add_argument( - "--use_external_heading", - dest="use_external_heading", - default=False, - action=argparse.BooleanOptionalAction, - help="Use external (Pixhawk/MAVLINK) heading", - ) - parser.add_argument( - "--external_gps_server", - dest="external_gps_server", - default=ORCHESTRATOR, - type=str, - help="server to query for external GPS data", - ) - parser.add_argument( - "--external_gps_server_port", - dest="external_gps_server_port", - default="8888", - type=str, - help="server port to query for external GPS data", - ) - - return parser - - -class Endpoints: - @staticmethod - def on_get(_req, resp): - endpoints = [] - for path in API.paths(): - endpoints.append(API.version() + path) - - resp.text = json.dumps(endpoints) - resp.content_type = falcon.MEDIA_TEXT - resp.status = falcon.HTTP_200 - - -class Info: - def __init__(self, arguments): - self.arguments = arguments - - def on_get(self, _req, resp): - resp.text = json.dumps( - { - "version": __version__, - "sdr": self.arguments.sdr, - "path_prefix": self.arguments.path, - "freq_excluded": self.arguments.freq_excluded, - } - ) - resp.content_type = falcon.MEDIA_TEXT - resp.status = falcon.HTTP_200 - - -class Action: - def __init__(self, arguments, q, sdr_recorder): - self.arguments = arguments - self.q = q - self.sdr_recorder = sdr_recorder - self.action = None - - def on_get(self, _req, resp, center_freq, sample_count, sample_rate): - # TODO check if chosen SDR can do the supplied sample_count - resp.content_type = falcon.MEDIA_JSON - resp.status = falcon.HTTP_400 - - status = None - if self.q.full(): - status = "Request queue is full" - else: - status = self.sdr_recorder.validate_request( - self.arguments.freq_excluded, center_freq, sample_count, sample_rate - ) - - if status is None: - self.q.put( - { - "action": self.action, - "center_freq": int(center_freq), - "sample_count": int(sample_count), - "sample_rate": int(sample_rate), - } - ) - status = "Requsted recording" - resp.status = falcon.HTTP_200 - - resp.text = json.dumps({"status": status}) - - -class Record(Action): - def __init__(self, arguments, q, sdr_recorder): - super().__init__(arguments, q, sdr_recorder) - self.action = "record" - - -class Rssi(Action): - def __init__(self, arguments, q, sdr_recorder): - super().__init__(arguments, q, sdr_recorder) - self.action = "record" - - -class API: - def __init__(self, arguments): - self.arguments = arguments - self.mqtt_reporter = MQTTReporter( - name=self.arguments.name, - mqtt_server=self.arguments.mqtt_server, - gps_server=self.arguments.gps_server, - compass=True, - use_external_gps=self.arguments.use_external_gps, - use_external_heading=self.arguments.use_external_heading, - external_gps_server=self.arguments.external_gps_server, - external_gps_server_port=self.arguments.external_gps_server_port, - ) - self.q = queue.Queue(self.arguments.qsize) - self.sdr_recorder = get_recorder( - self.arguments.sdr, self.arguments.sdrargs, self.arguments.rotate_secs - ) - self.start_time = time.time() - cors = CORS(allow_all_origins=True) - self.app = falcon.App(middleware=[cors.middleware]) - routes = self.routes() - for route, handler in routes.items(): - self.app.add_route(self.version() + route, handler) - - def run_recorder(self, record_func): - logging.info("run recorder") - while True: - logging.info("awaiting request") - action_args = self.q.get() - action = action_args["action"] - if action == "record": - self.serve_recording(record_func, action_args) - elif action == "rssi": - self.serve_rssi(action_args) - else: - logging.error("no such action: %s", action) - - def record(self, center_freq, sample_count, sample_rate=20e6): - return self.sdr_recorder.run_recording( - self.arguments.path, - sample_rate, - sample_count, - center_freq, - self.arguments.gain, - self.arguments.agc, - self.arguments.rxb, - self.arguments.sigmf, - self.arguments.sdr, - self.arguments.antenna, - ) - - def serve_recording(self, record_func, record_args): - logging.info(f"got a request: {record_args}") - record_status = record_func(**record_args) - if record_status == -1: - # TODO this only kills the thread, not the main process - return - record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish("gamutrf/record", record_args) - self.mqtt_reporter.log( - self.arguments.path, "record", self.start_time, record_args - ) - - def report_rssi(self, record_args, reported_rssi, reported_time): - logging.info(f'reporting RSSI {reported_rssi} for {record_args["center_freq"]}') - record_args.update({"rssi": reported_rssi, "time": reported_time}) - record_args.update(vars(self.arguments)) - self.mqtt_reporter.publish("gamutrf/rssi", record_args) - self.mqtt_reporter.log( - self.arguments.path, "rssi", self.start_time, record_args - ) - - def process_rssi(self, record_args, sock): - last_rssi_time = 0 - duration = 0 - if record_args["sample_count"]: - duration = float(record_args["sample_count"]) / float( - record_args["sample_rate"] - ) - start_time = time.time() - while self.q.empty(): - rssi_raw, _ = sock.recvfrom(FLOAT_SIZE) - rssi = struct.unpack("f", rssi_raw)[0] - if rssi < self.arguments.rssi_threshold: - continue - if rssi > MAX_RSSI: - continue - now = time.time() - if duration and now - start_time > duration: - break - now_diff = now - last_rssi_time - if now_diff < self.arguments.rssi_interval: - continue - self.report_rssi(record_args, rssi, now) - last_rssi_time = now - - def proxy_rssi(self, rssi_addr, record_args): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: - # codeql[py/bind-socket-all-network-interfaces] - sock.bind((rssi_addr, RSSI_UDP_PORT)) # nosec - self.process_rssi(record_args, sock) - - def serve_rssi(self, record_args): - logging.info(f"got request {record_args}") - if self.arguments.rssi_external: - logging.info("proxying external RSSI") - # codeql[py/bind-socket-all-network-interfaces] - self.proxy_rssi("0.0.0.0", record_args) # nosec - else: - center_freq = int(record_args["center_freq"]) - try: - rssi_server = BirdsEyeRSSI( - self.arguments, - record_args["sample_rate"], - center_freq, - agc=self.arguments.agc, - rssi_throttle=self.arguments.rssi_throttle, - ) - except RuntimeError as err: - logging.error("could not initialize RSSI server: %s", err) - return - rssi_server.start() - logging.info( - f"serving RSSI for {center_freq}Hz over threshold {self.arguments.rssi_threshold} with AGC {self.arguments.agc}" - ) - self.proxy_rssi(RSSI_UDP_ADDR, record_args) - logging.info("RSSI stream stopped") - rssi_server.stop() - rssi_server.wait() - - @staticmethod - def paths(): - return [ - "", - "/info", - "/record/{center_freq}/{sample_count}/{sample_rate}", - "/rssi/{center_freq}/{sample_count}/{sample_rate}", - ] - - @staticmethod - def version(): - return "/v1" - - def routes(self): - p = self.paths() - endpoints = Endpoints() - info = Info(self.arguments) - record = Record(self.arguments, self.q, self.sdr_recorder) - rssi = Rssi(self.arguments, self.q, self.sdr_recorder) - funcs = [endpoints, info, record, rssi] - return dict(zip(p, funcs)) - - def run(self): - logging.info("starting recorder thread") - recorder_thread = threading.Thread( - target=self.run_recorder, args=(self.record,) - ) - recorder_thread.start() - - logging.info("starting API thread") - bjoern.run(self.app, "0.0.0.0", self.arguments.port) # nosec - recorder_thread.join() - - -def main(): - arguments = argument_parser().parse_args() - level_int = {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10} - level = level_int.get(arguments.loglevel.upper(), 0) - logging.basicConfig(level=level, format="%(asctime)s %(message)s") - try: - app = API(arguments) - except ValueError: - sys.exit(1) - app.run() diff --git a/templates/scanner_form.html b/templates/scanner_form.html deleted file mode 100644 index c7d10ca7..00000000 --- a/templates/scanner_form.html +++ /dev/null @@ -1,14 +0,0 @@ -
-

Current signals:

- {% for bin in bins %} -

{{ bin[0] }} ({{ bin[1] }})

- {% endfor %} -

Make a request to record below:

-

Worker IP or name

-

Action (e.g. record, rssi

-

Frequency

-

Bandwidth in Mhz (20 max)

-

Duration in seconds

-

Repeat (-1 to repeat forever)

-

-
diff --git a/tests/test_birdseye_rssi.py b/tests/test_birdseye_rssi.py deleted file mode 100644 index 0f0531c7..00000000 --- a/tests/test_birdseye_rssi.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/python3 -import glob -import json -import os -import subprocess -import tempfile -import time -import unittest - -import docker -import numpy as np -import requests - -from gamutrf.birdseye_rssi import BirdsEyeRSSI - - -class FakeArgs: - def __init__(self, filename): - self.sdr = "file:" + filename - self.threshold = -100 - self.mean_window = 4096 - self.rssi_threshold = -100 - self.gain = 10 - - -class BirdseyeRSSITestCase(unittest.TestCase): - def test_birdseye_smoke(self): - with tempfile.TemporaryDirectory() as tmpdir: - filename = os.path.join(tmpdir, "gamutrf_recording1_1000Hz_1000sps.raw") - subprocess.check_call( - ["dd", "if=/dev/zero", "of=" + filename, "bs=1M", "count=1"] - ) - tb = BirdsEyeRSSI(FakeArgs(filename), 1e3, 1e3) - tb.start() - time.sleep(10) - tb.stop() - tb.wait() - - def verify_birdseye_stream(self, gamutdir, freq): - sample_rate = 1000000 - duration = 10 - sample_count = duration * sample_rate - for _ in range(15): - try: - response = requests.get( - f"http://localhost:8000/v1/rssi/{int(freq)}/{sample_count}/{sample_rate}" - ) - self.assertEqual(200, response.status_code, response) - break - except requests.exceptions.ConnectionError: - time.sleep(1) - mqtt_logs = None - line_json = None - for _ in range(duration): - self.assertTrue(os.path.exists(gamutdir)) - mqtt_logs = glob.glob(os.path.join(gamutdir, "mqtt-rssi-*log")) - for log_name in mqtt_logs: - with open(log_name, "r") as log: - for line in log.readlines(): - line_json = json.loads(line) - self.assertGreater(-10, line_json["rssi"]) - if line_json["center_freq"] == freq: - self.assertEqual(freq, line_json["center_freq"], line_json) - return - time.sleep(1) - - def test_birdseye_endtoend_rssi(self): - test_tag = "iqtlabs/gamutrf:latest" - with tempfile.TemporaryDirectory() as tempdir: - testraw = os.path.join(tempdir, "gamutrf_recording1_1000Hz_1000sps.raw") - gamutdir = os.path.join(tempdir, "gamutrf") - testdata = ( - np.random.random(int(1e6)).astype(np.float32) - + np.random.random(int(1e6)).astype(np.float32) * 1j - ) - with open(testraw, "wb") as testrawfile: - testdata.tofile(testrawfile) - os.mkdir(gamutdir) - client = docker.from_env() - client.images.build(dockerfile="Dockerfile", path=".", tag=test_tag) - container = client.containers.run( - test_tag, - command=[ - "gamutrf-worker", - "--rssi_threshold=-100", - f"--sdr=file:{testraw}", - ], - ports={"8000/tcp": 8000}, - volumes={tempdir: {"bind": "/data", "mode": "rw"}}, - detach=True, - ) - self.verify_birdseye_stream(gamutdir, 10e6) - self.verify_birdseye_stream(gamutdir, 99e6) - try: - container.kill() - except requests.exceptions.HTTPError as err: - print(str(err)) - - -if __name__ == "__main__": # pragma: no cover - unittest.main() diff --git a/tests/test_main.py b/tests/test_main.py index a0da6690..cc9a7ac5 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,20 +1,12 @@ import pytest import sys -from gamutrf.__main__ import worker from gamutrf.__main__ import scan sys.argv.append("-h") -def test_main_worker(): - with pytest.raises(SystemExit) as pytest_wrapped_e: - worker() - assert pytest_wrapped_e.type == SystemExit - assert pytest_wrapped_e.value.code == 0 - - def test_main_scan(): with pytest.raises(SystemExit) as pytest_wrapped_e: scan() diff --git a/tests/test_sdr_recorder.py b/tests/test_sdr_recorder.py deleted file mode 100644 index ca203ca2..00000000 --- a/tests/test_sdr_recorder.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/python3 -import os -import subprocess -import tempfile -import unittest - -from gamutrf.sdr_recorder import get_recorder - - -class SDRRecorderTestCase(unittest.TestCase): - SAMPLES = 1e3 * 4 - - def get_file_recorder(self, tmpdir): - filename = os.path.join(tmpdir, "gamutrf_recording1_1000Hz_1000sps.raw") - subprocess.check_call( - ["dd", "if=/dev/zero", "of=" + filename, "bs=1M", "count=1"] - ) - return get_recorder("file:" + filename, 3600) - - def test_sdr_recorder(self): - with tempfile.TemporaryDirectory() as tmpdir: - sdr_recorder = self.get_file_recorder(tmpdir) - record_status, sample_file = sdr_recorder.run_recording( - tmpdir, - self.SAMPLES, - self.SAMPLES, - self.SAMPLES, - 0, - False, - 0, - sigmf_=False, - sdr="zero", - antenna="omni", - ) - self.assertEqual(0, record_status) - self.assertTrue(os.path.exists(sample_file)) - sdr_recorder.tmpdir.cleanup() - with tempfile.TemporaryDirectory() as tmpdir: - sdr_recorder = self.get_file_recorder(tmpdir) - # TODO: sigmf 1.0.0 can't parse .zst files, but it can write the metadata fine. - record_status, sample_file = sdr_recorder.run_recording( - tmpdir, - self.SAMPLES, - self.SAMPLES, - self.SAMPLES, - 0, - False, - 0, - sigmf_=True, - sdr="zero", - antenna="directional", - ) - self.assertTrue(os.path.exists(sample_file)) - self.assertGreater(os.path.getsize(sample_file), 0) - self.assertTrue(os.path.exists(sample_file + ".sigmf-meta")) - sdr_recorder.tmpdir.cleanup() - - with tempfile.TemporaryDirectory() as tmpdir: - sdr_recorder = self.get_file_recorder(tmpdir) - self.assertNotEqual(None, sdr_recorder.validate_request([], 1e6, 0, 0)) - self.assertNotEqual(None, sdr_recorder.validate_request([], 1e6, 1, 1)) - self.assertEqual(None, sdr_recorder.validate_request([], 1e6, 1e6, 1e6)) - - -if __name__ == "__main__": # pragma: no cover - unittest.main() diff --git a/tests/test_sigwindows.py b/tests/test_sigwindows.py deleted file mode 100644 index b1596706..00000000 --- a/tests/test_sigwindows.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/python3 -import os -import unittest - -from gamutrf.sigwindows import freq_excluded -from gamutrf.sigwindows import parse_freq_excluded - - -TESTDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data") - - -class WindowsTestCase(unittest.TestCase): - def test_freq_excluded(self): - self.assertTrue(freq_excluded(100, ((100, 200),))) - self.assertFalse(freq_excluded(99, ((100, 200),))) - self.assertTrue(freq_excluded(1e9, ((1e6, None),))) - self.assertFalse(freq_excluded(1e6, ((1e9, None),))) - self.assertFalse(freq_excluded(1e9, ((None, 1e6),))) - - def test_parse_excluded(self): - self.assertEqual( - ((100, 200), (200, None), (None, 100)), - parse_freq_excluded(["100-200", "200-", "-100"]), - ) - - -if __name__ == "__main__": # pragma: no cover - unittest.main() diff --git a/worker.yml b/worker.yml deleted file mode 100644 index ccc994d3..00000000 --- a/worker.yml +++ /dev/null @@ -1,44 +0,0 @@ ---- -# On Pi4/Ubuntu, also requires systemd.unified_cgroup_hierarchy=0 added to -# /boot/firmware/cmdline.txt, to fall back to cgroup v1. -version: "3.3" -networks: - gamutrf: - default: - external: true - name: none -services: - worker: - restart: always - image: iqtlabs/gamutrf:latest - networks: - - gamutrf - ports: - - '8000:8000' - cap_add: - - SYS_NICE - - SYS_RAWIO - privileged: true - devices: - - /dev/bus/usb:/dev/bus/usb - - /dev/dri/renderD128:/dev/dri/renderD128 - volumes: - - '${VOL_PREFIX}:/data' - environment: - - 'WORKER_NAME=${WORKER_NAME}' - - 'ORCHESTRATOR=${ORCHESTRATOR}' - - 'ANTENNA=${ANTENNA}' - command: - - nice - - '-n' - - '-19' - - gamutrf-worker - - --no-agc - - --rxb=62914560 - - '--gain=${GAIN}' - - --qsize=5 - # - --rssi_threshold=-110 - # - --rssi_throttle=10 - # - --use_external_gps - # - --use_external_heading - # - --external_gps_server=$ORCHESTRATOR From ff730afa32c4a9d4d0fce1c4bd876fba329f9b6c Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Sat, 13 Jul 2024 03:40:27 +0000 Subject: [PATCH 2/4] remove worker api. --- tests/test_api.py | 65 ----------------------------------------------- 1 file changed, 65 deletions(-) delete mode 100644 tests/test_api.py diff --git a/tests/test_api.py b/tests/test_api.py deleted file mode 100644 index a24a79ce..00000000 --- a/tests/test_api.py +++ /dev/null @@ -1,65 +0,0 @@ -import time -import pytest -from falcon import testing - -from gamutrf import worker - - -class FakeArgs: - def __init__(self): - self.name = "test" - self.mqtt_server = "" - self.gps_server = "" - self.use_external_gps = False - self.use_external_heading = False - self.external_gps_server = "" - self.external_gps_server_port = "" - self.qsize = 2 - self.sdr = "/dev/null" - self.sdrargs = "" - self.path = "" - self.gain = -40 - self.agc = False - self.rxb = 1e6 - self.freq_excluded = "" - self.sigmf = False - self.antenna = "0" - self.rssi_throttle = 1 - self.rssi_threshold = -100 - self.rssi_external = False - self.mean_window = 100 - self.rotate_secs = 3600 - - -@pytest.fixture(scope="module") -def client(): - app = worker.API(FakeArgs()) - return testing.TestClient(app.app) - - -def test_routes(client): - result = client.simulate_get("/v1") - assert result.status_code == 200 - result = client.simulate_get("/v1/info") - assert result.status_code == 200 - result = client.simulate_get("/v1/record/100000000/20000000/20000000") - assert result.status_code == 200 - - -def test_report_rssi(): - app = worker.API(FakeArgs()) - app.report_rssi({"center_freq": 1e6}, -35, time.time()) - - -def test_serve_recording(): - app = worker.API(FakeArgs()) - app.serve_recording(app.record, {"center_freq": 1e6, "sample_count": 1e6}) - - -def test_serve_rssi(): - app = worker.API(FakeArgs()) - app.serve_rssi({"center_freq": 1e6, "sample_count": 1e6, "sample_rate": 1e6}) - - -def test_argument_parse(): - worker.argument_parser() From 4f52b85bbce17b441a21836b44adb16b178a053a Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Sat, 13 Jul 2024 04:09:43 +0000 Subject: [PATCH 3/4] templates. --- Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index c4948f84..d4592c55 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,7 +32,6 @@ RUN for i in bjoern falcon-cors gpsd-py3 ; do poetry run pip install --no-cache- RUN poetry install --no-interaction --no-ansi --no-dev --no-root COPY gamutrf gamutrf/ COPY bin bin/ -COPY templates templates/ RUN poetry install --no-interaction --no-ansi --no-dev # nosemgrep:github.workflows.config.dockerfile-source-not-pinned From 7c809fa23996ba3cdc223d0b80a016725a8bffe2 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Sat, 13 Jul 2024 04:25:12 +0000 Subject: [PATCH 4/4] remove uhd_sample_recorder. --- Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d4592c55..a6c320b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -85,6 +85,5 @@ COPY --from=installer /root/.local /root/.local RUN ldconfig -v WORKDIR /gamutrf RUN echo "$(find /gamutrf/gamutrf -type f -name \*py -print)"|xargs grep -Eh "^(import|from)\s"|grep -Ev "gamutrf"|sort|uniq|python3 -RUN ldd /usr/local/bin/uhd_sample_recorder # nosemgrep:github.workflows.config.missing-user CMD ["gamutrf-scan", "--help"]