From 4041622ed58a61caee1a6ab65d38ee7e8374971e Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 15 May 2024 02:40:35 +0000 Subject: [PATCH 1/5] Don't use json output. --- gamutrf/grinferenceoutput.py | 53 ++++++------------------------------ gamutrf/grscan.py | 6 ++-- 2 files changed, 12 insertions(+), 47 deletions(-) diff --git a/gamutrf/grinferenceoutput.py b/gamutrf/grinferenceoutput.py index 6669e37a..bf2ea1a7 100644 --- a/gamutrf/grinferenceoutput.py +++ b/gamutrf/grinferenceoutput.py @@ -6,7 +6,7 @@ import sys import threading import time -import numpy as np +import pmt import zmq try: @@ -23,36 +23,6 @@ DELIM = "\n\n" -class jsonmixer: - def __init__(self, inputs): - self.inputs = inputs - self.json_buffer = {} - for i in range(self.inputs): - self.json_buffer[i] = "" - - def mix(self, input_items): - items = [] - ns = {} - for i, input_item in enumerate(input_items): - raw_input_item = input_item.tobytes().decode("utf8").split("\x00")[0] - ns[i] = len(raw_input_item) - if len(raw_input_item): - self.json_buffer[i] += raw_input_item - for i in self.json_buffer: - while True: - delim_pos = self.json_buffer[i].find(DELIM) - if delim_pos == -1: - break - raw_item = self.json_buffer[i][:delim_pos] - self.json_buffer[i] = self.json_buffer[i][delim_pos + len(DELIM) :] - try: - item = json.loads(raw_item) - items.append(item) - except json.JSONDecodeError as e: - logging.error("cannot decode %s from source %u: %s", raw_item, i, e) - return (ns, items) - - class inferenceoutput(gr.basic_block): def __init__( self, @@ -66,13 +36,11 @@ def __init__( external_gps_server, external_gps_server_port, log_path, - inputs, ): - self.mixer = jsonmixer(inputs) self.q = queue.Queue() self.running = True self.reporter_thread = threading.Thread( - target=self.reporter_thread, + target=self.run_reporter_thread, args=( name, zmq_addr, @@ -90,15 +58,20 @@ def __init__( gr.basic_block.__init__( self, name="inferenceoutput", - in_sig=([np.ubyte] * inputs), + in_sig=None, out_sig=None, ) + self.message_port_register_in(pmt.intern("inference")) + self.set_msg_handler(pmt.intern("inference"), self.receive_pdu) + + def receive_pdu(self, pdu): + self.q.put((bytes(pmt.to_python(pmt.cdr(pdu))))) def stop(self): self.running = False self.reporter_thread.join() - def reporter_thread( + def run_reporter_thread( self, name, zmq_addr, @@ -144,11 +117,3 @@ def reporter_thread( mqtt_reporter.publish("gamutrf/inference", item) mqtt_reporter.log(log_path, "inference", start_time, item) self.q.task_done() - - def general_work(self, input_items, output_items): - ns, items = self.mixer.mix(input_items) - for i, n in ns.items(): - self.consume(i, n) - for item in items: - self.q.put(item) - return 0 diff --git a/gamutrf/grscan.py b/gamutrf/grscan.py index 80198081..6a15113d 100644 --- a/gamutrf/grscan.py +++ b/gamutrf/grscan.py @@ -319,6 +319,7 @@ def __init__( samp_rate=int(samp_rate), power_inference=iq_power_inference, background=iq_inference_background, + batch=0, # TODO: not used yet. ) self.inference_blocks.append(self.iq_inference_block) if self.write_samples_block: @@ -342,7 +343,6 @@ def __init__( external_gps_server, external_gps_server_port, inference_output_dir, - len(self.inference_blocks), ) if self.iq_inference_block: if iq_inference_squelch_db is not None: @@ -374,8 +374,8 @@ def __init__( ) else: retune_fft_output_block = self.image_inference_block - for i, block in enumerate(self.inference_blocks): - self.connect((block, 0), (self.inference_output_block, i)) + for block in self.inference_blocks: + self.msg_connect((block, "inference"), (self.inference_output_block, "inference")) if not retune_fft_output_block: retune_fft_output_block = blocks.null_sink(gr.sizeof_float * nfft) From cf18033b71812ed49c3b088c381a1fa1aa140ff6 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 15 May 2024 02:40:54 +0000 Subject: [PATCH 2/5] v103. --- docker/Dockerfile.base | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index 222dcf99..4dbeb80f 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libvulkan-dev \ python3-numpy WORKDIR /root -RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.102 +RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.103 COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs WORKDIR /root/gr-iqtlabs/build COPY --from=iqtlabs/gamutrf-sigmf:latest /usr/local /usr/local From 4361a1b6a4ff0b96a0f5356bc67e1744564f9a71 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 15 May 2024 02:47:58 +0000 Subject: [PATCH 3/5] serialno --- gamutrf/grinferenceoutput.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gamutrf/grinferenceoutput.py b/gamutrf/grinferenceoutput.py index bf2ea1a7..eb998fd7 100644 --- a/gamutrf/grinferenceoutput.py +++ b/gamutrf/grinferenceoutput.py @@ -39,6 +39,7 @@ def __init__( ): self.q = queue.Queue() self.running = True + self.serialno = 0 self.reporter_thread = threading.Thread( target=self.run_reporter_thread, args=( @@ -65,7 +66,7 @@ def __init__( self.set_msg_handler(pmt.intern("inference"), self.receive_pdu) def receive_pdu(self, pdu): - self.q.put((bytes(pmt.to_python(pmt.cdr(pdu))))) + self.q.put(json.loads(bytes(pmt.to_python(pmt.cdr(pdu))).decode("utf8"))) def stop(self): self.running = False @@ -110,7 +111,8 @@ def run_reporter_thread( item = self.q.get(block=True, timeout=1) except queue.Empty: continue - logging.info("inference output %s", item) + logging.info("inference output %u: %s", self.serialno, item) + self.serialno += 1 if zmq_pub is not None: zmq_pub.send_string(json.dumps(item) + DELIM, flags=zmq.NOBLOCK) if mqtt_reporter is not None: From fd76a36fb05b24be14740636a12f9a5254faf8df Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 15 May 2024 02:55:45 +0000 Subject: [PATCH 4/5] never rotate samples. --- gamutrf/grscan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gamutrf/grscan.py b/gamutrf/grscan.py index 6a15113d..d8db314b 100644 --- a/gamutrf/grscan.py +++ b/gamutrf/grscan.py @@ -211,7 +211,7 @@ def __init__( igain, sigmf, zstd=True, - rotate=(stare == False), + rotate=False, ), ] ) From 6cebf04071e9b053bc5efebd7526e4d09bce0b11 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 15 May 2024 02:56:06 +0000 Subject: [PATCH 5/5] black. --- gamutrf/grscan.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gamutrf/grscan.py b/gamutrf/grscan.py index d8db314b..2241c64a 100644 --- a/gamutrf/grscan.py +++ b/gamutrf/grscan.py @@ -375,7 +375,9 @@ def __init__( else: retune_fft_output_block = self.image_inference_block for block in self.inference_blocks: - self.msg_connect((block, "inference"), (self.inference_output_block, "inference")) + self.msg_connect( + (block, "inference"), (self.inference_output_block, "inference") + ) if not retune_fft_output_block: retune_fft_output_block = blocks.null_sink(gr.sizeof_float * nfft)