Skip to content

Commit

Permalink
Execution by subgroups of subjects (#151)
Browse files Browse the repository at this point in the history
* [BUG] inside unit_tests workflow

* [DOC] runner help

* Creating entry-points for the project

* [DOC] command line tools

* [DOC] command line tools

* Adding a tester command line tool

* [DATALAD] change results url

* Runner configuration

* Remove dir func in

* Runner always stops on first crash

* [TEST][helpers] not failing test if correlation under threshold

* [DOC] narps_open.core.common

* [ENH][TEST] narps_open.core.nodes module

* [ENH][DOC] node generators in core module

* [PEP8][SPELL] node generators in core module

* Add a remove parent dir node generator

* [REFAC][DOC] Creators at interface level instead of nodes

* Creating an interface factory

* Remove node modules after merging

* [TEST] iterating over groups f 4 subjects in pipeline_execution_test

* [TEST] iterating over groups f 4 subjects in pipeline_execution_test

* Runner refac
  • Loading branch information
bclenet authored Jan 29, 2024
1 parent 59bb46c commit d4ff702
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 61 deletions.
46 changes: 23 additions & 23 deletions narps_open/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,38 +106,38 @@ def start(self, first_level_only: bool = False, group_level_only: bool = False)
if first_level_only and group_level_only:
raise AttributeError('first_level_only and group_level_only cannot both be True')

# Generate workflow list
workflow_list = []
# Generate workflow lists
first_level_workflows = []
group_level_workflows = []

if not group_level_only:
workflow_list += [
for workflow in [
self._pipeline.get_preprocessing(),
self._pipeline.get_run_level_analysis(),
self._pipeline.get_subject_level_analysis(),
]
if not first_level_only:
workflow_list += [
self._pipeline.get_group_level_analysis()
]
self._pipeline.get_subject_level_analysis()]:

nb_procs = Configuration()['runner']['nb_procs']
if isinstance(workflow, list):
for sub_workflow in workflow:
first_level_workflows.append(sub_workflow)
else:
first_level_workflows.append(workflow)

if not first_level_only:
for workflow in [self._pipeline.get_group_level_analysis()]:
if isinstance(workflow, list):
for sub_workflow in workflow:
group_level_workflows.append(sub_workflow)
else:
group_level_workflows.append(workflow)

# Launch workflows
for workflow in workflow_list:
for workflow in first_level_workflows + group_level_workflows:
if workflow is None:
pass
elif isinstance(workflow, list):
for sub_workflow in workflow:
if not isinstance(sub_workflow, Workflow):
raise AttributeError('Workflow must be of type nipype.Workflow')

if nb_procs > 1:
sub_workflow.run('MultiProc', plugin_args = {'n_procs': nb_procs})
else:
sub_workflow.run()
elif not isinstance(workflow, Workflow):
raise AttributeError('Workflow must be of type nipype.Workflow')
else:
if not isinstance(workflow, Workflow):
raise AttributeError('Workflow must be of type nipype.Workflow')

nb_procs = Configuration()['runner']['nb_procs']
if nb_procs > 1:
workflow.run('MultiProc', plugin_args = {'n_procs': nb_procs})
else:
Expand Down
1 change: 1 addition & 0 deletions narps_open/utils/configuration/testing_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ neurovault_naming = true # true if results files are saved using the neurovault
[testing]

[testing.pipelines]
nb_subjects_per_group = 4 # Compute first level analyses by subgroups of N subjects, to avoid lacking of disk and memory
correlation_thresholds = [0.30, 0.70, 0.79, 0.85, 0.93] # Correlation between reproduced hypotheses files and results, respectively for [20, 40, 60, 80, 108] subjects.
55 changes: 33 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,46 @@ def test_pipeline_execution(
TODO : how to keep intermediate files of the low level for the next numbers of subjects ?
- keep intermediate levels : boolean in PipelineRunner
"""
# A list of number of subject to iterate over
nb_subjects_list = list(range(
Configuration()['testing']['pipelines']['nb_subjects_per_group'],
nb_subjects,
Configuration()['testing']['pipelines']['nb_subjects_per_group'])
)
nb_subjects_list.append(nb_subjects)

# Initialize the pipeline
runner = PipelineRunner(team_id)
runner.nb_subjects = nb_subjects
runner.pipeline.directories.dataset_dir = Configuration()['directories']['dataset']
runner.pipeline.directories.results_dir = Configuration()['directories']['reproduced_results']
runner.pipeline.directories.set_output_dir_with_team_id(team_id)
runner.pipeline.directories.set_working_dir_with_team_id(team_id)

# Run as long as there are missing files after first level (with a max number of trials)
# TODO : this is a workaround
for _ in range(Configuration()['runner']['nb_trials']):

# Get missing subjects
missing_subjects = set()
for file in runner.get_missing_first_level_outputs():
subject_id = get_subject_id(file)
if subject_id is not None:
missing_subjects.add(subject_id)

# Leave if no missing subjects
if not missing_subjects:
break

# Start pipeline
runner.subjects = missing_subjects
try: # This avoids errors in the workflow to make the test fail
runner.start(True, False)
except(RuntimeError) as err:
print('RuntimeError: ', err)
# Run first level by (small) sub-groups of subjects
for subjects in nb_subjects_list:
runner.nb_subjects = subjects

# Run as long as there are missing files after first level (with a max number of trials)
# TODO : this is a workaround
for _ in range(Configuration()['runner']['nb_trials']):

# Get missing subjects
missing_subjects = set()
for file in runner.get_missing_first_level_outputs():
subject_id = get_subject_id(file)
if subject_id is not None:
missing_subjects.add(subject_id)

# Leave if no missing subjects
if not missing_subjects:
break

# Start pipeline
runner.subjects = missing_subjects
try: # This avoids errors in the workflow to make the test fail
runner.start(True, False)
except(RuntimeError) as err:
print('RuntimeError: ', err)

# Check missing files for the last time
missing_files = runner.get_missing_first_level_outputs()
Expand Down
71 changes: 55 additions & 16 deletions tests/test_conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def set_test_directory(scope = 'function'):
yield

# Comment this line for debugging
#rmtree(TEST_DIR, ignore_errors = True)
rmtree(TEST_DIR, ignore_errors = True)

class MockupPipeline(Pipeline):
""" A simple Pipeline class for test purposes """
Expand All @@ -50,11 +50,12 @@ def __init__(self):
with open(self.test_file, 'w', encoding = 'utf-8') as file:
file.write(str(0))

def update_execution_count(_, file_path: str, workflow_name: str):
def update_execution_count(_, file_path: str, workflow_name: str, nb_subjects: int):
""" Method used inside a nipype Node, to update the execution count inside the file.
Arguments:
- file_path:str, path to the execution count file
- workflow_name:str, name of the workflow
- nb_subjects:int, number of subjects in the workflow
Return: the updated number of executions
"""
Expand All @@ -63,13 +64,13 @@ def update_execution_count(_, file_path: str, workflow_name: str):
execution_counter = 0
with open(file_path, 'r', encoding = 'utf-8') as file:
# Get last char of the file
execution_counter = int(file.read()[-1])
execution_counter = int(file.read().split(' ')[-1])

execution_counter += 1

# Write execution count back
with open(file_path, 'a', encoding = 'utf-8') as file:
file.write(f'\n{workflow_name} {execution_counter}')
file.write(f'\n{workflow_name} {nb_subjects} {execution_counter}')

return execution_counter

Expand Down Expand Up @@ -98,7 +99,7 @@ def create_workflow(self, workflow_name: str, file_list: list):
- file_list: list, list of the files that the workflow is supposed to generate
"""
node_count = Node(Function(
input_names = ['_', 'file_path', 'workflow_name'],
input_names = ['_', 'file_path', 'workflow_name', 'nb_subjects'],
output_names = ['execution_counter'],
function = self.update_execution_count),
name = 'node_count'
Expand All @@ -108,6 +109,7 @@ def create_workflow(self, workflow_name: str, file_list: list):
node_count.inputs._ = datetime.now()
node_count.inputs.file_path = self.test_file
node_count.inputs.workflow_name = workflow_name
node_count.inputs.nb_subjects = len(self.subject_list)

node_decide = Node(Function(
input_names = ['execution_counter'],
Expand Down Expand Up @@ -202,6 +204,38 @@ def get_hypotheses_outputs(self):
template = join(TEST_DIR, 'hypothesis_{id}.md')
return [template.format(id = i) for i in range(1,19)]

class MockupResultsCollection():
""" A fake ResultsCollection object for test purposes """

def __init__(self, team_id: str):
self.team_id = team_id
self.uid = self.get_uid()
self.directory = join(
Configuration()['directories']['narps_results'],
'orig',
self.uid + '_' + self.team_id
)
self.files = self.get_file_urls()

def get_uid(self):
""" Return the uid of the collection by browsing the team description """
return 'uid'

def get_file_urls(self):
""" Return a dict containing the download url for each file of the collection.
* dict key is the file base name (with extension)
* dict value is the download url for the file on Neurovault
"""
urls = {}
for file_id in range(1, 19):
urls[f'file_{file_id}'] = 'url'

return urls

def download(self):
""" Download the collection, file by file. """
pass

class TestConftest:
""" A class that contains all the unit tests for the conftest module."""

Expand Down Expand Up @@ -247,16 +281,18 @@ def test_test_correlation_results(mocker):
def test_test_pipeline_execution(mocker, set_test_directory):
""" Test the test_pipeline_execution helper """

# Set subgroups of subjects
Configuration()['testing']['pipelines']['nb_subjects_per_group'] = 4

# Create mocks
mocker.patch('conftest.get_correlation_coefficient', return_value = 1.0)
fake_runner = PipelineRunner('2T6S')
fake_runner._pipeline = MockupPipeline()
mocker.patch('conftest.PipelineRunner', return_value = fake_runner)
fake_collection = ResultsCollection('2T6S')
mocker.patch('conftest.ResultsCollection', return_value = fake_collection)
mocker.patch('conftest.ResultsCollection', return_value = MockupResultsCollection('2T6S'))

# Run pipeline
helpers.test_pipeline_execution('test_conftest', 20)
helpers.test_pipeline_execution('test_conftest', 7)

# Check outputs
assert isdir(join(TEST_DIR, 'TestConftest_preprocessing_workflow'))
Expand All @@ -268,18 +304,21 @@ def test_test_pipeline_execution(mocker, set_test_directory):
with open(join(TEST_DIR, 'test_conftest.txt'), 'r', encoding = 'utf-8') as file:
assert file.readline() == '0\n'
# First exec of preprocessing creates an exception (execution counter == 1)
assert file.readline() == 'TestConftest_preprocessing_workflow 1\n'
assert file.readline() == f'TestConftest_preprocessing_workflow 4 1\n'
# Relaunching the workflow
# Preprocessing files won't be created(execution counter == 2)
assert file.readline() == 'TestConftest_preprocessing_workflow 2\n'
assert file.readline() == 'TestConftest_run_level_workflow 3\n'
assert file.readline() == 'TestConftest_subject_level_workflow 4\n'
assert file.readline() == f'TestConftest_preprocessing_workflow 4 2\n'
assert file.readline() == f'TestConftest_run_level_workflow 4 3\n'
assert file.readline() == f'TestConftest_subject_level_workflow 4 4\n'
# Relaunching the workflow
# Everything's fine
assert file.readline() == 'TestConftest_preprocessing_workflow 5\n'
assert file.readline() == 'TestConftest_run_level_workflow 6\n'
assert file.readline() == 'TestConftest_subject_level_workflow 7\n'
assert file.readline() == 'TestConftest_group_level_workflow 8'
assert file.readline() == f'TestConftest_preprocessing_workflow 4 5\n'
assert file.readline() == f'TestConftest_run_level_workflow 4 6\n'
assert file.readline() == f'TestConftest_subject_level_workflow 4 7\n'
assert file.readline() == f'TestConftest_preprocessing_workflow 3 8\n'
assert file.readline() == f'TestConftest_run_level_workflow 3 9\n'
assert file.readline() == f'TestConftest_subject_level_workflow 3 10\n'
assert file.readline() == f'TestConftest_group_level_workflow 7 11'

@staticmethod
@mark.unit_test
Expand Down

0 comments on commit d4ff702

Please sign in to comment.