Skip to content

Commit

Permalink
Keep refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
tsalo committed Nov 29, 2023
1 parent bf062c5 commit 620a759
Show file tree
Hide file tree
Showing 2 changed files with 492 additions and 147 deletions.
325 changes: 178 additions & 147 deletions fLoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pandas as pd
from psychopy import core, event, logging, visual
from psychopy.constants import STARTED, STOPPED
from psychopy.gui import DlgFromDict
from yaml import Loader, load


Expand Down Expand Up @@ -239,8 +238,131 @@ def draw(win, stim, duration, clock):
return response.keys, response.rt


def prepare_trials(
stimulus_categories,
stimuli,
constants,
task,
):
"""Define the order and stimuli for the trials in a single run."""
trial_duration = constants["IMAGE_DURATION"] + constants["TARGET_ISI"]
n_categories = len(stimulus_categories)
n_blocks_per_category = int(np.floor(constants["N_BLOCKS"] / n_categories))

# Determine which trials will be task
# This might be overly convoluted, but it maximizes balance between
# task/non-task instead of just sampling with set probabilities
nontask_rate = 1 - constants["TASK_RATE"]
task_mult = 1 / np.minimum(constants["TASK_RATE"], nontask_rate)
n_task_prop = int(task_mult * constants["TASK_RATE"])
n_nontask_prop = int(task_mult * nontask_rate)
grabber_list = [1] * n_task_prop + [0] * n_nontask_prop

# We want to ensure that tasks are not assigned to baseline blocks
n_nonbaseline_blocks = int(constants["N_BLOCKS"] * (n_categories - 1) / n_categories)
n_dupes = int(np.ceil(n_nonbaseline_blocks / len(grabber_list)))
task_blocks = grabber_list * n_dupes

block_categories = randomize_carefully(stimulus_categories, n_blocks_per_category)
np.random.shuffle(task_blocks)
task_blocks_full = np.zeros(len(block_categories))
task_blocks_full[np.array(block_categories) != "baseline"] = task_blocks

run_config = {
"stimuli": ["n/a"],
"trial_type": ["countdown"],
"category": ["n/a"],
"subcategory": ["n/a"],
"miniblock_number": ["n/a"],
"expected_duration": [constants["COUNTDOWN_DURATION"]],
}
target_trial_idx = None
for j_block, category in enumerate(block_categories):
if category == "baseline":
run_config["stimuli"].append("n/a")
run_config["trial_type"].append("baseline")
run_config["category"].append("n/a")
run_config["subcategory"].append("n/a")
run_config["expected_duration"].append(
constants["N_STIMULI_PER_BLOCK"] * trial_duration
)
run_config["miniblock_number"].append(j_block)
else:
n_trials_in_block = constants["N_STIMULI_PER_BLOCK"]
# Block of stimuli
block_stimuli = list(
np.random.choice(
stimuli[category],
size=n_trials_in_block,
replace=False,
)
)
block_subcategories = [os.path.basename(s).split("-")[0] for s in block_stimuli]
run_config["stimuli"] += block_stimuli
run_config["trial_type"] += ["category"] * n_trials_in_block
run_config["category"] += [category] * n_trials_in_block
run_config["subcategory"] += block_subcategories
run_config["miniblock_number"] += [j_block] * n_trials_in_block
run_config["expected_duration"] += [trial_duration] * n_trials_in_block

if task_blocks_full[j_block] == 1:
start_of_block = len(run_config["trial_type"]) - n_trials_in_block
# Check for last block's target to make sure that two targets don't
# occur within the same response window
if (j_block > 0) and (target_trial_idx is not None):
last_target_onset = np.sum(run_config["expected_duration"][:target_trial_idx])
last_target_rw_offset = last_target_onset + constants["RESPONSE_WINDOW"]

first_viable_trial = None
for k_trial, trial_offset in enumerate(range(n_trials_in_block + 1, 1, -1)):
onset = np.sum(run_config["expected_duration"][:-trial_offset])
if onset > last_target_rw_offset:
first_viable_trial = k_trial
break

else:
first_viable_trial = 0

# Adjust stimuli based on task
if task == "Oddball":
# target is scrambled image
target_trial_idx = np.random.randint(
start_of_block + first_viable_trial,
start_of_block + n_trials_in_block,
)
run_config["stimuli"][target_trial_idx] = np.random.choice(
stimuli["scrambled"]
)
elif task == "OneBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 1)
target_trial_idx = np.random.randint(
start_of_block + first_viable_trial,
start_of_block + n_trials_in_block,
)
run_config["stimuli"][target_trial_idx] = run_config["stimuli"][
target_trial_idx - 1
]
elif task == "TwoBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 2)
target_trial_idx = np.random.randint(
start_of_block + first_viable_trial,
start_of_block + n_trials_in_block,
)
run_config["stimuli"][target_trial_idx] = run_config["stimuli"][
target_trial_idx - 2
]

run_config["trial_type"][target_trial_idx] = task.lower()

return run_config


def main(debug=False):
"""Run the fLoc task."""
from psychopy.gui import DlgFromDict

# Ensure that relative paths start from the same directory as this script
try:
script_dir = os.path.dirname(os.path.abspath(__file__)).decode(sys.getfilesystemencoding())
Expand All @@ -259,7 +381,7 @@ def main(debug=False):
# ------------------
# Remember to turn fullscr to True for the real deal.
window = visual.Window(
fullscr=True,
fullscr=False,
size=(800, 600),
monitor="testMonitor",
units="pix",
Expand Down Expand Up @@ -404,8 +526,6 @@ def main(debug=False):
stimulus_folders = config["category_sets"][exp_info["Image Set"]]

standard_categories = [cat for cat in stimulus_folders.keys() if cat != "scrambled"]
n_categories = len(standard_categories)
n_blocks_per_category = int(np.floor(constants["N_BLOCKS"] / n_categories))

stimuli = {}
for category in stimulus_folders.keys():
Expand All @@ -423,46 +543,27 @@ def main(debug=False):
# TODO: Support stimulus for baseline trials
stimuli[category] = None # baseline trials just have fixation

# Determine which trials will be task
# This might be overly convoluted, but it maximizes balance between
# task/non-task instead of just sampling with set probabilities
nontask_rate = 1 - constants["TASK_RATE"]
task_mult = 1 / np.minimum(constants["TASK_RATE"], nontask_rate)
n_task_prop = int(task_mult * constants["TASK_RATE"])
n_nontask_prop = int(task_mult * nontask_rate)
grabber_list = [1] * n_task_prop + [0] * n_nontask_prop

# We want to ensure that tasks are not assigned to baseline blocks
n_nonbaseline_blocks = int(constants["N_BLOCKS"] * (n_categories - 1) / n_categories)
n_dupes = int(np.ceil(n_nonbaseline_blocks / len(grabber_list)))
task_blocks = grabber_list * n_dupes

# Scanner runtime
# ---------------
global_clock = core.Clock() # to track the time since experiment started
run_clock = core.Clock() # to track time since each run starts (post scanner pulse)
block_clock = core.Clock() # to track duration of each block
trial_clock = core.Clock() # to track duration of each trial
fixation_trial_clock = core.Clock() # to account for fixation time spent loading image

columns = [
"onset",
"duration",
"trial_type",
"miniblock_number",
"category",
"subcategory",
"stim_file",
]
# unnecessary, since run_frame is defined at end of for loop, but satisfies linter
run_frame = None
for i_run in range(n_runs):
run_data = {c: [] for c in columns}
run_label = i_run + 1
events_file = os.path.join(output_dir, f"{base_name}_run-{run_label:02d}_events.tsv")

block_categories = randomize_carefully(standard_categories, n_blocks_per_category)
np.random.shuffle(task_blocks)
run_data = prepare_trials(
stimulus_categories=standard_categories,
stimuli=stimuli,
task=exp_info["Task"],
constants=config["constants"],
)
run_data["onset"] = []
run_data["duration"] = []

# Scanner runtime
# ---------------
Expand All @@ -484,29 +585,24 @@ def main(debug=False):
draw_until_keypress(win=window, stim=performance_screen, debug=debug)

run_clock.reset()
run_response_times = []
for i_trial in range(len(run_data["trial_type"])):
trial_type = run_data["trial_type"][i_trial]
stim_file = run_data["stimuli"][i_trial]
trial_clock.reset()

actual_onset_time = run_clock.getTime()
if trial_type == "countdown":
draw_countdown(
win=window,
stim=countdown_text_box,
duration=constants["COUNTDOWN_DURATION"],
)

# Show countdown at beginning of run
draw_countdown(
win=window,
stim=countdown_text_box,
duration=constants["COUNTDOWN_DURATION"],
)

real_countdown_duration = run_clock.getTime()
run_data["onset"].append(0)
run_data["duration"].append(real_countdown_duration)
run_data["trial_type"].append("countdown")
run_data["stim_file"].append("n/a")
run_data["category"].append("n/a")
run_data["subcategory"].append("n/a")
run_data["miniblock_number"].append("n/a")
run_data["onset"].append(actual_onset_time)
run_data["duration"].append(trial_clock.getTime())

run_response_times = []
nonbaseline_block_counter = 0
for j_block, category in enumerate(block_categories):
block_clock.reset()
if category == "baseline":
onset_time = run_clock.getTime()
elif trial_type == "baseline":
responses, _ = draw(
win=window,
stim=fixation,
Expand All @@ -516,108 +612,43 @@ def main(debug=False):
# The first element of each sublist is the actual button pressed,
# but we don't care about that.
run_response_times += [resp[1] for resp in responses]
target_idx = None

# Log info
run_data["onset"].append(onset_time)
run_data["duration"].append(block_clock.getTime())
run_data["trial_type"].append("baseline")
run_data["stim_file"].append("n/a")
run_data["category"].append("baseline")
run_data["subcategory"].append("baseline")
run_data["miniblock_number"].append(j_block + 1)
else:
# Block of stimuli
block_stimuli = list(
np.random.choice(
stimuli[category],
size=constants["N_STIMULI_PER_BLOCK"],
replace=False,
)
)
if task_blocks[nonbaseline_block_counter] == 1:
# Check for last block's target to make sure that two targets don't
# occur within the same response window
if (j_block > 0) and (target_idx is not None):
last_target_onset = (
((constants["N_STIMULI_PER_BLOCK"] + 1) - target_idx)
* trial_duration
* -1
)
last_target_rw_offset = last_target_onset + constants["RESPONSE_WINDOW"]
first_viable_trial = int(np.ceil(last_target_rw_offset / trial_duration))
first_viable_trial = np.maximum(0, first_viable_trial)
first_viable_trial += 1 # just to give it a one-trial buffer
else:
first_viable_trial = 0

# Adjust stimuli based on task
if exp_info["Task"] == "Oddball":
# target is scrambled image
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = np.random.choice(stimuli["scrambled"])
elif exp_info["Task"] == "OneBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 1)
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = block_stimuli[target_idx - 1]
elif exp_info["Task"] == "TwoBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 2)
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = block_stimuli[target_idx - 2]
else:
target_idx = None

for k_stim, stim_file in enumerate(block_stimuli):
fixation_trial_clock.reset()
stim_image.image = stim_file
trial_clock.reset()
onset_time = run_clock.getTime()

# Draw the stimulus.
# Accept responses during the stimulus presentation.
responses, _ = draw(
win=window,
stim=[stim_image, fixation],
duration=constants["IMAGE_DURATION"],
clock=run_clock,
)
run_response_times += [resp[1] for resp in responses]
run_data["onset"].append(actual_onset_time)
run_data["duration"].append(trial_clock.getTime())

# Log the inter-stimulus interval.
duration = trial_clock.getTime()
loading_plus_stim_duration = fixation_trial_clock.getTime()
isi_dur = np.maximum(trial_duration - loading_plus_stim_duration, 0)

# Draw the post-stimulus fixation.
# Accept responses during this fixation presentation.
responses, _ = draw(
win=window, stim=fixation, duration=isi_dur, clock=run_clock
)
else:
fixation_trial_clock.reset()
stim_image.image = stim_file

run_response_times += [resp[1] for resp in responses]
relative_stim_file = os.path.relpath(stim_file)
subcategory = os.path.basename(relative_stim_file).split("-")[0]
# Draw the stimulus.
# Accept responses during the stimulus presentation.
responses, _ = draw(
win=window,
stim=[stim_image, fixation],
duration=constants["IMAGE_DURATION"],
clock=run_clock,
)
run_response_times += [resp[1] for resp in responses]

if k_stim == target_idx:
trial_type = exp_info["Task"].lower()
else:
trial_type = "category"
# Log the inter-stimulus interval.
actual_duration = trial_clock.getTime()
loading_plus_stim_duration = fixation_trial_clock.getTime()
isi_dur = np.maximum(trial_duration - loading_plus_stim_duration, 0)

# Log info
run_data["onset"].append(onset_time)
run_data["duration"].append(duration)
run_data["trial_type"].append(trial_type)
run_data["stim_file"].append(relative_stim_file)
run_data["category"].append(category)
run_data["subcategory"].append(subcategory)
run_data["miniblock_number"].append(j_block + 1)
# Draw the post-stimulus fixation.
# Accept responses during this fixation presentation.
responses, _ = draw(
win=window,
stim=fixation,
duration=isi_dur,
clock=run_clock,
)

nonbaseline_block_counter += 1
run_response_times += [resp[1] for resp in responses]

# Unused duration
# block_duration = block_clock.getTime()
# Log info
run_data["onset"].append(actual_onset_time)
run_data["duration"].append(actual_duration)

run_frame = pd.DataFrame(run_data)
run_frame = allocate_responses(
Expand Down
Loading

0 comments on commit 620a759

Please sign in to comment.