diff --git a/lib/image_inference_impl.cc b/lib/image_inference_impl.cc index 538c4a87..4e9c90fa 100644 --- a/lib/image_inference_impl.cc +++ b/lib/image_inference_impl.cc @@ -251,7 +251,7 @@ image_inference_impl::image_inference_impl( flip_(flip), max_rows_(max_rows), vlen_(vlen), rotate_secs_(rotate_secs), n_image_(n_image), n_inference_(n_inference), image_count_(0), inference_count_(0), samp_rate_(samp_rate), last_image_start_item_(0), - convert_alpha_(convert_alpha), norm_alpha_(norm_alpha), + serial_(0), convert_alpha_(convert_alpha), norm_alpha_(norm_alpha), norm_beta_(norm_beta), min_peak_points_(min_peak_points), confidence_(confidence), running_(true), last_rx_freq_(0), last_rx_time_(0) { @@ -371,6 +371,7 @@ void image_inference_impl::create_image_(bool discard) { output_item.points_buffer = points_buffer_; output_item.points_mean = points_mean; output_item.image_buffer = NULL; + output_item.serial = ++serial_; if (!inference_q_.push(output_item)) { d_logger->error("inference request queue full, size {}", MAX_INFERENCE); @@ -527,6 +528,7 @@ void image_inference_impl::run_inference_() { std::to_string(output_item.start_item * vlen_); metadata_json["orig_rows"] = output_item.points_buffer->rows; metadata_json["sample_rate"] = std::to_string(samp_rate_); + metadata_json["serial"] = std::to_string(output_item.serial); const std::string secs_image_dir = secs_dir(image_dir_, rotate_secs_); ++image_count_; diff --git a/lib/image_inference_impl.h b/lib/image_inference_impl.h index 4855c9e9..055060c0 100644 --- a/lib/image_inference_impl.h +++ b/lib/image_inference_impl.h @@ -233,6 +233,7 @@ typedef struct output_item { double points_mean; double points_max; COUNT_T start_item; + COUNT_T serial; } output_item_type; class image_inference_impl : public image_inference, base_impl { @@ -241,7 +242,7 @@ class image_inference_impl : public image_inference, base_impl { std::string image_dir_; int x_, y_, norm_type_, colormap_, interpolation_, flip_, max_rows_; COUNT_T vlen_, rotate_secs_, n_image_, n_inference_, image_count_, - inference_count_, samp_rate_, last_image_start_item_; + inference_count_, samp_rate_, last_image_start_item_, serial_; double convert_alpha_, norm_alpha_, norm_beta_, min_peak_points_, confidence_; bool running_; FREQ_T last_rx_freq_; diff --git a/lib/iq_inference_impl.cc b/lib/iq_inference_impl.cc index 9ecb2bab..fcf64475 100644 --- a/lib/iq_inference_impl.cc +++ b/lib/iq_inference_impl.cc @@ -240,10 +240,11 @@ iq_inference_impl::iq_inference_impl( batch_(vlen_ * n_vlen_), sample_buffer_(sample_buffer), sample_clock_(0), last_rx_freq_sample_clock_(0), n_inference_(n_inference), inference_count_(n_inference), samples_since_tag_(0), predictions_(0), - batch_inference_(batch), samp_rate_(samp_rate), last_full_time_(0), - min_peak_points_(min_peak_points), confidence_(confidence), - power_inference_(power_inference), background_(background), - running_(true), last_rx_time_(0), last_rx_freq_(0) { + batch_inference_(batch), serial_(0), samp_rate_(samp_rate), + last_full_time_(0), min_peak_points_(min_peak_points), + confidence_(confidence), power_inference_(power_inference), + background_(background), running_(true), last_rx_time_(0), + last_rx_freq_(0) { samples_lookback_.reset(new gr_complex[batch_ * sample_buffer]); unsigned int alignment = volk_get_alignment(); samples_total_.reset((float *)volk_malloc(sizeof(float), alignment)); @@ -290,7 +291,7 @@ bool iq_inference_impl::stop() { running_ = false; io_service_->stop(); threadpool_.join_all(); - d_logger->info("published {} predictions", predictions_); + d_logger->info("iq_inference sent {} predictions", predictions_); return true; } @@ -306,6 +307,7 @@ void iq_inference_impl::run_inference_(torchserve_client *client) { metadata_json["sample_rate"] = std::to_string(samp_rate_); metadata_json["rx_freq_sample_clock"] = std::to_string(output_item.rx_freq_sample_clock); + metadata_json["serial"] = std::to_string(output_item.serial); nlohmann::json output_json, results_json; COUNT_T signal_predictions = 0; std::string error; @@ -360,15 +362,15 @@ void iq_inference_impl::run_inference_(torchserve_client *client) { } output_json["predictions"] = results_json; - // double new line to facilitate json parsing, since prediction may - // contain new lines. - if (signal_predictions) { - output_json["metadata"] = metadata_json; - const std::string output_json_str = output_json.dump(); - json_result_type output_json; + output_json["metadata"] = metadata_json; + const std::string output_json_str = output_json.dump(); + if (output_json_str.size() < MAX_JSON_SIZE) { + json_result_type output_json_chars; std::copy(output_json_str.begin(), output_json_str.end(), - output_json.data()); - json_q_.push(output_json); + output_json_chars.data()); + json_q_.push(output_json_chars); + } else { + d_logger->error("output json size too large"); } delete_output_item_(output_item); } @@ -396,6 +398,9 @@ void iq_inference_impl::process_items_(COUNT_T power_in_count, if (n_inference_ > 0 && --inference_count_) { continue; } + if (!last_rx_freq_) { + continue; + } inference_count_ = n_inference_; if (!model_names_.size()) { continue; @@ -410,16 +415,13 @@ void iq_inference_impl::process_items_(COUNT_T power_in_count, output_item.rx_freq = last_rx_freq_; output_item.rx_freq_sample_clock = last_rx_freq_sample_clock_; output_item.sample_count = batch_; + output_item.serial = ++serial_; output_item.samples = new gr_complex[output_item.sample_count]; output_item.power = new float[output_item.sample_count]; memcpy(output_item.samples, (void *)&samples_lookback_[j * batch_], batch_ * sizeof(gr_complex)); memcpy(output_item.power, (void *)power_in, batch_ * sizeof(float)); if (background_) { - if (!last_rx_freq_) { - - continue; - } if (!inference_q_.push(output_item)) { delete_output_item_(output_item); if (host_now_() - last_full_time_ > 5) { @@ -477,6 +479,10 @@ int iq_inference_impl::general_work(int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { + if (!background_ && !inference_q_.empty()) { + return 0; + } + const gr_complex *samples_in = static_cast(input_items[0]); const float *power_in = static_cast(input_items[1]); diff --git a/lib/iq_inference_impl.h b/lib/iq_inference_impl.h index f9a15dcf..33de594f 100644 --- a/lib/iq_inference_impl.h +++ b/lib/iq_inference_impl.h @@ -222,6 +222,7 @@ namespace gr { namespace iqtlabs { const COUNT_T MAX_INFERENCE = 5; +const COUNT_T MAX_JSON_SIZE = 8192; typedef struct output_item { FREQ_T rx_freq; @@ -232,16 +233,17 @@ typedef struct output_item { gr_complex *samples; float *power; COUNT_T rx_freq_sample_clock; + COUNT_T serial; } output_item_type; -typedef std::array json_result_type; +typedef std::array json_result_type; class iq_inference_impl : public iq_inference, base_impl { private: pmt::pmt_t tag_; COUNT_T vlen_, n_vlen_, batch_, sample_buffer_, sample_clock_, last_rx_freq_sample_clock_, n_inference_, inference_count_, - samples_since_tag_, predictions_, batch_inference_; + samples_since_tag_, predictions_, batch_inference_, serial_; int samp_rate_; TIME_T last_full_time_; double min_peak_points_, confidence_; diff --git a/python/iqtlabs/qa_image_inference.py b/python/iqtlabs/qa_image_inference.py index 4d48f731..d68a1910 100755 --- a/python/iqtlabs/qa_image_inference.py +++ b/python/iqtlabs/qa_image_inference.py @@ -336,7 +336,7 @@ def test_instance(self): self.assertTrue(stat.st_size) self.assertEqual(imghdr.what(image_file), "png") self.assertTrue(json_raw_all) - for json_raw in json_raw_all: + for i, json_raw in enumerate(json_raw_all, start=1): result = json.loads(json_raw) print(result) metadata_result = result["metadata"] @@ -350,6 +350,7 @@ def test_instance(self): "sample_rate", ) ] + self.assertEqual(int(metadata_result["serial"]), i) self.assertGreaterEqual(rssi_mean, rssi_min, metadata_result) self.assertGreaterEqual(rssi_max, rssi_mean, metadata_result) self.assertEqual(samp_rate, meta_samp_rate, metadata_result) diff --git a/python/iqtlabs/qa_iq_inference_standalone.py b/python/iqtlabs/qa_iq_inference_standalone.py index f2fa9ea5..798506b8 100755 --- a/python/iqtlabs/qa_iq_inference_standalone.py +++ b/python/iqtlabs/qa_iq_inference_standalone.py @@ -285,7 +285,8 @@ def test_instance(self): self.tb.stop() self.tb.wait() self.assertTrue(pdu_logger.pdu_log) - for result in pdu_logger.pdu_log: + for i, result in enumerate(pdu_logger.pdu_log, start=1): + predictions_result["metadata"]["serial"] = str(i) self.assertEqual(predictions_result, json.loads(result))