Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Separate XNAT DAG into task groups #27

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions airtigrs/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""py.test configuration"""

from datetime import timedelta
from pathlib import Path
import pytest

from airflow.models import DAG
from airflow.utils.dates import days_ago

pytest_plugins = ["helpers_namespace"]


data_dir = Path(__file__).parent / "data" / "tests"

_config_data = {
"main_config": data_dir / "test_config.yaml",
"study_config": data_dir / "TEST_settings.yaml",
}


@pytest.fixture
def dm_config_data(scope="session"):
"""Create a shared config dictionary across tests to pull data in."""
return _config_data


@pytest.fixture
def test_dag():
"""Airflow DAG for testing."""
return DAG(
"test_dag",
start_date=days_ago(1),
schedule_interval=timedelta(days=1),
)


@pytest.helpers.register
def run_task(task, dag):
"""Run an Airflow task."""
dag.clear()
task.run(start_date=dag.start_date, end_date=dag.start_date)
45 changes: 45 additions & 0 deletions airtigrs/dags/generate_study_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago

import datman.config
from airtigrs.task_groups.xnat_ingest import build_xnat_ingest_taskgroup

default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=10),
}


def create_dag(study, config, default_args):
"""Generates a DAG for a given study."""

dag = DAG(
study,
default_args=default_args,
description="Create end-to-end DAG for a given study",
schedule_interval=timedelta(days=1),
tags=[study],
start_date=days_ago(1),
)

with dag:

xnat_ingest = build_xnat_ingest_taskgroup(study, config)

xnat_ingest

return dag


config = datman.config.config()
projects = config.get_key("Projects")
for project in projects:
config.set_study(project)
dag = create_dag(project, config, default_args)
40 changes: 40 additions & 0 deletions airtigrs/dags/tests/test_dag_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Test validity of all DAGs."""
import os
from pathlib import Path

import pytest
from airflow import models as airflow_models
from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle


DAG_PATHS = Path(__file__).parents[1].glob("*.py")

@pytest.mark.parametrize("dag_path", DAG_PATHS)
def test_dag_integrity(dag_path, dm_config_data):
"""Import DAG files and check for a valid DAG instance."""
os.environ["DM_CONFIG"] = str(dm_config_data["main_config"])
os.environ["DM_SYSTEM"] = "test"

dag_name = dag_path.name
module = _import_file(dag_name, dag_path)

# Validate if there is at least 1 DAG object in the file
dag_objects = [
var
for var in vars(module).values()
if isinstance(var, airflow_models.DAG)
]
assert dag_objects

# For every DAG object, test for cycles
for dag in dag_objects:
_test_cycle(dag)


def _import_file(module_name, module_path):
import importlib.util

spec = importlib.util.spec_from_file_location(module_name, str(module_path))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ SystemSettings:
test:
DatmanProjectsDir: "projectsdir/"
DatmanAssetsDir: "assetsdir/"
ConfigDir: "tests/test_datasources_to_xnat/"
ConfigDir: "../../data/tests/"

Paths:
# paths defined here are relative to the study specific folder
Expand Down
Empty file.
50 changes: 50 additions & 0 deletions airtigrs/task_groups/xnat_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

from airtigrs.operators.sftp import SFTPFetchOperator
import airtigrs.utils.connections as conn
import airtigrs.utils.xnat as xnat


def build_xnat_ingest_taskgroup(study, config):
"""
Generate a DAG for a given study

Will only return a DAG if XNAT is configured or if
SFTP is configured. If not returns None
"""

with TaskGroup("xnat_ingest_taskgroup") as xnat_ingest:

for site in config.get_sites():
with TaskGroup(group_id=f"dm_sftp_{site}") as dm_sftp:
conn_id = f"{study}_{site}_sftp"
if conn.conn_id_exists(conn_id):
SFTPFetchOperator(
task_id=f"{study}_{site}_fetcher",
sftp_conn_id=conn_id,
local_path=config.get_path("zips"),
retries=0,
)

if xnat.external_xnat_is_configured(config, site):
BashOperator(
task_id="xnat_fetch_session",
bash_command="xnat_fetch_sessions.py {{params.study}}",
params={"study": study},
)

dm_link = BashOperator(
task_id="link_zips_to_datman",
bash_command="dm_link.py {{ params.study }}",
params={"study": study},
trigger_rule="all_done",
)

dm_xnat_upload = BashOperator(
task_id="xnat_upload",
bash_command="dm_xnat_upload.py {{ params.study }}",
params={"study": study},
)

dm_sftp >> dm_link >> dm_xnat_upload
102 changes: 0 additions & 102 deletions dags/datasources_to_xnat.py

This file was deleted.

65 changes: 0 additions & 65 deletions dags/tests/test_datasources_to_xnat.py

This file was deleted.

6 changes: 0 additions & 6 deletions dags/tests/utils.py

This file was deleted.

3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ lint =
yapf >= 0.30.0
test =
pytest >= 6.2.4
pytest-mock
pytest-cov
pytest-helpers-namespace
pytest-mock
all =
%(doc)s
%(lint)s
Expand Down