-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplay_noise.py
831 lines (723 loc) · 25.5 KB
/
play_noise.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
import moderngl_window
import moderngl
from moderngl_window.conf import settings
import time
import h5py
import numpy as np
from pathlib import Path
import csv
import datetime
from multiprocessing import RawArray
from multiprocessing import sharedctypes
import pyglet
from arduino import Arduino
import threading
class Presenter:
"""
This class is responsible for presenting the stimuli. It is a wrapper around the pyglet window class and the
moderngl_window BaseWindow class. It is responsible for loading the noise stimuli and presenting them. It is also
responsible for communicating with the main process (gui) via a queue.
"""
def __init__(
self,
process_idx,
config_dict,
queue,
sync_queue,
sync_lock,
lock,
ard_queue,
ard_lock,
mode,
delay=10,
):
"""
Parameters
----------
config_dict : dict
Dictionary containing the configuration parameters for the window.
Keys:
"y_shift" : int
Shift of the window in y direction.
"x_shift" : int
Shift of the window in x direction.
"gl_version" : tuple
Version of OpenGL to use.
"window_size" : tuple
Size of the window.
"fullscreen" : bool
Whether to use fullscreen mode or not. Fullscreen is currently only working on the
main monitor.
queue : multiprocessing.Queue
Queue for communication with the main process (gui).
"""
self.process_idx = process_idx
self.queue = queue
self.sync_queue = sync_queue
self.sync_lock = sync_lock
self.lock = lock
self.mode = mode
self.ard_queue = ard_queue
self.ard_lock = ard_lock
self.nr_followers = len(config_dict["windows"].keys()) - 1
self.c_channels = config_dict["windows"][str(self.process_idx)]["channels"]
self.delay = delay
settings.WINDOW[
"class"
] = "moderngl_window.context.pyglet.Window" # using a pyglet window
settings.WINDOW["gl_version"] = config_dict["gl_version"]
settings.WINDOW["size"] = config_dict["windows"][str(self.process_idx)][
"window_size"
]
settings.WINDOW[
"aspect_ratio"
] = None # Sets the aspect ratio to the window's aspect ratio
settings.WINDOW["fullscreen"] = config_dict["windows"][str(self.process_idx)][
"fullscreen"
]
settings.WINDOW["samples"] = 0
settings.WINDOW["double_buffer"] = True
settings.WINDOW["vsync"] = True
settings.WINDOW["resizable"] = False
settings.WINDOW["title"] = "Noise Presentation"
settings.WINDOW["style"] = config_dict["windows"][str(self.process_idx)][
"style"
]
self.frame_duration = 1 / config_dict["fps"] # Calculate the frame duration
self.window = moderngl_window.create_window_from_settings()
self.window.position = (
config_dict["windows"][str(self.process_idx)]["x_shift"],
config_dict["windows"][str(self.process_idx)]["y_shift"],
) # Shift the window
self.window.init_mgl_context() # Initialize the moderngl context
self.stop = False # Flag for stopping the presentation
self.window.set_default_viewport() # Set the viewport to the window size
if self.mode == "lead":
self.arduino = Arduino(
port=config_dict["windows"][str(self.process_idx)]["arduino_port"],
baud_rate=config_dict["windows"][str(self.process_idx)][
"arduino_baud_rate"
],
queue=ard_queue,
queue_lock=ard_lock,
)
def __del__(self):
with self.ard_lock:
if self.mode == "lead":
self.arduino.disconnect()
def run_empty(self):
"""
Empty loop. Establishes a window filled with a grey background. Waits for commands from the main process (gui).
"""
# if self.mode == "lead":
# self.arduino.send("W")
while not self.window.is_closing:
self.window.use()
# self.window.ctx.clear(0.5, 0.5, 0.5, 1.0) # Clear the window with a grey background
self.window.ctx.clear(1, 1, 1, 1.0)
self.window.swap_buffers() # Swap the buffers (update the window content)
self.communicate() # Check for commands from the main process (gui)
time.sleep(0.001) # Sleep for 1 ms to avoid busy waiting
self.window.close() # Close the window in case it is closed by the user
def communicate(self):
"""
Check for commands from the main process (gui). If a command is found, execute it.
"""
command = None
with self.lock:
if not self.queue.empty():
command = self.queue.get()
if command:
if type(command) == dict: # This would be an array to play.
self.play_noise(command)
elif command == "white_screen":
if self.mode == "lead":
with self.ard_lock:
ard_command = self.ard_queue.get()
self.send_colour(ard_command)
elif command == "stop": # If the command is "stop", stop the presentation
self.stop = True # Trigger the stop flag for next time
if self.mode == "lead":
self.send_colour("b")
self.send_colour("b")
self.send_colour("b")
self.send_colour("O")
self.run_empty()
elif command == "destroy":
self.window.close() # Close the window
def send_array(self, array):
"""Send array string of shared memory to other processes"""
for _ in range(self.nr_followers):
self.sync_queue.put(array)
def receive_array(self):
"""Receive array string of shared memory from lead process"""
return self.sync_queue.get()
def send_trigger(self):
"""Send a trigger signal to the Arduino."""
if self.mode == "lead":
self.arduino.send("T")
def send_colour(self, colour):
"""Send a colour signal to the Arduino."""
if self.mode == "lead":
self.arduino.send(colour)
def load_and_initialize_data(self, noise_dict):
"""
Load and initialize data from the provided dictionary.
Parameters
----------
noise_dict : dict
A dictionary containing various settings for noise playback.
Returns
-------
tuple
A tuple containing initialized values: file, loops, colours,
change_logic, s_frames.
"""
# Extract parameters from the noise_dict
file = noise_dict["file"]
loops = noise_dict["loops"]
colours = noise_dict["colours"]
change_logic = noise_dict["change_logic"]
s_frames_temp = noise_dict["s_frames"]
# Copy and modify s_frames based on loops
s_frames = s_frames_temp.copy()
first_frame_dur = np.diff(s_frames[0:2])
for loop in range(1, loops):
s_frames = np.concatenate(
(
s_frames,
s_frames_temp
+ loop * (s_frames_temp[-1] - s_frames_temp[0] + first_frame_dur),
)
)
return file, loops, colours, change_logic, s_frames
def process_arduino_colours(self, colours, change_logic, frames):
"""
Processes the colours and calculates the necessary repeats.
Parameters
----------
colours : str
A comma-separated string of colour values.
change_logic : int
The logic determining how often the colour changes.
frames : int
The total number of frames for the noise.
Returns
-------
list
A list of colours repeated and arranged as per the specified logic.
"""
colours = colours.split(",")
colour_repeats = np.ceil(frames / (len(colours) * change_logic))
colours = np.repeat(np.asarray(colours), change_logic).tolist()
colours = colours * int(colour_repeats)
return colours
def load_noise_data(self, file):
"""
Loads the noise data from a file and establishes textures for each noise frame.
Parameters
----------
file : str
The path to the noise file.
Returns
-------
tuple
A tuple containing the loaded patterns as a 3D array, the width and height of each pattern,
the number of frames, and the desired frames per second (fps).
"""
(
all_patterns_3d,
width,
height,
frames,
desired_fps,
nr_colours,
) = load_3d_patterns(
file, channels=self.c_channels
) # Load the noise data
# Establish the texture for each noise frame
patterns = [
self.window.ctx.texture(
(width, height),
nr_colours,
all_patterns_3d[i, :].tobytes(),
samples=0,
alignment=1,
)
for i in range(frames)
]
return all_patterns_3d, width, height, frames, desired_fps, patterns, nr_colours
def setup_shader_program(self, nr_colours=1):
"""
Initializes the shader program using vertex and fragment shaders.
Returns
-------
moderngl.Program
The compiled and linked shader program.
"""
# Load and compile vertex and fragment shaders
with open("vertex_shader.glsl", "r") as vertex_file:
vertex_shader_source = vertex_file.read()
if nr_colours == 1:
with open("fragment_shader.glsl", "r") as fragment_file:
fragment_shader_source = fragment_file.read()
else:
with open("fragment_shader_colour.glsl", "r") as fragment_file:
fragment_shader_source = fragment_file.read()
# Create and return the shader program
program = self.window.ctx.program(
vertex_shader=vertex_shader_source, fragment_shader=fragment_shader_source
)
return program
def calculate_scaling(self, width, height):
"""
Calculates the scaling factors based on the aspect ratio of the texture and the window.
Parameters
----------
width : int
The width of the texture.
height : int
The height of the texture.
Returns
-------
tuple
A tuple containing the scaling factors (scale_x, scale_y) and the quad vertices array.
"""
# Calculate the aspect ratio of the window and the texture
window_width, window_height = self.window.size
window_aspect = window_width / window_height
texture_aspect = width / height
# Determine scaling factors based on aspect ratios
scale_x = width / window_width
scale_y = height / window_height
if (window_aspect == texture_aspect) & (window_aspect > 1):
scale_x = scale_y = 1
# Establish the vertices for the texture in the shader program
quad = np.array(
[
-scale_x,
scale_y, # top left
-scale_x,
-scale_y, # bottom left
scale_x,
scale_y, # top right
scale_x,
scale_y, # top right
-scale_x,
-scale_y, # bottom left
scale_x,
-scale_y, # bottom right
],
dtype=np.float32,
)
return scale_x, scale_y, quad
def create_buffer_and_vao(self, quad, program):
"""
Creates a buffer and vertex array object (VAO) for rendering.
Parameters
----------
quad : np.array
Array of vertices for the quad.
program : moderngl.Program
The shader program used for rendering.
Returns
-------
moderngl.Buffer
The created vertex buffer object (VBO).
moderngl.VertexArray
The created vertex array object (VAO).
"""
# Create a buffer from the quad vertices
vbo = self.window.ctx.buffer(quad.tobytes())
# Create a vertex array object
vao = self.window.ctx.simple_vertex_array(program, vbo, "in_pos")
return vbo, vao
def setup_presentation(self, frames, loops, desired_fps):
"""
Sets up the presentation parameters including time per frame and pattern indices.
Parameters
----------
frames : int
The number of frames in the noise pattern.
loops : int
The number of times the noise pattern should loop.
desired_fps : float
The desired frames per second for the presentation.
Returns
-------
float
The time allocated per frame.
list
The list of pattern indices for the presentation loop.
"""
# Calculate the time per frame for the desired FPS
time_per_frame = 1 / desired_fps
# Calculate the pattern indices for each frame in the loop
pattern_indices = np.arange(frames) # Generate indices for each frame
pattern_indices = np.tile(
pattern_indices, loops
) # Repeat indices for each loop
return time_per_frame, pattern_indices.tolist()
import time
def presentation_loop(
self,
pattern_indices,
s_frames,
end_times,
nr_colours,
arduino_colours,
change_logic,
patterns,
program,
vao,
):
"""
Main loop for presenting the noise.
Parameters
----------
pattern_indices : list
List of indices indicating the order in which to present the patterns.
s_frames : list
List of timestamps for when each frame should start.
arduino_colours : list
List of colours to be used for each frame.
change_logic : int
Logic to determine when to change the colour.
patterns : list
List of textures for each noise frame.
program : moderngl.Program
The shader program used for rendering.
vao : moderngl.VertexArray
The vertex array object for rendering.
"""
for idx, current_pattern_index in enumerate(pattern_indices):
self.communicate() # Custom function for communication, can be modified as needed
if self.stop:
return end_times
# Sync frame presentation to the scheduled time
while time.perf_counter() < s_frames[idx]:
pass # Busy-wait until the scheduled frame time
self.window.use() # Ensure the correct context is being used
# Handle colour change logic
if current_pattern_index % change_logic == 0:
c = arduino_colours[
current_pattern_index // change_logic % len(arduino_colours)
]
self.send_colour(c) # Custom function to send colour to Arduino
# Clear the window and render the noise
self.window.ctx.clear(0, 0, 0)
patterns[current_pattern_index].use(location=0)
if nr_colours > 1:
program[
"pattern"
].red = 0 # Assuming 'pattern' is the uniform name in shader
program["pattern"].green = 0
program["pattern"].blue = 0
else:
program["pattern"].value = 0
vao.render(moderngl.TRIANGLES)
# Swap buffers and send trigger signal
start_time = time.perf_counter()
self.window.swap_buffers()
self.send_trigger() # Custom function to send a trigger signal to Arduino
# Monitor and log frame duration, if necessary
end_times[idx] = time.perf_counter() - start_time
# Break the loop if the last frame was presented
if idx >= len(pattern_indices) - 1:
return end_times
def cleanup_and_finalize(
self, patterns, vbo, vao, noise_dict, end_times, desired_fps
):
"""
Cleans up resources, writes logs, and runs final procedures after the presentation.
Parameters
----------
patterns : list
List of texture objects to be released.
vbo : moderngl.Buffer
The vertex buffer object to be released.
vao : moderngl.VertexArray
The vertex array object to be released.
noise_dict : dict
The dictionary containing noise settings, used for logging purposes.
end_times : list
List of frame durations.
desired_fps : float
The desired frames per second for the presentation.
"""
# Send final colour signal or perform any final communication
self.send_colour("O") # Assuming 'O' is the signal for completion
# Release all pattern textures
for pattern in patterns:
pattern.release()
# Release the buffer and vertex array object
vbo.release()
vao.release()
# Check which frames were dropped
dropped_frames = np.where(end_times - (1 / desired_fps) > 0)
wrong_frame_times = end_times[dropped_frames[0]]
if len(dropped_frames[0]) == 0:
dropped_frames = None
wrong_frame_times = None
else:
# Print the dropped frames
print(f"dropped frames (idx): {dropped_frames[0]}")
print(f"wrong frame times: {wrong_frame_times}")
# Write log with the noise_dict or any other relevant information
write_log(
noise_dict, dropped_frames, wrong_frame_times
) # Assuming 'write_log' is a function for logging
# Run any additional emptying or resetting procedures
self.stop = False
self.run_empty() # Assuming 'run_empty' is a method for final procedures
def play_noise(self, noise_dict):
"""
Play the noise file. This function loads the noise file, creates a texture from it and presents it.
Parameters
----------
file : str
Path to the noise file.
"""
# p
# Get the data according to the noise_dict
(
file,
loops,
arduino_colours,
change_logic,
s_frames,
) = self.load_and_initialize_data(noise_dict)
arduino_colours = self.process_arduino_colours(
arduino_colours, change_logic, len(s_frames)
)
# Load the noise data
(
all_patterns_3d,
width,
height,
frames,
desired_fps,
patterns,
nr_colours,
) = self.load_noise_data(file)
# Establish the shader program for presenting the noise
program = self.setup_shader_program(nr_colours)
# Calculate the aspect ratio of the window and the noise to adjust the noise size
scale_x, scale_y, quad = self.calculate_scaling(width, height)
# Create the buffer and vertex array object for the noise
vbo, vao = self.create_buffer_and_vao(quad, program)
# Establish the time per frame for the desired fps
time_per_frame, pattern_indices = self.setup_presentation(
frames, loops, desired_fps
)
# Synchronize the presentation
# Add buffer delay to frames:
delay_needed = s_frames[0] - time.perf_counter()
if delay_needed > 0:
delay = 10
else:
delay = np.abs(delay_needed) + 10
s_frames = s_frames + delay
print(
f"stimulus will start in {s_frames[0] - time.perf_counter()} seconds, window_idx: {self.process_idx}"
)
print(f"Current time is {datetime.datetime.now()}")
end_times = np.zeros(len(s_frames))
# Start the presentation loop
end_times = self.presentation_loop(
pattern_indices,
s_frames,
end_times,
nr_colours,
arduino_colours,
change_logic,
patterns,
program,
vao,
)
# Clean up and finalize the presentation
self.cleanup_and_finalize(
patterns, vbo, vao, noise_dict, end_times, desired_fps
)
def write_log(noise_dict, dropped_frames=None, wrong_frame_times=None):
"""
Write the log file for the noise presentation.
Parameters
----------
noise_dict : str
Path to the noise file.
"""
file = noise_dict["file"]
loops = noise_dict["loops"]
colours = noise_dict["colours"]
change_logic = noise_dict["change_logic"]
filename_format = (
f"logs/{file}_{datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S.csv')}"
)
if dropped_frames is None:
dropped_frames = []
wrong_frame_times = []
with open(filename_format, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(
[
"noise_file",
"loops",
"colours",
"change_logic",
"time",
"dropped_frames",
"wrong_frame_times",
]
)
writer.writerow(
[
file,
loops,
colours,
change_logic,
time.strftime("%H:%M:%S"),
dropped_frames,
wrong_frame_times,
]
)
def load_3d_patterns(file, channels=None):
"""
Load the noise .h5 file and return the noise data, width, height, frames and frame rate.
Parameters
----------
file : str
Path to the noise file.
Returns
-------
noise : np.ndarray
Noise data.
width : int
Width of the noise.
height : int
Height of the noise.
frames : int
Number of frames in the noise.
frame_rate : int
Frame rate of the noise.
"""
with h5py.File(f"stimuli/{file}", "r") as f:
noise = np.asarray(f["Noise"][:], dtype=np.uint8)
frame_rate = f["Frame_Rate"][()]
size = noise.shape
width = size[2]
height = size[1]
frames = size[0]
try:
colours = size[3]
except IndexError:
colours = 1
if (colours > 1) & (channels is not None):
try:
noise = noise[:, :, :, channels]
colours = len(channels)
except IndexError:
print("more channels requested than available in the noise file")
raise
# noise = np.asfortranarray(noise)
return noise, width, height, frames, frame_rate, colours
def get_noise_info(file):
with h5py.File(f"stimuli/{file}", "r") as f:
noise = f["Noise"][:]
frame_rate = f["Frame_Rate"][()]
size = noise.shape
width = size[2]
height = size[1]
frames = size[0]
return width, height, frames, frame_rate
def pyglet_app_lead(
process_idx,
config,
queue,
sync_queue,
sync_lock,
lock,
ard_queue,
ard_lock,
delay=10,
):
"""
Start the pyglet app. This function is used to spawn the pyglet app in a separate process.
Parameters
----------
process_idx : int
Index of the process. Used to determine the window position.
config : dict
Configuration dictionary.
Keys:
width : int
Width of the window.
height : int
Height of the window.
fullscreen : bool
Fullscreen mode.
screen : int
Screen number.
queue : multiprocessing.Queue
Queue for communication with the main process (gui).
"""
Noise = Presenter(
process_idx,
config,
queue,
sync_queue,
sync_lock,
lock,
ard_queue,
ard_lock,
mode="lead",
delay=delay,
)
Noise.run_empty() # Establish the empty loop
def pyglet_app_follow(
process_idx,
config,
queue,
sync_queue,
sync_lock,
lock,
ard_queue,
ard_lock,
delay=10,
):
"""
Start the pyglet app. This function is used to spawn the pyglet app in a separate process.
Parameters
----------
process_idx : int
Index of the process. Used to determine the window position.
config : dict
Configuration dictionary.
Keys:
width : int
Width of the window.
height : int
Height of the window.
fullscreen : bool
Fullscreen mode.
screen : int
Screen number.
queue : multiprocessing.Queue
Queue for communication with the main process (gui).
"""
Noise = Presenter(
process_idx,
config,
queue,
sync_queue,
sync_lock,
lock,
ard_queue,
ard_lock,
mode="follow",
delay=delay,
)
Noise.run_empty() # Establish the empty loop
# Can run the pyglet app from here for testing purposes if needed
# if __name__ == '__main__':
# pyglet_app(Queue())