diff --git a/docs/how-to-wrap-your-project.md b/docs/how-to-wrap-your-project.md index 31188a83..0a044059 100644 --- a/docs/how-to-wrap-your-project.md +++ b/docs/how-to-wrap-your-project.md @@ -113,7 +113,7 @@ RT-Cloud uses RPC (Remote Procedure Calls) to send command requests from the res The rpyc timeout can be set when the ClientInterface is created in the experiment script, such as in the sample.py project. Simply include the rpyc_timeout= parameter (e.g. ClientInterface(rpyc_timeout=60)), the default is 60 seconds. Rpyc also has a timed() function which can be used to adjust the timeout of individual rpc calls. -The websocket timeout can be set using the setRPCTimeout() of remoteable objects. For example to increase the timeout of the dataInterface in the experiment script, call dataInterface.setRPCTimeout(5). The default websocket timeout is 5 seconds. +The websocket timeout can be set in 1 of 2 ways. Method 1 is to set a larger timeout for all calls using the setRPCTimeout() of remoteable objects. For example to increase the timeout of the dataInterface in the experiment script, call dataInterface.setRPCTimeout(5). The default websocket timeout is 5 seconds. Method 2 is to set a larger timeout for on specific call by including a "rpc_timeout" kwarg in that call. For example dataInterface.getFile("BigFile", rpc_timeout=60). Note that before setting an RCP timeout you should check that the interface you are using is actually running over RPC because sometimes interfaces will run locally. To check that use the isRunningRemote() command, such as dataInterface.isRunningRemote(), see the openNeuroClient project for an example of this usage. ## **Some Alternate Configurations For Your Experiment** ### **Running everything on the same computer** diff --git a/environment.yml b/environment.yml index 74316471..41d38563 100644 --- a/environment.yml +++ b/environment.yml @@ -7,6 +7,7 @@ dependencies: - awscli - bcrypt - brainiak + - boto3 - dcm2niix - flake8 - indexed_gzip # for efficient random access of gzipped files with Nibabel diff --git a/projects/openNeuroClient/openNeuroClient.py b/projects/openNeuroClient/openNeuroClient.py index f4b6232e..96bda829 100644 --- a/projects/openNeuroClient/openNeuroClient.py +++ b/projects/openNeuroClient/openNeuroClient.py @@ -1,6 +1,5 @@ # import important modules import os -from rtCommon.bidsRun import BidsRun import sys import numpy import uuid @@ -15,6 +14,7 @@ from rtCommon.utils import loadConfigFile, stringPartialFormat from rtCommon.clientInterface import ClientInterface from rtCommon.bidsArchive import BidsArchive +from rtCommon.bidsRun import BidsRun # path for default configuration toml file defaultConfig = os.path.join(currPath, 'conf', 'openNeuroClient.toml') @@ -41,8 +41,11 @@ def doRuns(cfg, bidsInterface, subjInterface, webInterface): print(f'BIDS Archive will be written to {bidsArchivePath}') newArchive = BidsArchive(bidsArchivePath) newRun = BidsRun(**entities) + extraKwargs = {} + if bidsInterface.isRunningRemote(): + extraKwargs = {"rpc_timeout": 60} # Initialize the bids stream - streamId = bidsInterface.initOpenNeuroStream(cfg.dsAccessionNumber, **entities) + streamId = bidsInterface.initOpenNeuroStream(cfg.dsAccessionNumber, **entities, **extraKwargs) numVols = bidsInterface.getNumVolumes(streamId) for idx in range(numVols): bidsIncremental = bidsInterface.getIncremental(streamId, idx) diff --git a/rtCommon/bidsInterface.py b/rtCommon/bidsInterface.py index a81741bd..b3ba4fb1 100644 --- a/rtCommon/bidsInterface.py +++ b/rtCommon/bidsInterface.py @@ -10,17 +10,14 @@ one instance of dataInterface, as part of the projectServer with dataRemote=False. """ import os -import glob import time -import tempfile -import nibabel as nib from rtCommon.remoteable import RemoteableExtensible from rtCommon.bidsArchive import BidsArchive -from rtCommon.bidsRun import BidsRun from rtCommon.bidsIncremental import BidsIncremental from rtCommon.bidsCommon import getDicomMetadata from rtCommon.imageHandling import convertDicomImgToNifti from rtCommon.dataInterface import DataInterface +from rtCommon.openNeuro import OpenNeuroCache from rtCommon.errors import RequestError, MissingMetadataError @@ -58,6 +55,7 @@ def __init__(self, dataRemote=False, allowedDirs=[], scannerClockSkew=0): # Store the allowed directories to be used by the DicomToBidsStream class self.allowedDirs = allowedDirs self.scannerClockSkew = scannerClockSkew + self.openNeuroCache = OpenNeuroCache(cachePath="/tmp/openneuro") def initDicomBidsStream(self, dicomDir, dicomFilePattern, dicomMinSize, **entities) -> int: @@ -109,10 +107,13 @@ def initOpenNeuroStream(self, dsAccessionNumber, **entities) -> int: Returns: streamId: An identifier used when calling stream functions, such as getIncremental() """ + if 'subject' not in entities or 'run' not in entities: + raise RequestError("initOpenNeuroStream: Must specify subject and run number") + archivePath = self.openNeuroCache.downloadData(dsAccessionNumber, **entities) # TODO - allow multiple simultaneous streams to be instantiated streamId = 1 - openNeuroStream = OpenNeuroStream(dsAccessionNumber, **entities) - self.streamMap[streamId] = openNeuroStream + bidsStream = BidsStream(archivePath, **entities) + self.streamMap[streamId] = bidsStream return streamId def getIncremental(self, streamId, volIdx=-1) -> BidsIncremental: @@ -141,6 +142,9 @@ def getNumVolumes(self, streamId) -> int: stream = self.streamMap[streamId] return stream.getNumVolumes() + def closeStream(self, streamId): + # remove the stream from the map + self.streamMap.pop(streamId, None) def getClockSkew(self, callerClockTime: float, roundTripTime: float) -> float: """ @@ -299,55 +303,3 @@ def getIncremental(self, volIdx=-1) -> BidsIncremental: return incremental else: return None - - -class OpenNeuroStream(BidsStream): - """ - A BidsStream from an OpenNeuro dataset. The OpenNeuro dataset will be automatically - downloaded, as needed, on the computer where this stream is intialized. - """ - def __init__(self, dsAccessionNumber, **entities): - """ - Args: - dsAccessionNumber: The OpenNeruo specific accession number for the dataset - to stream. - entities: BIDS entities (subject, session, task, run, suffix, datatype) that - define the particular subject/run of the data to stream - """ - subject = entities.get('subject') - run = entities.get('run') - if subject is None or run is None: - raise RequestError("OpenNeuroStream: Must specify subject and run number") - # TODO - Use OpenNeuroService when it is available, to download - # and access the dataset and get dataset entities - # OpenNeuroService to provide path to dataset - datasetPath = tmpDownloadOpenNeuro(dsAccessionNumber, subject, run) - super().__init__(datasetPath, **entities) - - -def tmpDownloadOpenNeuro(dsAccessNumber, subject, run) -> str: - """ - Temporary function used until we integrate in the OpenNeuro service. Downloads - a portion of an OpenNeuro dataset corresponding to the subject/run. - Args: - dsAccessionNumber: The OpenNeruo specific accession number for the dataset - to stream. - subject: the specific subject name within the OpenNeuro dataset to download - run: the specific run within the subject's data to download. - Returns: - Absolute path to where the dataset has been downloaded. - - """ - tmpDir = tempfile.gettempdir() - print(f'OpenNeuro Data cached to {tmpDir}') - datasetDir = os.path.join(tmpDir, dsAccessNumber) - # check if already downloaded - includePattern = f'sub-{subject}/func/*run-{run:02d}*' - files = glob.glob(os.path.join(datasetDir, includePattern)) - if len(files) == 0: - os.makedirs(datasetDir, exist_ok = True) - awsCmd = f'aws s3 sync --no-sign-request s3://openneuro.org/{dsAccessNumber} ' \ - f'{datasetDir} --exclude "*/*" --include "{includePattern}"' - print(f'run {awsCmd}') - os.system(awsCmd) - return datasetDir diff --git a/rtCommon/dataInterface.py b/rtCommon/dataInterface.py index 454bc45a..e2a66a0b 100644 --- a/rtCommon/dataInterface.py +++ b/rtCommon/dataInterface.py @@ -120,7 +120,7 @@ def initScannerStream(self, imgDir: str, filePattern: str, def getImageData(self, streamId: int, imageIndex: int=None, timeout: int=5) -> pydicom.dataset.FileDataset: """ - Get data from a stream initialized with initScannerStream or initOpenNeuroStream + Get data from a stream initialized with initScannerStream Args: streamId: Id of a previously opened stream. diff --git a/rtCommon/openNeuro.py b/rtCommon/openNeuro.py new file mode 100644 index 00000000..cce97256 --- /dev/null +++ b/rtCommon/openNeuro.py @@ -0,0 +1,169 @@ +""" +An interface to access OpenNeuro data and metadata. It can download +and cache OpenNeuro data for playback. +""" +import os +import json +import boto3 +from botocore.config import Config +from botocore import UNSIGNED +import rtCommon.utils as utils + + +class OpenNeuroCache(): + def __init__(self, cachePath="/tmp/openneuro/"): + self.cachePath = cachePath + self.datasetList = None + self.s3Client = None + os.makedirs(cachePath, exist_ok = True) + + def getCachePath(self): + return self.cachePath + + def getS3Client(self): + """Returns an s3 client in order to reuse the same s3 client without + always creating a new one. Not thread safe currently. + """ + if self.s3Client is None: + self.s3Client = boto3.client("s3", config=Config(signature_version=UNSIGNED)) + return self.s3Client + + def getDatasetList(self, refresh=False): + """ + Returns a list of all datasets available in OpenNeuro S3 storage + "See https://openneuro.org/public/datasets for datasets info" + Alternate method to access from a command line call: + aws s3 --no-sign-request ls s3://openneuro.org/ + """ + if self.datasetList is None or len(self.datasetList)==0 or refresh is True: + s3Client = boto3.client("s3", config=Config(signature_version=UNSIGNED)) + all_datasets = s3Client.list_objects(Bucket='openneuro.org', Delimiter="/") + self.datasetList = [] + for dataset in all_datasets.get('CommonPrefixes'): + dsetName = dataset.get('Prefix') + # strip trailing slash characters + dsetName = dsetName.rstrip('/\\') + self.datasetList.append(dsetName) + return self.datasetList + + def isValidAccessionNumber(self, dsAccessionNum): + if dsAccessionNum not in self.getDatasetList(): + print(f"{dsAccessionNum} not in the OpenNeuro S3 datasets.") + return False + return True + + def getSubjectList(self, dsAccessionNum): + """ + Returns a list of all the subjects in a dataset + Args: + dsAccessionNum - accession number of dataset to lookup + Returns: + list of subjects in that dataset + """ + if not self.isValidAccessionNumber(dsAccessionNum): + return None + s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED)) + prefix = dsAccessionNum + '/sub-' + dsSubjDirs = s3.list_objects(Bucket='openneuro.org', Delimiter="/", Prefix=prefix) + subjects = [] + for info in dsSubjDirs.get('CommonPrefixes'): + subj = info.get('Prefix') + if subj is not None: + subj = subj.split('sub-')[1] + if subj is not None: + subj = subj.rstrip('/\\') + subjects.append(subj) + return subjects + + def getDescription(self, dsAccessionNum): + """ + Returns the dataset description file as a python dictionary + """ + if not self.isValidAccessionNumber(dsAccessionNum): + return None + dsDir = self.downloadData(dsAccessionNum, downloadWholeDataset=False) + filePath = os.path.join(dsDir, 'dataset_description.json') + descDict = None + try: + with open(filePath, 'r') as fp: + descDict = json.load(fp) + except Exception as err: + print(f"Failed to load dataset_description.json: {err}") + return descDict + + def getReadme(self, dsAccessionNum): + """ + Return the contents of the dataset README file. + Downloads toplevel dataset files if needed. + """ + if not self.isValidAccessionNumber(dsAccessionNum): + return None + dsDir = self.downloadData(dsAccessionNum, downloadWholeDataset=False) + filePath = os.path.join(dsDir, 'README') + readme = None + try: + readme = utils.readFile(filePath) + except Exception as err: + print(f"Failed to load README: {err}") + return readme + + + def getArchivePath(self, dsAccessionNum): + """Returns the directory path to the cached dataset files""" + archivePath = os.path.join(self.cachePath, dsAccessionNum) + return archivePath + + + def downloadData(self, dsAccessionNum, downloadWholeDataset=False, **entities): + """ + This command will sync the specified portion of the dataset to the cache directory. + Note: if only the accessionNum is supplied then it will just sync the top-level files. + Sync doesn't re-download files that are already present in the directory. + Consider using --delete which removes local cache files no longer on the remote. + Args: + dsAccessionNum: accession number of the dataset to download data for. + downloadWholeDataset: boolean, if true all files in the dataset + will be downloaded. + entities: BIDS entities (subject, session, task, run, suffix) that + define the particular subject/run of the data to download. + Returns: + Path to the directory containing the downloaded dataset data. + """ + if not self.isValidAccessionNumber(dsAccessionNum): + print(f"{dsAccessionNum} not in the OpenNeuro S3 datasets.") + return False + + includePattern = '' + if 'subject' in entities: + subject = entities['subject'] + if type(subject) is int: + subject = f'{subject:02d}' + includePattern += f'sub-{subject}/' + if 'session' in entities: + session = entities['session'] + if includePattern == '': + includePattern = '*' + if type(session) is int: + session = f'{session:02d}' + includePattern += f'ses-{session}/' + if 'task' in entities: + task = entities['task'] + includePattern += f'*task-{task}' + if 'run' in entities: + run = entities['run'] + if type(run) is int: + run = f'{run:02d}' + includePattern += f'*run-{run}' + if 'suffix' in entities: + suffix = entities['suffix'] + includePattern += f'*{suffix}' + if includePattern != '' or downloadWholeDataset is True: + includePattern += '*' + + datasetDir = os.path.join(self.cachePath, dsAccessionNum) + awsCmd = f'aws s3 sync --no-sign-request s3://openneuro.org/{dsAccessionNum} ' \ + f'{datasetDir} --exclude "*/*" --include "{includePattern}"' + print(f'run {awsCmd}') + os.system(awsCmd) + return datasetDir + diff --git a/rtCommon/openNeuroService.py b/rtCommon/openNeuroService.py index a728aaad..5f22cc3d 100644 --- a/rtCommon/openNeuroService.py +++ b/rtCommon/openNeuroService.py @@ -1,5 +1,5 @@ """ -A command-line service to be run where the where OpenNeuro data is downloaded and cached. +A command-line service to be run where the OpenNeuro data is downloaded and cached. This service instantiates a BidsInterface object for serving the data back to the client running in the cloud. It connects to the remote projectServer. Once a connection is established it waits for requets and invokes the BidsInterface diff --git a/tests/test_bidsInterface.py b/tests/test_bidsInterface.py index 8a9c8600..22fbde11 100644 --- a/tests/test_bidsInterface.py +++ b/tests/test_bidsInterface.py @@ -2,13 +2,13 @@ import time import math import pytest -from numpy.core.numeric import isclose from rtCommon.bidsArchive import BidsArchive from rtCommon.bidsIncremental import BidsIncremental from rtCommon.imageHandling import convertDicomImgToNifti, readDicomFromFile from rtCommon.clientInterface import ClientInterface -from rtCommon.bidsInterface import BidsInterface, tmpDownloadOpenNeuro +from rtCommon.bidsInterface import BidsInterface from rtCommon.bidsCommon import getDicomMetadata +from rtCommon.openNeuro import OpenNeuroCache import rtCommon.utils as utils from tests.backgroundTestServers import BackgroundTestServers from tests.common import rtCloudPath, tmpDir @@ -122,11 +122,18 @@ def dicomStreamTest(bidsInterface): def openNeuroStreamTest(bidsInterface): dsAccessionNumber = 'ds002338' dsSubject = 'xp201' - datasetDir = tmpDownloadOpenNeuro(dsAccessionNumber, dsSubject, 1) localEntities = {'subject': dsSubject, 'run': 1, 'suffix': 'bold', 'datatype': 'func'} - remoteEntities = {'subject': dsSubject, 'run': 1} + remoteEntities = {'subject': dsSubject, 'run': 1, 'suffix': 'bold'} + extraKwargs = {} + if bidsInterface.isRunningRemote(): + # Set longer timeout for potentially downloading data + extraKwargs = {"rpc_timeout": 60} + streamId = bidsInterface.initOpenNeuroStream(dsAccessionNumber, **remoteEntities, + **extraKwargs) + openNeuroCache = OpenNeuroCache() + datasetDir = openNeuroCache.downloadData(dsAccessionNumber, **localEntities) localBidsArchive = BidsArchive(datasetDir) - streamId = bidsInterface.initOpenNeuroStream(dsAccessionNumber, **remoteEntities) + for idx in range(3): streamIncremental = bidsInterface.getIncremental(streamId) localIncremental = localBidsArchive._getIncremental(idx, **localEntities) diff --git a/tests/test_dataInterface.py b/tests/test_dataInterface.py index 3f08f46c..d42e56d0 100644 --- a/tests/test_dataInterface.py +++ b/tests/test_dataInterface.py @@ -64,7 +64,7 @@ def test_rpyclocalDataInterface(self, dicomTestFilename, bigTestFile): allowedFileTypes=allowedFileTypes, dataRemote=False, subjectRemote=False) - clientInterface = ClientInterface() + clientInterface = ClientInterface(rpyc_timeout=70) dataInterface = clientInterface.dataInterface assert clientInterface.isDataRemote() == False assert dataInterface.isRemote == False @@ -80,7 +80,7 @@ def test_remoteDataInterface(self, dicomTestFilename, bigTestFile): allowedFileTypes=allowedFileTypes, dataRemote=True, subjectRemote=False) - clientInterface = ClientInterface() + clientInterface = ClientInterface(rpyc_timeout=70) dataInterface = clientInterface.dataInterface assert clientInterface.isDataRemote() == True assert dataInterface.isRemote == True diff --git a/tests/test_fileWatcher.py b/tests/test_fileWatcher.py index 9ed02363..b12edc3c 100644 --- a/tests/test_fileWatcher.py +++ b/tests/test_fileWatcher.py @@ -48,6 +48,7 @@ def startCopyThread(numFiles=10): def test_waitForFile(): + """Test to make sure file events are being triggered""" global exitThread, watchTmpPath, rndTimeouts clearWatchDir() @@ -60,12 +61,16 @@ def test_waitForFile(): # call waitForFile for i in range(10): dicomName = f'001_000013_00000{i}.dcm' - tout = rndTimeouts[i] + 0.1 - result = watcher.waitForFile(dicomName, timeout=tout, timeCheckIncrement=0.5) - print(f'Got {result}') + tout = rndTimeouts[i] + 0.2 + result = watcher.waitForFile(dicomName, timeout=tout, timeCheckIncrement=2.2) + print(f'Got {result}, timeout {tout}, loopCount {watcher.waitLoopCount}') # assert filename match assert result == os.path.join(watchTmpPath, dicomName) # assert file found by event trigger + # If this assert fails, it could be because two events are triggered + # per file on Mac/Windows, a creation event and a modified event. This + # will cause the loop count to go through twice in general and the + # file could be found to exist (without an event) in traversing twice. assert watcher.foundWithFileEvent == True or watcher.waitLoopCount == 0 finally: exitThread = True @@ -99,7 +104,7 @@ def test_waitForFile_noEvents(): def test_waitForFile_wrongDir(): - # Init a file watcher on the wrong directory, + # Init a file watcher on the wrong directory, # the filewatch should time out and fail global exitThread, watchTmpPath, rndTimeouts diff --git a/tests/test_openNeuro.py b/tests/test_openNeuro.py new file mode 100644 index 00000000..a1845d43 --- /dev/null +++ b/tests/test_openNeuro.py @@ -0,0 +1,87 @@ +import os +import pytest +import shutil +import tempfile +import subprocess +from rtCommon.openNeuro import OpenNeuroCache + +def test_openNeuroCache(): + tmpDir = os.path.join(tempfile.gettempdir(), 'openNeuro') + shutil.rmtree(tmpDir, ignore_errors=True) + openNeuroCache = OpenNeuroCache(tmpDir) + + datasets = openNeuroCache.getDatasetList() + assert len(datasets) > 625 + + subjects = openNeuroCache.getSubjectList('ds002338') + expectedList = \ + ['xp201', 'xp202', 'xp203', 'xp204', 'xp205', 'xp206', + 'xp207', 'xp210', 'xp211', 'xp213', 'xp216', 'xp217', + 'xp218', 'xp219', 'xp220', 'xp221', 'xp222'] + assert subjects == expectedList + + desc = openNeuroCache.getDescription('ds002338') + assert type(desc) is dict and len(desc.get('Name')) > 0 + + readme = openNeuroCache.getReadme('ds002338') + assert readme is not None and len(readme) > 0 + + +def test_openNeuroDownloads(): + tmpDir = os.path.join(tempfile.gettempdir(), 'openNeuro') + print() + print(f"## USE {tmpDir}") + openNeuroCache = OpenNeuroCache(tmpDir) + + accessionNumber = 'ds003194' + dsPath = os.path.join(tmpDir, accessionNumber) + # remove all files in this path + shutil.rmtree(dsPath, ignore_errors=True) + + openNeuroCache.downloadData(accessionNumber) + assert fileCount(dsPath) == 6 + + subj = 'NEPO01' + openNeuroCache.downloadData(accessionNumber, subject=subj) + assert fileCount(os.path.join(dsPath, 'sub-' + subj)) == 10 + + subj = 'NEPO03' + openNeuroCache.downloadData(accessionNumber, subject=subj, task='task6mnepo') + assert fileCount(os.path.join(dsPath, 'sub-' + subj)) == 4 + + subj = 'NEPO04' + openNeuroCache.downloadData(accessionNumber, subject=subj, session='nepo6m') + assert fileCount(os.path.join(dsPath, 'sub-' + subj)) == 5 + + subj = 'NEPO06' + openNeuroCache.downloadData(accessionNumber, subject=subj, session='nepo6m', task='task6mnepo') + assert fileCount(os.path.join(dsPath, 'sub-' + subj)) == 4 + + # # This is a larger download, commenting out for now + # accessionNumber = 'ds001345' + # dsPath = os.path.join(tmpDir, accessionNumber) + # # remove all files in this path + # shutil.rmtree(dsPath, ignore_errors=True) + # openNeuroCache.downloadData(accessionNumber, subject='01', run=4) + # assert fileCount(os.path.join(dsPath, 'sub-' + '01')) == 2 + + # # This is a larger download, commenting out for now + # accessionNumber = 'ds000234' + # dsPath = os.path.join(tmpDir, accessionNumber) + # # remove all files in this path + # shutil.rmtree(dsPath, ignore_errors=True) + # subj = '03' + # openNeuroCache.downloadData(accessionNumber, subject=subj ,task='motorphotic') + # assert fileCount(os.path.join(dsPath, 'sub-' + subj)) == 2 + + +def fileCount(path): + if not os.path.exists(path): + return 0 + cmd = f'find {path} -type f | wc -l' + cmdList = cmd.split(' ') + # Only seems to work with shell=True, maybe because of the pipe to wc + result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) + count = result.stdout.decode('utf-8') + count = count.lstrip().rstrip() + return int(count)