diff --git a/gamutrf/grinference2mqtt.py b/gamutrf/grinference2mqtt.py index 82943b23..4990ed23 100644 --- a/gamutrf/grinference2mqtt.py +++ b/gamutrf/grinference2mqtt.py @@ -34,7 +34,7 @@ def __init__( external_gps_server_port, log_path, ): - self.yaml_buffer = "" + self.json_buffer = "" self.mqtt_reporter = None self.q = queue.Queue() self.mqtt_reporter_thread = threading.Thread( @@ -94,14 +94,14 @@ def work(self, input_items, output_items): for input_item in input_items: raw_input_item = input_item.tobytes().decode("utf8") n += len(raw_input_item) - self.yaml_buffer += raw_input_item + self.json_buffer += raw_input_item while True: - delim_pos = self.yaml_buffer.find(DELIM) + delim_pos = self.json_buffer.find(DELIM) if delim_pos == -1: break - raw_item = self.yaml_buffer[:delim_pos] + raw_item = self.json_buffer[:delim_pos] item = json.loads(raw_item) - self.yaml_buffer = self.yaml_buffer[delim_pos + len(DELIM) :] + self.json_buffer = self.json_buffer[delim_pos + len(DELIM) :] self.process_item(item) return n diff --git a/gamutrf/grscan.py b/gamutrf/grscan.py index ec6d6bd6..d9e7f22a 100644 --- a/gamutrf/grscan.py +++ b/gamutrf/grscan.py @@ -41,6 +41,8 @@ def __init__( freq_start=100e6, gps_server="", igain=0, + inference_addr="0.0.0.0", # nosec + inference_port=8002, inference_min_confidence=0.5, inference_min_db=-200, inference_model_server="", @@ -204,9 +206,12 @@ def __init__( not pretune and low_power_hold_down, ) self.fft_blocks.append(retune_fft) - zmq_addr = f"tcp://{logaddr}:{logport}" - logging.info("serving FFT on %s", zmq_addr) - self.fft_blocks.append((zeromq.pub_sink(1, 1, zmq_addr, 100, False, 65536, ""))) + fft_zmq_addr = f"tcp://{logaddr}:{logport}" + inference_zmq_addr = f"tcp://{inference_addr}:{inference_port}" + logging.info("serving FFT on %s", fft_zmq_addr) + self.fft_blocks.append( + (zeromq.pub_sink(1, 1, fft_zmq_addr, 100, False, 65536, "")) + ) self.inference_blocks = [] if inference_output_dir: @@ -255,20 +260,23 @@ def __init__( ) ] ) - else: - self.inference_blocks.extend([blocks.null_sink(1)]) - if not self.inference_blocks: - self.inference_blocks = [blocks.null_sink(gr.sizeof_float * nfft)] - if pretune: self.msg_connect((self.retune_pre_fft, "tune"), (self.sources[0], cmd_port)) self.msg_connect((self.retune_pre_fft, "tune"), (retune_fft, "cmd")) else: self.msg_connect((retune_fft, "tune"), (self.sources[0], cmd_port)) self.connect_blocks(self.sources[0], self.sources[1:]) - self.connect((retune_fft, 1), (self.inference_blocks[0], 0)) - self.connect_blocks(self.inference_blocks[0], self.inference_blocks[1:]) + if self.inference_blocks: + self.connect((retune_fft, 1), (self.inference_blocks[0], 0)) + self.connect_blocks(self.inference_blocks[0], self.inference_blocks[1:]) + self.connect_blocks( + self.inference_blocks[0], + [zeromq.pub_sink(1, 1, inference_zmq_addr, 100, False, 65536, "")], + ) + else: + self.connect((retune_fft, 1), (blocks.null_sink(gr.sizeof_float * nfft))) + for pipeline_blocks in ( self.fft_blocks, self.samples_blocks, diff --git a/gamutrf/scan.py b/gamutrf/scan.py index c64cf468..96d88450 100644 --- a/gamutrf/scan.py +++ b/gamutrf/scan.py @@ -134,6 +134,20 @@ def argument_parser(): default=8001, help="Log FFT results to this port", ) + parser.add_argument( + "--inference_addr", + dest="inference_addr", + type=str, + default="0.0.0.0", # nosec + help="Log inference results to this address", + ) + parser.add_argument( + "--inference_port", + dest="inference_port", + type=int, + default=8002, + help="Log inference results to this port", + ) parser.add_argument( "--promport", dest="promport", diff --git a/orchestrator.yml b/orchestrator.yml index 053f5e4a..6a8e1048 100644 --- a/orchestrator.yml +++ b/orchestrator.yml @@ -34,6 +34,7 @@ services: ports: - '9001:9000' - '10000:10000' + - '10001:10001' cap_add: - SYS_NICE - SYS_RAWIO @@ -73,6 +74,8 @@ services: - --no-compass - --use_external_gps - --use_external_heading + - --inference_addr=0.0.0.0 + - --inference_port=10001 - --inference_min_confidence=0.25 - --inference_min_db=-80 - --inference_model_name=mini2_snr