diff --git a/gamutrf/sigfinder.py b/gamutrf/sigfinder.py index e444d5a1..b6ea2e14 100755 --- a/gamutrf/sigfinder.py +++ b/gamutrf/sigfinder.py @@ -3,8 +3,10 @@ import json import logging import os +import pathlib import subprocess import sys +import tempfile import threading import time @@ -328,7 +330,12 @@ def zstd_file(uncompressed_file): def process_fft_lines( - args, prom_vars, buff_file, executor, proxy_result, runonce=False + args, + prom_vars, + buff_file, + executor, + proxy_result, + live_file, ): lastfreq = 0 fftbuffer = [] @@ -354,11 +361,13 @@ def process_fft_lines( openlogts = int(time.time()) with open(args.log, mode=mode, encoding="utf-8") as l: while True: + if not live_file.exists(): + return if not proxy_result.running(): logging.error( "FFT proxy stopped running: %s", proxy_result.result() ) - sys.exit(-1) + return now = int(time.time()) if now - last_fft_report > FFT_BUFFER_TIME * 2: logging.info( @@ -448,8 +457,6 @@ def process_fft_lines( rotate_age = now - openlogts if rotate_age > args.rotatesecs: rotatelognow = True - if runonce: - return fftbuffer.append((row.ts, row.freq, row.pw)) if rotatelognow: break @@ -459,11 +466,13 @@ def process_fft_lines( executor.submit(zstd_file, new_log) -def fft_proxy(args, buff_file, buffer_time=FFT_BUFFER_TIME, shutdown_str=None): +def fft_proxy( + args, buff_file, buffer_time=FFT_BUFFER_TIME, live_file=None, poll_timeout=1 +): zmq_addr = f"tcp://{args.logaddr}:{args.logport}" logging.info("connecting to %s", zmq_addr) - context = zmq.Context() - socket = context.socket(zmq.SUB) + zmq_context = zmq.Context() + socket = zmq_context.socket(zmq.SUB) socket.connect(zmq_addr) socket.setsockopt_string(zmq.SUBSCRIBE, "") packets_sent = 0 @@ -472,17 +481,19 @@ def fft_proxy(args, buff_file, buffer_time=FFT_BUFFER_TIME, shutdown_str=None): tmp_buff_file = buff_file.replace(tmp_buff_file, "." + tmp_buff_file) if os.path.exists(tmp_buff_file): os.remove(tmp_buff_file) - shutdown = False context = zstandard.ZstdCompressor() + shutdown = False while not shutdown: with open(tmp_buff_file, "wb") as zbf: with context.stream_writer(zbf) as bf: while not shutdown: - sock_txt = socket.recv() + shutdown = live_file is not None and not live_file.exists() + try: + sock_txt = socket.recv(flags=zmq.NOBLOCK) + except zmq.error.Again: + time.sleep(poll_timeout) + continue bf.write(sock_txt) - shutdown = ( - shutdown_str is not None and sock_txt.find(shutdown_str) != -1 - ) now = time.time() if ( shutdown or now - last_packet_sent_time > buffer_time @@ -495,13 +506,12 @@ def fft_proxy(args, buff_file, buffer_time=FFT_BUFFER_TIME, shutdown_str=None): os.rename(tmp_buff_file, buff_file) -def find_signals(args, prom_vars): +def find_signals(args, prom_vars, executor, live_file): buff_file = os.path.join(args.buff_path, BUFF_FILE) if os.path.exists(buff_file): os.remove(buff_file) - with concurrent.futures.ProcessPoolExecutor(2) as executor: - proxy_result = executor.submit(fft_proxy, args, buff_file) - process_fft_lines(args, prom_vars, buff_file, executor, proxy_result) + proxy_result = executor.submit(fft_proxy, args, buff_file, live_file=live_file) + process_fft_lines(args, prom_vars, buff_file, executor, proxy_result, live_file) def argument_parser(): @@ -645,15 +655,6 @@ def main(): logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s") prom_vars = init_prom_vars() - start_http_server(args.promport) - x = threading.Thread( - target=find_signals, - args=( - args, - prom_vars, - ), - ) - x.start() app = falcon.App() scanner_form = ScannerForm() result = Result() @@ -661,4 +662,27 @@ def main(): app.add_route("/", scanner_form) app.add_route("/result", result) app.add_route("/requests", active_requests) - bjoern.run(app, "0.0.0.0", args.port) # nosec + start_http_server(args.promport) + + with tempfile.TemporaryDirectory() as tmpdir: + live_file = pathlib.Path(os.path.join(tmpdir, "live_file")) + live_file.touch() + + with concurrent.futures.ProcessPoolExecutor(2) as executor: + x = threading.Thread( + target=find_signals, + args=( + args, + prom_vars, + executor, + live_file, + ), + ) + x.start() + try: + bjoern.run(app, "0.0.0.0", args.port) # nosec + except KeyboardInterrupt: + logging.info("interrupt!") + live_file.unlink() + executor.shutdown() + x.join() diff --git a/tests/test_sigfinder.py b/tests/test_sigfinder.py index 7f4125f9..2eedb3b2 100644 --- a/tests/test_sigfinder.py +++ b/tests/test_sigfinder.py @@ -2,7 +2,9 @@ import json import os +import pathlib import tempfile +import threading import time import unittest import concurrent.futures @@ -145,13 +147,14 @@ def test_argument_parser(self): argument_parser() def test_process_fft_lines(self): - with concurrent.futures.ProcessPoolExecutor(1) as executor: + with concurrent.futures.ProcessPoolExecutor() as executor: proxy_result = executor.submit(null_proxy) with tempfile.TemporaryDirectory() as tempdir: test_log = os.path.join(str(tempdir), "test.csv") test_fftlog = os.path.join(str(tempdir), "fft.csv") test_fftgraph = os.path.join(str(tempdir), "fft.png") buff_file = os.path.join(str(tempdir), "buff_file") + live_file = pathlib.Path(os.path.join(str(tempdir), "live_file")) args = FakeArgs( test_log, 60, @@ -197,14 +200,25 @@ def test_process_fft_lines(self): output["buckets"][str(freq)] = -50 freq += 1e5 bf.write(bytes(json.dumps(output) + "\n", encoding="utf8")) - process_fft_lines( - args, - prom_vars, - buff_file, - executor, - proxy_result, - runonce=True, + live_file.touch() + process_thread = threading.Thread( + target=process_fft_lines, + args=( + args, + prom_vars, + buff_file, + executor, + proxy_result, + live_file, + ), ) + process_thread.start() + for i in range(10): + if os.path.exists(test_fftlog) and os.path.exists(test_fftgraph): + break + time.sleep(1) + live_file.unlink() + process_thread.join() self.assertTrue(os.path.exists(test_fftlog)) self.assertTrue(os.path.exists(test_fftgraph)) @@ -233,6 +247,8 @@ def test_fft_proxy(self): zstd_context = zstandard.ZstdDecompressor() with tempfile.TemporaryDirectory() as tempdir: + live_file = pathlib.Path(os.path.join(tempdir, "live_file")) + live_file.touch() buff_file = os.path.join(tempdir, "buff_file") test_bytes = b"1, 2, 3\n4, 5, 6\n" shutdown_str = b"shutdown\n" @@ -241,13 +257,13 @@ def test_fft_proxy(self): socket.bind(f"tcp://{args.logaddr}:{args.logport}") with concurrent.futures.ProcessPoolExecutor(1) as executor: - executor.submit(fft_proxy, args, buff_file, 1, shutdown_str) + executor.submit(fft_proxy, args, buff_file, 1, live_file=live_file) for _ in range(5): socket.send(test_bytes) if os.path.exists(buff_file): break time.sleep(1) - socket.send(shutdown_str) + live_file.unlink() with open(buff_file, "rb") as zbf: with zstd_context.stream_reader(zbf) as bf: content = bf.read()