Skip to content

Commit

Permalink
Merge pull request #1406 from anarkiwi/buf
Browse files Browse the repository at this point in the history
last data time, buffer.
  • Loading branch information
anarkiwi authored Sep 15, 2024
2 parents e1ebabe + 4894a6f commit 827a935
Show file tree
Hide file tree
Showing 13 changed files with 631 additions and 657 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ COPY --from=iqtlabs/gamutrf-base:latest /usr/local /usr/local
WORKDIR /gamutrf
COPY poetry.lock pyproject.toml README.md /gamutrf/
# dependency install is cached for faster rebuild, if only gamutrf source changed.
RUN if [ "${POETRY_CACHE}" != "" ] ; then echo using cache "${POETRY_CACHE}" ; poetry source add --priority=default local "${POETRY_CACHE}" ; fi
RUN if [ "${POETRY_CACHE}" != "" ] ; then echo using cache "${POETRY_CACHE}" ; poetry source add --priority=default local "${POETRY_CACHE}" ; poetry lock ; fi
# TODO: handle caching
RUN for i in bjoern falcon-cors gpsd-py3 ; do poetry run pip install --no-cache-dir "$i"=="$(grep $i pyproject.toml | grep -Eo '\"[0-9\.]+' | sed 's/\"//g')" || exit 1 ; done
RUN poetry install --no-interaction --no-ansi --no-dev --no-root
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libvulkan-dev \
pybind11-dev
WORKDIR /root
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.120
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.121
COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs
WORKDIR /root/gr-iqtlabs/build
COPY --from=iqtlabs/gamutrf-sigmf:latest /usr/local /usr/local
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile.waterfall
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ RUN wget -Obootstrap.zip https://github.com/twbs/bootstrap/releases/download/v5.
rm bootstrap.zip
WORKDIR /gamutrflib
COPY gamutrflib /gamutrflib/
RUN if [ "${POETRY_CACHE}" != "" ] ; then echo using cache "${POETRY_CACHE}" ; poetry source add --priority=default local "${POETRY_CACHE}" ; fi
RUN if [ "${POETRY_CACHE}" != "" ] ; then echo using cache "${POETRY_CACHE}" ; poetry source add --priority=default local "${POETRY_CACHE}" ; poetry lock ; fi
RUN poetry install --no-interaction --no-ansi --no-dev
WORKDIR /gamutrfwaterfall
COPY gamutrfwaterfall /gamutrfwaterfall/
# TODO: handle caching for pycairo
RUN poetry run pip install --no-cache-dir pycairo=="$(grep pycairo pyproject.toml | grep -Eo '[0-9\.]+')"
RUN if [ "${POETRY_CACHE}" != "" ] ; then echo using cache "${POETRY_CACHE}" ; poetry source add --priority=default local "${POETRY_CACHE}" ; fi
RUN if [ "${POETRY_CACHE}" != "" ] ; then echo using cache "${POETRY_CACHE}" ; poetry source add --priority=default local "${POETRY_CACHE}" ; poetry lock ; fi
RUN apt-get purge -yq python3-packaging
RUN poetry install --no-interaction --no-ansi --no-dev

Expand Down
106 changes: 36 additions & 70 deletions gamutrf/grinferenceoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# -*- coding: utf-8 -*-
import json
import logging
import queue
import sys
import threading
import time
import pmt
import zmq

NO_SIGNAL = "No signal"

try:
from gnuradio import gr # pytype: disable=import-error
from gamutrf.mqtt_reporter import MQTTReporter
Expand All @@ -20,9 +20,6 @@
sys.exit(1)


DELIM = "\n\n"


class inferenceoutput(gr.basic_block):
def __init__(
self,
Expand All @@ -37,66 +34,26 @@ def __init__(
external_gps_server_port,
log_path,
):
self.q = queue.Queue()
self.running = True
self.serialno = 0
self.reporter_thread = threading.Thread(
target=self.run_reporter_thread,
args=(
name,
zmq_addr,
mqtt_server,
gps_server,
compass,
use_external_gps,
use_external_heading,
external_gps_server,
external_gps_server_port,
log_path,
),
)
self.reporter_thread.start()
gr.basic_block.__init__(
self,
name="inferenceoutput",
in_sig=None,
out_sig=None,
)
self.message_port_register_in(pmt.intern("inference"))
self.set_msg_handler(pmt.intern("inference"), self.receive_pdu)

def receive_pdu(self, pdu):
self.q.put(json.loads(bytes(pmt.to_python(pmt.cdr(pdu))).decode("utf8")))

def stop(self):
self.running = False
self.reporter_thread.join()

def run_reporter_thread(
self,
name,
zmq_addr,
mqtt_server,
gps_server,
compass,
use_external_gps,
use_external_heading,
external_gps_server,
external_gps_server_port,
log_path,
):
start_time = time.time()
zmq_context = None
zmq_pub = None
self.serialno = 0
self.start_time = time.time()
self.zmq_context = None
self.zmq_pub = None
if zmq_addr:
zmq_context = zmq.Context()
zmq_pub = zmq_context.socket(zmq.PUB)
zmq_pub.setsockopt(zmq.SNDHWM, 100)
zmq_pub.setsockopt(zmq.SNDBUF, 65536)
zmq_pub.bind(zmq_addr)
mqtt_reporter = None
self.zmq_context = zmq.Context()
self.zmq_pub = self.zmq_context.socket(zmq.PUB)
self.zmq_pub.setsockopt(zmq.SNDHWM, 100)
self.zmq_pub.setsockopt(zmq.SNDBUF, 65536)
self.zmq_pub.bind(zmq_addr)
self.mqtt_reporter = None
self.log_path = log_path
if mqtt_server:
mqtt_reporter = MQTTReporter(
self.mqtt_reporter = MQTTReporter(
name=name,
mqtt_server=mqtt_server,
gps_server=gps_server,
Expand All @@ -106,16 +63,25 @@ def run_reporter_thread(
external_gps_server=external_gps_server,
external_gps_server_port=external_gps_server_port,
)
while self.running:
try:
item = self.q.get(block=True, timeout=1)
except queue.Empty:
continue
self.serialno += 1
logging.info("inference output %u: %s", self.serialno, item)
if zmq_pub is not None:
zmq_pub.send_string(json.dumps(item) + DELIM, flags=zmq.NOBLOCK)
if mqtt_reporter is not None:
mqtt_reporter.publish("gamutrf/inference", item)
mqtt_reporter.log(log_path, "inference", start_time, item)
self.q.task_done()
self.message_port_register_in(pmt.intern("inference"))
self.set_msg_handler(pmt.intern("inference"), self.receive_pdu)

def stop(self):
if self.zmq_pub is not None:
self.zmq_pub.close()

def receive_pdu(self, pdu):
item = json.loads(bytes(pmt.to_python(pmt.cdr(pdu))).decode("utf8"))
self.serialno += 1
try:
predictions = set(item["predictions"].keys())
if NO_SIGNAL in predictions:
return
except KeyError:
pass
logging.info("inference output %u: %s", self.serialno, item)
if self.zmq_pub is not None:
self.zmq_pub.send_string(json.dumps(item), flags=zmq.NOBLOCK)
if self.mqtt_reporter is not None:
self.mqtt_reporter.publish("gamutrf/inference", item)
self.mqtt_reporter.log(self.log_path, "inference", self.start_time, item)
Loading

0 comments on commit 827a935

Please sign in to comment.