From 1c55117acab917ab8a91b7a7d0e8447063072e95 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Mon, 19 Aug 2024 01:55:46 +0000 Subject: [PATCH] refactor to pipeline blocks. --- gamutrf/grscan.py | 218 ++++++++++++++++++++++++------------------- tests/test_grscan.py | 36 +++---- 2 files changed, 142 insertions(+), 112 deletions(-) diff --git a/gamutrf/grscan.py b/gamutrf/grscan.py index bda03613..4922f983 100644 --- a/gamutrf/grscan.py +++ b/gamutrf/grscan.py @@ -163,6 +163,10 @@ def __init__( logging.warn(">1s dwell time in stare mode, updates will be slow!") peak_fft_range = min(peak_fft_range, tune_step_fft) + fft_dir = "" + if write_fft_points: + fft_dir = sample_dir + self.sources, cmd_port, self.workaround_start_hook = get_source( sdr, samp_rate, @@ -175,7 +179,14 @@ def __init__( dc_ettus_auto_offset=dc_ettus_auto_offset, ) - fft_batch_size, self.fft_blocks = self.get_fft_blocks( + ( + fft_batch_size, + self.retune_pre_fft, + self.retune_fft, + self.db_block, + self.sample_block, + self.pipeline_blocks, + ) = self.get_pipeline_blocks( samp_rate, tune_jitter_hz, vkfft, @@ -194,15 +205,37 @@ def __init__( dc_block_len, dc_block_long, correct_iq, + scaling, + db_clamp_floor, + db_clamp_ceil, + fft_dir, + write_samples, + bucket_range, + description, + rotate_secs, + peak_fft_range, ) - self.fft_blocks = self.fft_blocks + self.get_db_blocks(nfft, samp_rate, scaling) - self.last_db_block = self.fft_blocks[-1] - fft_dir = "" + fft_zmq_block_addr = f"tcp://{fft_zmq_addr}:{fft_zmq_port}" + self.pduzmq_block = pduzmq(fft_zmq_block_addr) + logging.info("serving FFT on %s", fft_zmq_block_addr) + + if iq_zmq_port: + iq_zmq_block_addr = f"tcp://{iq_zmq_addr}:{iq_zmq_port}" + logging.info("serving I/Q samples and tags on %s", iq_zmq_block_addr) + iq_zmq_block = zeromq.pub_sink( + gr.sizeof_gr_complex, + fft_batch_size * nfft, + iq_zmq_block_addr, + 100, + True, + 65536, + "", + ) + self.connect((self.sample_block, 0), (iq_zmq_block, 0)) + self.samples_blocks = [] self.write_samples_block = None if write_samples: - if write_fft_points: - fft_dir = sample_dir Path(sample_dir).mkdir(parents=True, exist_ok=True) samples_vlen = fft_batch_size * nfft self.samples_blocks.extend( @@ -233,49 +266,6 @@ def __init__( ) self.write_samples_block = self.samples_blocks[-1] - retune_fft = self.iqtlabs.retune_fft( - tag="rx_freq", - nfft=nfft, - samp_rate=int(samp_rate), - tune_jitter_hz=int(tune_jitter_hz), - freq_start=int(freq_start), - freq_end=int(freq_end), - tune_step_hz=tune_step_hz, - tune_step_fft=tune_step_fft, - skip_tune_step_fft=skip_tune_step, - fft_min=db_clamp_floor, - fft_max=db_clamp_ceil, - sdir=fft_dir, - write_step_fft=write_samples, - bucket_range=bucket_range, - tuning_ranges=tuning_ranges, - description=description, - rotate_secs=rotate_secs, - pre_fft=pretune, - tag_now=self.tag_now, - low_power_hold_down=(not pretune and low_power_hold_down), - slew_rx_time=slew_rx_time, - peak_fft_range=peak_fft_range, - ) - self.fft_blocks.append(retune_fft) - fft_zmq_block_addr = f"tcp://{fft_zmq_addr}:{fft_zmq_port}" - self.pduzmq_block = pduzmq(fft_zmq_block_addr) - logging.info("serving FFT on %s", fft_zmq_block_addr) - - if iq_zmq_port: - iq_zmq_block_addr = f"tcp://{iq_zmq_addr}:{iq_zmq_port}" - logging.info("serving I/Q samples and tags on %s", iq_zmq_block_addr) - iq_zmq_block = zeromq.pub_sink( - gr.sizeof_gr_complex, - fft_batch_size * nfft, - iq_zmq_block_addr, - 100, - True, - 65536, - "", - ) - self.connect((self.retune_pre_fft, 0), (iq_zmq_block, 0)) - self.inference_blocks = [] self.inference_output_block = None self.image_inference_block = None @@ -284,20 +274,17 @@ def __init__( if inference_output_dir: Path(inference_output_dir).mkdir(parents=True, exist_ok=True) - if inference_text_color: - wc = webcolors.name_to_rgb(inference_text_color, "css3") - inference_text_color = ",".join( - [str(c) for c in [wc.blue, wc.green, wc.red]] - ) - if (inference_model_server and inference_model_name) or inference_output_dir: - x = 640 - y = 640 + if inference_text_color: + wc = webcolors.name_to_rgb(inference_text_color, "css3") + inference_text_color = ",".join( + [str(c) for c in [wc.blue, wc.green, wc.red]] + ) self.image_inference_block = self.iqtlabs.image_inference( tag="rx_freq", vlen=nfft, - x=x, - y=y, + x=640, + y=640, image_dir=inference_output_dir, convert_alpha=255, norm_alpha=0, @@ -343,7 +330,6 @@ def __init__( ) # TODO: provide new block that receives JSON-over-PMT and outputs to MQTT/zmq. - retune_fft_output_block = None if self.inference_blocks: inference_zmq_addr = f"tcp://{inference_addr}:{inference_port}" self.inference_output_block = inferenceoutput( @@ -359,48 +345,46 @@ def __init__( inference_output_dir, ) if self.iq_inference_block: + iq_inference_blocks = [self.iq_inference_block] if iq_inference_squelch_db is not None: - squelch_blocks = self.wrap_batch( - [ - analog.pwr_squelch_cc( - iq_inference_squelch_db, - iq_inference_squelch_alpha, - 0, - False, - ) - ], - fft_batch_size, - nfft, - ) + [self.iq_inference_block] - self.connect_blocks(self.retune_pre_fft, squelch_blocks) - else: - self.connect((self.retune_pre_fft, 0), (self.iq_inference_block, 0)) - self.connect((self.last_db_block, 0), (self.iq_inference_block, 1)) + self.iq_inference_block = ( + self.wrap_batch( + [ + analog.pwr_squelch_cc( + iq_inference_squelch_db, + iq_inference_squelch_alpha, + 0, + False, + ) + ], + fft_batch_size, + nfft, + ) + + iq_inference_blocks + ) + self.connect_blocks(self.sample_block, iq_inference_blocks) + self.connect((self.db_block, 0), (self.iq_inference_block, 1)) if self.image_inference_block: if stare: - self.connect( - (self.last_db_block, 0), (self.image_inference_block, 0) - ) + self.connect((self.db_block, 0), (self.image_inference_block, 0)) else: - retune_fft_output_block = self.image_inference_block + # need to pass samples through retune_fft if using image inference + self.connect((self.retune_fft, 0), (self.image_inference_block, 0)) for block in self.inference_blocks: self.msg_connect( (block, "inference"), (self.inference_output_block, "inference") ) - if retune_fft_output_block: - self.connect((retune_fft, 0), (retune_fft_output_block, 0)) - if pretune: self.msg_connect((self.retune_pre_fft, "tune"), (self.sources[0], cmd_port)) - self.msg_connect((self.retune_pre_fft, "tune"), (retune_fft, "cmd")) + self.msg_connect((self.retune_pre_fft, "tune"), (self.retune_fft, "cmd")) else: - self.msg_connect((retune_fft, "tune"), (self.sources[0], cmd_port)) - self.msg_connect((retune_fft, "json"), (self.pduzmq_block, "json")) - self.connect_blocks(self.sources[0], self.sources[1:]) + self.msg_connect((self.retune_fft, "tune"), (self.sources[0], cmd_port)) + self.msg_connect((self.retune_fft, "json"), (self.pduzmq_block, "json")) - self.connect_blocks(self.sources[-1], self.fft_blocks) - self.connect_blocks(self.retune_pre_fft, self.samples_blocks) + self.connect_blocks(self.sources[0], self.sources[1:]) + self.connect_blocks(self.sources[-1], self.pipeline_blocks) + self.connect_blocks(self.sample_block, self.samples_blocks) def connect_blocks(self, source, other_blocks, last_block_port=0): last_block = source @@ -535,7 +519,7 @@ def get_dc_blocks( ) return [] - def get_fft_blocks( + def get_pipeline_blocks( self, samp_rate, tune_jitter_hz, @@ -555,6 +539,15 @@ def get_fft_blocks( dc_block_len, dc_block_long, correct_iq, + scaling, + db_clamp_floor, + db_clamp_ceil, + fft_dir, + write_samples, + bucket_range, + description, + rotate_secs, + peak_fft_range, ): fft_batch_size, fft_blocks = self.get_offload_fft_blocks( vkfft, @@ -562,7 +555,7 @@ def get_fft_blocks( nfft, fft_processor_affinity, ) - self.retune_pre_fft = self.get_pretune_block( + retune_pre_fft = self.get_pretune_block( fft_batch_size, nfft, samp_rate, @@ -577,13 +570,46 @@ def get_fft_blocks( low_power_hold_down, slew_rx_time, ) + retune_fft = self.iqtlabs.retune_fft( + tag="rx_freq", + nfft=nfft, + samp_rate=int(samp_rate), + tune_jitter_hz=int(tune_jitter_hz), + freq_start=int(freq_start), + freq_end=int(freq_end), + tune_step_hz=tune_step_hz, + tune_step_fft=tune_step_fft, + skip_tune_step_fft=skip_tune_step, + fft_min=db_clamp_floor, + fft_max=db_clamp_ceil, + sdir=fft_dir, + write_step_fft=write_samples, + bucket_range=bucket_range, + tuning_ranges=tuning_ranges, + description=description, + rotate_secs=rotate_secs, + pre_fft=pretune, + tag_now=self.tag_now, + low_power_hold_down=(not pretune and low_power_hold_down), + slew_rx_time=slew_rx_time, + peak_fft_range=peak_fft_range, + ) + sample_blocks = [retune_pre_fft] + self.get_dc_blocks( + correct_iq, dc_block_len, dc_block_long, fft_batch_size, nfft + ) + pipeline_blocks = ( + sample_blocks + + fft_blocks + + self.get_db_blocks(nfft, samp_rate, scaling) + + [retune_fft] + ) return ( fft_batch_size, - [self.retune_pre_fft] - + self.get_dc_blocks( - correct_iq, dc_block_len, dc_block_long, fft_batch_size, nfft - ) - + fft_blocks, + retune_pre_fft, + retune_fft, + pipeline_blocks[-1], + sample_blocks[-1], + pipeline_blocks, ) def start(self): diff --git a/tests/test_grscan.py b/tests/test_grscan.py index 095b451f..a2db484f 100644 --- a/tests/test_grscan.py +++ b/tests/test_grscan.py @@ -143,22 +143,26 @@ def run_grscan_smoke(self, pretune, wavelearner, write_samples, test_file): ] ) sdr = "file:" + sdr_file - tb = grscan( - freq_start=freq_start, - freq_end=freq_end, - sdr=sdr, - samp_rate=samp_rate, - tune_step_fft=512, - write_samples=write_samples, - sample_dir=tempdir, - iqtlabs=iqtlabs, - wavelearner=wavelearner, - rotate_secs=900, - db_clamp_floor=-1e6, - pretune=pretune, - fft_batch_size=4, - inference_output_dir=str(tempdir), - ) + try: + tb = grscan( + freq_start=freq_start, + freq_end=freq_end, + sdr=sdr, + samp_rate=samp_rate, + tune_step_fft=512, + write_samples=write_samples, + sample_dir=tempdir, + iqtlabs=iqtlabs, + wavelearner=wavelearner, + rotate_secs=900, + db_clamp_floor=-1e6, + pretune=pretune, + fft_batch_size=4, + inference_output_dir=str(tempdir), + ) + except Exception as e: + print(e) + raise tb.start() time.sleep(3) tb.stop()