Skip to content

Commit

Permalink
Don't use json output.
Browse files Browse the repository at this point in the history
  • Loading branch information
anarkiwi committed May 15, 2024
1 parent ce03d65 commit 4041622
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 47 deletions.
53 changes: 9 additions & 44 deletions gamutrf/grinferenceoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import threading
import time
import numpy as np
import pmt
import zmq

try:
Expand All @@ -23,36 +23,6 @@
DELIM = "\n\n"


class jsonmixer:
def __init__(self, inputs):
self.inputs = inputs
self.json_buffer = {}
for i in range(self.inputs):
self.json_buffer[i] = ""

def mix(self, input_items):
items = []
ns = {}
for i, input_item in enumerate(input_items):
raw_input_item = input_item.tobytes().decode("utf8").split("\x00")[0]
ns[i] = len(raw_input_item)
if len(raw_input_item):
self.json_buffer[i] += raw_input_item
for i in self.json_buffer:
while True:
delim_pos = self.json_buffer[i].find(DELIM)
if delim_pos == -1:
break
raw_item = self.json_buffer[i][:delim_pos]
self.json_buffer[i] = self.json_buffer[i][delim_pos + len(DELIM) :]
try:
item = json.loads(raw_item)
items.append(item)
except json.JSONDecodeError as e:
logging.error("cannot decode %s from source %u: %s", raw_item, i, e)
return (ns, items)


class inferenceoutput(gr.basic_block):
def __init__(
self,
Expand All @@ -66,13 +36,11 @@ def __init__(
external_gps_server,
external_gps_server_port,
log_path,
inputs,
):
self.mixer = jsonmixer(inputs)
self.q = queue.Queue()
self.running = True
self.reporter_thread = threading.Thread(
target=self.reporter_thread,
target=self.run_reporter_thread,
args=(
name,
zmq_addr,
Expand All @@ -90,15 +58,20 @@ def __init__(
gr.basic_block.__init__(
self,
name="inferenceoutput",
in_sig=([np.ubyte] * inputs),
in_sig=None,
out_sig=None,
)
self.message_port_register_in(pmt.intern("inference"))
self.set_msg_handler(pmt.intern("inference"), self.receive_pdu)

def receive_pdu(self, pdu):
self.q.put((bytes(pmt.to_python(pmt.cdr(pdu)))))

def stop(self):
self.running = False
self.reporter_thread.join()

def reporter_thread(
def run_reporter_thread(
self,
name,
zmq_addr,
Expand Down Expand Up @@ -144,11 +117,3 @@ def reporter_thread(
mqtt_reporter.publish("gamutrf/inference", item)
mqtt_reporter.log(log_path, "inference", start_time, item)
self.q.task_done()

def general_work(self, input_items, output_items):
ns, items = self.mixer.mix(input_items)
for i, n in ns.items():
self.consume(i, n)
for item in items:
self.q.put(item)
return 0
6 changes: 3 additions & 3 deletions gamutrf/grscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def __init__(
samp_rate=int(samp_rate),
power_inference=iq_power_inference,
background=iq_inference_background,
batch=0, # TODO: not used yet.
)
self.inference_blocks.append(self.iq_inference_block)
if self.write_samples_block:
Expand All @@ -342,7 +343,6 @@ def __init__(
external_gps_server,
external_gps_server_port,
inference_output_dir,
len(self.inference_blocks),
)
if self.iq_inference_block:
if iq_inference_squelch_db is not None:
Expand Down Expand Up @@ -374,8 +374,8 @@ def __init__(
)
else:
retune_fft_output_block = self.image_inference_block
for i, block in enumerate(self.inference_blocks):
self.connect((block, 0), (self.inference_output_block, i))
for block in self.inference_blocks:
self.msg_connect((block, "inference"), (self.inference_output_block, "inference"))

if not retune_fft_output_block:
retune_fft_output_block = blocks.null_sink(gr.sizeof_float * nfft)
Expand Down

0 comments on commit 4041622

Please sign in to comment.