Skip to content

Commit

Permalink
Merge pull request #1276 from anarkiwi/v103
Browse files Browse the repository at this point in the history
V103
  • Loading branch information
anarkiwi authored May 15, 2024
2 parents ce03d65 + 6cebf04 commit eb4e143
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 50 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libvulkan-dev \
python3-numpy
WORKDIR /root
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.102
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.103
COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs
WORKDIR /root/gr-iqtlabs/build
COPY --from=iqtlabs/gamutrf-sigmf:latest /usr/local /usr/local
Expand Down
57 changes: 12 additions & 45 deletions gamutrf/grinferenceoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import threading
import time
import numpy as np
import pmt
import zmq

try:
Expand All @@ -23,36 +23,6 @@
DELIM = "\n\n"


class jsonmixer:
def __init__(self, inputs):
self.inputs = inputs
self.json_buffer = {}
for i in range(self.inputs):
self.json_buffer[i] = ""

def mix(self, input_items):
items = []
ns = {}
for i, input_item in enumerate(input_items):
raw_input_item = input_item.tobytes().decode("utf8").split("\x00")[0]
ns[i] = len(raw_input_item)
if len(raw_input_item):
self.json_buffer[i] += raw_input_item
for i in self.json_buffer:
while True:
delim_pos = self.json_buffer[i].find(DELIM)
if delim_pos == -1:
break
raw_item = self.json_buffer[i][:delim_pos]
self.json_buffer[i] = self.json_buffer[i][delim_pos + len(DELIM) :]
try:
item = json.loads(raw_item)
items.append(item)
except json.JSONDecodeError as e:
logging.error("cannot decode %s from source %u: %s", raw_item, i, e)
return (ns, items)


class inferenceoutput(gr.basic_block):
def __init__(
self,
Expand All @@ -66,13 +36,12 @@ def __init__(
external_gps_server,
external_gps_server_port,
log_path,
inputs,
):
self.mixer = jsonmixer(inputs)
self.q = queue.Queue()
self.running = True
self.serialno = 0
self.reporter_thread = threading.Thread(
target=self.reporter_thread,
target=self.run_reporter_thread,
args=(
name,
zmq_addr,
Expand All @@ -90,15 +59,20 @@ def __init__(
gr.basic_block.__init__(
self,
name="inferenceoutput",
in_sig=([np.ubyte] * inputs),
in_sig=None,
out_sig=None,
)
self.message_port_register_in(pmt.intern("inference"))
self.set_msg_handler(pmt.intern("inference"), self.receive_pdu)

def receive_pdu(self, pdu):
self.q.put(json.loads(bytes(pmt.to_python(pmt.cdr(pdu))).decode("utf8")))

def stop(self):
self.running = False
self.reporter_thread.join()

def reporter_thread(
def run_reporter_thread(
self,
name,
zmq_addr,
Expand Down Expand Up @@ -137,18 +111,11 @@ def reporter_thread(
item = self.q.get(block=True, timeout=1)
except queue.Empty:
continue
logging.info("inference output %s", item)
logging.info("inference output %u: %s", self.serialno, item)
self.serialno += 1
if zmq_pub is not None:
zmq_pub.send_string(json.dumps(item) + DELIM, flags=zmq.NOBLOCK)
if mqtt_reporter is not None:
mqtt_reporter.publish("gamutrf/inference", item)
mqtt_reporter.log(log_path, "inference", start_time, item)
self.q.task_done()

def general_work(self, input_items, output_items):
ns, items = self.mixer.mix(input_items)
for i, n in ns.items():
self.consume(i, n)
for item in items:
self.q.put(item)
return 0
10 changes: 6 additions & 4 deletions gamutrf/grscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def __init__(
igain,
sigmf,
zstd=True,
rotate=(stare == False),
rotate=False,
),
]
)
Expand Down Expand Up @@ -319,6 +319,7 @@ def __init__(
samp_rate=int(samp_rate),
power_inference=iq_power_inference,
background=iq_inference_background,
batch=0, # TODO: not used yet.
)
self.inference_blocks.append(self.iq_inference_block)
if self.write_samples_block:
Expand All @@ -342,7 +343,6 @@ def __init__(
external_gps_server,
external_gps_server_port,
inference_output_dir,
len(self.inference_blocks),
)
if self.iq_inference_block:
if iq_inference_squelch_db is not None:
Expand Down Expand Up @@ -374,8 +374,10 @@ def __init__(
)
else:
retune_fft_output_block = self.image_inference_block
for i, block in enumerate(self.inference_blocks):
self.connect((block, 0), (self.inference_output_block, i))
for block in self.inference_blocks:
self.msg_connect(
(block, "inference"), (self.inference_output_block, "inference")
)

if not retune_fft_output_block:
retune_fft_output_block = blocks.null_sink(gr.sizeof_float * nfft)
Expand Down

0 comments on commit eb4e143

Please sign in to comment.