From 39864d4caa0f912d274c3a4a3ae7254ddbb35e4d Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 14 Feb 2024 00:43:41 +0000 Subject: [PATCH] make offline deterministic by frontloading tags. --- .github/workflows/docker-test.yml | 3 ++ docker/Dockerfile.base | 2 +- gamutrf/birdseye_rssi.py | 8 +++- gamutrf/grinferenceoutput.py | 5 ++- gamutrf/grscan.py | 20 +++++---- gamutrf/grsource.py | 69 +++++++++++++++++-------------- gamutrf/offline.py | 3 ++ tests/test_grscan.py | 25 ++++++++--- tests/test_offline_consistent.sh | 23 +++++++++++ 9 files changed, 112 insertions(+), 46 deletions(-) create mode 100755 tests/test_offline_consistent.sh diff --git a/.github/workflows/docker-test.yml b/.github/workflows/docker-test.yml index c37e5bee..a20ddd9c 100644 --- a/.github/workflows/docker-test.yml +++ b/.github/workflows/docker-test.yml @@ -37,3 +37,6 @@ jobs: docker run -t iqtlabs/gamutrf-waterfall:latest gamutrf-waterfall --help sudo apt-get update && sudo apt-get install -qy python3-pip docker compose -f orchestrator.yml -f worker.yml -f docker/monitoring.yml -f torchserve.yml -f specgram.yml build + - name: offline consistency + run: | + tests/test_offline_consistent.sh diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index c1574477..936281d5 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -28,7 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libvulkan-dev \ python3-numpy WORKDIR /root -RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.85 +RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.86 COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs WORKDIR /root/gr-iqtlabs/build COPY --from=sigmf-builder /usr/local /usr/local diff --git a/gamutrf/birdseye_rssi.py b/gamutrf/birdseye_rssi.py index e2a81b7b..faa56711 100644 --- a/gamutrf/birdseye_rssi.py +++ b/gamutrf/birdseye_rssi.py @@ -27,7 +27,13 @@ def __init__(self, args, samp_rate, center_freq, rssi_throttle=10, agc=False): self.mean_window = args.mean_window self.rssi_throttle = rssi_throttle sources, _cmd_port, _workaround_start_hook = get_source( - args.sdr, samp_rate, args.gain, agc, center_freq + sdr=args.sdr, + samp_rate=samp_rate, + gain=args.gain, + nfft=1024, + tune_step_fft=512, + agc=agc, + center_freq=center_freq, ) rssi_blocks = sources + [ diff --git a/gamutrf/grinferenceoutput.py b/gamutrf/grinferenceoutput.py index c10d55bd..0f25cb90 100644 --- a/gamutrf/grinferenceoutput.py +++ b/gamutrf/grinferenceoutput.py @@ -131,7 +131,10 @@ def reporter_thread( external_gps_server_port=external_gps_server_port, ) while self.running: - item = self.q.get() + try: + item = self.q.get(block=True, timeout=1) + except queue.Empty: + continue logging.info("inference output %s", item) if zmq_pub is not None: zmq_pub.send_string(json.dumps(item) + DELIM, flags=zmq.NOBLOCK) diff --git a/gamutrf/grscan.py b/gamutrf/grscan.py index 448159c6..7e6e8700 100644 --- a/gamutrf/grscan.py +++ b/gamutrf/grscan.py @@ -249,7 +249,7 @@ def __init__( [str(c) for c in [wc.blue, wc.green, wc.red]] ) - if inference_model_server and inference_model_name: + if (inference_model_server and inference_model_name) or inference_output_dir: x = 640 y = 640 self.image_inference_block = self.iqtlabs.image_inference( @@ -293,6 +293,7 @@ def __init__( self.inference_blocks.append(self.iq_inference_block) # TODO: provide new block that receives JSON-over-PMT and outputs to MQTT/zmq. + retune_fft_output_block = None if self.inference_blocks: self.inference_output_block = inferenceoutput( "inferencemqtt", @@ -311,15 +312,18 @@ def __init__( self.connect((self.retune_pre_fft, 0), (self.iq_inference_block, 0)) self.connect((self.last_db_block, 0), (self.iq_inference_block, 1)) if self.image_inference_block: - self.connect((retune_fft, 1), (self.image_inference_block, 0)) - else: - self.connect( - (retune_fft, 1), (blocks.null_sink(gr.sizeof_float * nfft)) - ) + if stare: + self.connect( + (self.last_db_block, 0), (self.image_inference_block, 0) + ) + else: + retune_fft_output_block = self.image_inference_block for i, block in enumerate(self.inference_blocks): self.connect((block, 0), (self.inference_output_block, i)) - else: - self.connect((retune_fft, 1), (blocks.null_sink(gr.sizeof_float * nfft))) + + if not retune_fft_output_block: + retune_fft_output_block = blocks.null_sink(gr.sizeof_float * nfft) + self.connect((retune_fft, 1), (retune_fft_output_block, 0)) if pretune: self.msg_connect((self.retune_pre_fft, "tune"), (self.sources[0], cmd_port)) diff --git a/gamutrf/grsource.py b/gamutrf/grsource.py index 15fda51e..79079465 100644 --- a/gamutrf/grsource.py +++ b/gamutrf/grsource.py @@ -58,18 +58,31 @@ def __init__( self.n_samples = len(self.samples) self.nfft = nfft self.tune_step_fft = tune_step_fft + self.cmds_received = 0 + self.tags_sent = -1 self.i = 0 self.message_port_register_in(pmt.intern(cmd_port)) self.set_msg_handler(pmt.intern(cmd_port), self.handle_cmd) - self.need_tags = False - self.tags_sent = 0 + self.set_output_multiple(self.nfft) + self.tagged_interval = self.nfft * self.tune_step_fft + self.n_samples = ( + int(self.n_samples / self.tagged_interval) * self.tagged_interval + ) logging.info("opened %s with %u samples", input_file, self.n_samples) def complete(self): return self.i >= self.n_samples def handle_cmd(self, _msg): - self.need_tags = True + self.cmds_received += 1 + + def make_rx_time(self, sample_time): + sample_sec = int(sample_time) + sample_fsec = sample_time - sample_sec + pmt_sample_time = pmt.make_tuple( + pmt.from_long(sample_sec), pmt.from_double(sample_fsec) + ) + return pmt_sample_time def add_tags(self): # Ideally, We want to add simulated rx_time tags in a consistent way between @@ -78,47 +91,43 @@ def add_tags(self): # a simulated tuning request is received by this block, between runs. This # will result in different timestamps. self.tags_sent += 1 + tag_pos = self.tags_sent * self.tagged_interval + sample_time = self.timestamp + (tag_pos / float(self.sample_rate)) + logging.info( + "tag %u at pos %u (nfft item %u), %.1f%%", + self.tags_sent, + tag_pos, + int(tag_pos / self.nfft), + self.i / self.n_samples * 100, + ) self.add_item_tag( 0, - self.nitems_written(0), + tag_pos, pmt.intern("rx_freq"), pmt.from_double(self.center_freq), ) - sample_time = self.timestamp + ( - (self.tags_sent * self.nfft * self.tune_step_fft) / float(self.sample_rate) - ) - logging.info("%.1f%%", self.i / self.n_samples * 100) - sample_sec = int(sample_time) - sample_fsec = sample_time - sample_sec - pmt_sample_time = pmt.make_tuple( - pmt.from_long(sample_sec), pmt.from_double(sample_fsec) - ) self.add_item_tag( 0, - self.nitems_written(0), + tag_pos, pmt.intern("rx_time"), - pmt_sample_time, + self.make_rx_time(sample_time), ) def general_work(self, input_items, output_items): if self.complete(): - logging.info("100%%") + logging.info("complete") return -1 - if self.need_tags: + n = min(self.nfft, len(output_items[0])) + samples = self.samples[self.i : self.i + n] + c = len(samples) + self.i += c + if c < len(output_items[0]): + zeros = np.zeros(len(output_items[0]) - c, dtype=np.complex64) + samples = np.append(samples, zeros) + while ( + not self.complete() and int(self.i / self.tagged_interval) != self.tags_sent + ): self.add_tags() - self.need_tags = False - if self.tags_sent: - n = min(self.nfft, len(output_items[0])) - samples = self.samples[self.i : self.i + n] - c = len(samples) - self.i += c - if c < len(output_items[0]): - zeros = np.zeros(len(output_items[0]) - c, dtype=np.complex64) - samples = np.append(samples, zeros) - else: - # feed zeros until received first tag request - samples = np.zeros(len(output_items[0]), dtype=np.complex64) - c = len(samples) output_items[0][:] = samples return c diff --git a/gamutrf/offline.py b/gamutrf/offline.py index d6e1bd78..cb1ae858 100644 --- a/gamutrf/offline.py +++ b/gamutrf/offline.py @@ -15,6 +15,7 @@ def main(): parser = argument_parser() parser.add_argument("filename", type=str, help="Recording filename (or glob)") options = parser.parse_args() + outputs = 0 for filename in glob.glob(options.filename): out_dir = os.path.dirname(filename) if out_dir == "": @@ -48,3 +49,5 @@ def main(): tb.start() tb.wait() tb.stop() + outputs += 1 + logging.info("%u filenames processed from %s", outputs, options.filename) diff --git a/tests/test_grscan.py b/tests/test_grscan.py index e173d95f..095b451f 100644 --- a/tests/test_grscan.py +++ b/tests/test_grscan.py @@ -1,4 +1,5 @@ #!/usr/bin/python3 +import logging import glob import os import subprocess @@ -95,11 +96,23 @@ def start(self): class GrscanTestCase(unittest.TestCase): def test_fake_uhd(self): - get_source("ettus", 1e3, 10, 1024, 1024, uhd_lib=FakeUHD()) + get_source( + sdr="ettus", + samp_rate=1e3, + gain=10, + nfft=1024, + tune_step_fft=1024, + uhd_lib=FakeUHD(), + ) def test_fake_soapy(self): sources, _, workaround = get_source( - "SoapyAIRT", 1e3, 10, 1024, 1024, soapy_lib=FakeSoapy() + sdr="SoapyAIRT", + samp_rate=1e3, + gain=10, + nfft=1024, + tune_step_fft=1024, + soapy_lib=FakeSoapy(), ) tb = FakeTb(sources, workaround, 100e6) tb.start() @@ -125,7 +138,7 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file): "dd", "if=/dev/urandom", f"of={sdr_file}", - f"bs={samp_rate}", + f"bs={samp_rate*8}", "count=10", ] ) @@ -135,6 +148,7 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file): freq_end=freq_end, sdr=sdr, samp_rate=samp_rate, + tune_step_fft=512, write_samples=write_samples, sample_dir=tempdir, iqtlabs=iqtlabs, @@ -152,8 +166,8 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file): del tb if not write_samples: return - self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*zst")]) - self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*sigmf-meta")]) + self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*zst")], sdr) + self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*sigmf-meta")], sdr) def test_grscan_smoke(self): for pretune in (True, False): @@ -164,4 +178,5 @@ def test_grscan_smoke(self): if __name__ == "__main__": # pragma: no cover + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s") unittest.main() diff --git a/tests/test_offline_consistent.sh b/tests/test_offline_consistent.sh new file mode 100755 index 00000000..5128f1e4 --- /dev/null +++ b/tests/test_offline_consistent.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +set -e + +TMPDIR=/tmp +TESTFILE=gamutrf_recording_ettus__gain40_1_10000000Hz_1024000sps.s16 +rm -rf "$TMPDIR/input" +mkdir "$TMPDIR/input" +dd if=/dev/urandom of="$TMPDIR/input/$TESTFILE" bs=4096000 count=10 + +for trial in {1..5} ; do + rm -rf "$TMPDIR/$trial" + mkdir "$TMPDIR/$trial" + docker run -v "$TMPDIR:/gamutrf/data" -t iqtlabs/gamutrf gamutrf-offline --tune-step-fft=512 --db_clamp_floor=-150 --nfft=1024 --rotate_secs=0 "--inference_output_dir=/gamutrf/data/$trial" "/gamutrf/data/input/$TESTFILE" +done + +for trial in {2..5} ; do + for image in $TMPDIR/1/image*png ; do + baseimage="$(basename $image)" + testimage="$TMPDIR/$trial/$baseimage" + diff -b "$image" "$testimage" + done +done