Skip to content

Commit

Permalink
Merge branch 'IQTLabs:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
lk-iqt authored Nov 13, 2023
2 parents c621fc1 + 6612665 commit 2bb1671
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 94 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ For example, make a recording with a sample rate of 20.48e6, for 2 seconds (4096
$ wget -nv -O- localhost:8000/v1/record/100000000/409600000/20480000
```

If the worker is started with `--rssi`, that same ```record``` call will cause the worker to publish RSSI values instead of recording (the duration argument must be present, but is not used).
To stream RSSI values instead, call:

```
$ wget -nv -O- localhost:8000/v1/rssi/100000000/409600000/20480000
```

If the sample count parameter is 0, the stream will not end
until a new RPC (whether rssi or record) is received.

## Working with worker I/Q recordings

Expand Down
3 changes: 2 additions & 1 deletion gamutrf/sigfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def on_post(self, req, resp):
signal_hz = int(int(req.media["frequency"]) * 1e6)
record_bps = int(int(req.media["bandwidth"]) * MB)
record_samples = int(record_bps * int(req.media["duration"]))
recorder_args = f"record/{signal_hz}/{record_samples}/{record_bps}"
action = req.media["action"]
recorder_args = f"{action}/{signal_hz}/{record_samples}/{record_bps}"
timeout = int(req.media["duration"])
response = None
if int(req.media["repeat"]) == -1:
Expand Down
59 changes: 42 additions & 17 deletions gamutrf/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,6 @@ def argument_parser():
action=argparse.BooleanOptionalAction,
help="add sigmf meta file",
)
parser.add_argument(
"--rssi",
dest="enable_rssi",
default=False,
action=argparse.BooleanOptionalAction,
help="get RSSI values",
)
parser.add_argument(
"--use_external_gps",
dest="use_external_gps",
Expand Down Expand Up @@ -206,11 +199,12 @@ def on_get(self, _req, resp):
resp.status = falcon.HTTP_200


class Record:
class Action:
def __init__(self, arguments, q, sdr_recorder):
self.arguments = arguments
self.q = q
self.sdr_recorder = sdr_recorder
self.action = None

def on_get(self, _req, resp, center_freq, sample_count, sample_rate):
# TODO check if chosen SDR can do the supplied sample_count
Expand All @@ -228,6 +222,7 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate):
if status is None:
self.q.put(
{
"action": self.action,
"center_freq": int(center_freq),
"sample_count": int(sample_count),
"sample_rate": int(sample_rate),
Expand All @@ -239,6 +234,18 @@ def on_get(self, _req, resp, center_freq, sample_count, sample_rate):
resp.text = json.dumps({"status": status})


class Record(Action):
def __init__(self, arguments, q, sdr_recorder):
super().__init__(arguments, q, sdr_recorder)
self.action = "record"


class Rssi(Action):
def __init__(self, arguments, q, sdr_recorder):
super().__init__(arguments, q, sdr_recorder)
self.action = "record"


class API:
def __init__(self, arguments):
self.arguments = arguments
Expand Down Expand Up @@ -267,10 +274,14 @@ def run_recorder(self, record_func):
logging.info("run recorder")
while True:
logging.info("awaiting request")
if self.arguments.enable_rssi:
self.serve_rssi()
action_args = self.q.get()
action = action_args["action"]
if action == "record":
self.serve_recording(record_func, action_args)
elif action == "rssi":
self.serve_rssi(action_args)
else:
self.serve_recording(record_func)
logging.error("no such action: %s", action)

def record(self, center_freq, sample_count, sample_rate=20e6):
return self.sdr_recorder.run_recording(
Expand All @@ -286,8 +297,7 @@ def record(self, center_freq, sample_count, sample_rate=20e6):
self.arguments.antenna,
)

def serve_recording(self, record_func):
record_args = self.q.get()
def serve_recording(self, record_func, record_args):
logging.info(f"got a request: {record_args}")
record_status = record_func(**record_args)
if record_status == -1:
Expand All @@ -310,6 +320,12 @@ def report_rssi(self, record_args, reported_rssi, reported_time):

def process_rssi(self, record_args, sock):
last_rssi_time = 0
duration = 0
if record_args["sample_count"]:
duration = float(record_args["sample_count"]) / float(
record_args["sample_rate"]
)
start_time = time.time()
while self.q.empty():
rssi_raw, _ = sock.recvfrom(FLOAT_SIZE)
rssi = struct.unpack("f", rssi_raw)[0]
Expand All @@ -318,6 +334,8 @@ def process_rssi(self, record_args, sock):
if rssi > MAX_RSSI:
continue
now = time.time()
if duration and now - start_time > duration:
break
now_diff = now - last_rssi_time
if now_diff < self.arguments.rssi_interval:
continue
Expand All @@ -326,14 +344,15 @@ def process_rssi(self, record_args, sock):

def proxy_rssi(self, rssi_addr, record_args):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
# codeql[py/bind-socket-all-network-interfaces]
sock.bind((rssi_addr, RSSI_UDP_PORT)) # nosec
self.process_rssi(record_args, sock)

def serve_rssi(self):
record_args = self.q.get()
def serve_rssi(self, record_args):
logging.info(f"got request {record_args}")
if self.arguments.rssi_external:
logging.info("proxying external RSSI")
# codeql[py/bind-socket-all-network-interfaces]
self.proxy_rssi("0.0.0.0", record_args) # nosec
else:
center_freq = int(record_args["center_freq"])
Expand All @@ -359,7 +378,12 @@ def serve_rssi(self):

@staticmethod
def paths():
return ["", "/info", "/record/{center_freq}/{sample_count}/{sample_rate}"]
return [
"",
"/info",
"/record/{center_freq}/{sample_count}/{sample_rate}",
"/rssi/{center_freq}/{sample_count}/{sample_rate}",
]

@staticmethod
def version():
Expand All @@ -370,7 +394,8 @@ def routes(self):
endpoints = Endpoints()
info = Info(self.arguments)
record = Record(self.arguments, self.q, self.sdr_recorder)
funcs = [endpoints, info, record]
rssi = Rssi(self.arguments, self.q, self.sdr_recorder)
funcs = [endpoints, info, record, rssi]
return dict(zip(p, funcs))

def run(self):
Expand Down
Loading

0 comments on commit 2bb1671

Please sign in to comment.