diff --git a/lib/retune_fft_impl.cc b/lib/retune_fft_impl.cc index 4e7078e6..6bf7d8f2 100644 --- a/lib/retune_fft_impl.cc +++ b/lib/retune_fft_impl.cc @@ -248,10 +248,8 @@ retune_fft_impl::retune_fft_impl( : gr::block("retune_fft", gr::io_signature::make(1 /* min inputs */, 1 /* max inputs */, nfft * sizeof(input_type)), - gr::io_signature::makev( - 2 /* min outputs */, 2 /* max outputs */, - std::vector{(int)sizeof(output_type), - (int)(nfft * sizeof(input_type))})), + gr::io_signature::make(1 /* min outputs */, 1 /* max outputs */, + nfft * sizeof(input_type))), retuner_impl(samp_rate, tune_jitter_hz, freq_start, freq_end, tune_step_hz, tune_step_fft, skip_tune_step_fft, tuning_ranges, tag_now, low_power_hold_down, slew_rx_time), @@ -353,7 +351,7 @@ void retune_fft_impl::sum_items_(const input_type *in) { void retune_fft_impl::add_output_tags_(TIME_T rx_time, FREQ_T rx_freq, COUNT_T rel) { - OUTPUT_TAGS(apply_rx_time_slew_(rx_time), rx_freq, 1, rel); + OUTPUT_TAGS(apply_rx_time_slew_(rx_time), rx_freq, 0, rel); } void retune_fft_impl::process_items_(COUNT_T c, const input_type *&in, @@ -402,12 +400,6 @@ void retune_fft_impl::process_items_(COUNT_T c, const input_type *&in, } } -void retune_fft_impl::forecast(int noutput_items, - gr_vector_int &ninput_items_required) { - ninput_items_required[0] = 1; - ninput_items_required[1] = noutput_items * nfft_; -} - void retune_fft_impl::output_buckets_( const std::string &name, const std::list> &buckets, @@ -469,22 +461,7 @@ void retune_fft_impl::write_buckets_(TIME_T host_now, FREQ_T rx_freq) { output_buckets_("buckets", buckets, ss); ss << "}" << std::endl; const std::string s = ss.str(); - out_buf_.insert(out_buf_.end(), s.begin(), s.end()); - // TODO: migrate to PMT if/when PMT supports compressed payloads. - // TODO: compressing multiple messages together if latency not a concern. - std::stringstream uncompressed_ss(s); - std::stringstream compressed_ss; - boost::iostreams::filtering_streambuf zstd_out; - zstd_out.push(boost::iostreams::zstd_compressor(zstd_params)); - zstd_out.push(uncompressed_ss); - uncompressed_ss.flush(); - boost::iostreams::copy(zstd_out, compressed_ss); - const std::string compressed_s = compressed_ss.str(); - auto pdu = - pmt::cons(pmt::make_dict(), - pmt::init_u8vector(compressed_s.length(), - (const uint8_t *)compressed_s.c_str())); - message_port_pub(JSON_KEY, pdu); + message_port_pub(JSON_KEY, string_to_pmt(s)); } void retune_fft_impl::process_buckets_(FREQ_T rx_freq, TIME_T rx_time) { @@ -540,30 +517,17 @@ void retune_fft_impl::process_tags_(const input_type *in, COUNT_T in_count, process_items_(in_count - consumed, in, fft_output, consumed, produced); } } - produce(1, produced); + produce(0, produced); } int retune_fft_impl::general_work(int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - if (!out_buf_.empty()) { - auto out = static_cast(output_items[0]); - const COUNT_T leftover = std::min(out_buf_.size(), (COUNT_T)noutput_items); - auto from = out_buf_.begin(); - auto to = from + leftover; - std::copy(from, to, out); - out_buf_.erase(from, to); - produce(0, leftover); - return WORK_CALLED_PRODUCE; - } - const input_type *fft_output = - static_cast(output_items[1]); + static_cast(output_items[0]); const input_type *in = static_cast(input_items[0]); - - float max_input_items = noutput_items; - COUNT_T in_count = std::min(ninput_items[0], int(max_input_items)); + COUNT_T in_count = ninput_items[0]; COUNT_T in_first = nitems_read(0); process_tags_(in, in_count, in_first, fft_output); consume_each(in_count); diff --git a/lib/retune_fft_impl.h b/lib/retune_fft_impl.h index f72c7e54..d7ebee69 100644 --- a/lib/retune_fft_impl.h +++ b/lib/retune_fft_impl.h @@ -218,7 +218,6 @@ namespace gr { namespace iqtlabs { using input_type = float; -using output_type = char; class retune_fft_impl : public retune_fft, base_impl, retuner_impl { private: @@ -255,7 +254,6 @@ class retune_fft_impl : public retune_fft, base_impl, retuner_impl { float fft_min_; float fft_max_; - std::deque out_buf_; COUNT_T sample_count_; COUNT_T write_step_fft_count_; COUNT_T bucket_offset_; @@ -278,7 +276,6 @@ class retune_fft_impl : public retune_fft, base_impl, retuner_impl { bool pre_fft, bool tag_now, bool low_power_hold_down, bool slew_rx_time, COUNT_T peak_fft_range); ~retune_fft_impl(); - void forecast(int noutput_items, gr_vector_int &ninput_items_required); int general_work(int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items); diff --git a/python/iqtlabs/qa_retune_fft.py b/python/iqtlabs/qa_retune_fft.py index 3b4977ea..9f50cea8 100755 --- a/python/iqtlabs/qa_retune_fft.py +++ b/python/iqtlabs/qa_retune_fft.py @@ -209,8 +209,8 @@ import re import subprocess import time +import queue import tempfile -import zstandard from collections import defaultdict import numpy as np import pandas as pd @@ -248,18 +248,11 @@ def __init__(self): ) self.message_port_register_in(pmt.intern("pdu")) self.set_msg_handler(pmt.intern("pdu"), self.receive_pdu) - self.decompressor = zstandard.ZstdDecompressor() - self.json_output = [] - self.compressed_received = 0 - self.decompressed_received = 0 + self.json_output = queue.Queue() def receive_pdu(self, pdu): - compressed_payload = bytes(pmt.to_python(pmt.cdr(pdu))) - self.compressed_received += len(compressed_payload) - with self.decompressor.stream_reader(compressed_payload) as reader: - decompressed_payload = reader.read().decode("utf-8") - self.decompressed_received += len(decompressed_payload) - self.json_output.append(json.loads(decompressed_payload)) + str_pdu = pmt.to_python(pmt.cdr(pdu)).tobytes().decode("utf8") + self.json_output.put(json.loads(str_pdu)) class qa_retune_fft_base: @@ -287,7 +280,6 @@ def retune_fft(self, fft_roll, stare): fft_batch_size = 1 with tempfile.TemporaryDirectory() as tmpdir: - test_file = os.path.join(tmpdir, "samples.csv") iqtlabs_tuneable_test_source_0 = tuneable_test_source(0, freq_end) iqtlabs_retune_pre_fft_0 = retune_pre_fft( nfft=points, @@ -347,8 +339,6 @@ def retune_fft(self, fft_roll, stare): blocks_throttle_0 = blocks.throttle( gr.sizeof_gr_complex * 1, samp_rate, True ) - blocks_file_sink_0 = blocks.file_sink(gr.sizeof_char * 1, test_file, False) - blocks_file_sink_0.set_unbuffered(False) blocks_null_sink_0 = blocks.null_sink(gr.sizeof_float * points) blocks_complex_to_mag_0 = blocks.complex_to_mag(points) @@ -371,8 +361,7 @@ def retune_fft(self, fft_roll, stare): # double roll, is a no-op self.tb.connect((fft_vxx_0, 0), (vr1, 0)) self.tb.connect((vr1, 0), (blocks_complex_to_mag_0, 0)) - self.tb.connect((iqtlabs_retune_fft_0, 0), (blocks_file_sink_0, 0)) - self.tb.connect((iqtlabs_retune_fft_0, 1), (blocks_null_sink_0, 0)) + self.tb.connect((iqtlabs_retune_fft_0, 0), (blocks_null_sink_0, 0)) self.tb.connect((iqtlabs_retune_pre_fft_0, 0), (window, 0)) self.tb.connect((window, 0), (fft_vxx_0, 0)) self.tb.connect((blocks_throttle_0, 0), (iqtlabs_retune_pre_fft_0, 0)) @@ -388,83 +377,77 @@ def retune_fft(self, fft_roll, stare): startup_timeout = 1 for _ in range(10): - if os.path.exists(test_file): + if pdu_decoder_0.json_output.qsize(): break time.sleep(startup_timeout) - self.assertTrue(os.path.exists(test_file)) - - with open(test_file, encoding="utf8") as f: - try: - linebuffer = "" - last_data = time.time() - last_ts = 0 - last_buckets = None - last_tuning_range = None - file_poll_timeout = 0.001 - while (stare and len(records) < 1000) or ( - not stare and tuning_range_changes < 5 - ): - self.assertLess(time.time() - last_data, 5) - line = f.readline() - linebuffer = linebuffer + line - if not linebuffer.endswith("\n"): - time.sleep(file_poll_timeout) - continue + self.assertTrue(pdu_decoder_0.json_output.qsize()) + + try: + last_data = time.time() + last_ts = 0 + last_buckets = None + last_tuning_range = None + file_poll_timeout = 0.001 + while (stare and len(records) < 1000) or ( + not stare and tuning_range_changes < 5 + ): + self.assertLess(time.time() - last_data, 5) + try: + record = pdu_decoder_0.json_output.get_nowait() last_data = time.time() - line = linebuffer.strip() - linebuffer = "" - record = json.loads(line) - ts = round(record["ts"]) - self.assertGreaterEqual(ts, last_ts) - last_ts = ts - config = record["config"] - self.assertEqual("a text description", config["description"]), - tuning_range_freq_start = config["tuning_range_freq_start"] - tuning_range_freq_end = config["tuning_range_freq_end"] - tuning_range = int(config["tuning_range"]) - if tuning_range != last_tuning_range: - tuning_range_changes += 1 - print("tuning_range_changes:", tuning_range_changes) - last_tuning_range = tuning_range - if not stare: - self.assertTrue( - ( - tuning_range_freq_start == freq_start - and tuning_range_freq_end == freq_mid - ) - or ( - tuning_range_freq_start == freq_mid + samp_rate - and tuning_range_freq_end == freq_end - ) + except queue.Empty: + time.sleep(file_poll_timeout) + continue + ts = round(record["ts"]) + self.assertGreaterEqual(ts, last_ts) + last_ts = ts + config = record["config"] + self.assertEqual("a text description", config["description"]), + tuning_range_freq_start = config["tuning_range_freq_start"] + tuning_range_freq_end = config["tuning_range_freq_end"] + tuning_range = int(config["tuning_range"]) + if tuning_range != last_tuning_range: + tuning_range_changes += 1 + print("tuning_range_changes:", tuning_range_changes) + last_tuning_range = tuning_range + if not stare: + self.assertTrue( + ( + tuning_range_freq_start == freq_start + and tuning_range_freq_end == freq_mid + ) + or ( + tuning_range_freq_start == freq_mid + samp_rate + and tuning_range_freq_end == freq_end ) - self.assertEqual(config["freq_start"], freq_start) - self.assertEqual(config["freq_end"], freq_end) - self.assertEqual(config["sample_rate"], samp_rate) - self.assertEqual(config["nfft"], points) - buckets = record["buckets"] - self.assertTrue(buckets, (last_buckets, buckets)) - bucket_counts[len(buckets)] += 1 - fs = [int(f) for f in buckets.keys()] - self.assertGreaterEqual(min(fs), tuning_range_freq_start) - self.assertLessEqual(max(fs), tuning_range_freq_end) - new_records = [ - { - "ts": ts, - "f": int(freq), - "v": float(value), - "t": int(tuning_range), - } - for freq, value in buckets.items() - ] - records.extend(new_records) - last_buckets = buckets - except Exception: - self.tb.stop() - raise + ) + self.assertEqual(config["freq_start"], freq_start) + self.assertEqual(config["freq_end"], freq_end) + self.assertEqual(config["sample_rate"], samp_rate) + self.assertEqual(config["nfft"], points) + buckets = record["buckets"] + self.assertTrue(buckets, (last_buckets, buckets)) + bucket_counts[len(buckets)] += 1 + fs = [int(f) for f in buckets.keys()] + self.assertGreaterEqual(min(fs), tuning_range_freq_start) + self.assertLessEqual(max(fs), tuning_range_freq_end) + new_records = [ + { + "ts": ts, + "f": int(freq), + "v": float(value), + "t": int(tuning_range), + } + for freq, value in buckets.items() + ] + records.extend(new_records) + last_buckets = buckets + except Exception: + self.tb.stop() + raise self.tb.stop() self.tb.wait() - os.remove(test_file) top_count = sorted(bucket_counts.items(), key=lambda x: x[1], reverse=True)[ 0 @@ -532,11 +515,6 @@ def retune_fft(self, fft_roll, stare): remaining_files = glob.glob(os.path.join(tmpdir, "*/*")) print(f"remaining {remaining_files}") - self.assertTrue(pdu_decoder_0.json_output) - self.assertGreater( - pdu_decoder_0.decompressed_received, pdu_decoder_0.compressed_received - ) - class qa_retune_fft_no_roll(gr_unittest.TestCase, qa_retune_fft_base): def setUp(self):