-
Notifications
You must be signed in to change notification settings - Fork 0
/
simrtPupilPhase_human.py
260 lines (211 loc) · 16.2 KB
/
simrtPupilPhase_human.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import os
import argparse
import numpy as np
import pickle
import rtPupil_config
from EventCollector import EventCollector
from EventCollector import get_data, find_string_time, pull_pupil_sample, plot_mean_timecourses
from StimulusDecider import StimulusDecider
def main(subject_list, plot_timecourses, baseline_duration_ms, max_search_window_duration_ms,
num_random_events, IEI_duration_ms, pupil_sample_duration_ms, peak_pupil_quantile,
trough_pupil_quantile, dilation_quantile, constriction_quantile, half_epoch_duration_ms):
assert len(subject_list) > 0, "Please provide at least one subject"
# number of eyelink samples in pupil sample
samples_in_pupil_sample = np.round(pupil_sample_duration_ms/rtPupil_config.ms_per_sample)
random_IEI = rtPupil_config.block_duration_ms/num_random_events # duration of IEI in milliseconds
# Inter-event interval for pupil phase events
samples_in_IEI = np.round(IEI_duration_ms/rtPupil_config.ms_per_sample) # in samples
# calculate number of samples in baseline window
samples_in_baseline_window = np.round(baseline_duration_ms/rtPupil_config.ms_per_sample)
# define paths
root_path = os.getcwd()
data_base = os.path.join(root_path,rtPupil_config.data_fname)
results_base = os.path.join(root_path,rtPupil_config.results_fname)
for subjID in subject_list:
print('running subject: '+subjID +"...")
# make subject specific directories
sub_dir = os.path.join(data_base,subjID, "EyeLink")
results_dir = os.path.join(results_base, subjID)
# get data
# time, pupil size, x coord, y coord
gaze_data, event_data = get_data(sub_dir)
# if output directory doesn't exist, create it
if not os.path.isdir(results_dir):
os.makedirs(results_dir)
# create stimulus decider object
sd = StimulusDecider(rtPupil_config.block_duration_ms/1000, baseline_duration_ms,
max_search_window_duration_ms, pupil_sample_duration_ms, num_random_events,
IEI_duration_ms/1000, peak_pupil_quantile, trough_pupil_quantile,
dilation_quantile, constriction_quantile)
# identify events across block - this matters because we care about the ends of blocks.
# event epochs are concatenated across blocks at end of loop
for block in range(1,rtPupil_config.num_blocks+1):
print("Starting block "+str(block))
# initialize Event Collectors
random_events = EventCollector("random")
trough_events = EventCollector("trough")
peak_events = EventCollector("peak")
constriction_events = EventCollector("constriction")
dilation_events = EventCollector("dilation")
accepted_pupil_event_times = []
# pull the raw data for a block
start_time = find_string_time(time_array = event_data['sttime'][0], message_array = event_data['message'][0],
match_string="Starting Perception Rate Block "+str(block))
end_time = find_string_time(time_array = event_data['sttime'][0], message_array = event_data['message'][0],
match_string="Finished Perception Rate Block "+str(block))
block_data = gaze_data[:,np.where(gaze_data[0]==start_time)[0][0]:np.where(gaze_data[0]==end_time)[0][0]]
# Downsampling matches th Eyelink real-time sampling in a live testing session
downsampled_block_data = block_data[:, 0::rtPupil_config.ms_per_sample]
# reset StimulusDecider for new block
sd.reset_baseline_window()
sd.reset_search_window()
# ensures we only look at complete pupil samples
for pupil_sample_num in range(int(np.shape(downsampled_block_data)[1]/6)-1):
# stage 1: get pupil sample
current_pupil_sample = pull_pupil_sample(downsampled_block_data, pupil_sample_num, samples_in_pupil_sample)
# stage 2: create search window
sd.update_windows(current_pupil_sample)
# if the baseline window is long enough (and not all nans), update thresholds so we account for drift
if len(sd.get_baseline_window()) > samples_in_baseline_window:
if not np.isnan(sd.get_baseline_window()).all():
sd.update_pupil_phase_thresholds()
else:
sd.reset_baseline_window()
# validate search window - not too long, no blinks
if not sd.validate_search_window(max_search_window_duration_ms//rtPupil_config.ms_per_sample):
sd.reset_search_window()
continue
# demean search window
demeaned_search_window = sd.get_search_window() - np.mean(sd.get_search_window())
# Fit demeaned data in search window with polynomial function
sd.fit_polynomial(demeaned_search_window)
# stage 4: find pupil events
# check if random event happened
current_time = sd.get_search_window_times()[-1]
if len(random_events.get_times()) >= 1:
time_from_last_event = current_time - random_events.get_times()[-1]
else:
# ensures that the first stimulus always triggers accepted random event
time_from_last_event = random_IEI
# if it's been enough time, log a random event
if time_from_last_event >= random_IEI:
random_events.update_data(((pupil_sample_num+1)*samples_in_pupil_sample), current_time,
sd.get_search_window()[-1])
# find pupil phase events
if len(sd.get_search_window()) > 1:
# check if there was an event
found_event = sd.find_pupil_phase_event(pupil_sample_num=pupil_sample_num, current_time=current_time,
peak_events=peak_events, trough_events=trough_events,
constriction_events=constriction_events, dilation_events=dilation_events)
# update internal tracking of events
sd.set_current_event_idx(found_event)
# get times for all events (regardless of type)
event_times = peak_events.get_times() + trough_events.get_times() + constriction_events.get_times() + dilation_events.get_times()
event_times.sort() # make sure they're in order
# if there is an event, check to see whether enough time has passed to consider it an accepted event
# events get logged in respective EventCollector objects
if found_event != 0:
peak_events, trough_events, dilation_events, constriction_events = sd.validate_event_offline(event_times, accepted_pupil_event_times, IEI_duration_ms,
peak_events, trough_events, dilation_events, constriction_events)
# Note that it may be recommended to perform additional blink and microsaccade detection using your method of choice here.
# In the MATLAB simulations, we interpolate over blinks and use a method to identify microsaccades based off of Engbert & Kliegl (2003)
# Here, we do not perform this data processing and simply plot the data.
# pull epoch data for each event and concatentate across block
if block == 1:
accepted_constriction_epoch_data, all_constriction_epoch_data = constriction_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
accepted_dilation_epoch_data, all_dilation_epoch_data = dilation_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
accepted_peak_epoch_data, all_peak_epoch_data = peak_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
accepted_trough_epoch_data, all_trough_epoch_data = trough_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
all_random_epoch_data, _ = random_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
else:
block_accepted_constriction_epoch_data, block_all_constriction_epoch_data = constriction_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
if block_accepted_constriction_epoch_data is not None:
accepted_constriction_epoch_data = np.append(accepted_constriction_epoch_data, block_accepted_constriction_epoch_data, axis=1)
else:
print("No accepted constriction events in block # "+block)
if block_all_constriction_epoch_data is not None:
all_constriction_epoch_data = np.append(all_constriction_epoch_data, block_all_constriction_epoch_data, axis=1)
else:
print("No constriction events in block # "+block)
block_accepted_dilation_epoch_data, block_all_dilation_epoch_data = dilation_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
if block_accepted_dilation_epoch_data is not None:
accepted_dilation_epoch_data = np.append(accepted_dilation_epoch_data, block_accepted_dilation_epoch_data, axis=1)
else:
print("No accepted dilation events in block # "+block)
if block_all_dilation_epoch_data is not None:
all_dilation_epoch_data = np.append(all_dilation_epoch_data, block_all_dilation_epoch_data, axis=1)
else:
print("No dilation events in block # "+block)
block_accepted_peak_epoch_data, block_all_peak_epoch_data = peak_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
if block_accepted_peak_epoch_data is not None:
accepted_peak_epoch_data = np.append(accepted_peak_epoch_data, block_accepted_peak_epoch_data, axis=1)
else:
print("No accepted peak events in block # "+block)
if block_all_peak_epoch_data is not None:
all_peak_epoch_data = np.append(all_peak_epoch_data, block_all_peak_epoch_data, axis=1)
else:
print("No peak events in block # "+block)
block_accepted_trough_epoch_data, block_all_trough_epoch_data = trough_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
if block_accepted_trough_epoch_data is not None:
accepted_trough_epoch_data = np.append(accepted_trough_epoch_data, block_accepted_trough_epoch_data, axis=1)
else:
print("No accepted trough events in block # "+block)
if block_all_trough_epoch_data is not None:
all_trough_epoch_data = np.append(all_trough_epoch_data, block_all_trough_epoch_data, axis=1)
else:
print("No trough events in block # "+block)
block_all_random_epoch_data, _ = random_events.pull_valid_epochs(block_data[1,:], half_epoch_duration_ms, rtPupil_config.ms_per_sample)
all_random_epoch_data = np.append(all_random_epoch_data, block_all_random_epoch_data, axis=1)
# save subject level data across all blocks
save_dict = {"accepted_peak_epochs": accepted_peak_epoch_data,
"accepted_trough_epochs": accepted_trough_epoch_data,
"accepted_dilation_epochs": accepted_dilation_epoch_data,
"accepted_constriction_epochs": accepted_constriction_epoch_data,
"all_peak_epochs": all_peak_epoch_data,
"all_trough_epochs": all_trough_epoch_data,
"all_dilation_epochs": all_dilation_epoch_data,
"all_constriction_epochs": all_constriction_epoch_data,
"random_epochs": all_random_epoch_data}
outfile = os.path.join(results_dir, "event_statistics.pckl")
with open(outfile, 'wb') as pickle_file:
pickle.dump(save_dict, pickle_file)
if plot_timecourses:
plot_mean_timecourses(half_epoch_duration=half_epoch_duration_ms, title = "Accepted events - subject "+subjID,
peak_epoch = accepted_peak_epoch_data, trough_epoch = accepted_trough_epoch_data,
constriction_epoch = accepted_constriction_epoch_data, dilation_epoch = accepted_dilation_epoch_data,
random_epoch = all_random_epoch_data, save_dir= os.path.join(results_dir, "accepted_trace.png"))
plot_mean_timecourses(half_epoch_duration=half_epoch_duration_ms, title = "All events - subject "+subjID,
peak_epoch = all_peak_epoch_data, trough_epoch = all_trough_epoch_data,
constriction_epoch = all_constriction_epoch_data, dilation_epoch = all_dilation_epoch_data,
random_epoch = all_random_epoch_data, save_dir= os.path.join(results_dir, "all_trace.png"))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description= 'Simulated rtPupilPhase: simulating real-time pupillometry', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("subs", help="Subjects to simulate.", nargs='+')
parser.add_argument("--plot_timecourses", help="Whether or not to display mean timecourses.",
action="store_true")
parser.add_argument("--baseline_duration_ms", help="Duration of baseline window in milliseconds.",
type=int, default=5000)
parser.add_argument("--max_search_window_duration_ms", help=
"Maximum duration of search window before resetting, in milliseconds.",
type=int, default=5000)
parser.add_argument("--num_random_events", help="Number of random events to occur per block. Note that this value will have implications on the number of pupil phase events.",
type=int, default=20)
parser.add_argument("--IEI_duration_ms", help="Inter-event interval - how long to wait between valid events in milliseconds.",
type=int, default=3000)
parser.add_argument("--pupil_sample_duration_ms", help="Duration of pupil sample.",
type = int, default=100)
parser.add_argument("--peak_pupil_quantile", help="Quantile value a peak must be bigger than to accept.",
type=float, default=0.75)
parser.add_argument("--trough_pupil_quantile", help="Quantile value a trough must be smaller than to accept.",
type=float, default=0.25)
parser.add_argument("--dilation_quantile", help="Quantile value a dilation must be bigger than to accept.",
type=float, default=0.99)
parser.add_argument("--constriction_quantile", help="Quantile value a constriction must be smaller than to accept.",
type=float, default=0.01)
parser.add_argument("--half_epoch_duration_ms", help="Half epoch duration - how far before and after event to plot.",
type=int, default=2500)
args = parser.parse_args()
main(args.subs, args.plot_timecourses, args.baseline_duration_ms,
args.max_search_window_duration_ms,args.num_random_events, args.IEI_duration_ms,
args.pupil_sample_duration_ms, args.peak_pupil_quantile, args.trough_pupil_quantile,
args.dilation_quantile, args.constriction_quantile, args.half_epoch_duration_ms)