Skip to content

Commit

Permalink
adds --upload_method option to xnat-put to control how files are uplo…
Browse files Browse the repository at this point in the history
…aded
  • Loading branch information
adsouza committed Jul 25, 2024
1 parent d66cca3 commit bce664a
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 92 deletions.
12 changes: 12 additions & 0 deletions scripts/put_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from xnatutils import put

put(
"ses-01",
"T1w_3",
"/Users/arkievdsouza/Desktop/FastSurferTesting/data/sub-100307_ses-01_task-T1w.nii.gz",
create_session=True,
project_id="OPENNEURO_T1W",
subject_id="100307",
overwrite=True,
upload_method="per_file",
)
251 changes: 159 additions & 92 deletions xnatutils/put_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,44 @@
import hashlib
from xnat.exceptions import XNATResponseError
from .base import (
sanitize_re, illegal_scan_chars_re, get_resource_name,
session_modality_re, connect, base_parser, add_default_args,
print_response_error, print_usage_error, print_info_message, set_logger)
sanitize_re,
illegal_scan_chars_re,
get_resource_name,
session_modality_re,
connect,
base_parser,
add_default_args,
print_response_error,
print_usage_error,
print_info_message,
set_logger,
)
from .exceptions import (
XnatUtilsUsageError, XnatUtilsError, XnatUtilsDigestCheckFailedError,
XnatUtilsDigestCheckError, XnatUtilsException,
XnatUtilsNoMatchingSessionsException)
XnatUtilsUsageError,
XnatUtilsError,
XnatUtilsDigestCheckFailedError,
XnatUtilsDigestCheckError,
XnatUtilsException,
XnatUtilsNoMatchingSessionsException,
)

HASH_CHUNK_SIZE = 2 ** 20
HASH_CHUNK_SIZE = 2**20


def put(session, scan, *filenames, **kwargs):
def put(
session,
scan,
*filenames,
overwrite=False,
create_session=False,
resource_name=None,
project_id=None,
subject_id=None,
scan_id=None,
modality=None,
upload_method="tgz_file",
**kwargs,
):
"""
Uploads datasets to an XNAT instance project (requires manager privileges for the
project).
Expand Down Expand Up @@ -62,6 +88,8 @@ def put(session, scan, *filenames, **kwargs):
The ID for the scan (defaults to the scan type)
modality : str
The modality of the session to upload
upload_method: str
the method used to upload the files (see XnatPy for options)
user : str
The user to connect to the server with
loglevel : str
Expand All @@ -84,13 +112,7 @@ def put(session, scan, *filenames, **kwargs):
located at $HOME/.netrc
"""
# Set defaults for kwargs
overwrite = kwargs.pop('overwrite', False)
create_session = kwargs.pop('create_session', False,)
resource_name = kwargs.pop('resource_name', None)
project_id = kwargs.pop('project_id', None)
subject_id = kwargs.pop('subject_id', None)
scan_id = kwargs.pop('scan_id', None)
modality = kwargs.pop('modality', None)

# If a single directory is provided, upload all files in it that
# don't start with '.'
if len(filenames) == 1 and os.path.isdir(filenames[0]):
Expand All @@ -100,46 +122,50 @@ def put(session, scan, *filenames, **kwargs):
if len(filenames) == 1 and isinstance(filenames[0], (list, tuple)):
filenames = filenames[0]
if not filenames:
raise XnatUtilsUsageError(
"No filenames provided to upload")
raise XnatUtilsUsageError("No filenames provided to upload")
for fname in filenames:
if not os.path.exists(fname):
raise XnatUtilsUsageError(
"The file to upload, '{}', does not exist"
.format(fname))
"The file to upload, '{}', does not exist".format(fname)
)
local_dir_path = os.path.join(local_dir, os.path.basename(fname))
if os.path.exists(local_dir_path):
raise XnatUtilsUsageError(
"Name clash between filename paths '{}'".format(
os.path.basename(fname)))
os.path.basename(fname)
)
)
# Symlink file into local temp directory before uploading the
# whole directory (much faster for many files)
os.symlink(fname, local_dir_path)
if sanitize_re.match(session):
raise XnatUtilsUsageError(
"Session '{}' is not a valid session name (must only contain "
"alpha-numeric characters and underscores)".format(session))
"alpha-numeric characters and underscores)".format(session)
)
if illegal_scan_chars_re.search(scan) is not None:
raise XnatUtilsUsageError(
"Scan name '{}' contains illegal characters".format(scan))
"Scan name '{}' contains illegal characters".format(scan)
)

if resource_name is None:
if len(filenames) == 1:
resource_name = get_resource_name(filenames[0])
else:
raise XnatUtilsUsageError(
"'resource_name' option needs to be provided when uploading "
"multiple files")
"multiple files"
)
else:
resource_name = resource_name.upper()
with connect(**kwargs) as login:
if modality is None:
match = session_modality_re.match(session)
if match is None:
modality = 'MR' # The default
modality = "MR" # The default
else:
modality = match.group(1)
if modality == 'MRPT':
if modality == "MRPT":
session_cls = login.classes.PetmrSessionData
scan_cls = login.classes.MrScanData
else:
Expand All @@ -164,80 +190,82 @@ def put(session, scan, *filenames, **kwargs):
if create_session:
if project_id is None and subject_id is None:
try:
project_id, subject_id, _ = session.split('_')
project_id, subject_id, _ = session.split("_")
except ValueError:
raise XnatUtilsUsageError(
"Must explicitly provide project and subject IDs "
"if session ID ({}) scheme doesn't match "
"<project>_<subject>_<visit> convention, i.e. "
"have exactly 2 underscores".format(session))
"have exactly 2 underscores".format(session)
)
if project_id is None:
project_id = session.split('_')[0]
project_id = session.split("_")[0]
if subject_id is None:
subject_id = '_'.join(session.split('_')[:2])
subject_id = "_".join(session.split("_")[:2])
try:
xproject = login.projects[project_id]
except KeyError:
raise XnatUtilsUsageError(
"Cannot create session '{}' as '{}' does not exist "
"(or you don't have access to it)".format(session,
project_id))
"(or you don't have access to it)".format(session, project_id)
)
# Creates a corresponding subject and session if they don't
# exist
xsubject = login.classes.SubjectData(label=subject_id,
parent=xproject)
xsession = session_cls(
label=session, parent=xsubject)
print("{} session successfully created."
.format(xsession.label))
xsubject = login.classes.SubjectData(label=subject_id, parent=xproject)
xsession = session_cls(label=session, parent=xsubject)
print("{} session successfully created.".format(xsession.label))
else:
raise XnatUtilsNoMatchingSessionsException(
"'{}' session does not exist, to automatically create it "
"please use '--create_session' option."
.format(session))
xdataset = scan_cls(id=(scan_id if scan_id is not None else scan),
type=scan, parent=xsession)
"please use '--create_session' option.".format(session)
)
xdataset = scan_cls(
id=(scan_id if scan_id is not None else scan), type=scan, parent=xsession
)
if overwrite:
try:
xdataset.resources[resource_name].delete()
print("Deleted existing resource at {}:{}/{}".format(
session, scan, resource_name))
print(
"Deleted existing resource at {}:{}/{}".format(
session, scan, resource_name
)
)
except KeyError:
pass
resource = xdataset.create_resource(resource_name)
# TODO: use folder upload where possible
resource.upload_dir(local_dir, method='tar_file')
print("{} uploaded to {}:{}".format(
fname, session, scan))
resource.upload_dir(local_dir, method=upload_method)
print("{} uploaded to {}:{}".format(fname, session, scan))
print("Uploaded files, checking digests...")
# Check uploaded files checksums
remote_digests = get_digests(resource)
for fname in filenames:
remote_digest = remote_digests[
os.path.basename(fname).replace(' ', '%20')]
remote_digest = remote_digests[os.path.basename(fname).replace(" ", "%20")]
local_digest = calculate_checksum(fname)
if local_digest != remote_digest:
raise XnatUtilsDigestCheckError(
"Remote digest does not match local ({} vs {}) "
"for {}. Please upload your datasets again"
.format(remote_digest, local_digest, fname))
print("Successfully checked digest for {}".format(
fname, session, scan))
if resource_name == 'DICOM':
"for {}. Please upload your datasets again".format(
remote_digest, local_digest, fname
)
)
print("Successfully checked digest for {}".format(fname, session, scan))
if resource_name == "DICOM":
print("pulling data from headers")
login.put(f'/data/experiments/{xsession.id}?pullDataFromHeaders=true')
login.put(f"/data/experiments/{xsession.id}?pullDataFromHeaders=true")


def calculate_checksum(fname):
try:
file_hash = hashlib.md5()
with open(fname, 'rb') as f:
for chunk in iter(lambda: f.read(HASH_CHUNK_SIZE), b''):
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(HASH_CHUNK_SIZE), b""):
file_hash.update(chunk)
return file_hash.hexdigest()
except OSError:
raise XnatUtilsDigestCheckFailedError(
"Could not check digest of '{}' ".format(fname))
"Could not check digest of '{}' ".format(fname)
)


def get_digests(resource):
Expand All @@ -246,14 +274,13 @@ def get_digests(resource):
These are saved with the downloaded files in the cache and used to
check if the files have been updated on the server
"""
result = resource.xnat_session.get(resource.uri + '/files')
result = resource.xnat_session.get(resource.uri + "/files")
if result.status_code != 200:
raise XnatUtilsError(
"Could not download metadata for resource {}. Files "
"may have been uploaded but cannot check checksums"
.format(resource.id))
return dict((r['Name'], r['digest'])
for r in result.json()['ResultSet']['Result'])
"may have been uploaded but cannot check checksums".format(resource.id)
)
return dict((r["Name"], r["digest"]) for r in result.json()["ResultSet"]["Result"])


description = """
Expand All @@ -279,31 +306,61 @@ def get_digests(resource):

def parser():
parser = base_parser(description)
parser.add_argument('session', type=str,
help="Name of the session to upload the dataset to")
parser.add_argument('scan', type=str,
help="Name for the dataset on XNAT")
parser.add_argument('filenames', type=str, nargs='+',
help="Filename(s) of the dataset to upload to XNAT")
parser.add_argument('--overwrite', '-o', action='store_true',
default=False,
help="Allow overwrite of existing dataset")
parser.add_argument('--create_session', '-c', action='store_true',
default=False, help=(
"Create the required session on XNAT to upload "
"the the dataset to"))
parser.add_argument('--resource_name', '-r', type=str, default=None,
help=("The name of the resource (the data format) to "
"upload the dataset to. If not provided the "
"format will be determined from the file "
"extension (i.e. in most cases it won't be "
"necessary to specify"))
parser.add_argument('--project_id', '-p',
help="Provide the project ID if session doesn't exist")
parser.add_argument('--subject_id', '-b',
help="Provide the subject ID if session doesn't exist")
parser.add_argument('--scan_id', type=str,
help="Provide the scan ID (defaults to the scan type)")
parser.add_argument(
"session", type=str, help="Name of the session to upload the dataset to"
)
parser.add_argument("scan", type=str, help="Name for the dataset on XNAT")
parser.add_argument(
"filenames",
type=str,
nargs="+",
help="Filename(s) of the dataset to upload to XNAT",
)
parser.add_argument(
"--overwrite",
"-o",
action="store_true",
default=False,
help="Allow overwrite of existing dataset",
)
parser.add_argument(
"--create_session",
"-c",
action="store_true",
default=False,
help=("Create the required session on XNAT to upload " "the the dataset to"),
)
parser.add_argument(
"--resource_name",
"-r",
type=str,
default=None,
help=(
"The name of the resource (the data format) to "
"upload the dataset to. If not provided the "
"format will be determined from the file "
"extension (i.e. in most cases it won't be "
"necessary to specify"
),
)
parser.add_argument(
"--project_id", "-p", help="Provide the project ID if session doesn't exist"
)
parser.add_argument(
"--subject_id", "-b", help="Provide the subject ID if session doesn't exist"
)
parser.add_argument(
"--scan_id", type=str, help="Provide the scan ID (defaults to the scan type)"
)
parser.add_argument(
"--upload_method",
type=str,
help=(
"The XnatPy method used to upload the files. Can be one of "
"'tgz_file', 'tar_file', 'per_file', 'tar_memory', or 'tgz_memory'"
),
)

add_default_args(parser)
return parser

Expand All @@ -315,11 +372,21 @@ def cmd(argv=sys.argv[1:]):
set_logger(args.loglevel)

try:
put(args.session, args.scan, *args.filenames, overwrite=args.overwrite,
create_session=args.create_session, resource_name=args.resource_name,
project_id=args.project_id, subject_id=args.subject_id,
scan_id=args.scan_id, user=args.user, server=args.server,
use_netrc=(not args.no_netrc))
put(
args.session,
args.scan,
*args.filenames,
overwrite=args.overwrite,
create_session=args.create_session,
resource_name=args.resource_name,
project_id=args.project_id,
subject_id=args.subject_id,
scan_id=args.scan_id,
user=args.user,
server=args.server,
use_netrc=(not args.no_netrc),
upload_method=args.upload_method,
)
except XnatUtilsUsageError as e:
print_usage_error(e)
except XNATResponseError as e:
Expand Down

0 comments on commit bce664a

Please sign in to comment.