-
Notifications
You must be signed in to change notification settings - Fork 3
/
mffquery.py
188 lines (182 loc) · 6.48 KB
/
mffquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import argparse # ArgumentParser
import datetime # datetetime
import glob # glob
import os # path.join
import xml.etree.ElementTree as ET # parse
PREFIX_EVENT_EGI = '{http://www.egi.com/event_mff}'
PREFIX_INFO_EGI = '{http://www.egi.com/info_mff}'
def harvest_event_files(mffname: str) -> list[str]:
return glob.glob(os.path.join(mffname, 'Events_*'))
def trim_evt_prefix(s: str) -> str:
return s[len(PREFIX_EVENT_EGI):]
def trim_info_prefix(s: str) -> str:
return s[len(PREFIX_INFO_EGI):]
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'--repeating',
help=(
'Add a repeating column with single repeating value. The first '
'argument should be the column name, the second the value.'
'Remember to include this column in the final argument set.'
),
nargs=2,
)
parser.add_argument(
'--unq',
metavar='COLUMN',
help=(
'Count the number of unique values for the given column and '
'make a new column indicating the index of unique values '
'for the original column value. To display, remember to '
'include COLUMN_unq in the final argument set.'
),
)
parser.add_argument(
'-v',
'--filter',
help=(
'Do not print a row if any columns contain the provided '
'substring.'
),
nargs='+',
)
parser.add_argument(
'--datetime',
help='Do not convert datetimes into relative milliseconds.',
action='store_true',
)
parser.add_argument(
'--sort_by',
default='relative_millis',
help=(
'The column to sort by. The special key "relative_millis" may '
'be used to sort by the relative time. Default "relative_millis,"'
' or "beginTime" if the --datetime option is used.'
),
)
parser.add_argument(
'--to_csv',
default=None,
help='File to save output to as a csv. Default None.',
)
parser.add_argument(
'mff',
help='The .mff file to parse',
)
parser.add_argument(
'columns',
help=(
'The columns to write out, in the desired order.'
'Either relative_millis or beginTime is always written out.'
),
nargs='+',
)
args = parser.parse_args()
time_column = 'relative_millis' if not args.datetime else 'beginTime'
args.columns.insert(0, time_column)
# Decide if beginTime or relative_millis should be used
if args.datetime and args.sort_by == 'relative_millis':
sorter = lambda x: x['beginTime']
else:
sorter = lambda x: x[args.sort_by]
event_files = harvest_event_files(args.mff)
# Events should be a dictionary, where each key represents an event file,
# and each event file has an "events" key and optionally a "trackType"
events = {}
# Iterate over all event files and find the events
for f in event_files:
tree = ET.parse(f)
root = tree.getroot()
# First sub-element tag: 'name', text: the name of the event track
name_element = root[0]
element_tag = trim_evt_prefix(name_element.tag)
if element_tag != 'name':
raise ValueError(
'Unexpected Tree type: first XML element does not have name'
)
curr_name = name_element.text
events[curr_name] = {}
curr_dict = events[curr_name]
# Second sub-element should be trackType, but also might not
track_element = root[1]
element_tag = trim_evt_prefix(track_element.tag)
start = 1
if element_tag == 'trackType':
curr_dict['trackType'] = track_element.text
start += 1
curr_dict['events'] = []
for x in root[start:]:
# Iterate over events
if trim_evt_prefix(x.tag) != 'event':
continue
curr_dict['events'].append(
{trim_evt_prefix(y.tag): y.text for y in x}
)
# Harvest all events
all_events = []
for k, v in events.items():
for evt in v['events']:
all_events.append(evt)
if args.repeating:
for evt in all_events:
evt[args.repeating[0]] = args.repeating[1]
if args.unq:
vals = []
for evt in all_events:
vals.append(evt[args.unq])
vals = list(set(vals))
vals.sort()
val_mapping = {vals[i]: str(i+1) for i in range(len(vals))}
unique_col_name = args.unq + '_unq'
for evt in all_events:
evt[unique_col_name] = val_mapping[evt[args.unq]]
if not args.datetime:
# Get the recording start time
info_xml = os.path.join(args.mff, "info.xml")
info_tree = ET.parse(info_xml)
info_root = info_tree.getroot()
record_start = None
for x in info_root:
if trim_info_prefix(x.tag) == 'recordTime':
record_start = x.text
if not record_start:
raise ValueError('Could not find the recording start time.')
record_dt = datetime.datetime.fromisoformat(record_start)
# Calculate the time in milliseconds between recording and event start
for evt in all_events:
evt_dt = datetime.datetime.fromisoformat(evt['beginTime'])
delta_t = evt_dt - record_dt
evt['relative_millis'] = (
delta_t.seconds * 1000
+ delta_t.microseconds // 1000
)
# Sort the events
all_events.sort(key=sorter)
# Determine if a delimiter should be a comma or tab
delimiter = '\t' if not args.to_csv else ','
out_lines = [delimiter.join(args.columns)]
for evt in all_events:
out_lines.append(delimiter.join(
[str(evt[column]) if evt[column] else "" for column in args.columns]
))
if args.filter:
filtered_lines = []
for ol in out_lines:
append_line = True
for fil in args.filter:
if fil in ol:
append_line = False
break
if append_line:
filtered_lines.append(ol)
out_lines = filtered_lines
out_str = '\n'.join(out_lines)
# Put into file or stdout
if args.to_csv:
with open(args.to_csv, 'w') as f:
f.write(out_str)
else:
print(out_str)
if __name__ == '__main__':
main()