Skip to content

Commit

Permalink
[TEST] temporary directory fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
bclenet committed Feb 23, 2024
1 parent 1c6f034 commit 8fdcb26
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 71 deletions.
65 changes: 29 additions & 36 deletions tests/core/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,25 @@
pytest -q test_common.py
pytest -q test_common.py -k <selected_test>
"""
from os import mkdir, makedirs
from os import makedirs
from os.path import join, exists, abspath
from shutil import rmtree
from pathlib import Path

from pytest import mark, fixture
from pytest import mark
from nipype import Node, Function, Workflow

from narps_open.utils.configuration import Configuration
import narps_open.core.common as co

TEMPORARY_DIR = join(Configuration()['directories']['test_runs'], 'test_common')

@fixture
def remove_test_dir():
""" A fixture to remove temporary directory created by tests """

rmtree(TEMPORARY_DIR, ignore_errors = True)
mkdir(TEMPORARY_DIR)
yield # test runs here
rmtree(TEMPORARY_DIR, ignore_errors = True)

class TestCoreCommon:
""" A class that contains all the unit tests for the common module."""

@staticmethod
@mark.unit_test
def test_remove_file(remove_test_dir):
def test_remove_file(temporary_data_dir):
""" Test the remove_file function """

# Create a single file
test_file_path = abspath(join(TEMPORARY_DIR, 'file1.txt'))
test_file_path = abspath(join(temporary_data_dir, 'file1.txt'))
Path(test_file_path).touch()

# Check file exist
Expand All @@ -62,15 +49,15 @@ def test_remove_file(remove_test_dir):

@staticmethod
@mark.unit_test
def test_remove_directory(remove_test_dir):
def test_remove_directory(temporary_data_dir):
""" Test the remove_directory function """

# Create a single inside dir tree
dir_path = abspath(join(TEMPORARY_DIR, 'dir_1', 'dir_2'))
dir_path = abspath(join(temporary_data_dir, 'dir_1', 'dir_2'))
makedirs(dir_path)
file_path = abspath(join(TEMPORARY_DIR, 'dir_1', 'dir_2', 'file1.txt'))
file_path = abspath(join(temporary_data_dir, 'dir_1', 'dir_2', 'file1.txt'))
Path(file_path).touch()
test_dir_path = abspath(join(TEMPORARY_DIR, 'dir_1'))
test_dir_path = abspath(join(temporary_data_dir, 'dir_1'))

# Check file exist
assert exists(file_path)
Expand All @@ -90,13 +77,13 @@ def test_remove_directory(remove_test_dir):

@staticmethod
@mark.unit_test
def test_remove_parent_directory(remove_test_dir):
def test_remove_parent_directory(temporary_data_dir):
""" Test the remove_parent_directory function """

# Create a single inside dir tree
dir_path = abspath(join(TEMPORARY_DIR, 'dir_1', 'dir_2'))
dir_path = abspath(join(temporary_data_dir, 'dir_1', 'dir_2'))
makedirs(dir_path)
file_path = abspath(join(TEMPORARY_DIR, 'dir_1', 'dir_2', 'file1.txt'))
file_path = abspath(join(temporary_data_dir, 'dir_1', 'dir_2', 'file1.txt'))
Path(file_path).touch()

# Check file exist
Expand Down Expand Up @@ -151,7 +138,7 @@ def test_node_elements_in_string():

@staticmethod
@mark.unit_test
def test_connect_elements_in_string(remove_test_dir):
def test_connect_elements_in_string(temporary_data_dir):
""" Test the elements_in_string function as evaluated in a connect """

# Inputs
Expand Down Expand Up @@ -180,7 +167,7 @@ def test_connect_elements_in_string(remove_test_dir):

# Create Workflow
test_workflow = Workflow(
base_dir = TEMPORARY_DIR,
base_dir = temporary_data_dir,
name = 'test_workflow'
)
test_workflow.connect([
Expand All @@ -193,11 +180,13 @@ def test_connect_elements_in_string(remove_test_dir):

test_workflow.run()

test_file_t = join(TEMPORARY_DIR, 'test_workflow', 'node_true', '_report', 'report.rst')
test_file_t = join(temporary_data_dir,
'test_workflow', 'node_true', '_report', 'report.rst')
with open(test_file_t, 'r', encoding = 'utf-8') as file:
assert '* out_value : test_string' in file.read()

test_file_f = join(TEMPORARY_DIR, 'test_workflow', 'node_false', '_report', 'report.rst')
test_file_f = join(temporary_data_dir,
'test_workflow', 'node_false', '_report', 'report.rst')
with open(test_file_f, 'r', encoding = 'utf-8') as file:
assert '* out_value : None' in file.read()

Expand Down Expand Up @@ -238,7 +227,7 @@ def test_node_clean_list():

@staticmethod
@mark.unit_test
def test_connect_clean_list(remove_test_dir):
def test_connect_clean_list(temporary_data_dir):
""" Test the clean_list function as evaluated in a connect """

# Inputs
Expand Down Expand Up @@ -269,7 +258,7 @@ def test_connect_clean_list(remove_test_dir):

# Create Workflow
test_workflow = Workflow(
base_dir = TEMPORARY_DIR,
base_dir = temporary_data_dir,
name = 'test_workflow'
)
test_workflow.connect([
Expand All @@ -279,11 +268,13 @@ def test_connect_clean_list(remove_test_dir):
])
test_workflow.run()

test_file_1 = join(TEMPORARY_DIR, 'test_workflow', 'node_1', '_report', 'report.rst')
test_file_1 = join(temporary_data_dir,
'test_workflow', 'node_1', '_report', 'report.rst')
with open(test_file_1, 'r', encoding = 'utf-8') as file:
assert f'* out_value : {output_list_1}' in file.read()

test_file_2 = join(TEMPORARY_DIR, 'test_workflow', 'node_2', '_report', 'report.rst')
test_file_2 = join(temporary_data_dir,
'test_workflow', 'node_2', '_report', 'report.rst')
with open(test_file_2, 'r', encoding = 'utf-8') as file:
assert f'* out_value : {output_list_2}' in file.read()

Expand Down Expand Up @@ -324,7 +315,7 @@ def test_node_list_intersection():

@staticmethod
@mark.unit_test
def test_connect_list_intersection(remove_test_dir):
def test_connect_list_intersection(temporary_data_dir):
""" Test the list_intersection function as evaluated in a connect """

# Inputs / outputs
Expand Down Expand Up @@ -355,7 +346,7 @@ def test_connect_list_intersection(remove_test_dir):

# Create Workflow
test_workflow = Workflow(
base_dir = TEMPORARY_DIR,
base_dir = temporary_data_dir,
name = 'test_workflow'
)
test_workflow.connect([
Expand All @@ -365,11 +356,13 @@ def test_connect_list_intersection(remove_test_dir):
])
test_workflow.run()

test_file_1 = join(TEMPORARY_DIR, 'test_workflow', 'node_1', '_report', 'report.rst')
test_file_1 = join(temporary_data_dir,
'test_workflow', 'node_1', '_report', 'report.rst')
with open(test_file_1, 'r', encoding = 'utf-8') as file:
assert f'* out_value : {output_list_1}' in file.read()

test_file_2 = join(TEMPORARY_DIR, 'test_workflow', 'node_2', '_report', 'report.rst')
test_file_2 = join(temporary_data_dir,
'test_workflow', 'node_2', '_report', 'report.rst')
with open(test_file_2, 'r', encoding = 'utf-8') as file:
assert f'* out_value : {output_list_2}' in file.read()

Expand Down
58 changes: 23 additions & 35 deletions tests/test_conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
pytest -q test_conftest.py -k <selected_test>
"""

from os import makedirs, remove
from os.path import join, abspath, isdir, isfile
from shutil import rmtree
from os import remove
from os.path import join, isdir, isfile

from datetime import datetime

from pytest import mark, helpers, fixture, raises
from pytest import mark, helpers, raises

from nipype import Node, Workflow
from nipype.interfaces.utility import Function
Expand All @@ -26,24 +25,13 @@
from narps_open.runner import PipelineRunner
from narps_open.pipelines import Pipeline

TEST_DIR = abspath(join(Configuration()['directories']['test_runs'], 'test_conftest'))

@fixture
def set_test_directory(scope = 'function'):
""" A fixture to remove temporary directory created by tests """

rmtree(TEST_DIR, ignore_errors = True)
makedirs(TEST_DIR, exist_ok = True)
yield
# Comment this line for debugging
rmtree(TEST_DIR, ignore_errors = True)

class MockupPipeline(Pipeline):
""" A simple Pipeline class for test purposes """

def __init__(self):
def __init__(self, base_dir: str):
super().__init__()
self.test_file = join(TEST_DIR, 'test_conftest.txt')
self.base_dir = base_dir
self.test_file = join(base_dir, 'test_conftest.txt')

# Init the test_file : write a number of execution set to zero
with open(self.test_file, 'w', encoding = 'utf-8') as file:
Expand Down Expand Up @@ -126,7 +114,7 @@ def create_workflow(self, workflow_name: str, file_list: list):
node_files.inputs.file_list = file_list

workflow = Workflow(
base_dir = TEST_DIR,
base_dir = self.base_dir,
name = workflow_name
)
workflow.add_nodes([node_count, node_decide, node_files])
Expand Down Expand Up @@ -166,41 +154,41 @@ def get_group_level_analysis(self):

def get_preprocessing_outputs(self):
""" Return a list of templates of the output files generated by the preprocessing """
template = join(TEST_DIR, 'subject_id_{subject_id}_output_preprocessing_1.md')
template = join(self.base_dir, 'subject_id_{subject_id}_output_preprocessing_1.md')
return [template.format(subject_id = s) for s in self.subject_list]

def get_run_level_outputs(self):
""" Return a list of templates of the output files generated by the run level analysis.
Templates are expressed relatively to the self.directories.output_dir.
"""
template = join(TEST_DIR, 'subject_id_{subject_id}_output_run_1.md')
template = join(self.base_dir, 'subject_id_{subject_id}_output_run_1.md')
return [template.format(subject_id = s) for s in self.subject_list]

def get_subject_level_outputs(self):
""" Return a list of templates of the output files generated by the subject level analysis.
Templates are expressed relatively to the self.directories.output_dir.
"""
template = join(TEST_DIR, 'subject_id_{subject_id}_output_analysis_1.md')
template = join(self.base_dir, 'subject_id_{subject_id}_output_analysis_1.md')
return [template.format(subject_id = s) for s in self.subject_list]

def get_group_level_outputs(self):
""" Return a list of templates of the output files generated by the group level analysis.
Templates are expressed relatively to the self.directories.output_dir.
"""
templates = [
join(TEST_DIR, 'group_{nb_subjects}_output_a.md'),
join(TEST_DIR, 'group_{nb_subjects}_output_b.md')
join(self.base_dir, 'group_{nb_subjects}_output_a.md'),
join(self.base_dir, 'group_{nb_subjects}_output_b.md')
]
return_list = [t.format(nb_subjects = len(self.subject_list)) for t in templates]

template = join(TEST_DIR, 'hypothesis_{id}.md')
template = join(self.base_dir, 'hypothesis_{id}.md')
return_list += [template.format(id = i) for i in range(1,19)]

return return_list

def get_hypotheses_outputs(self):
""" Return the names of the files used by the team to answer the hypotheses of NARPS. """
template = join(TEST_DIR, 'hypothesis_{id}.md')
template = join(self.base_dir, 'hypothesis_{id}.md')
return [template.format(id = i) for i in range(1,19)]

class MockupResultsCollection():
Expand Down Expand Up @@ -263,11 +251,11 @@ def test_compare_float_2d_arrays():

@staticmethod
@mark.unit_test
def test_test_outputs(set_test_directory):
def test_test_outputs(temporary_data_dir):
""" Test the test_pipeline_outputs helper """

# Test pipeline
pipeline = MockupPipeline()
pipeline = MockupPipeline(temporary_data_dir)
pipeline.subject_list = ['001', '002']

# Wrong length for nb_of_outputs
Expand Down Expand Up @@ -343,7 +331,7 @@ def test_test_correlation_results(mocker):

@staticmethod
@mark.unit_test
def test_test_pipeline_execution(mocker, set_test_directory):
def test_test_pipeline_execution(mocker, temporary_data_dir):
""" Test the test_pipeline_execution helper """

# Set subgroups of subjects
Expand All @@ -352,21 +340,21 @@ def test_test_pipeline_execution(mocker, set_test_directory):
# Create mocks
mocker.patch('conftest.get_correlation_coefficient', return_value = 1.0)
fake_runner = PipelineRunner('2T6S')
fake_runner._pipeline = MockupPipeline()
fake_runner._pipeline = MockupPipeline(temporary_data_dir)
mocker.patch('conftest.PipelineRunner', return_value = fake_runner)
mocker.patch('conftest.ResultsCollection', return_value = MockupResultsCollection('2T6S'))

# Run pipeline
helpers.test_pipeline_execution('test_conftest', 7)

# Check outputs
assert isdir(join(TEST_DIR, 'TestConftest_preprocessing_workflow'))
assert isdir(join(TEST_DIR, 'TestConftest_run_level_workflow'))
assert isdir(join(TEST_DIR, 'TestConftest_subject_level_workflow'))
assert isdir(join(TEST_DIR, 'TestConftest_group_level_workflow'))
assert isdir(join(temporary_data_dir, 'TestConftest_preprocessing_workflow'))
assert isdir(join(temporary_data_dir, 'TestConftest_run_level_workflow'))
assert isdir(join(temporary_data_dir, 'TestConftest_subject_level_workflow'))
assert isdir(join(temporary_data_dir, 'TestConftest_group_level_workflow'))

# Check executions
with open(join(TEST_DIR, 'test_conftest.txt'), 'r', encoding = 'utf-8') as file:
with open(join(temporary_data_dir, 'test_conftest.txt'), 'r', encoding = 'utf-8') as file:
assert file.readline() == '0\n'
# First exec of preprocessing creates an exception (execution counter == 1)
assert file.readline() == 'TestConftest_preprocessing_workflow 4 1\n'
Expand Down

0 comments on commit 8fdcb26

Please sign in to comment.