Skip to content

Commit

Permalink
Add OpenNeuro.py and OpenNeuroCache class (#65)
Browse files Browse the repository at this point in the history
* Add OpenNeuroCache class
* Allows to download and cache OpenNeuro data. 
* Download is specified by a set of entities such as dataset accession number, subject, session, run, task etc.
* Data can then be replayed through a BidsStream using the BidsInterface to test an analysis pipeline
  • Loading branch information
gdoubleyew authored Oct 13, 2021
1 parent 5578219 commit c2cfce6
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 74 deletions.
2 changes: 1 addition & 1 deletion docs/how-to-wrap-your-project.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ RT-Cloud uses RPC (Remote Procedure Calls) to send command requests from the res

The rpyc timeout can be set when the ClientInterface is created in the experiment script, such as in the sample.py project. Simply include the rpyc_timeout= parameter (e.g. ClientInterface(rpyc_timeout=60)), the default is 60 seconds. Rpyc also has a timed() function which can be used to adjust the timeout of individual rpc calls.

The websocket timeout can be set using the setRPCTimeout() of remoteable objects. For example to increase the timeout of the dataInterface in the experiment script, call dataInterface.setRPCTimeout(5). The default websocket timeout is 5 seconds.
The websocket timeout can be set in 1 of 2 ways. Method 1 is to set a larger timeout for all calls using the setRPCTimeout() of remoteable objects. For example to increase the timeout of the dataInterface in the experiment script, call dataInterface.setRPCTimeout(5). The default websocket timeout is 5 seconds. Method 2 is to set a larger timeout for on specific call by including a "rpc_timeout" kwarg in that call. For example dataInterface.getFile("BigFile", rpc_timeout=60). Note that before setting an RCP timeout you should check that the interface you are using is actually running over RPC because sometimes interfaces will run locally. To check that use the isRunningRemote() command, such as dataInterface.isRunningRemote(), see the openNeuroClient project for an example of this usage.

## **Some Alternate Configurations For Your Experiment**
### **Running everything on the same computer**
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies:
- awscli
- bcrypt
- brainiak
- boto3
- dcm2niix
- flake8
- indexed_gzip # for efficient random access of gzipped files with Nibabel
Expand Down
7 changes: 5 additions & 2 deletions projects/openNeuroClient/openNeuroClient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# import important modules
import os
from rtCommon.bidsRun import BidsRun
import sys
import numpy
import uuid
Expand All @@ -15,6 +14,7 @@
from rtCommon.utils import loadConfigFile, stringPartialFormat
from rtCommon.clientInterface import ClientInterface
from rtCommon.bidsArchive import BidsArchive
from rtCommon.bidsRun import BidsRun

# path for default configuration toml file
defaultConfig = os.path.join(currPath, 'conf', 'openNeuroClient.toml')
Expand All @@ -41,8 +41,11 @@ def doRuns(cfg, bidsInterface, subjInterface, webInterface):
print(f'BIDS Archive will be written to {bidsArchivePath}')
newArchive = BidsArchive(bidsArchivePath)
newRun = BidsRun(**entities)
extraKwargs = {}
if bidsInterface.isRunningRemote():
extraKwargs = {"rpc_timeout": 60}
# Initialize the bids stream
streamId = bidsInterface.initOpenNeuroStream(cfg.dsAccessionNumber, **entities)
streamId = bidsInterface.initOpenNeuroStream(cfg.dsAccessionNumber, **entities, **extraKwargs)
numVols = bidsInterface.getNumVolumes(streamId)
for idx in range(numVols):
bidsIncremental = bidsInterface.getIncremental(streamId, idx)
Expand Down
68 changes: 10 additions & 58 deletions rtCommon/bidsInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@
one instance of dataInterface, as part of the projectServer with dataRemote=False.
"""
import os
import glob
import time
import tempfile
import nibabel as nib
from rtCommon.remoteable import RemoteableExtensible
from rtCommon.bidsArchive import BidsArchive
from rtCommon.bidsRun import BidsRun
from rtCommon.bidsIncremental import BidsIncremental
from rtCommon.bidsCommon import getDicomMetadata
from rtCommon.imageHandling import convertDicomImgToNifti
from rtCommon.dataInterface import DataInterface
from rtCommon.openNeuro import OpenNeuroCache
from rtCommon.errors import RequestError, MissingMetadataError


Expand Down Expand Up @@ -58,6 +55,7 @@ def __init__(self, dataRemote=False, allowedDirs=[], scannerClockSkew=0):
# Store the allowed directories to be used by the DicomToBidsStream class
self.allowedDirs = allowedDirs
self.scannerClockSkew = scannerClockSkew
self.openNeuroCache = OpenNeuroCache(cachePath="/tmp/openneuro")


def initDicomBidsStream(self, dicomDir, dicomFilePattern, dicomMinSize, **entities) -> int:
Expand Down Expand Up @@ -109,10 +107,13 @@ def initOpenNeuroStream(self, dsAccessionNumber, **entities) -> int:
Returns:
streamId: An identifier used when calling stream functions, such as getIncremental()
"""
if 'subject' not in entities or 'run' not in entities:
raise RequestError("initOpenNeuroStream: Must specify subject and run number")
archivePath = self.openNeuroCache.downloadData(dsAccessionNumber, **entities)
# TODO - allow multiple simultaneous streams to be instantiated
streamId = 1
openNeuroStream = OpenNeuroStream(dsAccessionNumber, **entities)
self.streamMap[streamId] = openNeuroStream
bidsStream = BidsStream(archivePath, **entities)
self.streamMap[streamId] = bidsStream
return streamId

def getIncremental(self, streamId, volIdx=-1) -> BidsIncremental:
Expand Down Expand Up @@ -141,6 +142,9 @@ def getNumVolumes(self, streamId) -> int:
stream = self.streamMap[streamId]
return stream.getNumVolumes()

def closeStream(self, streamId):
# remove the stream from the map
self.streamMap.pop(streamId, None)

def getClockSkew(self, callerClockTime: float, roundTripTime: float) -> float:
"""
Expand Down Expand Up @@ -299,55 +303,3 @@ def getIncremental(self, volIdx=-1) -> BidsIncremental:
return incremental
else:
return None


class OpenNeuroStream(BidsStream):
"""
A BidsStream from an OpenNeuro dataset. The OpenNeuro dataset will be automatically
downloaded, as needed, on the computer where this stream is intialized.
"""
def __init__(self, dsAccessionNumber, **entities):
"""
Args:
dsAccessionNumber: The OpenNeruo specific accession number for the dataset
to stream.
entities: BIDS entities (subject, session, task, run, suffix, datatype) that
define the particular subject/run of the data to stream
"""
subject = entities.get('subject')
run = entities.get('run')
if subject is None or run is None:
raise RequestError("OpenNeuroStream: Must specify subject and run number")
# TODO - Use OpenNeuroService when it is available, to download
# and access the dataset and get dataset entities
# OpenNeuroService to provide path to dataset
datasetPath = tmpDownloadOpenNeuro(dsAccessionNumber, subject, run)
super().__init__(datasetPath, **entities)


def tmpDownloadOpenNeuro(dsAccessNumber, subject, run) -> str:
"""
Temporary function used until we integrate in the OpenNeuro service. Downloads
a portion of an OpenNeuro dataset corresponding to the subject/run.
Args:
dsAccessionNumber: The OpenNeruo specific accession number for the dataset
to stream.
subject: the specific subject name within the OpenNeuro dataset to download
run: the specific run within the subject's data to download.
Returns:
Absolute path to where the dataset has been downloaded.
"""
tmpDir = tempfile.gettempdir()
print(f'OpenNeuro Data cached to {tmpDir}')
datasetDir = os.path.join(tmpDir, dsAccessNumber)
# check if already downloaded
includePattern = f'sub-{subject}/func/*run-{run:02d}*'
files = glob.glob(os.path.join(datasetDir, includePattern))
if len(files) == 0:
os.makedirs(datasetDir, exist_ok = True)
awsCmd = f'aws s3 sync --no-sign-request s3://openneuro.org/{dsAccessNumber} ' \
f'{datasetDir} --exclude "*/*" --include "{includePattern}"'
print(f'run {awsCmd}')
os.system(awsCmd)
return datasetDir
2 changes: 1 addition & 1 deletion rtCommon/dataInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def initScannerStream(self, imgDir: str, filePattern: str,

def getImageData(self, streamId: int, imageIndex: int=None, timeout: int=5) -> pydicom.dataset.FileDataset:
"""
Get data from a stream initialized with initScannerStream or initOpenNeuroStream
Get data from a stream initialized with initScannerStream
Args:
streamId: Id of a previously opened stream.
Expand Down
169 changes: 169 additions & 0 deletions rtCommon/openNeuro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""
An interface to access OpenNeuro data and metadata. It can download
and cache OpenNeuro data for playback.
"""
import os
import json
import boto3
from botocore.config import Config
from botocore import UNSIGNED
import rtCommon.utils as utils


class OpenNeuroCache():
def __init__(self, cachePath="/tmp/openneuro/"):
self.cachePath = cachePath
self.datasetList = None
self.s3Client = None
os.makedirs(cachePath, exist_ok = True)

def getCachePath(self):
return self.cachePath

def getS3Client(self):
"""Returns an s3 client in order to reuse the same s3 client without
always creating a new one. Not thread safe currently.
"""
if self.s3Client is None:
self.s3Client = boto3.client("s3", config=Config(signature_version=UNSIGNED))
return self.s3Client

def getDatasetList(self, refresh=False):
"""
Returns a list of all datasets available in OpenNeuro S3 storage
"See https://openneuro.org/public/datasets for datasets info"
Alternate method to access from a command line call:
aws s3 --no-sign-request ls s3://openneuro.org/
"""
if self.datasetList is None or len(self.datasetList)==0 or refresh is True:
s3Client = boto3.client("s3", config=Config(signature_version=UNSIGNED))
all_datasets = s3Client.list_objects(Bucket='openneuro.org', Delimiter="/")
self.datasetList = []
for dataset in all_datasets.get('CommonPrefixes'):
dsetName = dataset.get('Prefix')
# strip trailing slash characters
dsetName = dsetName.rstrip('/\\')
self.datasetList.append(dsetName)
return self.datasetList

def isValidAccessionNumber(self, dsAccessionNum):
if dsAccessionNum not in self.getDatasetList():
print(f"{dsAccessionNum} not in the OpenNeuro S3 datasets.")
return False
return True

def getSubjectList(self, dsAccessionNum):
"""
Returns a list of all the subjects in a dataset
Args:
dsAccessionNum - accession number of dataset to lookup
Returns:
list of subjects in that dataset
"""
if not self.isValidAccessionNumber(dsAccessionNum):
return None
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
prefix = dsAccessionNum + '/sub-'
dsSubjDirs = s3.list_objects(Bucket='openneuro.org', Delimiter="/", Prefix=prefix)
subjects = []
for info in dsSubjDirs.get('CommonPrefixes'):
subj = info.get('Prefix')
if subj is not None:
subj = subj.split('sub-')[1]
if subj is not None:
subj = subj.rstrip('/\\')
subjects.append(subj)
return subjects

def getDescription(self, dsAccessionNum):
"""
Returns the dataset description file as a python dictionary
"""
if not self.isValidAccessionNumber(dsAccessionNum):
return None
dsDir = self.downloadData(dsAccessionNum, downloadWholeDataset=False)
filePath = os.path.join(dsDir, 'dataset_description.json')
descDict = None
try:
with open(filePath, 'r') as fp:
descDict = json.load(fp)
except Exception as err:
print(f"Failed to load dataset_description.json: {err}")
return descDict

def getReadme(self, dsAccessionNum):
"""
Return the contents of the dataset README file.
Downloads toplevel dataset files if needed.
"""
if not self.isValidAccessionNumber(dsAccessionNum):
return None
dsDir = self.downloadData(dsAccessionNum, downloadWholeDataset=False)
filePath = os.path.join(dsDir, 'README')
readme = None
try:
readme = utils.readFile(filePath)
except Exception as err:
print(f"Failed to load README: {err}")
return readme


def getArchivePath(self, dsAccessionNum):
"""Returns the directory path to the cached dataset files"""
archivePath = os.path.join(self.cachePath, dsAccessionNum)
return archivePath


def downloadData(self, dsAccessionNum, downloadWholeDataset=False, **entities):
"""
This command will sync the specified portion of the dataset to the cache directory.
Note: if only the accessionNum is supplied then it will just sync the top-level files.
Sync doesn't re-download files that are already present in the directory.
Consider using --delete which removes local cache files no longer on the remote.
Args:
dsAccessionNum: accession number of the dataset to download data for.
downloadWholeDataset: boolean, if true all files in the dataset
will be downloaded.
entities: BIDS entities (subject, session, task, run, suffix) that
define the particular subject/run of the data to download.
Returns:
Path to the directory containing the downloaded dataset data.
"""
if not self.isValidAccessionNumber(dsAccessionNum):
print(f"{dsAccessionNum} not in the OpenNeuro S3 datasets.")
return False

includePattern = ''
if 'subject' in entities:
subject = entities['subject']
if type(subject) is int:
subject = f'{subject:02d}'
includePattern += f'sub-{subject}/'
if 'session' in entities:
session = entities['session']
if includePattern == '':
includePattern = '*'
if type(session) is int:
session = f'{session:02d}'
includePattern += f'ses-{session}/'
if 'task' in entities:
task = entities['task']
includePattern += f'*task-{task}'
if 'run' in entities:
run = entities['run']
if type(run) is int:
run = f'{run:02d}'
includePattern += f'*run-{run}'
if 'suffix' in entities:
suffix = entities['suffix']
includePattern += f'*{suffix}'
if includePattern != '' or downloadWholeDataset is True:
includePattern += '*'

datasetDir = os.path.join(self.cachePath, dsAccessionNum)
awsCmd = f'aws s3 sync --no-sign-request s3://openneuro.org/{dsAccessionNum} ' \
f'{datasetDir} --exclude "*/*" --include "{includePattern}"'
print(f'run {awsCmd}')
os.system(awsCmd)
return datasetDir

2 changes: 1 addition & 1 deletion rtCommon/openNeuroService.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
A command-line service to be run where the where OpenNeuro data is downloaded and cached.
A command-line service to be run where the OpenNeuro data is downloaded and cached.
This service instantiates a BidsInterface object for serving the data back to the client
running in the cloud. It connects to the remote projectServer.
Once a connection is established it waits for requets and invokes the BidsInterface
Expand Down
17 changes: 12 additions & 5 deletions tests/test_bidsInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import time
import math
import pytest
from numpy.core.numeric import isclose
from rtCommon.bidsArchive import BidsArchive
from rtCommon.bidsIncremental import BidsIncremental
from rtCommon.imageHandling import convertDicomImgToNifti, readDicomFromFile
from rtCommon.clientInterface import ClientInterface
from rtCommon.bidsInterface import BidsInterface, tmpDownloadOpenNeuro
from rtCommon.bidsInterface import BidsInterface
from rtCommon.bidsCommon import getDicomMetadata
from rtCommon.openNeuro import OpenNeuroCache
import rtCommon.utils as utils
from tests.backgroundTestServers import BackgroundTestServers
from tests.common import rtCloudPath, tmpDir
Expand Down Expand Up @@ -122,11 +122,18 @@ def dicomStreamTest(bidsInterface):
def openNeuroStreamTest(bidsInterface):
dsAccessionNumber = 'ds002338'
dsSubject = 'xp201'
datasetDir = tmpDownloadOpenNeuro(dsAccessionNumber, dsSubject, 1)
localEntities = {'subject': dsSubject, 'run': 1, 'suffix': 'bold', 'datatype': 'func'}
remoteEntities = {'subject': dsSubject, 'run': 1}
remoteEntities = {'subject': dsSubject, 'run': 1, 'suffix': 'bold'}
extraKwargs = {}
if bidsInterface.isRunningRemote():
# Set longer timeout for potentially downloading data
extraKwargs = {"rpc_timeout": 60}
streamId = bidsInterface.initOpenNeuroStream(dsAccessionNumber, **remoteEntities,
**extraKwargs)
openNeuroCache = OpenNeuroCache()
datasetDir = openNeuroCache.downloadData(dsAccessionNumber, **localEntities)
localBidsArchive = BidsArchive(datasetDir)
streamId = bidsInterface.initOpenNeuroStream(dsAccessionNumber, **remoteEntities)

for idx in range(3):
streamIncremental = bidsInterface.getIncremental(streamId)
localIncremental = localBidsArchive._getIncremental(idx, **localEntities)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_dataInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_rpyclocalDataInterface(self, dicomTestFilename, bigTestFile):
allowedFileTypes=allowedFileTypes,
dataRemote=False,
subjectRemote=False)
clientInterface = ClientInterface()
clientInterface = ClientInterface(rpyc_timeout=70)
dataInterface = clientInterface.dataInterface
assert clientInterface.isDataRemote() == False
assert dataInterface.isRemote == False
Expand All @@ -80,7 +80,7 @@ def test_remoteDataInterface(self, dicomTestFilename, bigTestFile):
allowedFileTypes=allowedFileTypes,
dataRemote=True,
subjectRemote=False)
clientInterface = ClientInterface()
clientInterface = ClientInterface(rpyc_timeout=70)
dataInterface = clientInterface.dataInterface
assert clientInterface.isDataRemote() == True
assert dataInterface.isRemote == True
Expand Down
Loading

0 comments on commit c2cfce6

Please sign in to comment.