Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add derivatives command and pipeline-catalog submodule #349

Open
wants to merge 52 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
9d6bbbc
add examples of nipoppy proc status files
alyssadai Sep 26, 2024
b723c57
use actual allowed pipelines + vers in example TSVs
alyssadai Sep 26, 2024
714755b
add skeleton derivatives command
alyssadai Sep 26, 2024
e4aa01a
add basic smoke test of derivatives cmd
alyssadai Sep 26, 2024
2dc72d8
load TSV and check for missing IDs in derivatives cmd
alyssadai Sep 26, 2024
2f274a4
generalize util to load a tabular file
alyssadai Sep 26, 2024
41b5d58
Test added for load_tabular
alyssadai Sep 26, 2024
3d762a6
Added nipoppy pipeline catalogue as submodule
alyssadai Sep 26, 2024
666be05
Add loading of pipeline names and versions
alyssadai Sep 26, 2024
4d552e3
validate pipeline names & versions and store expected col names in a …
alyssadai Sep 27, 2024
783b57f
update help text & docstrings
alyssadai Sep 27, 2024
5f09328
refactor unique subject check to generic util
alyssadai Sep 27, 2024
611cd58
refactor pipeline name / version validation
alyssadai Sep 27, 2024
826ee2a
add example proc status file w/ subjects not in the synthetic dataset
alyssadai Sep 27, 2024
1699a6b
check that proc status subs are in pheno-containing JSONLD
alyssadai Sep 27, 2024
42b96e7
refactor out jsonld validation & move IO utils into new module
alyssadai Sep 27, 2024
0b7f8c5
switch to typer echo statement for model validation error
alyssadai Sep 27, 2024
58b38e9
factor out context extraction
alyssadai Sep 27, 2024
166aaf1
remove space from pipeline URIs
alyssadai Sep 27, 2024
7c3faed
use fixture for output path in tests
alyssadai Sep 27, 2024
25f9ba5
add logic to add completed pipelines to existing or new imaging sessions
alyssadai Sep 27, 2024
78f1082
create utility for extracting imaging sessions from a JSONLD
alyssadai Sep 28, 2024
e694e8f
create util for creating completed pipelines
alyssadai Sep 28, 2024
f400d62
update example proc status files for readability
alyssadai Oct 1, 2024
f0d50fd
handle missing BIDS sessions
alyssadai Oct 1, 2024
26bdf13
refine smoke test and add test using pheno-bids JSONLD
alyssadai Oct 1, 2024
71932ab
update test data README
alyssadai Oct 1, 2024
aae4b39
refactor out custom session ID
alyssadai Oct 1, 2024
8e95027
switch to using 3.9 syntax
alyssadai Oct 1, 2024
cbef45c
update comments
alyssadai Oct 1, 2024
01d5943
refactor out jsonld subject extraction
alyssadai Oct 1, 2024
0e30b90
Merge branch 'main' into add-derivatives-command
alyssadai Oct 1, 2024
fd80223
create list of namespaces & update tests to catch outdated @context
alyssadai Oct 2, 2024
092a934
regenerate context in each cmd to ensure they are up-to-date
alyssadai Oct 2, 2024
dd632c8
add short option for overwrite to error msg
alyssadai Oct 2, 2024
52f0c56
Merge branch 'main' into add-derivatives-command
alyssadai Oct 2, 2024
6abd911
Merge branch 'add-derivatives-command' of https://github.com/neurobag…
alyssadai Oct 2, 2024
6d18c54
update comments
alyssadai Oct 2, 2024
b239cc9
update test data README
alyssadai Oct 2, 2024
e33e7db
store known pipelines/versions in local var
alyssadai Oct 2, 2024
a0fe97d
rename funcs and vars for clarity
alyssadai Oct 2, 2024
d0ddefc
handle jsonld loading tgt with dataset parsing
alyssadai Oct 3, 2024
6b65f22
update test names and docstrings
alyssadai Oct 3, 2024
46ae71a
make tests more explicit
alyssadai Oct 3, 2024
e603e56
refactor test
alyssadai Oct 3, 2024
0c94522
create global vars for known pipelines + vers
alyssadai Oct 3, 2024
be5847e
handle error for mismatched subs in separate func
alyssadai Oct 3, 2024
d82b199
update print statement
alyssadai Oct 3, 2024
fea6415
make test docstring clearer
alyssadai Oct 3, 2024
4f89e44
Merge branch 'main' into add-derivatives-command
alyssadai Oct 3, 2024
9f6280a
fix NP namespace url to ensure proper expansion in graph
alyssadai Oct 3, 2024
5af6a6d
Merge branch 'main' into add-derivatives-command
alyssadai Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "neurobagel_examples"]
path = neurobagel_examples
url = https://github.com/neurobagel/neurobagel_examples.git
[submodule "pipeline-catalog"]
path = pipeline-catalog
url = https://github.com/nipoppy/pipeline-catalog.git
13 changes: 0 additions & 13 deletions bagel/bids_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,6 @@ def get_bids_subjects_simple(bids_dir: Path) -> list:
return bids_subject_list


def check_unique_bids_subjects(pheno_subjects: list, bids_subjects: list):
"""Raises informative error if subject IDs exist that are found only in the BIDS directory."""
unique_bids_subjects = set(bids_subjects).difference(pheno_subjects)
if len(unique_bids_subjects) > 0:
raise LookupError(
"The specified BIDS directory contains subject IDs not found in "
"the provided phenotypic json-ld file:\n"
f"{unique_bids_subjects}\n"
"Subject IDs are case sensitive. "
"Please check that the specified BIDS and phenotypic datasets match."
)


def create_acquisitions(
layout: BIDSLayout,
bids_sub_id: str,
Expand Down
189 changes: 166 additions & 23 deletions bagel/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@

import typer
from bids import BIDSLayout
from pydantic import ValidationError

import bagel.bids_utils as butil
import bagel.derivatives_utils as dutil
import bagel.file_utils as futil
import bagel.pheno_utils as putil
from bagel import mappings, models
from bagel.utility import check_overwrite, load_json
from bagel.derivatives_utils import PROC_STATUS_COLS
from bagel.utility import (
confirm_subs_match_pheno_data,
extract_and_validate_jsonld_dataset,
generate_context,
get_subject_instances,
)

# TODO: Coordinate with Nipoppy about what we want to name this
CUSTOM_SESSION_ID = "nb01"

bagel = typer.Typer(
help="""
Expand Down Expand Up @@ -85,13 +95,15 @@ def pheno(
You can upload this .jsonld file to the Neurobagel graph.
"""
# Check if output file already exists
check_overwrite(output, overwrite)
futil.check_overwrite(output, overwrite)

data_dictionary = load_json(dictionary)
pheno_df = putil.load_pheno(pheno)
data_dictionary = futil.load_json(dictionary)
pheno_df = futil.load_tabular(pheno)
putil.validate_inputs(data_dictionary, pheno_df)

# Display validated input paths to user
# NOTE: `space` determines the amount of padding (in num. characters) before the file paths in the print statement.
# It is currently calculated as = (length of the longer string, including the 3 leading spaces) + (2 extra spaces)
space = 25
print(
"Processing phenotypic annotations:\n"
Expand Down Expand Up @@ -119,7 +131,7 @@ def pheno(
for session_row_idx, session_row in _sub_pheno.iterrows():
# If there is no session column, we create a session with a custom label "ses-nb01" to assign each subject's phenotypic data to
if session_column is None:
session_name = "ses-nb01" # TODO: Should we make this more obscure to avoid potential overlap with actual session names?
session_name = f"ses-{CUSTOM_SESSION_ID}"
else:
# NOTE: We take the name from the first session column - we don't know how to handle multiple session columns yet
session_name = session_row[session_column[0]]
Expand Down Expand Up @@ -185,7 +197,7 @@ def pheno(
hasSamples=subject_list,
)

context = putil.generate_context()
context = generate_context()
# We can't just exclude_unset here because the identifier and schemaKey
# for each instance are created as default values and so technically are never set
# TODO: we should revisit this because there may be reasons to have None be meaningful in the future
Expand Down Expand Up @@ -245,7 +257,7 @@ def bids(
You can upload this .jsonld file to the Neurobagel graph.
"""
# Check if output file already exists
check_overwrite(output, overwrite)
futil.check_overwrite(output, overwrite)

space = 32
print(
Expand All @@ -254,25 +266,17 @@ def bids(
f" {'BIDS dataset directory:' : <{space}} {bids_dir}"
)

jsonld = load_json(jsonld_path)
# Strip and store context to be added back later, since it's not part of
# (and can't be easily added) to the existing data model
context = {"@context": jsonld.pop("@context")}
try:
pheno_dataset = models.Dataset.parse_obj(jsonld)
except ValidationError as err:
print(err)
pheno_dataset = extract_and_validate_jsonld_dataset(jsonld_path)

pheno_subject_dict = {
pheno_subject.hasLabel: pheno_subject
for pheno_subject in getattr(pheno_dataset, "hasSamples")
}
pheno_subject_dict = get_subject_instances(pheno_dataset)

# TODO: Revert to using Layout.get_subjects() to get BIDS subjects once pybids performance is improved
butil.check_unique_bids_subjects(
confirm_subs_match_pheno_data(
subjects=butil.get_bids_subjects_simple(bids_dir),
subject_source_for_err="BIDS directory",
pheno_subjects=pheno_subject_dict.keys(),
bids_subjects=butil.get_bids_subjects_simple(bids_dir),
)

print("Initial checks of inputs passed.\n")

print("Parsing and validating BIDS dataset. This may take a while...")
Expand Down Expand Up @@ -310,7 +314,7 @@ def bids(
# so the API can still find the session-level information.
# This should be revisited in the future as for these cases the resulting dataset object is not
# an exact representation of what's on disk.
session_label = "nb01" if session is None else session
session_label = CUSTOM_SESSION_ID if session is None else session
session_path = butil.get_session_path(
layout=layout,
bids_dir=bids_dir,
Expand All @@ -329,9 +333,148 @@ def bids(

pheno_subject.hasSession += session_list

context = generate_context()
merged_dataset = {**context, **pheno_dataset.dict(exclude_none=True)}

with open(output, "w") as f:
f.write(json.dumps(merged_dataset, indent=2))

print(f"Saved output to: {output}")


@bagel.command()
def derivatives(
tabular: Path = typer.Option(
...,
"--tabular",
"-t",
help="The path to a .tsv containing subject-level processing pipeline status info. Expected to comply with the Nipoppy processing status file schema.",
exists=True,
file_okay=True,
dir_okay=False,
resolve_path=True,
),
# TODO: Remove _path?
jsonld_path: Path = typer.Option(
...,
"--jsonld-path",
"-p", # for pheno
help="The path to a .jsonld file containing the phenotypic data for your dataset, created by the bagel pheno command. This JSONLD may optionally also include the BIDS metadata for the dataset (created by the bagel bids command).",
exists=True,
file_okay=True,
dir_okay=False,
resolve_path=True,
),
output: Path = typer.Option(
"pheno_derivatives.jsonld",
"--output",
"-o",
help="The path for the output .jsonld file.",
file_okay=True,
dir_okay=False,
resolve_path=True,
),
overwrite: bool = typer.Option(
False,
"--overwrite",
"-f",
help="Overwrite output file if it already exists.",
),
):
"""
Extract subject processing pipeline and derivative metadata from a tabular processing status file and
integrate them in a single .jsonld with subjects' harmonized phenotypic data (from the bagel pheno command) and optionally,
BIDS metadata (from the bagel bids command).
NOTE: Must be run AFTER the pheno command.

This command will create a valid, subject-level instance of the Neurobagel
graph data model for the combined metadata in the .jsonld format.
You can upload this .jsonld file to the Neurobagel graph.
"""
futil.check_overwrite(output, overwrite)

space = 51
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
print(
"Processing subject-level derivative metadata...\n"
f" {'Existing subject graph data to augment (.jsonld):' : <{space}}{jsonld_path}\n"
f" {'Processing status file (.tsv):' : <{space}}{tabular}"
)

status_df = futil.load_tabular(tabular, input_type="processing status")

# We don't allow empty values in the participant ID column
if row_indices := putil.get_rows_with_empty_strings(
status_df, [PROC_STATUS_COLS["participant"]]
):
raise LookupError(
f"Your processing status file contains missing values in the column '{PROC_STATUS_COLS['participant']}'. "
"Please ensure that every row has a non-empty participant id. "
f"We found missing values in the following rows (first row is zero): {row_indices}."
)

pipelines = status_df[PROC_STATUS_COLS["pipeline_name"]].unique()
dutil.check_pipelines_are_recognized(pipelines)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

# Per pipeline, check that version(s) are from the allowed set
# TODO: Do we need to check all versions across all pipelines first, and report all unrecognized versions together?
for pipeline in pipelines:
versions = status_df[
status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline
][PROC_STATUS_COLS["pipeline_version"]].unique()

dutil.check_pipeline_versions_are_recognized(pipeline, versions)

jsonld_dataset = extract_and_validate_jsonld_dataset(jsonld_path)

existing_subs_dict = get_subject_instances(jsonld_dataset)

confirm_subs_match_pheno_data(
subjects=status_df[PROC_STATUS_COLS["participant"]].unique(),
subject_source_for_err="processing status file",
pheno_subjects=existing_subs_dict.keys(),
)

# Create sub-dataframes for each subject
for subject, sub_proc_df in status_df.groupby(
PROC_STATUS_COLS["participant"]
):
existing_subject = existing_subs_dict.get(subject)

# Get existing imaging sessions for the subject
# Note: This dictionary can be empty if only bagel pheno was run
existing_sessions_dict = dutil.get_imaging_session_instances(
existing_subject
)

for session_name, sub_ses_proc_df in sub_proc_df.groupby(
PROC_STATUS_COLS["session"]
):
# Create pipeline objects for the subject-session
completed_pipelines = dutil.create_completed_pipelines(
sub_ses_proc_df
)

if not completed_pipelines:
continue
if session_name in existing_sessions_dict:
existing_img_session = existing_sessions_dict.get(session_name)
existing_img_session.hasCompletedPipeline = completed_pipelines
else:
new_session_label = (
f"ses-{CUSTOM_SESSION_ID}"
if session_name == ""
else session_name
)
new_img_session = models.ImagingSession(
hasLabel=new_session_label,
hasCompletedPipeline=completed_pipelines,
)
existing_subject.hasSession.append(new_img_session)

context = generate_context()
merged_dataset = {**context, **jsonld_dataset.dict(exclude_none=True)}

with open(output, "w") as f:
f.write(json.dumps(merged_dataset, indent=2))

print(f"Saved output to: {output}")
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
92 changes: 92 additions & 0 deletions bagel/derivatives_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from typing import Iterable

import pandas as pd

from bagel import mappings, models

# Shorthands for expected column names in a Nipoppy processing status file
# TODO: While there are multiple session ID columns in a Nipoppy processing status file,
# we only only look at `bids_session` right now. We should revisit this after the schema is finalized,
# to see if any other logic is needed to avoid issues with session ID discrepancies across columns.
PROC_STATUS_COLS = {
"participant": "bids_participant",
"session": "bids_session",
"pipeline_name": "pipeline_name",
"pipeline_version": "pipeline_version",
"status": "status",
}


def check_pipelines_are_recognized(pipelines: Iterable[str]):
"""Check that all pipelines in the processing status file are supported by Nipoppy."""
unrecognized_pipelines = list(
set(pipelines).difference(mappings.KNOWN_PIPELINE_URIS)
)
if len(unrecognized_pipelines) > 0:
raise LookupError(
f"The processing status file contains unrecognized pipelines in the column '{PROC_STATUS_COLS['pipeline_name']}': "
f"{unrecognized_pipelines}. "
f"Allowed pipeline names are the following pipelines supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n"
f"{mappings.KNOWN_PIPELINE_URIS}"
)


def check_pipeline_versions_are_recognized(
pipeline: str, versions: Iterable[str]
):
"""
Check that all pipeline versions in the processing status file are supported by Nipoppy.
Assumes that the input pipeline name is recognized.
"""
unrecognized_versions = list(
set(versions).difference(mappings.KNOWN_PIPELINE_VERSIONS[pipeline])
)
if len(unrecognized_versions) > 0:
raise LookupError(
f"The processing status file contains unrecognized {pipeline} versions in the column '{PROC_STATUS_COLS['pipeline_version']}': {unrecognized_versions}. "
f"Allowed {pipeline} versions are the following versions supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n"
f"{mappings.KNOWN_PIPELINE_VERSIONS[pipeline]}"
)


def get_imaging_session_instances(
jsonld_subject: models.Subject,
) -> dict:
"""
Return a dictionary of imaging sessions for a given subject from JSONLD data,
where the keys are the session labels and values are the session objects.
"""
jsonld_sub_sessions_dict = {}
for jsonld_sub_ses in getattr(jsonld_subject, "hasSession"):
if jsonld_sub_ses.schemaKey == "ImagingSession":
jsonld_sub_sessions_dict[jsonld_sub_ses.hasLabel] = jsonld_sub_ses

return jsonld_sub_sessions_dict


def create_completed_pipelines(session_proc_df: pd.DataFrame) -> list:
"""
Create a list of CompletedPipeline objects for a single subject-session based on the completion status
info of pipelines for that session from the processing status dataframe.
"""
completed_pipelines = []
for (pipeline, version), session_pipe_df in session_proc_df.groupby(
[
PROC_STATUS_COLS["pipeline_name"],
PROC_STATUS_COLS["pipeline_version"],
]
):
# Check that all pipeline steps have succeeded
if (
session_pipe_df[PROC_STATUS_COLS["status"]].str.lower()
== "success"
).all():
completed_pipeline = models.CompletedPipeline(
hasPipelineName=models.Pipeline(
identifier=mappings.KNOWN_PIPELINE_URIS[pipeline]
),
hasPipelineVersion=version,
)
completed_pipelines.append(completed_pipeline)

return completed_pipelines
Loading
Loading