diff --git a/grc/iqtlabs_iq_inference.block.yml b/grc/iqtlabs_iq_inference.block.yml index 90110507..a11883a0 100644 --- a/grc/iqtlabs_iq_inference.block.yml +++ b/grc/iqtlabs_iq_inference.block.yml @@ -23,19 +23,22 @@ documentation: |- parameters: tag: received frequency tag name. vlen: length of complex/FFT dB vector. + sample_buffer: size of sample lookback buffer. min_peak_points: Only run inference on buckets with this minimum dB power. model_names: if not empty, comma separated list of model names. model_server: if not empty, do torchserve inference to this address. confidence: Only output inference results where confidence is greater. n_inference: if > 0, only run inference on 1/n_inference images. samp_rate: sample rate. + power_inference: if True, infer on power as well as samples. templates: imports: from gnuradio import iqtlabs make: > iqtlabs.iq_inference( ${tag}, ${vlen}, ${sample_buffer}, ${min_peak_points}, ${model_server}, - ${model_names}, ${confidence}, ${n_inference}, ${samp_rate}) + ${model_names}, ${confidence}, ${n_inference}, ${samp_rate}, + ${power_inference}) cpp_templates: includes: ['#include '] @@ -43,7 +46,8 @@ cpp_templates: make: > this->${id} = gr::iqtlabs::iq_inference::make( ${tag}, ${vlen}, ${sample_buffer, ${min_peak_points}, ${model_server}, - ${model_names}, ${confidence}, ${n_inference}, ${samp_rate}) + ${model_names}, ${confidence}, ${n_inference}, ${samp_rate}, + $power_inference); link: ['libgnuradio-iqtlabs.so'] parameters: @@ -66,6 +70,8 @@ parameters: dtype: int - id: samp_rate dtype: int + - id: power_inference + dtype: bool asserts: - ${ tag != "" } diff --git a/include/gnuradio/iqtlabs/iq_inference.h b/include/gnuradio/iqtlabs/iq_inference.h index 5a83b357..5ba82a84 100644 --- a/include/gnuradio/iqtlabs/iq_inference.h +++ b/include/gnuradio/iqtlabs/iq_inference.h @@ -231,7 +231,7 @@ class IQTLABS_API iq_inference : virtual public gr::block { static sptr make(const std::string &tag, size_t vlen, size_t sample_buffer, double min_peak_points, const std::string &model_server, const std::string &model_names, double confidence, - size_t n_inference, int samp_rate); + size_t n_inference, int samp_rate, bool power_inference); }; } // namespace iqtlabs diff --git a/lib/iq_inference_impl.cc b/lib/iq_inference_impl.cc index 165c7452..cf8a47d2 100644 --- a/lib/iq_inference_impl.cc +++ b/lib/iq_inference_impl.cc @@ -215,10 +215,10 @@ iq_inference::sptr iq_inference::make(const std::string &tag, size_t vlen, size_t sample_buffer, double min_peak_points, const std::string &model_server, const std::string &model_names, double confidence, - size_t n_inference, int samp_rate) { + size_t n_inference, int samp_rate, bool power_inference) { return gnuradio::make_block_sptr( tag, vlen, sample_buffer, min_peak_points, model_server, model_names, - confidence, n_inference, samp_rate); + confidence, n_inference, samp_rate, power_inference); } /* @@ -230,19 +230,20 @@ iq_inference_impl::iq_inference_impl(const std::string &tag, size_t vlen, const std::string &model_server, const std::string &model_names, double confidence, size_t n_inference, - int samp_rate) + int samp_rate, bool power_inference) : gr::block("iq_inference", gr::io_signature::makev( 2 /* min inputs */, 2 /* min inputs */, std::vector{(int)(vlen * sizeof(gr_complex)), (int)(vlen * sizeof(float))}), - gr::io_signature::make(1 /* min outputs */, 1 /*max outputs */, + gr::io_signature::make(1 /* min outputs */, 1 /* max outputs */, sizeof(char))), tag_(pmt::intern(tag)), vlen_(vlen), sample_buffer_(sample_buffer), min_peak_points_(min_peak_points), model_server_(model_server), confidence_(confidence), n_inference_(n_inference), samp_rate_(samp_rate), - inference_count_(0), running_(true), last_rx_freq_(0), last_rx_time_(0), - inference_connected_(false), samples_since_tag_(0) { + power_inference_(power_inference), inference_count_(0), running_(true), + last_rx_freq_(0), last_rx_time_(0), inference_connected_(false), + samples_since_tag_(0) { samples_lookback_.reset(new gr_complex[vlen * sample_buffer]); unsigned int alignment = volk_get_alignment(); total_.reset((float *)volk_malloc(sizeof(float), alignment)); @@ -324,6 +325,12 @@ void iq_inference_impl::run_inference_() { req.set(boost::beast::http::field::content_type, "application/octet-stream"); req.body() = body; + if (power_inference_) { + const std::string_view power_body( + reinterpret_cast(output_item.power), + output_item.sample_count * sizeof(float)); + req.body() += power_body; + } req.prepare_payload(); std::string results; // TODO: troubleshoot test flask server hang after one request. @@ -436,6 +443,9 @@ void iq_inference_impl::process_items_(size_t power_in_count, if (!last_rx_freq_) { continue; } + // TODO: we select one slice in time (samples and power), + // where at least one sample exceeded the minimum. We could + // potentially select more samples either side for example. output_item_type output_item; output_item.rx_time = last_rx_time_ + (samples_since_tag_ / TIME_T(samp_rate_)); diff --git a/lib/iq_inference_impl.h b/lib/iq_inference_impl.h index 46424292..2942177d 100644 --- a/lib/iq_inference_impl.h +++ b/lib/iq_inference_impl.h @@ -246,6 +246,7 @@ class iq_inference_impl : public iq_inference, base_impl { double confidence_; size_t n_inference_; int samp_rate_; + bool power_inference_; size_t inference_count_; size_t samples_since_tag_; boost::lockfree::spsc_queue inference_q_{MAX_INFERENCE}; @@ -271,7 +272,7 @@ class iq_inference_impl : public iq_inference, base_impl { iq_inference_impl(const std::string &tag, size_t vlen, size_t sample_buffer, double min_peak_points, const std::string &model_server, const std::string &model_names, double confidence, - size_t n_inference, int samp_rate); + size_t n_inference, int samp_rate, bool power_inference); void forecast(int noutput_items, gr_vector_int &ninput_items_required); int general_work(int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, diff --git a/python/iqtlabs/bindings/iq_inference_python.cc b/python/iqtlabs/bindings/iq_inference_python.cc index fc2ad887..503d5aca 100644 --- a/python/iqtlabs/bindings/iq_inference_python.cc +++ b/python/iqtlabs/bindings/iq_inference_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(iq_inference.h) */ -/* BINDTOOL_HEADER_FILE_HASH(bb19467225c84c83a744c71e2cfd0472) */ +/* BINDTOOL_HEADER_FILE_HASH(368e033e2f2df720e22de85a2e3f6fa7) */ /***********************************************************************************/ #include @@ -46,6 +46,7 @@ void bind_iq_inference(py::module& m) py::arg("confidence"), py::arg("n_inference"), py::arg("samp_rate"), + py::arg("power_inference"), D(iq_inference, make)) diff --git a/python/iqtlabs/qa_iq_inference.py b/python/iqtlabs/qa_iq_inference.py index 4316b795..80926d9d 100755 --- a/python/iqtlabs/qa_iq_inference.py +++ b/python/iqtlabs/qa_iq_inference.py @@ -212,6 +212,7 @@ from flask import Flask, request from gnuradio import gr, gr_unittest from gnuradio import analog, blocks +import numpy as np try: from gnuradio.iqtlabs import iq_inference, retune_pre_fft, tuneable_test_source @@ -234,13 +235,33 @@ def tearDown(self): if self.pid: os.kill(self.pid, 15) - def simulate_torchserve(self, port, model_name, result): + def simulate_torchserve(self, port, model_name, result, fft_size): app = Flask(__name__) # nosemgrep:github.workflows.config.useless-inner-function @app.route(f"/predictions/{model_name}", methods=["POST"]) def predictions_test(): print("got %s, count %u" % (type(request.data), len(request.data))) + samples = np.frombuffer(request.data, dtype=np.complex64, count=fft_size) + power_offset = samples.size * samples.itemsize + power = np.frombuffer( + request.data[power_offset:], dtype=np.float32, count=fft_size + ) + unique_samples = np.unique(samples) + unique_power = np.unique(power) + print(unique_samples, unique_power) + if unique_samples.size != 1: + print("not unique samples: ", unique_samples) + raise ValueError + if unique_power.size != 1: + print("not unique samples: ", unique_power) + raise ValueError + if unique_samples[0].real != unique_samples[0].imag: + print("not equal sample: ", unique_samples) + raise ValueError + if unique_samples[0].real != unique_power[0]: + print("not equal power to sample", unique_power, unique_samples) + raise ValueError return json.dumps(result, indent=2), 200 try: @@ -252,7 +273,7 @@ def run_flowgraph(self, tmpdir, fft_size, samp_rate, port, model_name): test_file = os.path.join(tmpdir, "samples") freq_divisor = 1e9 new_freq = 1e9 / 2 - delay = 500 + delay = 300 source = tuneable_test_source(freq_divisor) strobe = blocks.message_strobe(pmt.to_pmt({"freq": new_freq}), delay) @@ -263,7 +284,16 @@ def run_flowgraph(self, tmpdir, fft_size, samp_rate, port, model_name): stream2vector_samples = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_size) iq_inf = iq_inference( - "rx_freq", fft_size, 512, -1e9, f"localhost:{port}", model_name, 0.8, 10001, int(samp_rate) + tag="rx_freq", + vlen=fft_size, + sample_buffer=512, + min_peak_points=-1e9, + model_server=f"localhost:{port}", + model_names=model_name, + confidence=0.8, + n_inference=5001, + samp_rate=int(samp_rate), + power_inference=True, ) self.tb.msg_connect((strobe, "strobe"), (source, "cmd")) @@ -276,8 +306,10 @@ def run_flowgraph(self, tmpdir, fft_size, samp_rate, port, model_name): self.tb.connect((iq_inf, 0), (fs, 0)) self.tb.start() - test_time = 10 - time.sleep(test_time) + for i in range(10): + new_freq *= 1.1 + strobe.set_msg(pmt.to_pmt({"freq": int(new_freq)})) + time.sleep(1) self.tb.stop() self.tb.wait() return test_file @@ -286,10 +318,10 @@ def test_bad_instance(self): port = 11002 model_name = "testmodel" predictions_result = ["cant", "parse", {"this": 0}] + fft_size = 1024 if self.pid == 0: - self.simulate_torchserve(port, model_name, predictions_result) + self.simulate_torchserve(port, model_name, predictions_result, fft_size) return - fft_size = 1024 samp_rate = 4e6 with tempfile.TemporaryDirectory() as tmpdir: self.run_flowgraph(tmpdir, fft_size, samp_rate, port, model_name) @@ -299,10 +331,10 @@ def test_instance(self): model_name = "testmodel" px = 100 predictions_result = {"modulation": [{"conf": 0.9}]} + fft_size = 1024 if self.pid == 0: - self.simulate_torchserve(port, model_name, predictions_result) + self.simulate_torchserve(port, model_name, predictions_result, fft_size) return - fft_size = 1024 samp_rate = 4e6 with tempfile.TemporaryDirectory() as tmpdir: test_file = self.run_flowgraph( @@ -313,11 +345,21 @@ def test_instance(self): content = f.read() json_raw_all = content.split("\n\n") self.assertTrue(json_raw_all) + last_ts = 0 + last_rx_freq = 0 for json_raw in json_raw_all: if not json_raw: continue result = json.loads(json_raw) print(result) + self.assertEqual(result.get("error", None), None) + rx_freq = float(result["metadata"]["rx_freq"]) + ts = float(result["metadata"]["rx_freq"]) + self.assertGreater(rx_freq, last_rx_freq) + self.assertGreater(ts, last_ts) + last_ts = ts + last_rx_freq = rx_freq + if __name__ == "__main__":