Skip to content

Commit

Permalink
Merge pull request #284 from anarkiwi/nocomp
Browse files Browse the repository at this point in the history
Don't compress JSON PDU.
  • Loading branch information
anarkiwi authored Jun 23, 2024
2 parents 6b234c5 + 5b751ac commit 10e1f36
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 137 deletions.
50 changes: 7 additions & 43 deletions lib/retune_fft_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,8 @@ retune_fft_impl::retune_fft_impl(
: gr::block("retune_fft",
gr::io_signature::make(1 /* min inputs */, 1 /* max inputs */,
nfft * sizeof(input_type)),
gr::io_signature::makev(
2 /* min outputs */, 2 /* max outputs */,
std::vector<int>{(int)sizeof(output_type),
(int)(nfft * sizeof(input_type))})),
gr::io_signature::make(1 /* min outputs */, 1 /* max outputs */,
nfft * sizeof(input_type))),
retuner_impl(samp_rate, tune_jitter_hz, freq_start, freq_end,
tune_step_hz, tune_step_fft, skip_tune_step_fft,
tuning_ranges, tag_now, low_power_hold_down, slew_rx_time),
Expand Down Expand Up @@ -353,7 +351,7 @@ void retune_fft_impl::sum_items_(const input_type *in) {

void retune_fft_impl::add_output_tags_(TIME_T rx_time, FREQ_T rx_freq,
COUNT_T rel) {
OUTPUT_TAGS(apply_rx_time_slew_(rx_time), rx_freq, 1, rel);
OUTPUT_TAGS(apply_rx_time_slew_(rx_time), rx_freq, 0, rel);
}

void retune_fft_impl::process_items_(COUNT_T c, const input_type *&in,
Expand Down Expand Up @@ -402,12 +400,6 @@ void retune_fft_impl::process_items_(COUNT_T c, const input_type *&in,
}
}

void retune_fft_impl::forecast(int noutput_items,
gr_vector_int &ninput_items_required) {
ninput_items_required[0] = 1;
ninput_items_required[1] = noutput_items * nfft_;
}

void retune_fft_impl::output_buckets_(
const std::string &name,
const std::list<std::pair<double, double>> &buckets,
Expand Down Expand Up @@ -469,22 +461,7 @@ void retune_fft_impl::write_buckets_(TIME_T host_now, FREQ_T rx_freq) {
output_buckets_("buckets", buckets, ss);
ss << "}" << std::endl;
const std::string s = ss.str();
out_buf_.insert(out_buf_.end(), s.begin(), s.end());
// TODO: migrate to PMT if/when PMT supports compressed payloads.
// TODO: compressing multiple messages together if latency not a concern.
std::stringstream uncompressed_ss(s);
std::stringstream compressed_ss;
boost::iostreams::filtering_streambuf<boost::iostreams::input> zstd_out;
zstd_out.push(boost::iostreams::zstd_compressor(zstd_params));
zstd_out.push(uncompressed_ss);
uncompressed_ss.flush();
boost::iostreams::copy(zstd_out, compressed_ss);
const std::string compressed_s = compressed_ss.str();
auto pdu =
pmt::cons(pmt::make_dict(),
pmt::init_u8vector(compressed_s.length(),
(const uint8_t *)compressed_s.c_str()));
message_port_pub(JSON_KEY, pdu);
message_port_pub(JSON_KEY, string_to_pmt(s));
}

void retune_fft_impl::process_buckets_(FREQ_T rx_freq, TIME_T rx_time) {
Expand Down Expand Up @@ -540,30 +517,17 @@ void retune_fft_impl::process_tags_(const input_type *in, COUNT_T in_count,
process_items_(in_count - consumed, in, fft_output, consumed, produced);
}
}
produce(1, produced);
produce(0, produced);
}

int retune_fft_impl::general_work(int noutput_items,
gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items) {
if (!out_buf_.empty()) {
auto out = static_cast<output_type *>(output_items[0]);
const COUNT_T leftover = std::min(out_buf_.size(), (COUNT_T)noutput_items);
auto from = out_buf_.begin();
auto to = from + leftover;
std::copy(from, to, out);
out_buf_.erase(from, to);
produce(0, leftover);
return WORK_CALLED_PRODUCE;
}

const input_type *fft_output =
static_cast<const input_type *>(output_items[1]);
static_cast<const input_type *>(output_items[0]);
const input_type *in = static_cast<const input_type *>(input_items[0]);

float max_input_items = noutput_items;
COUNT_T in_count = std::min(ninput_items[0], int(max_input_items));
COUNT_T in_count = ninput_items[0];
COUNT_T in_first = nitems_read(0);
process_tags_(in, in_count, in_first, fft_output);
consume_each(in_count);
Expand Down
3 changes: 0 additions & 3 deletions lib/retune_fft_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@
namespace gr {
namespace iqtlabs {
using input_type = float;
using output_type = char;

class retune_fft_impl : public retune_fft, base_impl, retuner_impl {
private:
Expand Down Expand Up @@ -255,7 +254,6 @@ class retune_fft_impl : public retune_fft, base_impl, retuner_impl {
float fft_min_;
float fft_max_;

std::deque<output_type> out_buf_;
COUNT_T sample_count_;
COUNT_T write_step_fft_count_;
COUNT_T bucket_offset_;
Expand All @@ -278,7 +276,6 @@ class retune_fft_impl : public retune_fft, base_impl, retuner_impl {
bool pre_fft, bool tag_now, bool low_power_hold_down,
bool slew_rx_time, COUNT_T peak_fft_range);
~retune_fft_impl();
void forecast(int noutput_items, gr_vector_int &ninput_items_required);
int general_work(int noutput_items, gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
Expand Down
160 changes: 69 additions & 91 deletions python/iqtlabs/qa_retune_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@
import re
import subprocess
import time
import queue
import tempfile
import zstandard
from collections import defaultdict
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -248,18 +248,11 @@ def __init__(self):
)
self.message_port_register_in(pmt.intern("pdu"))
self.set_msg_handler(pmt.intern("pdu"), self.receive_pdu)
self.decompressor = zstandard.ZstdDecompressor()
self.json_output = []
self.compressed_received = 0
self.decompressed_received = 0
self.json_output = queue.Queue()

def receive_pdu(self, pdu):
compressed_payload = bytes(pmt.to_python(pmt.cdr(pdu)))
self.compressed_received += len(compressed_payload)
with self.decompressor.stream_reader(compressed_payload) as reader:
decompressed_payload = reader.read().decode("utf-8")
self.decompressed_received += len(decompressed_payload)
self.json_output.append(json.loads(decompressed_payload))
str_pdu = pmt.to_python(pmt.cdr(pdu)).tobytes().decode("utf8")
self.json_output.put(json.loads(str_pdu))


class qa_retune_fft_base:
Expand Down Expand Up @@ -287,7 +280,6 @@ def retune_fft(self, fft_roll, stare):
fft_batch_size = 1

with tempfile.TemporaryDirectory() as tmpdir:
test_file = os.path.join(tmpdir, "samples.csv")
iqtlabs_tuneable_test_source_0 = tuneable_test_source(0, freq_end)
iqtlabs_retune_pre_fft_0 = retune_pre_fft(
nfft=points,
Expand Down Expand Up @@ -347,8 +339,6 @@ def retune_fft(self, fft_roll, stare):
blocks_throttle_0 = blocks.throttle(
gr.sizeof_gr_complex * 1, samp_rate, True
)
blocks_file_sink_0 = blocks.file_sink(gr.sizeof_char * 1, test_file, False)
blocks_file_sink_0.set_unbuffered(False)
blocks_null_sink_0 = blocks.null_sink(gr.sizeof_float * points)

blocks_complex_to_mag_0 = blocks.complex_to_mag(points)
Expand All @@ -371,8 +361,7 @@ def retune_fft(self, fft_roll, stare):
# double roll, is a no-op
self.tb.connect((fft_vxx_0, 0), (vr1, 0))
self.tb.connect((vr1, 0), (blocks_complex_to_mag_0, 0))
self.tb.connect((iqtlabs_retune_fft_0, 0), (blocks_file_sink_0, 0))
self.tb.connect((iqtlabs_retune_fft_0, 1), (blocks_null_sink_0, 0))
self.tb.connect((iqtlabs_retune_fft_0, 0), (blocks_null_sink_0, 0))
self.tb.connect((iqtlabs_retune_pre_fft_0, 0), (window, 0))
self.tb.connect((window, 0), (fft_vxx_0, 0))
self.tb.connect((blocks_throttle_0, 0), (iqtlabs_retune_pre_fft_0, 0))
Expand All @@ -388,83 +377,77 @@ def retune_fft(self, fft_roll, stare):

startup_timeout = 1
for _ in range(10):
if os.path.exists(test_file):
if pdu_decoder_0.json_output.qsize():
break
time.sleep(startup_timeout)
self.assertTrue(os.path.exists(test_file))

with open(test_file, encoding="utf8") as f:
try:
linebuffer = ""
last_data = time.time()
last_ts = 0
last_buckets = None
last_tuning_range = None
file_poll_timeout = 0.001
while (stare and len(records) < 1000) or (
not stare and tuning_range_changes < 5
):
self.assertLess(time.time() - last_data, 5)
line = f.readline()
linebuffer = linebuffer + line
if not linebuffer.endswith("\n"):
time.sleep(file_poll_timeout)
continue
self.assertTrue(pdu_decoder_0.json_output.qsize())

try:
last_data = time.time()
last_ts = 0
last_buckets = None
last_tuning_range = None
file_poll_timeout = 0.001
while (stare and len(records) < 1000) or (
not stare and tuning_range_changes < 5
):
self.assertLess(time.time() - last_data, 5)
try:
record = pdu_decoder_0.json_output.get_nowait()
last_data = time.time()
line = linebuffer.strip()
linebuffer = ""
record = json.loads(line)
ts = round(record["ts"])
self.assertGreaterEqual(ts, last_ts)
last_ts = ts
config = record["config"]
self.assertEqual("a text description", config["description"]),
tuning_range_freq_start = config["tuning_range_freq_start"]
tuning_range_freq_end = config["tuning_range_freq_end"]
tuning_range = int(config["tuning_range"])
if tuning_range != last_tuning_range:
tuning_range_changes += 1
print("tuning_range_changes:", tuning_range_changes)
last_tuning_range = tuning_range
if not stare:
self.assertTrue(
(
tuning_range_freq_start == freq_start
and tuning_range_freq_end == freq_mid
)
or (
tuning_range_freq_start == freq_mid + samp_rate
and tuning_range_freq_end == freq_end
)
except queue.Empty:
time.sleep(file_poll_timeout)
continue
ts = round(record["ts"])
self.assertGreaterEqual(ts, last_ts)
last_ts = ts
config = record["config"]
self.assertEqual("a text description", config["description"]),
tuning_range_freq_start = config["tuning_range_freq_start"]
tuning_range_freq_end = config["tuning_range_freq_end"]
tuning_range = int(config["tuning_range"])
if tuning_range != last_tuning_range:
tuning_range_changes += 1
print("tuning_range_changes:", tuning_range_changes)
last_tuning_range = tuning_range
if not stare:
self.assertTrue(
(
tuning_range_freq_start == freq_start
and tuning_range_freq_end == freq_mid
)
or (
tuning_range_freq_start == freq_mid + samp_rate
and tuning_range_freq_end == freq_end
)
self.assertEqual(config["freq_start"], freq_start)
self.assertEqual(config["freq_end"], freq_end)
self.assertEqual(config["sample_rate"], samp_rate)
self.assertEqual(config["nfft"], points)
buckets = record["buckets"]
self.assertTrue(buckets, (last_buckets, buckets))
bucket_counts[len(buckets)] += 1
fs = [int(f) for f in buckets.keys()]
self.assertGreaterEqual(min(fs), tuning_range_freq_start)
self.assertLessEqual(max(fs), tuning_range_freq_end)
new_records = [
{
"ts": ts,
"f": int(freq),
"v": float(value),
"t": int(tuning_range),
}
for freq, value in buckets.items()
]
records.extend(new_records)
last_buckets = buckets
except Exception:
self.tb.stop()
raise
)
self.assertEqual(config["freq_start"], freq_start)
self.assertEqual(config["freq_end"], freq_end)
self.assertEqual(config["sample_rate"], samp_rate)
self.assertEqual(config["nfft"], points)
buckets = record["buckets"]
self.assertTrue(buckets, (last_buckets, buckets))
bucket_counts[len(buckets)] += 1
fs = [int(f) for f in buckets.keys()]
self.assertGreaterEqual(min(fs), tuning_range_freq_start)
self.assertLessEqual(max(fs), tuning_range_freq_end)
new_records = [
{
"ts": ts,
"f": int(freq),
"v": float(value),
"t": int(tuning_range),
}
for freq, value in buckets.items()
]
records.extend(new_records)
last_buckets = buckets
except Exception:
self.tb.stop()
raise

self.tb.stop()
self.tb.wait()
os.remove(test_file)

top_count = sorted(bucket_counts.items(), key=lambda x: x[1], reverse=True)[
0
Expand Down Expand Up @@ -532,11 +515,6 @@ def retune_fft(self, fft_roll, stare):
remaining_files = glob.glob(os.path.join(tmpdir, "*/*"))
print(f"remaining {remaining_files}")

self.assertTrue(pdu_decoder_0.json_output)
self.assertGreater(
pdu_decoder_0.decompressed_received, pdu_decoder_0.compressed_received
)


class qa_retune_fft_no_roll(gr_unittest.TestCase, qa_retune_fft_base):
def setUp(self):
Expand Down

0 comments on commit 10e1f36

Please sign in to comment.