Skip to content

Commit

Permalink
Address style issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
tsalo committed Nov 28, 2023
1 parent 3dd95b4 commit 01888dd
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
11 changes: 4 additions & 7 deletions bids/generate_task_description_files.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Generate BIDS JSON files for the task."""
import json

events_description = {
Expand Down Expand Up @@ -53,15 +54,11 @@
for task_type in ["Oddball", "Two-Back", "One-Back"]:
bold_description = {
"CogAtlasID": "trm_553e85265f51e",
"TaskName": "dual functional localizer/{0}".format(task_type.lower()),
"TaskName": f"dual functional localizer/{task_type.lower()}",
}

with open(
"task-localizer{0}_events.json".format(task_type.replace("-", "")), "w"
) as fo:
with open(f"task-localizer{task_type.replace('-', '')}_events.json", "w") as fo:
json.dump(events_description, fo, sort_keys=True, indent=4)

with open(
"task-localizer{0}_bold.json".format(task_type.replace("-", "")), "w"
) as fo:
with open(f"task-localizer{task_type.replace('-', '')}_bold.json", "w") as fo:
json.dump(bold_description, fo, sort_keys=True, indent=4)
70 changes: 48 additions & 22 deletions fLoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def allocate_responses(events_df, response_times, response_window=1.0):
Updated dataframe with columns "response_time", "accuracy", and "classification" added.
"""
# Let's start by locating target trials
TASK_TYPES = ["oddball", "oneback", "twoback"]
task_types = ["oddball", "oneback", "twoback"]
response_times = response_times[:] # copy
target_trial_idx = events_df["trial_type"].isin(TASK_TYPES)
target_trial_idx = events_df["trial_type"].isin(task_types)
nontarget_trial_idx = ~target_trial_idx

events_df["response_time"] = "n/a"
Expand All @@ -42,7 +42,9 @@ def allocate_responses(events_df, response_times, response_window=1.0):

# Defaults
events_df.loc[events_df["trial_type"] == "category", "classification"] = 1
events_df.loc[events_df["trial_type"] == "category", "classification"] = "true_negative"
events_df.loc[
events_df["trial_type"] == "category", "classification"
] = "true_negative"
events_df.loc[target_trial_idx, "accuracy"] = 0 # default to miss
events_df.loc[target_trial_idx, "classification"] = "false_negative"

Expand Down Expand Up @@ -156,16 +158,16 @@ def draw_countdown(win, stim, duration):
countdown_sec -= 1


def draw_until_keypress(win, stim, continueKeys=["5"], debug=False):
def draw_until_keypress(win, stim, continue_keys=["5"], debug=False):
"""Draw a screen until a specific key is pressed.
Parameters
----------
win : :obj:`psychopy.visual.Window`
Window in which to draw the stimulus.
stim : :obj:`psychopy.visual.TextStim`
Text stimulus (e.g., instructions) to draw until one of the ``continueKeys`` are pressed.
continueKeys : :obj:`list` of :obj:`str`, optional
Text stimulus (e.g., instructions) to draw until one of the ``continue_keys`` are pressed.
continue_keys : :obj:`list` of :obj:`str`, optional
Keys to accept to stop drawing the stimulus.
Default is ["5"].
debug : :obj:`bool`
Expand All @@ -185,8 +187,8 @@ def draw_until_keypress(win, stim, continueKeys=["5"], debug=False):
s.draw()
else:
stim.draw()
keys = event.getKeys(keyList=continueKeys)
if any([ck in keys for ck in continueKeys]):
keys = event.getKeys(keyList=continue_keys)
if any([ck in keys for ck in continue_keys]):
return
close_on_esc(win)
win.flip()
Expand Down Expand Up @@ -306,7 +308,9 @@ def main(debug=False):
if exp_info["Session"]:
ses_str = f"ses-{exp_info['Session'].zfill(2)}_"

base_name = f"sub-{exp_info['Subject'].zfill(2)}_{ses_str}task-localizer{exp_info['Task']}"
base_name = (
f"sub-{exp_info['Subject'].zfill(2)}_{ses_str}task-localizer{exp_info['Task']}"
)

# save a log file for detail verbose info
filename = os.path.join(output_dir, f"{base_name}_events")
Expand Down Expand Up @@ -437,7 +441,9 @@ def main(debug=False):
grabber_list = [1] * n_task_prop + [0] * n_nontask_prop

# We want to ensure that tasks are not assigned to baseline blocks
n_nonbaseline_blocks = int(constants["N_BLOCKS"] * (n_categories - 1) / n_categories)
n_nonbaseline_blocks = int(
constants["N_BLOCKS"] * (n_categories - 1) / n_categories
)
n_dupes = int(np.ceil(n_nonbaseline_blocks / len(grabber_list)))
task_blocks = grabber_list * n_dupes

Expand All @@ -447,9 +453,11 @@ def main(debug=False):
run_clock = core.Clock() # to track time since each run starts (post scanner pulse)
block_clock = core.Clock() # to track duration of each block
trial_clock = core.Clock() # to track duration of each trial
fixation_trial_clock = core.Clock() # to account for fixation time spent loading image
fixation_trial_clock = (
core.Clock()
) # to account for fixation time spent loading image

COLUMNS = [
columns = [
"onset",
"duration",
"trial_type",
Expand All @@ -461,11 +469,15 @@ def main(debug=False):
# unnecessary, since run_frame is defined at end of for loop, but satisfies linter
run_frame = None
for i_run in range(n_runs):
run_data = {c: [] for c in COLUMNS}
run_data = {c: [] for c in columns}
run_label = i_run + 1
events_file = os.path.join(output_dir, f"{base_name}_run-{run_label:02d}_events.tsv")
events_file = os.path.join(
output_dir, f"{base_name}_run-{run_label:02d}_events.tsv"
)

block_categories = randomize_carefully(standard_categories, n_blocks_per_category)
block_categories = randomize_carefully(
standard_categories, n_blocks_per_category
)
np.random.shuffle(task_blocks)

# Scanner runtime
Expand Down Expand Up @@ -553,8 +565,12 @@ def main(debug=False):
* trial_duration
* -1
)
last_target_rw_offset = (last_target_onset + constants["RESPONSE_WINDOW"])
first_viable_trial = int(np.ceil(last_target_rw_offset / trial_duration))
last_target_rw_offset = (
last_target_onset + constants["RESPONSE_WINDOW"]
)
first_viable_trial = int(
np.ceil(last_target_rw_offset / trial_duration)
)
first_viable_trial = np.maximum(0, first_viable_trial)
first_viable_trial += 1 # just to give it a one-trial buffer
else:
Expand All @@ -563,17 +579,25 @@ def main(debug=False):
# Adjust stimuli based on task
if exp_info["Task"] == "Oddball":
# target is scrambled image
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = np.random.choice(stimuli["scrambled"])
target_idx = np.random.randint(
first_viable_trial, len(block_stimuli)
)
block_stimuli[target_idx] = np.random.choice(
stimuli["scrambled"]
)
elif exp_info["Task"] == "OneBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 1)
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
target_idx = np.random.randint(
first_viable_trial, len(block_stimuli)
)
block_stimuli[target_idx] = block_stimuli[target_idx - 1]
elif exp_info["Task"] == "TwoBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 2)
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
target_idx = np.random.randint(
first_viable_trial, len(block_stimuli)
)
block_stimuli[target_idx] = block_stimuli[target_idx - 2]
else:
target_idx = None
Expand Down Expand Up @@ -652,7 +676,9 @@ def main(debug=False):
# Show the final run's performance
# Scanner is off for this
hit_count = (run_frame["classification"] == "true_positive").sum()
n_probes = run_frame["classification"].isin(["false_negative", "true_positive"]).sum()
n_probes = (
run_frame["classification"].isin(["false_negative", "true_positive"]).sum()
)
hit_rate = hit_count / n_probes
fa_count = (run_frame["classification"] == "false_positive").sum()
performance_str = (
Expand Down

0 comments on commit 01888dd

Please sign in to comment.