Skip to content

Commit

Permalink
make offline deterministic by frontloading tags.
Browse files Browse the repository at this point in the history
  • Loading branch information
anarkiwi committed Feb 14, 2024
1 parent 1a4db21 commit 39864d4
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/docker-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ jobs:
docker run -t iqtlabs/gamutrf-waterfall:latest gamutrf-waterfall --help
sudo apt-get update && sudo apt-get install -qy python3-pip
docker compose -f orchestrator.yml -f worker.yml -f docker/monitoring.yml -f torchserve.yml -f specgram.yml build
- name: offline consistency
run: |
tests/test_offline_consistent.sh
2 changes: 1 addition & 1 deletion docker/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libvulkan-dev \
python3-numpy
WORKDIR /root
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.85
RUN git clone https://github.com/iqtlabs/gr-iqtlabs -b 1.0.86
COPY --from=iqtlabs/gamutrf-vkfft:latest /root /root/gr-iqtlabs
WORKDIR /root/gr-iqtlabs/build
COPY --from=sigmf-builder /usr/local /usr/local
Expand Down
8 changes: 7 additions & 1 deletion gamutrf/birdseye_rssi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ def __init__(self, args, samp_rate, center_freq, rssi_throttle=10, agc=False):
self.mean_window = args.mean_window
self.rssi_throttle = rssi_throttle
sources, _cmd_port, _workaround_start_hook = get_source(
args.sdr, samp_rate, args.gain, agc, center_freq
sdr=args.sdr,
samp_rate=samp_rate,
gain=args.gain,
nfft=1024,
tune_step_fft=512,
agc=agc,
center_freq=center_freq,
)

rssi_blocks = sources + [
Expand Down
5 changes: 4 additions & 1 deletion gamutrf/grinferenceoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ def reporter_thread(
external_gps_server_port=external_gps_server_port,
)
while self.running:
item = self.q.get()
try:
item = self.q.get(block=True, timeout=1)
except queue.Empty:
continue
logging.info("inference output %s", item)
if zmq_pub is not None:
zmq_pub.send_string(json.dumps(item) + DELIM, flags=zmq.NOBLOCK)
Expand Down
20 changes: 12 additions & 8 deletions gamutrf/grscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def __init__(
[str(c) for c in [wc.blue, wc.green, wc.red]]
)

if inference_model_server and inference_model_name:
if (inference_model_server and inference_model_name) or inference_output_dir:
x = 640
y = 640
self.image_inference_block = self.iqtlabs.image_inference(
Expand Down Expand Up @@ -293,6 +293,7 @@ def __init__(
self.inference_blocks.append(self.iq_inference_block)

# TODO: provide new block that receives JSON-over-PMT and outputs to MQTT/zmq.
retune_fft_output_block = None
if self.inference_blocks:
self.inference_output_block = inferenceoutput(
"inferencemqtt",
Expand All @@ -311,15 +312,18 @@ def __init__(
self.connect((self.retune_pre_fft, 0), (self.iq_inference_block, 0))
self.connect((self.last_db_block, 0), (self.iq_inference_block, 1))
if self.image_inference_block:
self.connect((retune_fft, 1), (self.image_inference_block, 0))
else:
self.connect(
(retune_fft, 1), (blocks.null_sink(gr.sizeof_float * nfft))
)
if stare:
self.connect(
(self.last_db_block, 0), (self.image_inference_block, 0)
)
else:
retune_fft_output_block = self.image_inference_block
for i, block in enumerate(self.inference_blocks):
self.connect((block, 0), (self.inference_output_block, i))
else:
self.connect((retune_fft, 1), (blocks.null_sink(gr.sizeof_float * nfft)))

if not retune_fft_output_block:
retune_fft_output_block = blocks.null_sink(gr.sizeof_float * nfft)
self.connect((retune_fft, 1), (retune_fft_output_block, 0))

if pretune:
self.msg_connect((self.retune_pre_fft, "tune"), (self.sources[0], cmd_port))
Expand Down
69 changes: 39 additions & 30 deletions gamutrf/grsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,31 @@ def __init__(
self.n_samples = len(self.samples)
self.nfft = nfft
self.tune_step_fft = tune_step_fft
self.cmds_received = 0
self.tags_sent = -1
self.i = 0
self.message_port_register_in(pmt.intern(cmd_port))
self.set_msg_handler(pmt.intern(cmd_port), self.handle_cmd)
self.need_tags = False
self.tags_sent = 0
self.set_output_multiple(self.nfft)
self.tagged_interval = self.nfft * self.tune_step_fft
self.n_samples = (
int(self.n_samples / self.tagged_interval) * self.tagged_interval
)
logging.info("opened %s with %u samples", input_file, self.n_samples)

def complete(self):
return self.i >= self.n_samples

def handle_cmd(self, _msg):
self.need_tags = True
self.cmds_received += 1

Check warning on line 77 in gamutrf/grsource.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/grsource.py#L77

Added line #L77 was not covered by tests

def make_rx_time(self, sample_time):
sample_sec = int(sample_time)
sample_fsec = sample_time - sample_sec
pmt_sample_time = pmt.make_tuple(

Check warning on line 82 in gamutrf/grsource.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/grsource.py#L80-L82

Added lines #L80 - L82 were not covered by tests
pmt.from_long(sample_sec), pmt.from_double(sample_fsec)
)
return pmt_sample_time

Check warning on line 85 in gamutrf/grsource.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/grsource.py#L85

Added line #L85 was not covered by tests

def add_tags(self):
# Ideally, We want to add simulated rx_time tags in a consistent way between
Expand All @@ -78,47 +91,43 @@ def add_tags(self):
# a simulated tuning request is received by this block, between runs. This
# will result in different timestamps.
self.tags_sent += 1
tag_pos = self.tags_sent * self.tagged_interval
sample_time = self.timestamp + (tag_pos / float(self.sample_rate))
logging.info(

Check warning on line 96 in gamutrf/grsource.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/grsource.py#L94-L96

Added lines #L94 - L96 were not covered by tests
"tag %u at pos %u (nfft item %u), %.1f%%",
self.tags_sent,
tag_pos,
int(tag_pos / self.nfft),
self.i / self.n_samples * 100,
)
self.add_item_tag(
0,
self.nitems_written(0),
tag_pos,
pmt.intern("rx_freq"),
pmt.from_double(self.center_freq),
)
sample_time = self.timestamp + (
(self.tags_sent * self.nfft * self.tune_step_fft) / float(self.sample_rate)
)
logging.info("%.1f%%", self.i / self.n_samples * 100)
sample_sec = int(sample_time)
sample_fsec = sample_time - sample_sec
pmt_sample_time = pmt.make_tuple(
pmt.from_long(sample_sec), pmt.from_double(sample_fsec)
)
self.add_item_tag(
0,
self.nitems_written(0),
tag_pos,
pmt.intern("rx_time"),
pmt_sample_time,
self.make_rx_time(sample_time),
)

def general_work(self, input_items, output_items):
if self.complete():
logging.info("100%%")
logging.info("complete")

Check warning on line 118 in gamutrf/grsource.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/grsource.py#L118

Added line #L118 was not covered by tests
return -1
if self.need_tags:
n = min(self.nfft, len(output_items[0]))
samples = self.samples[self.i : self.i + n]
c = len(samples)
self.i += c
if c < len(output_items[0]):
zeros = np.zeros(len(output_items[0]) - c, dtype=np.complex64)
samples = np.append(samples, zeros)
while (

Check warning on line 127 in gamutrf/grsource.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/grsource.py#L120-L127

Added lines #L120 - L127 were not covered by tests
not self.complete() and int(self.i / self.tagged_interval) != self.tags_sent
):
self.add_tags()
self.need_tags = False
if self.tags_sent:
n = min(self.nfft, len(output_items[0]))
samples = self.samples[self.i : self.i + n]
c = len(samples)
self.i += c
if c < len(output_items[0]):
zeros = np.zeros(len(output_items[0]) - c, dtype=np.complex64)
samples = np.append(samples, zeros)
else:
# feed zeros until received first tag request
samples = np.zeros(len(output_items[0]), dtype=np.complex64)
c = len(samples)
output_items[0][:] = samples
return c

Expand Down
3 changes: 3 additions & 0 deletions gamutrf/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def main():
parser = argument_parser()
parser.add_argument("filename", type=str, help="Recording filename (or glob)")
options = parser.parse_args()
outputs = 0

Check warning on line 18 in gamutrf/offline.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/offline.py#L18

Added line #L18 was not covered by tests
for filename in glob.glob(options.filename):
out_dir = os.path.dirname(filename)
if out_dir == "":
Expand Down Expand Up @@ -48,3 +49,5 @@ def main():
tb.start()
tb.wait()
tb.stop()
outputs += 1
logging.info("%u filenames processed from %s", outputs, options.filename)

Check warning on line 53 in gamutrf/offline.py

View check run for this annotation

Codecov / codecov/patch

gamutrf/offline.py#L52-L53

Added lines #L52 - L53 were not covered by tests
25 changes: 20 additions & 5 deletions tests/test_grscan.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/python3
import logging
import glob
import os
import subprocess
Expand Down Expand Up @@ -95,11 +96,23 @@ def start(self):

class GrscanTestCase(unittest.TestCase):
def test_fake_uhd(self):
get_source("ettus", 1e3, 10, 1024, 1024, uhd_lib=FakeUHD())
get_source(
sdr="ettus",
samp_rate=1e3,
gain=10,
nfft=1024,
tune_step_fft=1024,
uhd_lib=FakeUHD(),
)

def test_fake_soapy(self):
sources, _, workaround = get_source(
"SoapyAIRT", 1e3, 10, 1024, 1024, soapy_lib=FakeSoapy()
sdr="SoapyAIRT",
samp_rate=1e3,
gain=10,
nfft=1024,
tune_step_fft=1024,
soapy_lib=FakeSoapy(),
)
tb = FakeTb(sources, workaround, 100e6)
tb.start()
Expand All @@ -125,7 +138,7 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file):
"dd",
"if=/dev/urandom",
f"of={sdr_file}",
f"bs={samp_rate}",
f"bs={samp_rate*8}",
"count=10",
]
)
Expand All @@ -135,6 +148,7 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file):
freq_end=freq_end,
sdr=sdr,
samp_rate=samp_rate,
tune_step_fft=512,
write_samples=write_samples,
sample_dir=tempdir,
iqtlabs=iqtlabs,
Expand All @@ -152,8 +166,8 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file):
del tb
if not write_samples:
return
self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*zst")])
self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*sigmf-meta")])
self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*zst")], sdr)
self.assertTrue([x for x in glob.glob(f"{tempdir}/*/*sigmf-meta")], sdr)

def test_grscan_smoke(self):
for pretune in (True, False):
Expand All @@ -164,4 +178,5 @@ def test_grscan_smoke(self):


if __name__ == "__main__": # pragma: no cover
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")
unittest.main()
23 changes: 23 additions & 0 deletions tests/test_offline_consistent.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

set -e

TMPDIR=/tmp
TESTFILE=gamutrf_recording_ettus__gain40_1_10000000Hz_1024000sps.s16
rm -rf "$TMPDIR/input"
mkdir "$TMPDIR/input"
dd if=/dev/urandom of="$TMPDIR/input/$TESTFILE" bs=4096000 count=10

for trial in {1..5} ; do
rm -rf "$TMPDIR/$trial"
mkdir "$TMPDIR/$trial"
docker run -v "$TMPDIR:/gamutrf/data" -t iqtlabs/gamutrf gamutrf-offline --tune-step-fft=512 --db_clamp_floor=-150 --nfft=1024 --rotate_secs=0 "--inference_output_dir=/gamutrf/data/$trial" "/gamutrf/data/input/$TESTFILE"
done

for trial in {2..5} ; do
for image in $TMPDIR/1/image*png ; do
baseimage="$(basename $image)"
testimage="$TMPDIR/$trial/$baseimage"
diff -b "$image" "$testimage"
done
done

0 comments on commit 39864d4

Please sign in to comment.