Skip to content

Commit

Permalink
Merge pull request #22 from FNNDSC/px-push-rewrite
Browse files Browse the repository at this point in the history
Px push rewrite
  • Loading branch information
Sandip117 authored Jul 18, 2023
2 parents d479159 + acb8105 commit 33d0e97
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 37 deletions.
283 changes: 281 additions & 2 deletions pypx/pfstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ def __init__(self, arg, *args, **kwargs):
"""

proto = 'https' if arg['str_swiftPort'] == '443' else 'http'
if arg['str_storeBaseLocation']:
cont_name = arg['str_storeBaseLocation']
else:
cont_name = "users"
self.state_create(
{
"swift": {
"auth_url": "%s://%s:%s/auth/v1.0" % \
(proto, arg['str_swiftIP'], arg['str_swiftPort']),
"username": arg['str_swiftLogin'],
"key": "testing",
"container_name": "users",
"container_name": cont_name,
"auto_create_container": True,
"file_storage": "swift.storage.SwiftStorage"
}
Expand Down Expand Up @@ -783,4 +787,279 @@ def run(self, opt={}) -> dict:
self.log(str_msg, comms = 'error')
self.log(json.dumps(d_actionResult, indent = 4), comms = 'tx')

return d_actionResult
return d_actionResult


class fileStorage(PfStorage):
def __init__(self, arg, *args, **kwargs):
"""
Core initialization and logic in the base class
"""
# Check if an upstream object exists, and if so
# merge those args with the current namespace:
if 'upstream' in arg.keys():
d_argCopy = arg.copy()
# "merge" these 'arg's with upstream.
arg.update(arg['reportData']['args'])
# Since this might overwrite some args specific to this
# app, we update again to the copy.
arg.update(d_argCopy)

PfStorage.__init__(self, arg, *args, **kwargs)

def objPull(self, *args, **kwargs):
pass

def objPut_process(self, *args, **kwargs) -> dict:
"""
Process the 'objPut' directive.
DICOM handling
--------------
A special behaviour is available for DICOM files, triggered by passing
a kwarg of 'DICOMsubstr = <X>'. In this case, DICOM files (as identi-
fied by containing the substring pattern within the filename) will be
read for tag information used to generate the fully qualified storage
path.
This fully qualified storage path will be substituted into the
'toLocation = <someswiftpath>' by replacing the special tag
'%pack' in the <someswiftpath>.
NOTE:
* Typically a list of files all constitute the same DICOM SERIES
and as such, only one of file in the list needs to be processed for
packing tags.
* If the 'do' 'objPut' directive contains a true value for the field
'packEachDICOM', then each DICOM will be explicitly examined and
packed individually.
"""

def toLocation_updateWithDICOMtags(str_DICOMfilename) -> dict:
"""
Read the str_DICOMfilename, determine the pack path,
and update the 'toLocation' if necessary.
Return the original and modified 'toLocation' and status flag.
"""
b_pack = False
d_DICOMread = self.packer.DICOMfile_read(file=str_DICOMfilename)
d_path = self.packer.packPath_resolve(d_DICOMread)
self.obj[str_DICOMfilename] = d_DICOMread
str_origTo = d_args['toLocation']
if '%pack' in d_args['toLocation']:
b_pack = True
d_args['toLocation'] = \
d_args['toLocation'].replace('%pack', d_path['packDir'])
return {
'pack': b_pack,
'originalLocation': str_origTo,
'path': d_path,
'toLocation': d_args['toLocation']
}

def files_putSingly() -> dict:
"""
Handle a single file put, return and update d_ret
"""
nonlocal d_ret
nonlocal b_singleShot
d_pack: dict = {}
b_singleShot = True
d_ret = {
'status': False,
'localFileList': [],
'objectFileList': []
}
self.obj = {}
# pudb.set_trace()
for f in d_fileList['l_fileFS']:
d_pack = toLocation_updateWithDICOMtags(f)
d_args['file'] = f
d_args['remoteFile'] = d_pack['path']['imageFile']
d_put = self.objPut(**d_args)
d_ret[f] = d_put
d_args['toLocation'] = d_pack['originalLocation']
d_ret['status'] = d_put['status']
if d_ret['status']:
d_ret['localFileList'].append(d_put['localFileList'][0])
d_ret['objectFileList'].append(d_put['objectFileList'][0])
else:
break
return d_ret

d_ret: dict = {
'status': False,
'msg': "No 'arg' JSON directive found in request"
}

d_msg: dict = {}
d_args: dict = {}
str_localPath: str = ""
str_DICOMsubstr: str = ""
b_singleShot: bool = False

for k, v in kwargs.items():
if k == 'request': d_msg = v
# pudb.set_trace()
if 'args' in d_msg:
d_args = d_msg['args']
if 'localpath' in d_args:
str_localPath = d_args['localpath']
if 'DICOMsubstr' in d_args:
d_fileList = self.filesFind(
root=str_localPath,
fileSubStr=d_args['DICOMsubstr']
)
if 'packEachDICOM' in d_args:
if d_args['packEachDICOM']: files_putSingly()
if len(d_fileList['l_fileFS']) and not b_singleShot:
toLocation_updateWithDICOMtags(d_fileList['l_fileFS'][0])
else:
d_fileList = self.filesFind(
root=str_localPath
)
if d_fileList['status'] and not b_singleShot:
d_args['fileList'] = d_fileList['l_fileFS']
d_ret = self.objPut(**d_args)
elif not d_fileList['status']:
d_ret['msg'] = 'No valid file list generated'
return d_ret

def objPut(self, *args, **kwargs) -> dict:
"""
Put an object (or list of objects) into swift storage.
This method also "maps" tree locations in the local storage
to new locations in the object storage. For example, assume
a list of local locations starting with:
/home/user/project/data/ ...
and we want to pack everything in the 'data' dir to
object storage, at location '/storage'. In this case, the
pattern of kwargs specifying this would be:
fileList = ['/home/user/project/data/file1',
'/home/user/project/data/dir1/file_d1',
'/home/user/project/data/dir2/file_d2'],
toLocation = '/storage',
mapLocationOver = '/home/user/project/data'
will replace, for each file in <fileList>, the <mapLocationOver> with
<inLocation>, resulting in a new list
'/storage/file1',
'/storage/dir1/file_d1',
'/storage/dir2/file_d2'
"""
b_status: bool = True
l_localfile: list = [] # Name on the local file system
l_remotefileName: list = [] # A replacement for the remote filename
l_objectfile: list = [] # Name in the object storage
str_swiftLocation: str = ''
str_mapLocationOver: str = ''
str_localfilename: str = ''
str_storagefilename: str = ''
str_swiftLocation: str = ""
str_remoteFile: str = ""
dst: str = ""
d_ret: dict = {
'status': b_status,
'localFileList': [],
'objectFileList': [],
'localpath': ''
}

for k, v in kwargs.items():
if k == 'file': l_localfile.append(v)
if k == 'remoteFile': l_remotefileName.append(v)
if k == 'remoteFileList': l_remotefileName = v
if k == 'fileList': l_localfile = v
if k == 'toLocation': str_swiftLocation = v
if k == 'mapLocationOver': str_mapLocationOver = v

if len(str_mapLocationOver):
# replace the local file path with object store path
l_objectfile = [w.replace(str_mapLocationOver, str_swiftLocation) \
for w in l_localfile]
else:
# Prepend the swiftlocation to each element in the localfile list:
l_objectfile = [str_swiftLocation + '{0}'.format(i) for i in l_localfile]

# Check and possibly change the actual file *names* to put into swift storage
# (the default is to use the same name as the local file -- however in the
# case of DICOM files, the actual final file name might also change)
if len(l_remotefileName):
l_objectfile = [l.replace(os.path.basename(l), f) for l, f in
zip(l_objectfile, l_remotefileName)]

d_ret['localpath'] = os.path.dirname(l_localfile[0])
d_conn = self.state('/swift/container_name')
if d_conn:
for str_localfilename, str_storagefilename in zip(l_localfile, l_objectfile):
try:
d_ret['status'] = True and d_ret['status']
dst = Path(d_conn)/str_storagefilename
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(str_localfilename, str(dst))
except Exception as e:
d_ret['error'] = '%s' % e
d_ret['status'] = False
d_ret['localFileList'].append(str_localfilename)
d_ret['objectFileList'].append(str(dst))
return d_ret

def connect(self, *args, **kwargs):
pass

def ls(self, *args, **kwargs):
pass

def ls_process(self, *args, **kwargs):
pass

def objExists(self, *args, **kwargs):
pass

def run(self, opt={}) -> dict:
"""
Perform the storage operation
"""
d_actionResult : dict = {
'status' : False,
'msg' : ''
}
try:
# First see if the "do" directive is a CLI
# flag captured in the self.arg structure
d_do : dict = json.loads(self.arg['do'])
except:
# Else, assume that the d_do is the passed opt
d_do = opt
if 'action' in d_do:
self.log("verb: %s detected." % d_do['action'],
comms = 'status')
str_method = '%s_process' % d_do['action']
self.log("method to call: %s(request = d_msg) " % str_method,
comms = 'status')
try:
# pudb.set_trace()
method = getattr(self, str_method)
d_actionResult = method(request = d_do)
except Exception as ex:
str_msg = "Class '{}' does not implement method '{}':{}".format(
self.__class__.__name__,
str_method,
str(ex))
d_actionResult = {
'status': False,
'msg': str_msg
}
self.log(str_msg, comms = 'error')
self.log(json.dumps(d_actionResult, indent = 4), comms = 'tx')

return d_actionResult
33 changes: 19 additions & 14 deletions pypx/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sys
from datetime import date, datetime

from .pfstorage import swiftStorage
from .pfstorage import swiftStorage, fileStorage

# PYPX modules
import pypx.smdb
Expand Down Expand Up @@ -120,9 +120,9 @@ def parser_setup(str_desc):
action = 'store_true',
default = False)
parser.add_argument(
'--swiftBaseLocation',
'--storeBaseLocation',
action = 'store',
dest = 'str_swiftBaseLocation',
dest = 'str_storeBaseLocation',
type = str,
default = '',
help = 'swift base location to push files')
Expand Down Expand Up @@ -228,8 +228,9 @@ class Push(Base):
"""
``px-push`` is the primary vehicle for transmitting a DICOM file
to a remote location. The remote location can be either another
PACS node (in which case the PACS related args are used), or
swift storage (in which the swift related args are used). In the
PACS node (in which case the PACS related args are used), a
swift storage (in which the swift related args are used), or a
file system (in which store base related args are used). In the
case of swift storage, and if CUBE related args are used, then
this module will also register the files that have been pushed
to the CUBE instance.
Expand All @@ -244,12 +245,15 @@ def serviceKey_process(self) -> dict:
d_swiftInfo : dict = {}
d_swiftInfo['status'] = False
if len(self.arg['swift']):
d_swiftInfo = self.smdb.service_keyAccess('swift')
d_swiftInfo = self.smdb.service_keyAccess('storage')
if d_swiftInfo['status']:
self.arg['str_swiftIP'] = d_swiftInfo['swift'][self.arg['swift']]['ip']
self.arg['str_swiftPort'] = d_swiftInfo['swift'][self.arg['swift']]['port']
self.arg['str_swiftLogin'] = d_swiftInfo['swift'][self.arg['swift']]['login']

storageType = d_swiftInfo['storage'][self.arg['swift']]['storagetype']
if storageType == "swift":
self.arg['str_swiftIP'] = d_swiftInfo['storage'][self.arg['swift']]['ip']
self.arg['str_swiftPort'] = d_swiftInfo['storage'][self.arg['swift']]['port']
self.arg['str_swiftLogin'] = d_swiftInfo['storage'][self.arg['swift']]['login']
elif storageType == "fs":
self.arg['str_storeBaseLocation'] = d_swiftInfo['storage'][self.arg['swift']]['storepath']
return d_swiftInfo

def __init__(self, arg):
Expand Down Expand Up @@ -329,8 +333,10 @@ def path_pushToSwift(self):
'mapLocationOver' : self.arg['str_xcrdir']
}
}

store = swiftStorage(self.arg)
if self.arg['str_storeBaseLocation']:
store = fileStorage(self.arg)
else:
store = swiftStorage(self.arg)
d_storeDo = store.run(d_do)

# Record in the smdb an entry for each series
Expand All @@ -344,11 +350,10 @@ def path_pushToSwift(self):
self.smdb.seriesData('push', 'timestamp', now.strftime("%Y-%m-%d, %H:%M:%S"))
if len(self.arg['swift']):
self.smdb.seriesData('push', 'swift',
self.smdb.service_keyAccess('swift')['swift'][self.arg['swift']])
self.smdb.service_keyAccess('storage')['storage'][self.arg['swift']])
return d_storeDo

def run(self, opt={}) -> dict:

d_push : dict = {}

if self.pushToSwift_true():
Expand Down
Loading

0 comments on commit 33d0e97

Please sign in to comment.