Skip to content

Commit

Permalink
Merge pull request #1139 from anarkiwi/mix
Browse files Browse the repository at this point in the history
support simultaneous I/Q and image inference.
  • Loading branch information
anarkiwi authored Feb 2, 2024
2 parents 1d58be6 + 8515761 commit 4846bfd
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 190 deletions.
4 changes: 2 additions & 2 deletions docker/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libvulkan-dev \
python3-numpy
WORKDIR /root
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.77
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.78
COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs
WORKDIR /root/gr-iqtlabs/build
COPY --from=sigmf-builder /usr/local /usr/local
Expand All @@ -51,7 +51,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libvulkan1 \
python3 \
python3-numpy \
&& apt-get -y -q clean && rm -rf /var/lib/apt/lists/*
&& apt-get -y -q clean && rm -rf /var/lib/apt/lists/*
COPY --from=iqtlabs/gnuradio:3.10.9.2 /usr/local /usr/local
COPY --from=iqtlabs/gamutrf-driver /usr/local /usr/local
COPY --from=iqtlabs/gamutrf-uhd-sr /usr/local /usr/local
Expand Down
109 changes: 0 additions & 109 deletions gamutrf/grinference2mqtt.py

This file was deleted.

147 changes: 147 additions & 0 deletions gamutrf/grinferenceoutput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import queue
import sys
import threading
import time
import numpy as np
import zmq

try:
from gnuradio import gr # pytype: disable=import-error
from gamutrf.mqtt_reporter import MQTTReporter
except ModuleNotFoundError as err: # pragma: no cover
print(
"Run from outside a supported environment, please run via Docker (https://github.com/IQTLabs/gamutRF#readme): %s"
% err
)
sys.exit(1)


DELIM = "\n\n"


class jsonmixer:
def __init__(self, inputs):
self.inputs = inputs
self.json_buffer = {}
for i in range(self.inputs):
self.json_buffer[i] = ""

def mix(self, input_items):
items = []
n = 0
for i, input_item in enumerate(input_items):
raw_input_item = input_item.tobytes().decode("utf8")
n += len(raw_input_item)
self.json_buffer[i] += raw_input_item
while True:
delim_pos = self.json_buffer[i].find(DELIM)
if delim_pos == -1:
break
raw_item = self.json_buffer[i][:delim_pos]
self.json_buffer[i] = self.json_buffer[i][delim_pos + len(DELIM) :]
try:
item = json.loads(raw_item)
items.append(item)
except json.JSONDecodeError as e:
logging.error("cannot decode %s: %s", raw_item, e)
return (n, items)


class inferenceoutput(gr.sync_block):
def __init__(
self,
name,
zmq_addr,
mqtt_server,
compass,
gps_server,
use_external_gps,
use_external_heading,
external_gps_server,
external_gps_server_port,
log_path,
inputs,
):
self.mixer = jsonmixer(inputs)
self.q = queue.Queue()
self.running = True
self.reporter_thread = threading.Thread(
target=self.reporter_thread,
args=(
name,
zmq_addr,
mqtt_server,
gps_server,
compass,
use_external_gps,
use_external_heading,
external_gps_server,
external_gps_server_port,
log_path,
),
)
self.reporter_thread.start()
gr.sync_block.__init__(
self,
name="inferenceoutput",
in_sig=([np.ubyte] * inputs),
out_sig=None,
)

def stop(self):
self.running = False
self.reporter_thread.join()

def reporter_thread(
self,
name,
zmq_addr,
mqtt_server,
gps_server,
compass,
use_external_gps,
use_external_heading,
external_gps_server,
external_gps_server_port,
log_path,
):
start_time = time.time()
zmq_context = None
zmq_pub = None
if zmq_addr:
zmq_context = zmq.Context()
zmq_pub = zmq_context.socket(zmq.PUB)
zmq_pub.setsockopt(zmq.SNDHWM, 100)
zmq_pub.setsockopt(zmq.SNDBUF, 65536)
zmq_pub.bind(zmq_addr)
mqtt_reporter = None
if mqtt_server:
mqtt_reporter = MQTTReporter(
name=name,
mqtt_server=mqtt_server,
gps_server=gps_server,
compass=compass,
use_external_gps=use_external_gps,
use_external_heading=use_external_heading,
external_gps_server=external_gps_server,
external_gps_server_port=external_gps_server_port,
)
while self.running:
item = self.q.get()
logging.info("inference output %s", item)
if zmq_pub is not None:
zmq_pub.send_string(json.dumps(item) + DELIM, flags=zmq.NOBLOCK)
if mqtt_reporter is not None:
mqtt_reporter.publish("gamutrf/inference", item)
mqtt_reporter.log(log_path, "inference", start_time, item)
self.q.task_done()

def work(self, input_items, output_items):
n, items = self.mixer.mix(input_items)
for item in items:
self.q.put(item)
return n
Loading

0 comments on commit 4846bfd

Please sign in to comment.