From d4ff7029ddb190594743bbdf80c8677c058ab037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= <117362283+bclenet@users.noreply.github.com> Date: Mon, 29 Jan 2024 16:01:01 +0100 Subject: [PATCH] Execution by subgroups of subjects (#151) * [BUG] inside unit_tests workflow * [DOC] runner help * Creating entry-points for the project * [DOC] command line tools * [DOC] command line tools * Adding a tester command line tool * [DATALAD] change results url * Runner configuration * Remove dir func in * Runner always stops on first crash * [TEST][helpers] not failing test if correlation under threshold * [DOC] narps_open.core.common * [ENH][TEST] narps_open.core.nodes module * [ENH][DOC] node generators in core module * [PEP8][SPELL] node generators in core module * Add a remove parent dir node generator * [REFAC][DOC] Creators at interface level instead of nodes * Creating an interface factory * Remove node modules after merging * [TEST] iterating over groups f 4 subjects in pipeline_execution_test * [TEST] iterating over groups f 4 subjects in pipeline_execution_test * Runner refac --- narps_open/runner.py | 46 ++++++------ .../utils/configuration/testing_config.toml | 1 + tests/conftest.py | 55 ++++++++------ tests/test_conftest.py | 71 ++++++++++++++----- 4 files changed, 112 insertions(+), 61 deletions(-) diff --git a/narps_open/runner.py b/narps_open/runner.py index b1832ffe..bf557ba0 100644 --- a/narps_open/runner.py +++ b/narps_open/runner.py @@ -106,38 +106,38 @@ def start(self, first_level_only: bool = False, group_level_only: bool = False) if first_level_only and group_level_only: raise AttributeError('first_level_only and group_level_only cannot both be True') - # Generate workflow list - workflow_list = [] + # Generate workflow lists + first_level_workflows = [] + group_level_workflows = [] + if not group_level_only: - workflow_list += [ + for workflow in [ self._pipeline.get_preprocessing(), self._pipeline.get_run_level_analysis(), - self._pipeline.get_subject_level_analysis(), - ] - if not first_level_only: - workflow_list += [ - self._pipeline.get_group_level_analysis() - ] + self._pipeline.get_subject_level_analysis()]: - nb_procs = Configuration()['runner']['nb_procs'] + if isinstance(workflow, list): + for sub_workflow in workflow: + first_level_workflows.append(sub_workflow) + else: + first_level_workflows.append(workflow) + + if not first_level_only: + for workflow in [self._pipeline.get_group_level_analysis()]: + if isinstance(workflow, list): + for sub_workflow in workflow: + group_level_workflows.append(sub_workflow) + else: + group_level_workflows.append(workflow) # Launch workflows - for workflow in workflow_list: + for workflow in first_level_workflows + group_level_workflows: if workflow is None: pass - elif isinstance(workflow, list): - for sub_workflow in workflow: - if not isinstance(sub_workflow, Workflow): - raise AttributeError('Workflow must be of type nipype.Workflow') - - if nb_procs > 1: - sub_workflow.run('MultiProc', plugin_args = {'n_procs': nb_procs}) - else: - sub_workflow.run() + elif not isinstance(workflow, Workflow): + raise AttributeError('Workflow must be of type nipype.Workflow') else: - if not isinstance(workflow, Workflow): - raise AttributeError('Workflow must be of type nipype.Workflow') - + nb_procs = Configuration()['runner']['nb_procs'] if nb_procs > 1: workflow.run('MultiProc', plugin_args = {'n_procs': nb_procs}) else: diff --git a/narps_open/utils/configuration/testing_config.toml b/narps_open/utils/configuration/testing_config.toml index 4d9cb110..b5374183 100644 --- a/narps_open/utils/configuration/testing_config.toml +++ b/narps_open/utils/configuration/testing_config.toml @@ -22,4 +22,5 @@ neurovault_naming = true # true if results files are saved using the neurovault [testing] [testing.pipelines] +nb_subjects_per_group = 4 # Compute first level analyses by subgroups of N subjects, to avoid lacking of disk and memory correlation_thresholds = [0.30, 0.70, 0.79, 0.85, 0.93] # Correlation between reproduced hypotheses files and results, respectively for [20, 40, 60, 80, 108] subjects. diff --git a/tests/conftest.py b/tests/conftest.py index 14275bec..e01e4a00 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,35 +43,46 @@ def test_pipeline_execution( TODO : how to keep intermediate files of the low level for the next numbers of subjects ? - keep intermediate levels : boolean in PipelineRunner """ + # A list of number of subject to iterate over + nb_subjects_list = list(range( + Configuration()['testing']['pipelines']['nb_subjects_per_group'], + nb_subjects, + Configuration()['testing']['pipelines']['nb_subjects_per_group']) + ) + nb_subjects_list.append(nb_subjects) + # Initialize the pipeline runner = PipelineRunner(team_id) - runner.nb_subjects = nb_subjects runner.pipeline.directories.dataset_dir = Configuration()['directories']['dataset'] runner.pipeline.directories.results_dir = Configuration()['directories']['reproduced_results'] runner.pipeline.directories.set_output_dir_with_team_id(team_id) runner.pipeline.directories.set_working_dir_with_team_id(team_id) - # Run as long as there are missing files after first level (with a max number of trials) - # TODO : this is a workaround - for _ in range(Configuration()['runner']['nb_trials']): - - # Get missing subjects - missing_subjects = set() - for file in runner.get_missing_first_level_outputs(): - subject_id = get_subject_id(file) - if subject_id is not None: - missing_subjects.add(subject_id) - - # Leave if no missing subjects - if not missing_subjects: - break - - # Start pipeline - runner.subjects = missing_subjects - try: # This avoids errors in the workflow to make the test fail - runner.start(True, False) - except(RuntimeError) as err: - print('RuntimeError: ', err) + # Run first level by (small) sub-groups of subjects + for subjects in nb_subjects_list: + runner.nb_subjects = subjects + + # Run as long as there are missing files after first level (with a max number of trials) + # TODO : this is a workaround + for _ in range(Configuration()['runner']['nb_trials']): + + # Get missing subjects + missing_subjects = set() + for file in runner.get_missing_first_level_outputs(): + subject_id = get_subject_id(file) + if subject_id is not None: + missing_subjects.add(subject_id) + + # Leave if no missing subjects + if not missing_subjects: + break + + # Start pipeline + runner.subjects = missing_subjects + try: # This avoids errors in the workflow to make the test fail + runner.start(True, False) + except(RuntimeError) as err: + print('RuntimeError: ', err) # Check missing files for the last time missing_files = runner.get_missing_first_level_outputs() diff --git a/tests/test_conftest.py b/tests/test_conftest.py index 7a2cc6d9..de7d72cb 100644 --- a/tests/test_conftest.py +++ b/tests/test_conftest.py @@ -37,7 +37,7 @@ def set_test_directory(scope = 'function'): yield # Comment this line for debugging - #rmtree(TEST_DIR, ignore_errors = True) + rmtree(TEST_DIR, ignore_errors = True) class MockupPipeline(Pipeline): """ A simple Pipeline class for test purposes """ @@ -50,11 +50,12 @@ def __init__(self): with open(self.test_file, 'w', encoding = 'utf-8') as file: file.write(str(0)) - def update_execution_count(_, file_path: str, workflow_name: str): + def update_execution_count(_, file_path: str, workflow_name: str, nb_subjects: int): """ Method used inside a nipype Node, to update the execution count inside the file. Arguments: - file_path:str, path to the execution count file - workflow_name:str, name of the workflow + - nb_subjects:int, number of subjects in the workflow Return: the updated number of executions """ @@ -63,13 +64,13 @@ def update_execution_count(_, file_path: str, workflow_name: str): execution_counter = 0 with open(file_path, 'r', encoding = 'utf-8') as file: # Get last char of the file - execution_counter = int(file.read()[-1]) + execution_counter = int(file.read().split(' ')[-1]) execution_counter += 1 # Write execution count back with open(file_path, 'a', encoding = 'utf-8') as file: - file.write(f'\n{workflow_name} {execution_counter}') + file.write(f'\n{workflow_name} {nb_subjects} {execution_counter}') return execution_counter @@ -98,7 +99,7 @@ def create_workflow(self, workflow_name: str, file_list: list): - file_list: list, list of the files that the workflow is supposed to generate """ node_count = Node(Function( - input_names = ['_', 'file_path', 'workflow_name'], + input_names = ['_', 'file_path', 'workflow_name', 'nb_subjects'], output_names = ['execution_counter'], function = self.update_execution_count), name = 'node_count' @@ -108,6 +109,7 @@ def create_workflow(self, workflow_name: str, file_list: list): node_count.inputs._ = datetime.now() node_count.inputs.file_path = self.test_file node_count.inputs.workflow_name = workflow_name + node_count.inputs.nb_subjects = len(self.subject_list) node_decide = Node(Function( input_names = ['execution_counter'], @@ -202,6 +204,38 @@ def get_hypotheses_outputs(self): template = join(TEST_DIR, 'hypothesis_{id}.md') return [template.format(id = i) for i in range(1,19)] +class MockupResultsCollection(): + """ A fake ResultsCollection object for test purposes """ + + def __init__(self, team_id: str): + self.team_id = team_id + self.uid = self.get_uid() + self.directory = join( + Configuration()['directories']['narps_results'], + 'orig', + self.uid + '_' + self.team_id + ) + self.files = self.get_file_urls() + + def get_uid(self): + """ Return the uid of the collection by browsing the team description """ + return 'uid' + + def get_file_urls(self): + """ Return a dict containing the download url for each file of the collection. + * dict key is the file base name (with extension) + * dict value is the download url for the file on Neurovault + """ + urls = {} + for file_id in range(1, 19): + urls[f'file_{file_id}'] = 'url' + + return urls + + def download(self): + """ Download the collection, file by file. """ + pass + class TestConftest: """ A class that contains all the unit tests for the conftest module.""" @@ -247,16 +281,18 @@ def test_test_correlation_results(mocker): def test_test_pipeline_execution(mocker, set_test_directory): """ Test the test_pipeline_execution helper """ + # Set subgroups of subjects + Configuration()['testing']['pipelines']['nb_subjects_per_group'] = 4 + # Create mocks mocker.patch('conftest.get_correlation_coefficient', return_value = 1.0) fake_runner = PipelineRunner('2T6S') fake_runner._pipeline = MockupPipeline() mocker.patch('conftest.PipelineRunner', return_value = fake_runner) - fake_collection = ResultsCollection('2T6S') - mocker.patch('conftest.ResultsCollection', return_value = fake_collection) + mocker.patch('conftest.ResultsCollection', return_value = MockupResultsCollection('2T6S')) # Run pipeline - helpers.test_pipeline_execution('test_conftest', 20) + helpers.test_pipeline_execution('test_conftest', 7) # Check outputs assert isdir(join(TEST_DIR, 'TestConftest_preprocessing_workflow')) @@ -268,18 +304,21 @@ def test_test_pipeline_execution(mocker, set_test_directory): with open(join(TEST_DIR, 'test_conftest.txt'), 'r', encoding = 'utf-8') as file: assert file.readline() == '0\n' # First exec of preprocessing creates an exception (execution counter == 1) - assert file.readline() == 'TestConftest_preprocessing_workflow 1\n' + assert file.readline() == f'TestConftest_preprocessing_workflow 4 1\n' # Relaunching the workflow # Preprocessing files won't be created(execution counter == 2) - assert file.readline() == 'TestConftest_preprocessing_workflow 2\n' - assert file.readline() == 'TestConftest_run_level_workflow 3\n' - assert file.readline() == 'TestConftest_subject_level_workflow 4\n' + assert file.readline() == f'TestConftest_preprocessing_workflow 4 2\n' + assert file.readline() == f'TestConftest_run_level_workflow 4 3\n' + assert file.readline() == f'TestConftest_subject_level_workflow 4 4\n' # Relaunching the workflow # Everything's fine - assert file.readline() == 'TestConftest_preprocessing_workflow 5\n' - assert file.readline() == 'TestConftest_run_level_workflow 6\n' - assert file.readline() == 'TestConftest_subject_level_workflow 7\n' - assert file.readline() == 'TestConftest_group_level_workflow 8' + assert file.readline() == f'TestConftest_preprocessing_workflow 4 5\n' + assert file.readline() == f'TestConftest_run_level_workflow 4 6\n' + assert file.readline() == f'TestConftest_subject_level_workflow 4 7\n' + assert file.readline() == f'TestConftest_preprocessing_workflow 3 8\n' + assert file.readline() == f'TestConftest_run_level_workflow 3 9\n' + assert file.readline() == f'TestConftest_subject_level_workflow 3 10\n' + assert file.readline() == f'TestConftest_group_level_workflow 7 11' @staticmethod @mark.unit_test