Skip to content

Commit

Permalink
Add functions to get clockSkew and calculate time until next TR (#64)
Browse files Browse the repository at this point in the history
* Add functions to get clockSkew and calculate time until next TR using image header acquisitionTime and repetitionTime fields
  • Loading branch information
gdoubleyew authored Sep 20, 2021
1 parent 22ef95e commit 5578219
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 16 deletions.
31 changes: 24 additions & 7 deletions projects/sample/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@
# project modules from rt-cloud
sys.path.append(rootPath)
# import project modules from rt-cloud
from rtCommon.utils import loadConfigFile, stringPartialFormat
from rtCommon.utils import loadConfigFile, stringPartialFormat, calcAvgRoundTripTime
from rtCommon.clientInterface import ClientInterface
from rtCommon.imageHandling import readRetryDicomFromDataInterface, convertDicomImgToNifti
from rtCommon.imageHandling import dicomTimeToNextTr

# obtain the full path for the configuration toml file
defaultConfig = os.path.join(currPath, 'conf/sample.toml')
Expand All @@ -93,7 +94,14 @@ def doRuns(cfg, dataInterface, subjInterface, webInterface):
None.
"""
subjInterface.setMessage("Preparing Run ...")
time.sleep(1)

# Time delay to add between retrieving pre-collected dicoms (for re-runs)
demoTimeDelay = 1

# get round trip time to dataInterface computer
rttSec = calcAvgRoundTripTime(dataInterface.ping)
# get clockSkew between this computer and the dataInterface computer
clockSkew = dataInterface.getClockSkew(time.time(), rttSec)

# variables we'll use throughout
scanNum = cfg.scanNum[0]
Expand Down Expand Up @@ -157,7 +165,8 @@ def doRuns(cfg, dataInterface, subjInterface, webInterface):
"""
if verbose:
print("• initalize a watch for the dicoms using 'initWatch'")
dataInterface.initWatch(cfg.dicomDir, dicomScanNamePattern, cfg.minExpectedDicomSize)
dataInterface.initWatch(cfg.dicomDir, dicomScanNamePattern,
cfg.minExpectedDicomSize, demoStep=demoTimeDelay)

else: # use Stream functions
"""
Expand All @@ -172,7 +181,8 @@ def doRuns(cfg, dataInterface, subjInterface, webInterface):
"""
streamId = dataInterface.initScannerStream(cfg.dicomDir,
dicomScanNamePattern,
cfg.minExpectedDicomSize)
cfg.minExpectedDicomSize,
demoStep=demoTimeDelay)


"""
Expand Down Expand Up @@ -303,7 +313,16 @@ def doRuns(cfg, dataInterface, subjInterface, webInterface):
minAvg = 305
maxAvg = 315
feedback = (avg_niftiData - minAvg) / (maxAvg - minAvg)
subjInterface.setResult(runNum, int(this_TR), float(feedback), 1000)
# Get the seconds remaining before next TR starts, this can be passed to
# the setResult function to delay stimulus until that time
try:
secUntilNextTr = dicomTimeToNextTr(dicomData, clockSkew)
print(f"## Secs to next TR {secUntilNextTr}")
except Exception as err:
print(f'dicomTimeToNextTr error: {err}')

setFeedbackDelay = 500 # milliseconds
subjInterface.setResult(runNum, int(this_TR), float(feedback), setFeedbackDelay)

# Finally we will use use webInterface.plotDataPoint() to send the result
# to the web browser to be plotted in the --Data Plots-- tab.
Expand All @@ -317,14 +336,12 @@ def doRuns(cfg, dataInterface, subjInterface, webInterface):

# save the activations value info into a vector that can be saved later
all_avg_activations[this_TR] = avg_niftiData
time.sleep(1)

# create the full path filename of where we want to save the activation values vector.
# we're going to save things as .txt and .mat files
output_textFilename = '/tmp/cloud_directory/tmp/avg_activations.txt'
output_matFilename = os.path.join('/tmp/cloud_directory/tmp/avg_activations.mat')

time.sleep(1)
subjInterface.setMessage("End Run")
responses = subjInterface.getAllResponses()
keypresses = [response.get('key_pressed') for response in responses]
Expand Down
23 changes: 23 additions & 0 deletions rtCommon/bidsIncremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
-----------------------------------------------------------------------------"""
from copy import deepcopy
from datetime import datetime
from operator import eq as opeq
from typing import Any, Callable
import json
Expand Down Expand Up @@ -38,6 +39,7 @@
symmetricDictDifference,
writeDataFrameToEvents,
)
from rtCommon.utils import getTimeToNextTR
from rtCommon.errors import MissingMetadataError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -562,6 +564,27 @@ def getDataDirPath(self) -> str:
"""
return bids_build_path(self._imgMetadata, BIDS_DIR_PATH_PATTERN)

def getAcquisitionTime(self) -> datetime.time:
"""Returns the acquisition time as a datetime.time """
acqTm = self.getMetadataField('AcquisitionTime')
dtm = datetime.strptime(acqTm, '%H%M%S.%f')
return dtm.time()

def getRepetitionTime(self) -> float:
"""Returns the TR repetition time in seconds"""
repTm = self.getMetadataField('RepetitionTime')
tr_ms = float(repTm)
return tr_ms

def timeToNextTr(self, clockSkew, now=None) -> float:
"""Based on acquisition time returns seconds to next TR start"""
acquisitionTime = self.getAcquisitionTime()
repetitionTime = self.getRepetitionTime()
if now is None: # now variable can be passed in for testing
now = datetime.now().time()
secToNextTr = getTimeToNextTR(acquisitionTime, repetitionTime, now, clockSkew)
return secToNextTr

def writeToDisk(self, datasetRoot: str, onlyData=False) -> None:
"""
Writes the incremental's data to a directory on disk. NOTE: The
Expand Down
34 changes: 33 additions & 1 deletion rtCommon/bidsInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
import os
import glob
import time
import tempfile
import nibabel as nib
from rtCommon.remoteable import RemoteableExtensible
Expand All @@ -35,13 +36,15 @@ class BidsInterface(RemoteableExtensible):
If dataRemote=False, then the methods below will be invoked locally and the RemoteExtensible
parent class is inoperable (i.e. does nothing).
"""
def __init__(self, dataRemote=False, allowedDirs=[]):
def __init__(self, dataRemote=False, allowedDirs=[], scannerClockSkew=0):
"""
Args:
dataRemote (bool): Set to true for a passthrough instance that will forward requests.
Set to false for the actual instance running remotely
allowedDirs (list): Only applicable for DicomToBidsStreams. Indicates the
directories that Dicom files are allowed to be read from.
scannerClockSkew (float): number of seconds the scanner's clock is ahead of the
data server clock
"""
super().__init__(isRemote=dataRemote)
if dataRemote is True:
Expand All @@ -54,6 +57,7 @@ def __init__(self, dataRemote=False, allowedDirs=[]):
self.streamMap = {}
# Store the allowed directories to be used by the DicomToBidsStream class
self.allowedDirs = allowedDirs
self.scannerClockSkew = scannerClockSkew


def initDicomBidsStream(self, dicomDir, dicomFilePattern, dicomMinSize, **entities) -> int:
Expand Down Expand Up @@ -138,6 +142,34 @@ def getNumVolumes(self, streamId) -> int:
return stream.getNumVolumes()


def getClockSkew(self, callerClockTime: float, roundTripTime: float) -> float:
"""
Returns the clock skew between the caller's computer and the scanner clock.
This function is assumed to be running in the scanner room and have adjustments
to translate this server's clock to the scanner clock.
Value returned is in seconds. A positive number means the scanner clock
is ahead of the caller's clock. The caller should add the skew to their
localtime to get the time in the scanner's clock.
Args:
callerClockTime - current time (secs since epoch) of caller's clock
roundTripTime - measured RTT in seconds to remote caller
Returns:
Clockskew - seconds the scanner's clock is ahead of the caller's clock
"""
# Adjust the caller's clock forward by 1/2 round trip time
callerClockAdjToNow = callerClockTime + roundTripTime / 2.0
now = time.time()
# calcluate the time this server's clock is ahead of the caller's clock
skew = now - callerClockAdjToNow
# add the time skew from this server to the scanner clock
totalSkew = skew + self.scannerClockSkew
return totalSkew

def ping(self) -> float:
"""Returns seconds since the epoch"""
return time.time()


class DicomToBidsStream():
"""
A class that watches for DICOM file creation in a specified directory and with
Expand Down
32 changes: 31 additions & 1 deletion rtCommon/dataInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class DataInterface(RemoteableExtensible):
If dataRemote=False, then the methods below will be invoked locally and the RemoteExtensible
parent class is inoperable (i.e. does nothing).
"""
def __init__(self, dataRemote :bool=False, allowedDirs :List[str]=None, allowedFileTypes :List[str]=None):
def __init__(self, dataRemote :bool=False, allowedDirs :List[str]=None,
allowedFileTypes :List[str]=None, scannerClockSkew :float=0):
"""
Args:
dataRemote (bool): whether data will be served from the local instance or requests forwarded
Expand All @@ -48,6 +49,8 @@ def __init__(self, dataRemote :bool=False, allowedDirs :List[str]=None, allowedF
allowedFileTypes (list): list of file extensions, such as '.dcm', '.txt', for which file
operations are permitted. No file operations will be done unless the file extension matches
one on the list.
scannerClockSkew (float): number of seconds the scanner's clock is ahead of the
data server clock
"""
super().__init__(isRemote=dataRemote)
if dataRemote is True:
Expand All @@ -57,6 +60,7 @@ def __init__(self, dataRemote :bool=False, allowedDirs :List[str]=None, allowedF
self.currentStreamId = 0
self.streamInfo = None
self.allowedDirs = allowedDirs
self.scannerClockSkew = scannerClockSkew
# Remove trailing slash from dir names
if allowedDirs is not None:
self.allowedDirs = [dir.rstrip('/') for dir in allowedDirs]
Expand Down Expand Up @@ -290,6 +294,32 @@ def getAllowedFileTypes(self) -> List[str]:
"""Returns the list of file extensions which are allowed for read and write"""
return self.allowedFileTypes

def getClockSkew(self, callerClockTime: float, roundTripTime: float) -> float:
"""
Returns the clock skew between the caller's computer and the scanner clock.
This function is assumed to be running in the scanner room and have adjustments
to translate this server's clock to the scanner clock.
Value returned is in seconds. A positive number means the scanner clock
is ahead of the caller's clock. The caller should add the skew to their
localtime to get the time in the scanner's clock.
Args:
callerClockTime - current time (secs since epoch) of caller's clock
roundTripTime - measured RTT in seconds to remote caller
Returns:
Clockskew - seconds the scanner's clock is ahead of the caller's clock
"""
# Adjust the caller's clock forward by 1/2 round trip time
callerClockAdjToNow = callerClockTime + roundTripTime / 2.0
now = time.time()
# calcluate the time this server's clock is ahead of the caller's clock
skew = now - callerClockAdjToNow
# add the time skew from this server to the scanner clock
return skew + self.scannerClockSkew

def ping(self) -> float:
"""Returns seconds since the epoch"""
return time.time()

def _checkAllowedDirs(self, dir: str) -> bool:
if self.allowedDirs is None or len(self.allowedDirs) == 0:
raise ValidationError('DataInterface: no allowed directories are set')
Expand Down
35 changes: 33 additions & 2 deletions rtCommon/imageHandling.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import numpy as np # type: ignore
import nibabel as nib
import pydicom
from rtCommon.errors import StateError, ValidationError, InvocationError, RequestError
from datetime import datetime
from rtCommon.utils import getTimeToNextTR
from rtCommon.errors import StateError, ValidationError, InvocationError
from nilearn.image import new_img_like
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning)
Expand Down Expand Up @@ -63,11 +65,12 @@ def getDicomFileName(cfg, scanNum, fileNum):

return fullFileName

# Note: don't anonymize AcquisitionTime, needed to sync with TR
attributesToAnonymize = [
'PatientID', 'PatientAge', 'PatientBirthDate', 'PatientName',
'PatientSex', 'PatientSize', 'PatientWeight', 'PatientPosition',
'StudyDate', 'StudyTime', 'SeriesDate', 'SeriesTime',
'AcquisitionDate', 'AcquisitionTime', 'ContentDate', 'ContentTime',
'AcquisitionDate', 'ContentDate', 'ContentTime',
'InstanceCreationDate', 'InstanceCreationTime',
'PerformedProcedureStepStartDate', 'PerformedProcedureStepStartTime'
]
Expand Down Expand Up @@ -197,6 +200,34 @@ def parseDicomVolume(dicomImg, sliceDim):
return volume


def getDicomAcquisitionTime(dicomImg) -> datetime.time:
"""
Returns the acquisition time as a datetime.time
Note: day, month and year are not specified
"""
acqTm = dicomImg.get('AcquisitionTime', None)
if acqTm is None:
return None
dtm = datetime.strptime(acqTm, '%H%M%S.%f')
return dtm.time()

def getDicomRepetitionTime(dicomImg) -> float:
"""Returns the TR repetition time in seconds"""
repTm = dicomImg.get('RepetitionTime', None)
if repTm is None:
return None
tr_sec = float(repTm) / 1000
return tr_sec

def dicomTimeToNextTr(dicomImg, clockSkew, now=None):
"""Based on Dicom header returns seconds to next TR start"""
acquisitionTime = getDicomAcquisitionTime(dicomImg)
repetitionTime = getDicomRepetitionTime(dicomImg)
if now is None: # now variable may be passed in for testing purposes
now = datetime.now().time()
secToNextTr = getTimeToNextTR(acquisitionTime, repetitionTime, now, clockSkew)
return secToNextTr

"""-----------------------------------------------------------------------------
The following functions are used to convert dicom files into nifti files, which
Expand Down
12 changes: 10 additions & 2 deletions rtCommon/scannerDataService.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ def __init__(self, args, webSocketChannelName='wsData'):
webSocketChannelName: The websocket url extension used to connecy and communicate
to the remote projectServer, e.g. 'wsData' would connect to 'ws://server:port/wsData'
"""
if args.scannerClockSkew is None:
args.scannerClockSkew = 0

self.dataInterface = DataInterface(dataRemote=False,
allowedDirs=args.allowedDirs,
allowedFileTypes=args.allowedFileTypes)
self.bidsInterface = BidsInterface(dataRemote=False, allowedDirs=args.allowedDirs)
allowedFileTypes=args.allowedFileTypes,
scannerClockSkew=args.scannerClockSkew)
self.bidsInterface = BidsInterface(dataRemote=False,
allowedDirs=args.allowedDirs,
scannerClockSkew=args.scannerClockSkew)

self.wsRemoteService = WsRemoteService(args, webSocketChannelName)
self.wsRemoteService.addHandlerClass(DataInterface, self.dataInterface)
Expand All @@ -57,6 +63,8 @@ def __init__(self, args, webSocketChannelName='wsData'):
help="Allowed directories to server files from - comma separated list")
parser.add_argument('-f', action="store", dest="allowedFileTypes", default=defaultAllowedTypes,
help="Allowed file types - comma separated list")
parser.add_argument('--scannerClockSkew', default=0.0, type=float,
help="Seconds (float) that the scanner clock is ahead of the data server clock")
args, _ = parser.parse_known_args(namespace=connectionArgs)

if type(args.allowedDirs) is str:
Expand Down
Loading

0 comments on commit 5578219

Please sign in to comment.