Skip to content

Commit

Permalink
px-push for swiftless CUBE
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Jul 17, 2023
1 parent d479159 commit 41a083b
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 9 deletions.
281 changes: 280 additions & 1 deletion pypx/pfstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ def __init__(self, arg, *args, **kwargs):
"""

proto = 'https' if arg['str_swiftPort'] == '443' else 'http'
if arg['str_swiftBaseLocation']:
cont_name = arg['str_swiftBaseLocation']
else:
cont_name = "users"
self.state_create(
{
"swift": {
"auth_url": "%s://%s:%s/auth/v1.0" % \
(proto, arg['str_swiftIP'], arg['str_swiftPort']),
"username": arg['str_swiftLogin'],
"key": "testing",
"container_name": "users",
"container_name": cont_name,
"auto_create_container": True,
"file_storage": "swift.storage.SwiftStorage"
}
Expand Down Expand Up @@ -783,4 +787,279 @@ def run(self, opt={}) -> dict:
self.log(str_msg, comms = 'error')
self.log(json.dumps(d_actionResult, indent = 4), comms = 'tx')

return d_actionResult


class fileStorage(PfStorage):
def __init__(self, arg, *args, **kwargs):
"""
Core initialization and logic in the base class
"""
# Check if an upstream object exists, and if so
# merge those args with the current namespace:
if 'upstream' in arg.keys():
d_argCopy = arg.copy()
# "merge" these 'arg's with upstream.
arg.update(arg['reportData']['args'])
# Since this might overwrite some args specific to this
# app, we update again to the copy.
arg.update(d_argCopy)

PfStorage.__init__(self, arg, *args, **kwargs)

def objPull(self, *args, **kwargs):
pass

def objPut_process(self, *args, **kwargs) -> dict:
"""
Process the 'objPut' directive.
DICOM handling
--------------
A special behaviour is available for DICOM files, triggered by passing
a kwarg of 'DICOMsubstr = <X>'. In this case, DICOM files (as identi-
fied by containing the substring pattern within the filename) will be
read for tag information used to generate the fully qualified storage
path.
This fully qualified storage path will be substituted into the
'toLocation = <someswiftpath>' by replacing the special tag
'%pack' in the <someswiftpath>.
NOTE:
* Typically a list of files all constitute the same DICOM SERIES
and as such, only one of file in the list needs to be processed for
packing tags.
* If the 'do' 'objPut' directive contains a true value for the field
'packEachDICOM', then each DICOM will be explicitly examined and
packed individually.
"""

def toLocation_updateWithDICOMtags(str_DICOMfilename) -> dict:
"""
Read the str_DICOMfilename, determine the pack path,
and update the 'toLocation' if necessary.
Return the original and modified 'toLocation' and status flag.
"""
b_pack = False
d_DICOMread = self.packer.DICOMfile_read(file=str_DICOMfilename)
d_path = self.packer.packPath_resolve(d_DICOMread)
self.obj[str_DICOMfilename] = d_DICOMread
str_origTo = d_args['toLocation']
if '%pack' in d_args['toLocation']:
b_pack = True
d_args['toLocation'] = \
d_args['toLocation'].replace('%pack', d_path['packDir'])
return {
'pack': b_pack,
'originalLocation': str_origTo,
'path': d_path,
'toLocation': d_args['toLocation']
}

def files_putSingly() -> dict:
"""
Handle a single file put, return and update d_ret
"""
nonlocal d_ret
nonlocal b_singleShot
d_pack: dict = {}
b_singleShot = True
d_ret = {
'status': False,
'localFileList': [],
'objectFileList': []
}
self.obj = {}
# pudb.set_trace()
for f in d_fileList['l_fileFS']:
d_pack = toLocation_updateWithDICOMtags(f)
d_args['file'] = f
d_args['remoteFile'] = d_pack['path']['imageFile']
d_put = self.objPut(**d_args)
d_ret[f] = d_put
d_args['toLocation'] = d_pack['originalLocation']
d_ret['status'] = d_put['status']
if d_ret['status']:
d_ret['localFileList'].append(d_put['localFileList'][0])
d_ret['objectFileList'].append(d_put['objectFileList'][0])
else:
break
return d_ret

d_ret: dict = {
'status': False,
'msg': "No 'arg' JSON directive found in request"
}

d_msg: dict = {}
d_args: dict = {}
str_localPath: str = ""
str_DICOMsubstr: str = ""
b_singleShot: bool = False

for k, v in kwargs.items():
if k == 'request': d_msg = v
# pudb.set_trace()
if 'args' in d_msg:
d_args = d_msg['args']
if 'localpath' in d_args:
str_localPath = d_args['localpath']
if 'DICOMsubstr' in d_args:
d_fileList = self.filesFind(
root=str_localPath,
fileSubStr=d_args['DICOMsubstr']
)
if 'packEachDICOM' in d_args:
if d_args['packEachDICOM']: files_putSingly()
if len(d_fileList['l_fileFS']) and not b_singleShot:
toLocation_updateWithDICOMtags(d_fileList['l_fileFS'][0])
else:
d_fileList = self.filesFind(
root=str_localPath
)
if d_fileList['status'] and not b_singleShot:
d_args['fileList'] = d_fileList['l_fileFS']
d_ret = self.objPut(**d_args)
elif not d_fileList['status']:
d_ret['msg'] = 'No valid file list generated'
return d_ret

def objPut(self, *args, **kwargs) -> dict:
"""
Put an object (or list of objects) into swift storage.
This method also "maps" tree locations in the local storage
to new locations in the object storage. For example, assume
a list of local locations starting with:
/home/user/project/data/ ...
and we want to pack everything in the 'data' dir to
object storage, at location '/storage'. In this case, the
pattern of kwargs specifying this would be:
fileList = ['/home/user/project/data/file1',
'/home/user/project/data/dir1/file_d1',
'/home/user/project/data/dir2/file_d2'],
toLocation = '/storage',
mapLocationOver = '/home/user/project/data'
will replace, for each file in <fileList>, the <mapLocationOver> with
<inLocation>, resulting in a new list
'/storage/file1',
'/storage/dir1/file_d1',
'/storage/dir2/file_d2'
"""
b_status: bool = True
l_localfile: list = [] # Name on the local file system
l_remotefileName: list = [] # A replacement for the remote filename
l_objectfile: list = [] # Name in the object storage
str_swiftLocation: str = ''
str_mapLocationOver: str = ''
str_localfilename: str = ''
str_storagefilename: str = ''
str_swiftLocation: str = ""
str_remoteFile: str = ""
dst: str = ""
d_ret: dict = {
'status': b_status,
'localFileList': [],
'objectFileList': [],
'localpath': ''
}

for k, v in kwargs.items():
if k == 'file': l_localfile.append(v)
if k == 'remoteFile': l_remotefileName.append(v)
if k == 'remoteFileList': l_remotefileName = v
if k == 'fileList': l_localfile = v
if k == 'toLocation': str_swiftLocation = v
if k == 'mapLocationOver': str_mapLocationOver = v

if len(str_mapLocationOver):
# replace the local file path with object store path
l_objectfile = [w.replace(str_mapLocationOver, str_swiftLocation) \
for w in l_localfile]
else:
# Prepend the swiftlocation to each element in the localfile list:
l_objectfile = [str_swiftLocation + '{0}'.format(i) for i in l_localfile]

# Check and possibly change the actual file *names* to put into swift storage
# (the default is to use the same name as the local file -- however in the
# case of DICOM files, the actual final file name might also change)
if len(l_remotefileName):
l_objectfile = [l.replace(os.path.basename(l), f) for l, f in
zip(l_objectfile, l_remotefileName)]

d_ret['localpath'] = os.path.dirname(l_localfile[0])
d_conn = self.state('/swift/container_name')
if d_conn:
for str_localfilename, str_storagefilename in zip(l_localfile, l_objectfile):
try:
d_ret['status'] = True and d_ret['status']
dst = Path(d_conn)/str_storagefilename
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(str_localfilename, str(dst))
except Exception as e:
d_ret['error'] = '%s' % e
d_ret['status'] = False
d_ret['localFileList'].append(str_localfilename)
d_ret['objectFileList'].append(str(dst))
return d_ret

def connect(self, *args, **kwargs):
pass

def ls(self, *args, **kwargs):
pass

def ls_process(self, *args, **kwargs):
pass

def objExists(self, *args, **kwargs):
pass

def run(self, opt={}) -> dict:
"""
Perform the storage operation
"""
d_actionResult : dict = {
'status' : False,
'msg' : ''
}
try:
# First see if the "do" directive is a CLI
# flag captured in the self.arg structure
d_do : dict = json.loads(self.arg['do'])
except:
# Else, assume that the d_do is the passed opt
d_do = opt
if 'action' in d_do:
self.log("verb: %s detected." % d_do['action'],
comms = 'status')
str_method = '%s_process' % d_do['action']
self.log("method to call: %s(request = d_msg) " % str_method,
comms = 'status')
try:
# pudb.set_trace()
method = getattr(self, str_method)
d_actionResult = method(request = d_do)
except Exception as ex:
str_msg = "Class '{}' does not implement method '{}':{}".format(
self.__class__.__name__,
str_method,
str(ex))
d_actionResult = {
'status': False,
'msg': str_msg
}
self.log(str_msg, comms = 'error')
self.log(json.dumps(d_actionResult, indent = 4), comms = 'tx')

return d_actionResult
20 changes: 12 additions & 8 deletions pypx/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sys
from datetime import date, datetime

from .pfstorage import swiftStorage
from .pfstorage import swiftStorage, fileStorage

# PYPX modules
import pypx.smdb
Expand Down Expand Up @@ -246,10 +246,13 @@ def serviceKey_process(self) -> dict:
if len(self.arg['swift']):
d_swiftInfo = self.smdb.service_keyAccess('swift')
if d_swiftInfo['status']:
self.arg['str_swiftIP'] = d_swiftInfo['swift'][self.arg['swift']]['ip']
self.arg['str_swiftPort'] = d_swiftInfo['swift'][self.arg['swift']]['port']
self.arg['str_swiftLogin'] = d_swiftInfo['swift'][self.arg['swift']]['login']

storageType = d_swiftInfo['swift'][self.arg['swift']]['storagetype']
if storageType == "swift":
self.arg['str_swiftIP'] = d_swiftInfo['swift'][self.arg['swift']]['ip']
self.arg['str_swiftPort'] = d_swiftInfo['swift'][self.arg['swift']]['port']
self.arg['str_swiftLogin'] = d_swiftInfo['swift'][self.arg['swift']]['login']
elif storageType == "fs":
self.arg['str_swiftBaseLocation'] = d_swiftInfo['swift'][self.arg['swift']]['storepath']
return d_swiftInfo

def __init__(self, arg):
Expand Down Expand Up @@ -329,8 +332,10 @@ def path_pushToSwift(self):
'mapLocationOver' : self.arg['str_xcrdir']
}
}

store = swiftStorage(self.arg)
if self.arg['str_swiftBaseLocation']:
store = fileStorage(self.arg)
else:
store = swiftStorage(self.arg)
d_storeDo = store.run(d_do)

# Record in the smdb an entry for each series
Expand All @@ -348,7 +353,6 @@ def path_pushToSwift(self):
return d_storeDo

def run(self, opt={}) -> dict:

d_push : dict = {}

if self.pushToSwift_true():
Expand Down

0 comments on commit 41a083b

Please sign in to comment.