diff --git a/airtigrs/conftest.py b/airtigrs/conftest.py new file mode 100644 index 0000000..6824d87 --- /dev/null +++ b/airtigrs/conftest.py @@ -0,0 +1,41 @@ +"""py.test configuration""" + +from datetime import timedelta +from pathlib import Path +import pytest + +from airflow.models import DAG +from airflow.utils.dates import days_ago + +pytest_plugins = ["helpers_namespace"] + + +data_dir = Path(__file__).parent / "data" / "tests" + +_config_data = { + "main_config": data_dir / "test_config.yaml", + "study_config": data_dir / "TEST_settings.yaml", +} + + +@pytest.fixture +def dm_config_data(scope="session"): + """Create a shared config dictionary across tests to pull data in.""" + return _config_data + + +@pytest.fixture +def test_dag(): + """Airflow DAG for testing.""" + return DAG( + "test_dag", + start_date=days_ago(1), + schedule_interval=timedelta(days=1), + ) + + +@pytest.helpers.register +def run_task(task, dag): + """Run an Airflow task.""" + dag.clear() + task.run(start_date=dag.start_date, end_date=dag.start_date) diff --git a/airtigrs/dags/generate_study_dags.py b/airtigrs/dags/generate_study_dags.py new file mode 100644 index 0000000..30dcd11 --- /dev/null +++ b/airtigrs/dags/generate_study_dags.py @@ -0,0 +1,45 @@ +from datetime import timedelta + +from airflow import DAG +from airflow.utils.dates import days_ago + +import datman.config +from airtigrs.task_groups.xnat_ingest import build_xnat_ingest_taskgroup + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email": ["tigr.lab@camh.ca"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=10), +} + + +def create_dag(study, config, default_args): + """Generates a DAG for a given study.""" + + dag = DAG( + study, + default_args=default_args, + description="Create end-to-end DAG for a given study", + schedule_interval=timedelta(days=1), + tags=[study], + start_date=days_ago(1), + ) + + with dag: + + xnat_ingest = build_xnat_ingest_taskgroup(study, config) + + xnat_ingest + + return dag + + +config = datman.config.config() +projects = config.get_key("Projects") +for project in projects: + config.set_study(project) + dag = create_dag(project, config, default_args) diff --git a/airtigrs/dags/tests/test_dag_integrity.py b/airtigrs/dags/tests/test_dag_integrity.py new file mode 100644 index 0000000..906a3bf --- /dev/null +++ b/airtigrs/dags/tests/test_dag_integrity.py @@ -0,0 +1,40 @@ +"""Test validity of all DAGs.""" +import os +from pathlib import Path + +import pytest +from airflow import models as airflow_models +from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle + + +DAG_PATHS = Path(__file__).parents[1].glob("*.py") + +@pytest.mark.parametrize("dag_path", DAG_PATHS) +def test_dag_integrity(dag_path, dm_config_data): + """Import DAG files and check for a valid DAG instance.""" + os.environ["DM_CONFIG"] = str(dm_config_data["main_config"]) + os.environ["DM_SYSTEM"] = "test" + + dag_name = dag_path.name + module = _import_file(dag_name, dag_path) + + # Validate if there is at least 1 DAG object in the file + dag_objects = [ + var + for var in vars(module).values() + if isinstance(var, airflow_models.DAG) + ] + assert dag_objects + + # For every DAG object, test for cycles + for dag in dag_objects: + _test_cycle(dag) + + +def _import_file(module_name, module_path): + import importlib.util + + spec = importlib.util.spec_from_file_location(module_name, str(module_path)) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module diff --git a/dags/tests/__init__.py b/airtigrs/data/__init__.py similarity index 100% rename from dags/tests/__init__.py rename to airtigrs/data/__init__.py diff --git a/dags/tests/test_datasources_to_xnat/TEST_settings.yml b/airtigrs/data/tests/TEST_settings.yml similarity index 100% rename from dags/tests/test_datasources_to_xnat/TEST_settings.yml rename to airtigrs/data/tests/TEST_settings.yml diff --git a/dags/tests/test_datasources_to_xnat/test_config.yaml b/airtigrs/data/tests/test_config.yaml similarity index 99% rename from dags/tests/test_datasources_to_xnat/test_config.yaml rename to airtigrs/data/tests/test_config.yaml index 45b38eb..fa04a05 100644 --- a/dags/tests/test_datasources_to_xnat/test_config.yaml +++ b/airtigrs/data/tests/test_config.yaml @@ -13,7 +13,7 @@ SystemSettings: test: DatmanProjectsDir: "projectsdir/" DatmanAssetsDir: "assetsdir/" - ConfigDir: "tests/test_datasources_to_xnat/" + ConfigDir: "../../data/tests/" Paths: # paths defined here are relative to the study specific folder diff --git a/airtigrs/task_groups/__init__.py b/airtigrs/task_groups/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airtigrs/task_groups/xnat_ingest.py b/airtigrs/task_groups/xnat_ingest.py new file mode 100644 index 0000000..2094fda --- /dev/null +++ b/airtigrs/task_groups/xnat_ingest.py @@ -0,0 +1,50 @@ +from airflow.operators.bash import BashOperator +from airflow.utils.task_group import TaskGroup + +from airtigrs.operators.sftp import SFTPFetchOperator +import airtigrs.utils.connections as conn +import airtigrs.utils.xnat as xnat + + +def build_xnat_ingest_taskgroup(study, config): + """ + Generate a DAG for a given study + + Will only return a DAG if XNAT is configured or if + SFTP is configured. If not returns None + """ + + with TaskGroup("xnat_ingest_taskgroup") as xnat_ingest: + + for site in config.get_sites(): + with TaskGroup(group_id=f"dm_sftp_{site}") as dm_sftp: + conn_id = f"{study}_{site}_sftp" + if conn.conn_id_exists(conn_id): + SFTPFetchOperator( + task_id=f"{study}_{site}_fetcher", + sftp_conn_id=conn_id, + local_path=config.get_path("zips"), + retries=0, + ) + + if xnat.external_xnat_is_configured(config, site): + BashOperator( + task_id="xnat_fetch_session", + bash_command="xnat_fetch_sessions.py {{params.study}}", + params={"study": study}, + ) + + dm_link = BashOperator( + task_id="link_zips_to_datman", + bash_command="dm_link.py {{ params.study }}", + params={"study": study}, + trigger_rule="all_done", + ) + + dm_xnat_upload = BashOperator( + task_id="xnat_upload", + bash_command="dm_xnat_upload.py {{ params.study }}", + params={"study": study}, + ) + + dm_sftp >> dm_link >> dm_xnat_upload diff --git a/dags/datasources_to_xnat.py b/dags/datasources_to_xnat.py deleted file mode 100644 index 0fc90e5..0000000 --- a/dags/datasources_to_xnat.py +++ /dev/null @@ -1,102 +0,0 @@ -''' -This is an example DAG showing how Airflow can trigger -SFTP data pulls and upload -''' - -from datetime import timedelta -from airflow import DAG -from airflow.utils.dates import days_ago - -from airflow.operators.bash import BashOperator -from airtigrs.operators.sftp import SFTPFetchOperator -import datman.config -import airtigrs.utils.connections as conn - -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email': ['jerrold.jeyachandra@camh.ca'], - 'email_on_failure': True, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=10), -} - - -def make_dag(dag_id, study, config, default_args): - ''' - Generate a DAG for a given study - - Will only return a DAG if XNAT is configured or if - SFTP is configured. If not returns None - ''' - - config.set_study(study) - dag = DAG(dag_id, - default_args=default_args, - description='Ingest from external SFTP and XNAT servers' - ' and push to XNAT', - schedule_interval=timedelta(days=1), - tags=['sftp', 'xnat', 'ingest', study], - start_date=days_ago(1)) - - dag.doc_md = """ - This DAG ingests data from external datasources (SFTP, XNAT) - then uploads the files to an XNAT instance after mangling - names to match the desired convention - """ - - # Check if XNAT or SFTP is configured at all for study - sftp_config = [] - fetch_xnat = False - for site in config.get_sites(): - conn_id = f"{study}_{site}_sftp" - if conn.conn_id_exists(conn_id): - sftp_config.append((site, conn_id)) - - if conn.external_xnat_is_configured(config, site): - fetch_xnat = True - - if not fetch_xnat and not sftp_config: - return - - with dag: - - datasource_ops = [] - for site, conn_id in sftp_config: - datasource_ops.append( - SFTPFetchOperator(task_id=f"{study}_{site}_fetcher", - sftp_conn_id=conn_id, - local_path=config.get_path("zips"), - retries=0)) - - if fetch_xnat: - datasource_ops.append( - BashOperator( - task_id='xnat_fetch_session', - bash_command='xnat_fetch_sessions.py {{params.study}}', - params={"study": study})) - - dm_link = BashOperator(task_id='link_zips_to_datman', - bash_command='dm_link.py {{ params.study }}', - params={"study": study}, - trigger_rule="all_done") - - dm_xnat_upload = BashOperator( - task_id='xnat_upload', - bash_command='dm_xnat_upload.py {{ params.study }}', - params={"study": study}) - - datasource_ops >> dm_link >> dm_xnat_upload - - return dag - - -config = datman.config.config() -projects = config.get_key("Projects") - -for project in projects: - dag_id = f"ingest_external_mri_{project}" - dag = make_dag(dag_id, project, config, default_args) - if dag is not None: - globals()[dag_id] = dag diff --git a/dags/tests/test_datasources_to_xnat.py b/dags/tests/test_datasources_to_xnat.py deleted file mode 100644 index 23d320d..0000000 --- a/dags/tests/test_datasources_to_xnat.py +++ /dev/null @@ -1,65 +0,0 @@ -import os - -import pytest -from airflow.models import DagBag -import datman.config -import importlib -from .utils import assert_dag_dict_equal - -FIXTURE_DIR = "tests/test_datasources_to_xnat" - - -class TestConfiguredDAGBuild: - @classmethod - def setup_class(cls): - ''' - Setup DATMAN environment variables for configuration file to load - properly - ''' - os.environ['DM_CONFIG'] = os.path.join(FIXTURE_DIR, "test_config.yaml") - os.environ['DM_SYSTEM'] = 'test' - - # Initialize DAG object - cls.d2x = importlib.import_module( - 'airtigrs.dags.datasources_to_xnat.datasources_to_xnat') - - cls.config = datman.config.config() - cls.default_args = {'owner': 'test'} - - def test_sftp_task_included_if_connection_configured(self, mocker): - ''' - Check if SFTP is configured that it is included in DAG - ''' - mocker.patch('airtigrs.dags.datasources_to_xnat.utils.conn_id_exists', - return_value=True) - - result = self.d2x.make_dag(dag_id="TEST", - study="TEST", - config=self.config, - default_args=self.default_args) - - # Does task structure match expected? - assert_dag_dict_equal( - { - 'TEST_TESTA_fetcher': ['link_zips_to_datman'], - 'link_zips_to_datman': ['xnat_upload'], - 'xnat_upload': [] - }, result) - - def test_fetch_xnat_included_if_connection_configured(self, mocker): - mocker.patch( - 'airtigrs.dags.datasources_to_xnat.utils.external_xnat_is_configured', - return_value=True) - - result = self.d2x.make_dag(dag_id="TEST", - study="TEST", - config=self.config, - default_args=self.default_args) - - # Does task structure match expected? - assert_dag_dict_equal( - { - 'xnat_fetch_session': ['link_zips_to_datman'], - 'link_zips_to_datman': ['xnat_upload'], - 'xnat_upload': [] - }, result) diff --git a/dags/tests/utils.py b/dags/tests/utils.py deleted file mode 100644 index 683c615..0000000 --- a/dags/tests/utils.py +++ /dev/null @@ -1,6 +0,0 @@ -def assert_dag_dict_equal(source, dag): - assert dag.task_dict.keys() == source.keys() - for task_id, downstream_list in source.items(): - assert dag.has_task(task_id) - task = dag.get_task(task_id) - assert task.downstream_task_ids == set(downstream_list) diff --git a/setup.cfg b/setup.cfg index dea03e9..9e179c9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,8 +40,9 @@ lint = yapf >= 0.30.0 test = pytest >= 6.2.4 - pytest-mock pytest-cov + pytest-helpers-namespace + pytest-mock all = %(doc)s %(lint)s