Skip to content

Commit

Permalink
Merge pull request #1352 from anarkiwi/zstd
Browse files Browse the repository at this point in the history
Waterfall data is compressed.
  • Loading branch information
anarkiwi authored Jul 17, 2024
2 parents 577e78a + f2313d8 commit 962d767
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 33 deletions.
6 changes: 5 additions & 1 deletion gamutrf/grpduzmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import pmt
import zmq
import zstandard

try:
from gnuradio import gr # pytype: disable=import-error
Expand Down Expand Up @@ -38,13 +39,16 @@ def __init__(
self.zmq_pub.bind(zmq_addr)
self.message_port_register_in(pmt.intern("json"))
self.set_msg_handler(pmt.intern("json"), self.receive_pdu)
self.context = zstandard.ZstdCompressor()

def stop(self):
self.zmq_pub.close()

def receive_pdu(self, pdu):
item = pmt.to_python(pmt.cdr(pdu)).tobytes().decode("utf8").strip()
try:
self.zmq_pub.send_string(item + DELIM, flags=zmq.NOBLOCK)
data = item + DELIM
data = self.context.compress(data.encode("utf8"))
self.zmq_pub.send(data, flags=zmq.NOBLOCK)
except zmq.ZMQError as e:
logging.error(str(e))
56 changes: 24 additions & 32 deletions gamutrflib/gamutrflib/zmqbucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,24 @@ def fft_proxy(
tmp_buff_file = buff_file.replace(tmp_buff_file, "." + tmp_buff_file)
if os.path.exists(tmp_buff_file):
os.remove(tmp_buff_file)
context = zstandard.ZstdCompressor()
compress_context = zstandard.ZstdCompressor()
decompress_context = zstandard.ZstdDecompressor()
shutdown = False
while not shutdown:
with open(tmp_buff_file, "wb") as zbf:
with context.stream_writer(zbf) as bf:
with compress_context.stream_writer(zbf) as bf:
while not shutdown:
shutdown = live_file is not None and not live_file.exists()
try:
sock_txt = socket.recv(flags=zmq.NOBLOCK)
except zmq.error.Again:
time.sleep(poll_timeout)
continue
# gamutrf might send compressed message
try:
sock_txt = decompress_context.decompress(sock_txt)
except zstandard.ZstdError:
pass
bf.write(sock_txt)
now = time.time()
if (
Expand Down Expand Up @@ -122,7 +128,6 @@ def __init__(
self.addr = addr
self.port = port
self.context = zstandard.ZstdDecompressor()
self.txt_buf = ""
self.fftbuffer = None
self.scan_configs = {}
self.proxy_result = executor.submit(
Expand All @@ -138,30 +143,20 @@ def healthy(self):
def __str__(self):
return f"ZmqScanner on {self.addr}:{self.port}"

def read_buff_file(self):
def read_buff_file(self, log):
lines = None
if os.path.exists(self.buff_file):
self.info("read %u bytes of FFT data" % os.stat(self.buff_file).st_size)
with self.context.stream_reader(open(self.buff_file, "rb")) as bf:
self.txt_buf += bf.read().decode("utf8")
os.remove(self.buff_file)
return True
return False

def txtbuf_to_lines(self, log):
lines = self.txt_buf.splitlines()
if len(lines) > 1:
if self.txt_buf.endswith("\n"):
if log:
log.write(self.txt_buf)
self.txt_buf = ""
elif lines:
last_line = lines[-1]
txt_buf = bf.read().decode("utf8")
if log:
log.write(self.txt_buf[: -len(last_line)])
self.txt_buf = last_line
lines = lines[:-1]
return lines
return None
log.write(txt_buf)
try:
lines = [json.loads(line) for line in txt_buf.splitlines() if line]
except json.decoder.JSONDecodeError as err:
logging.info("%s: %s", err, txt_buf)
os.remove(self.buff_file)
return lines

def read_new_frame_df(self, df, discard_time):
frame_df = None
Expand Down Expand Up @@ -194,9 +189,7 @@ def read_new_frame_df(self, df, discard_time):
def lines_to_df(self, lines):
try:
records = []
for line in lines:
line = line.strip()
json_record = json.loads(line)
for json_record in lines:
ts = float(json_record["ts"])
sweep_start = float(json_record["sweep_start"])
total_tune_count = int(json_record["total_tune_count"])
Expand All @@ -223,12 +216,11 @@ def lines_to_df(self, lines):
def read_buff(self, log, discard_time):
scan_config = None
frame_df = None
if self.read_buff_file():
lines = self.txtbuf_to_lines(log)
if lines:
df = self.lines_to_df(lines)
if df is not None:
scan_config, frame_df = self.read_new_frame_df(df, discard_time)
lines = self.read_buff_file(log)
if lines:
df = self.lines_to_df(lines)
if df is not None:
scan_config, frame_df = self.read_new_frame_df(df, discard_time)
return scan_config, frame_df


Expand Down

0 comments on commit 962d767

Please sign in to comment.