diff --git a/augment/augment.py b/augment/augment.py index d0edb679..37e1467b 100755 --- a/augment/augment.py +++ b/augment/augment.py @@ -13,8 +13,7 @@ uniform_discrete_distribution, ) from torchsig.utils.types import SignalData, SignalDescription -from gamutrf.sample_reader import read_recording -from gamutrf.waterfall_samples import parse_filename +from gamutrf.sample_reader import read_recording, parse_filename def make_signal(samples, sample_rate, center_frequency): diff --git a/gamutrf/grsource.py b/gamutrf/grsource.py index f029a7aa..21ce5c3a 100644 --- a/gamutrf/grsource.py +++ b/gamutrf/grsource.py @@ -1,15 +1,17 @@ import logging import os +import random import sys import time from urllib.parse import urlparse +import numpy as np try: import pmt from gnuradio import blocks from gnuradio import soapy from gnuradio import uhd - from gnuradio.gr import sizeof_gr_complex + from gnuradio import gr from gnuradio import iqtlabs except ModuleNotFoundError as err: # pragma: no cover print( @@ -20,6 +22,7 @@ from gamutrf.ettus_source import get_ettus_source from gamutrf.soapy_source import get_soapy_source +from gamutrf.sample_reader import get_samples def null_workaround_start_hook(self): @@ -35,6 +38,52 @@ def airt_workaround_start_hook(self): self.sources[0].set_bandwidth(0, rate) +class file_source_tagger(gr.sync_block): + def __init__( + self, + input_file, + cmd_port, + ): + gr.sync_block.__init__( + self, + name="file_source_tagger", + in_sig=None, + out_sig=[np.complex64], + ) + _, self.samples, self.center_freq = get_samples(input_file) + self.n_samples = len(self.samples) + self.i = 0 + self.message_port_register_in(pmt.intern(cmd_port)) + self.set_msg_handler(pmt.intern(cmd_port), self.handle_cmd) + self.need_tags = True + + def complete(self): + return self.i >= self.n_samples + + def handle_cmd(self, _msg): + self.need_tags = True + + def add_tags(self): + self.add_item_tag( + 0, + self.nitems_written(0), + pmt.intern("rx_freq"), + pmt.from_double(self.center_freq), + ) + + def work(self, input_items, output_items): + if self.complete(): + return 0 + if self.need_tags: + self.add_tags() + self.need_tags = False + n = len(output_items[0]) + samples = self.samples[self.i : self.i + n] + self.i += len(samples) + output_items[0][:] = samples + return len(samples) + + def get_source( sdr, samp_rate, @@ -57,10 +106,9 @@ def get_source( if url.scheme: if url.scheme == "file" and os.path.exists(url.path): sources = [ - blocks.file_source(sizeof_gr_complex, url.path, True, 0, 0), - blocks.throttle(sizeof_gr_complex, samp_rate, True), + file_source_tagger(url.path, cmd_port), + blocks.throttle(gr.sizeof_gr_complex, samp_rate, True), ] - sources[0].message_port_register_in(pmt.intern(cmd_port)) else: raise ValueError("unsupported/missing file location") elif sdr == "tuneable_test_source": @@ -68,7 +116,7 @@ def get_source( cmd_port = "cmd" sources = [ iqtlabs.tuneable_test_source(freq_divisor), - blocks.throttle(sizeof_gr_complex, samp_rate, True), + blocks.throttle(gr.sizeof_gr_complex, samp_rate, True), ] elif sdr == "ettus": sources = get_ettus_source(sdrargs, samp_rate, center_freq, agc, gain, uhd_lib) diff --git a/gamutrf/sample_reader.py b/gamutrf/sample_reader.py index df2ffc43..907ea3d7 100644 --- a/gamutrf/sample_reader.py +++ b/gamutrf/sample_reader.py @@ -1,8 +1,16 @@ #!/usr/bin/env python3 +import re +import os import gzip +import sigmf import zstandard import numpy as np +from gamutrf.utils import SAMPLE_DTYPES, SAMPLE_FILENAME_RE, is_fft + +FFT_FILENAME_RE = re.compile( + r"^.+_([0-9]+)_([0-9]+)points_([0-9]+)Hz_([0-9]+)sps\.(s\d+|raw).*$" +) def get_reader(filename): @@ -27,6 +35,63 @@ def default_reader(x): return default_reader +def parse_filename(filename): + # FFT is always float not matter the original sample type. + if is_fft(filename): + sample_type = "raw" + match = FFT_FILENAME_RE.match(filename) + try: + timestamp = int(match.group(1)) + nfft = int(match.group(2)) + freq_center = int(match.group(3)) + sample_rate = int(match.group(4)) + # sample_type = match.group(3) + except AttributeError: + timestamp = None + nfft = None + freq_center = None + sample_rate = None + sample_type = None + else: + match = SAMPLE_FILENAME_RE.match(filename) + nfft = None + try: + timestamp = int(match.group(1)) + freq_center = int(match.group(2)) + sample_rate = int(match.group(3)) + sample_type = match.group(4) + except AttributeError: + timestamp = None + freq_center = None + sample_rate = None + sample_type = None + + sample_dtype, sample_type = SAMPLE_DTYPES.get(sample_type, (None, None)) + sample_bits = None + sample_len = None + if sample_dtype: + if is_fft(filename): + sample_dtype = np.float32 + sample_bits = 32 + sample_len = 4 + else: + sample_dtype = np.dtype([("i", sample_dtype), ("q", sample_dtype)]) + sample_bits = sample_dtype[0].itemsize * 8 + sample_len = sample_dtype[0].itemsize * 2 + file_info = { + "filename": filename, + "freq_center": freq_center, + "sample_rate": sample_rate, + "sample_dtype": sample_dtype, + "sample_len": sample_len, + "sample_type": sample_type, + "sample_bits": sample_bits, + "nfft": nfft, + "timestamp": timestamp, + } + return file_info + + def read_recording( filename, sample_rate, @@ -74,3 +139,42 @@ def read_recording( sample_buffer, dtype=sample_dtype, count=buffered_samples ) yield x1d["i"] + np.csingle(1j) * x1d["q"] + + +def get_nosigmf_samples(filename): + meta = parse_filename(filename) + sample_rate = meta["sample_rate"] + sample_dtype = meta["sample_dtype"] + sample_len = meta["sample_len"] + center_frequency = meta["freq_center"] + samples = None + for samples_buffer in read_recording( + filename, sample_rate, sample_dtype, sample_len, max_sample_secs=None + ): + if samples is None: + samples = samples_buffer + else: + samples = np.concatenate([samples, samples_buffer]) + return filename, samples, center_frequency + + +def get_samples(filename): + if not os.path.exists(filename): + raise FileNotFoundError(filename) + meta_ext = filename.find(".sigmf-meta") + if meta_ext == -1: + return get_nosigmf_samples(filename) + + meta = sigmf.sigmffile.fromfile(filename) + data_filename = filename[:meta_ext] + meta.set_data_file(data_filename) + # read_samples() always converts to host cf32. + samples = meta.read_samples() + global_meta = meta.get_global_info() + sample_rate = global_meta["core:sample_rate"] + sample_type = global_meta["core:datatype"] + captures_meta = meta.get_captures() + center_frequency = None + if captures_meta: + center_frequency = captures_meta[0].get("core:frequency", None) + return data_filename, samples, center_frequency diff --git a/gamutrf/waterfall_samples.py b/gamutrf/waterfall_samples.py index 47b67905..49e2b2ff 100644 --- a/gamutrf/waterfall_samples.py +++ b/gamutrf/waterfall_samples.py @@ -6,69 +6,7 @@ from scipy import signal import matplotlib.pyplot as plt from gamutrf.utils import is_fft -from gamutrf.sample_reader import get_reader -from gamutrf.utils import SAMPLE_DTYPES, SAMPLE_FILENAME_RE - -FFT_FILENAME_RE = re.compile( - r"^.+_([0-9]+)_([0-9]+)points_([0-9]+)Hz_([0-9]+)sps\.(s\d+|raw).*$" -) - - -def parse_filename(filename): - # FFT is always float not matter the original sample type. - if is_fft(filename): - sample_type = "raw" - match = FFT_FILENAME_RE.match(filename) - try: - timestamp = int(match.group(1)) - nfft = int(match.group(2)) - freq_center = int(match.group(3)) - sample_rate = int(match.group(4)) - # sample_type = match.group(3) - except AttributeError: - timestamp = None - nfft = None - freq_center = None - sample_rate = None - sample_type = None - else: - match = SAMPLE_FILENAME_RE.match(filename) - nfft = None - try: - timestamp = int(match.group(1)) - freq_center = int(match.group(2)) - sample_rate = int(match.group(3)) - sample_type = match.group(4) - except AttributeError: - timestamp = None - freq_center = None - sample_rate = None - sample_type = None - - sample_dtype, sample_type = SAMPLE_DTYPES.get(sample_type, (None, None)) - sample_bits = None - sample_len = None - if sample_dtype: - if is_fft(filename): - sample_dtype = np.float32 - sample_bits = 32 - sample_len = 4 - else: - sample_dtype = np.dtype([("i", sample_dtype), ("q", sample_dtype)]) - sample_bits = sample_dtype[0].itemsize * 8 - sample_len = sample_dtype[0].itemsize * 2 - file_info = { - "filename": filename, - "freq_center": freq_center, - "sample_rate": sample_rate, - "sample_dtype": sample_dtype, - "sample_len": sample_len, - "sample_type": sample_type, - "sample_bits": sample_bits, - "nfft": nfft, - "timestamp": timestamp, - } - return file_info +from gamutrf.sample_reader import get_reader, parse_filename def read_samples(filename, sample_dtype, sample_bytes, seek_bytes=0, nfft=None, n=None): diff --git a/tests/test_birdseye_rssi.py b/tests/test_birdseye_rssi.py index 1fe3ebbc..8680cb40 100644 --- a/tests/test_birdseye_rssi.py +++ b/tests/test_birdseye_rssi.py @@ -9,13 +9,14 @@ import docker import numpy as np import requests +import subprocess from gamutrf.birdseye_rssi import BirdsEyeRSSI class FakeArgs: - def __init__(self): - self.sdr = "file:/dev/zero" + def __init__(self, filename): + self.sdr = "file:" + filename self.threshold = -100 self.mean_window = 4096 self.rssi_threshold = -100 @@ -24,11 +25,14 @@ def __init__(self): class BirdseyeRSSITestCase(unittest.TestCase): def test_birdseye_smoke(self): - tb = BirdsEyeRSSI(FakeArgs(), 1e3, 1e3) - tb.start() - time.sleep(10) - tb.stop() - tb.wait() + with tempfile.TemporaryDirectory() as tmpdir: + filename = os.path.join(tmpdir, "gamutrf_recording1_1000Hz_1000sps.raw") + subprocess.check_call(["dd", "if=/dev/zero", "of=" + filename, "bs=1M", "count=1"]) + tb = BirdsEyeRSSI(FakeArgs(filename), 1e3, 1e3) + tb.start() + time.sleep(10) + tb.stop() + tb.wait() def verify_birdseye_stream(self, gamutdir, freq): sample_rate = 1000000 @@ -61,7 +65,7 @@ def verify_birdseye_stream(self, gamutdir, freq): def test_birdseye_endtoend_rssi(self): test_tag = "iqtlabs/gamutrf:latest" with tempfile.TemporaryDirectory() as tempdir: - testraw = os.path.join(tempdir, "test.raw") + testraw = os.path.join(tempdir, "gamutrf_recording1_1000Hz_1000sps.raw") gamutdir = os.path.join(tempdir, "gamutrf") testdata = ( np.random.random(int(1e6)).astype(np.float32) diff --git a/tests/test_sdr_recorder.py b/tests/test_sdr_recorder.py index 92839015..15ac4e03 100644 --- a/tests/test_sdr_recorder.py +++ b/tests/test_sdr_recorder.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 import os +import subprocess import tempfile import unittest @@ -9,19 +10,21 @@ class SDRRecorderTestCase(unittest.TestCase): SAMPLES = 1e3 * 4 - def get_file_recorder(self): - return get_recorder("file:/dev/zero", 3600) + def get_file_recorder(self, tmpdir): + filename = os.path.join(tmpdir, "gamutrf_recording1_1000Hz_1000sps.raw") + subprocess.check_call(["dd", "if=/dev/zero", "of=" + filename, "bs=1M", "count=1"]) + return get_recorder("file:" + filename, 3600) def test_sdr_recorder(self): with tempfile.TemporaryDirectory() as tmpdir: - sdr_recorder = self.get_file_recorder() + sdr_recorder = self.get_file_recorder(tmpdir) sample_file = os.path.join(tmpdir, "test_file.zst") fft_file = os.path.join(tmpdir, "fft_test_file.zst") with open(fft_file, "wb") as f: f.write(b"\x00" * 4 * 2048 * 10) sdr_recorder.fft_spectrogram(sample_file, fft_file, 2048, 1e6, 1e6, 2048) with tempfile.TemporaryDirectory() as tmpdir: - sdr_recorder = self.get_file_recorder() + sdr_recorder = self.get_file_recorder(tmpdir) record_status, sample_file = sdr_recorder.run_recording( tmpdir, self.SAMPLES, @@ -38,7 +41,7 @@ def test_sdr_recorder(self): self.assertTrue(os.path.exists(sample_file)) sdr_recorder.tmpdir.cleanup() with tempfile.TemporaryDirectory() as tmpdir: - sdr_recorder = self.get_file_recorder() + sdr_recorder = self.get_file_recorder(tmpdir) # TODO: sigmf 1.0.0 can't parse .zst files, but it can write the metadata fine. record_status, sample_file = sdr_recorder.run_recording( tmpdir, @@ -57,11 +60,11 @@ def test_sdr_recorder(self): self.assertTrue(os.path.exists(sample_file + ".sigmf-meta")) sdr_recorder.tmpdir.cleanup() - sdr_recorder = self.get_file_recorder() - self.assertNotEqual(None, sdr_recorder.validate_request([], 1e6, 0, 0)) - self.assertNotEqual(None, sdr_recorder.validate_request([], 1e6, 1, 1)) - self.assertEqual(None, sdr_recorder.validate_request([], 1e6, 1e6, 1e6)) - sdr_recorder.tmpdir.cleanup() + with tempfile.TemporaryDirectory() as tmpdir: + sdr_recorder = self.get_file_recorder(tmpdir) + self.assertNotEqual(None, sdr_recorder.validate_request([], 1e6, 0, 0)) + self.assertNotEqual(None, sdr_recorder.validate_request([], 1e6, 1, 1)) + self.assertEqual(None, sdr_recorder.validate_request([], 1e6, 1e6, 1e6)) if __name__ == "__main__": # pragma: no cover