-
Notifications
You must be signed in to change notification settings - Fork 117
/
Copy pathmeta-manager
executable file
·139 lines (101 loc) · 4.21 KB
/
meta-manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
import os
import sys
import argparse
import json
import datetime
def Annotation(**kwargs):
t = {}
if 'width' in kwargs:
t['freq_upper_edge'] = round(kwargs['freq']+kwargs['width']/2)
t['freq_lower_edge'] = round(kwargs['freq']-kwargs['width']/2)
del kwargs['width']
del kwargs['freq']
if 'count' in kwargs:
t['sample_count'] = kwargs['count']
del kwargs['count']
if 'start' in kwargs:
t['sample_start'] = kwargs['start']
del kwargs['start']
for a in kwargs:
t[a] = kwargs[a]
return dict({"core:"+k: v for k, v in t.items()})
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="store_true",
help="increase output verbosity")
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument("-w", "--write", help="write new file")
parser.add_argument("-p", "--peaks", help="add peaks from file")
parser.add_argument("--numbers", action="store_true", help="add numbers to annotations")
filters = parser.add_argument_group('filters')
filters.add_argument("--f-min", metavar="freq", type=int, help="minmum frequency")
filters.add_argument("--f-max", metavar="freq", type=int, help="maximum frequency")
filters.add_argument("--t-max", metavar="seconds", type=int, help="maxiumum time")
filters.add_argument("--c-max", metavar="count", type=int, help="maxiumum number of annotations")
parser.add_argument("metafile", help="filename")
args = parser.parse_args()
with open(args.metafile) as f:
data = json.load(f)
assert(len(data["captures"]) == 1)
assert(data["captures"][0]["core:sample_start"] == 0)
assert(data["captures"])
rate = int(data["global"]["core:sample_rate"])
dt = data["captures"][0]["core:datetime"]
bfreq = int(data["captures"][0]["core:frequency"])
print(f'{args.metafile}: [{data["global"]["core:datatype"]}] {rate/1e6:.9g}Msps @{bfreq/1e9:.9g}GHz')
if "annotations" not in data:
data["annotations"] = []
# Remove empty annotations
ann = [a for a in data["annotations"] if len(a) > 0]
print(f'in {len(ann)} annotations')
ann_ctr = 0
if args.peaks is not None:
with open(args.peaks) as f:
for line in f:
try:
sample, fftbin, t = line.strip().split(",")
sample = int(sample)
fftbin = int(fftbin)
except main:
pass
bins = 8192
freq = bfreq + rate * (fftbin / bins - 0.5)
width = rate / bins
if t == "x": # Peak
a = Annotation(comment="Peak", description="", freq=freq, width=width, start=sample, count=bins)
elif t == "y": # Possible Frame
a = Annotation(comment="Frame", description="", freq=freq, width=1e7/(30*8), start=sample, count=8.28 * rate / 1e3)
else:
print("Unknown peak type:", t, file=sys.stderr)
if args.numbers:
ann_ctr += 1
a["core:comment"] += " [" + str(ann_ctr) + "]"
ann.append(a)
if args.f_max is not None:
ann = [a for a in ann if a["core:freq_upper_edge"] > args.f_max]
if args.f_min is not None:
ann = [a for a in ann if a["core:freq_lower_edge"] < args.f_min]
if args.t_max is not None:
ann = [a for a in ann if a["core:sample_start"] < args.t_max*rate]
if args.c_max is not None:
ann = sorted(ann, key=lambda v: v["core:sample_start"])[:args.c_max]
data["annotations"] = ann
try:
fmin = min([a["core:freq_lower_edge"] for a in data["annotations"]])
fmax = max([a["core:freq_upper_edge"] for a in data["annotations"]])
tmin = min([a["core:sample_start"] for a in data["annotations"]])
tmax = max([a["core:sample_start"] for a in data["annotations"]])
except ValueError:
fmin = fmax = 0
tmin = tmax = 0
if False:
for a in data["annotations"]:
print(f'{a["core:sample_start"]}')
pass
def s2t(samples): # convert samples to time
return samples / rate
print(f'out {len(data["annotations"])} annotations')
print(f' {fmin/1e9:.6f} - {fmax/1e9:.6f} GHz / {s2t(tmin):.3f} - {s2t(tmax):.3f} sec')
if args.write:
with open(args.write, "w") as f:
json.dump(data, f, indent=0)