From 41a083beb178d8bad58417ab9ab3573bb38efa93 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Mon, 17 Jul 2023 09:50:48 -0400 Subject: [PATCH] px-push for swiftless CUBE --- pypx/pfstorage.py | 281 +++++++++++++++++++++++++++++++++++++++++++++- pypx/push.py | 20 ++-- 2 files changed, 292 insertions(+), 9 deletions(-) diff --git a/pypx/pfstorage.py b/pypx/pfstorage.py index c5ad6f7..980d009 100755 --- a/pypx/pfstorage.py +++ b/pypx/pfstorage.py @@ -58,6 +58,10 @@ def __init__(self, arg, *args, **kwargs): """ proto = 'https' if arg['str_swiftPort'] == '443' else 'http' + if arg['str_swiftBaseLocation']: + cont_name = arg['str_swiftBaseLocation'] + else: + cont_name = "users" self.state_create( { "swift": { @@ -65,7 +69,7 @@ def __init__(self, arg, *args, **kwargs): (proto, arg['str_swiftIP'], arg['str_swiftPort']), "username": arg['str_swiftLogin'], "key": "testing", - "container_name": "users", + "container_name": cont_name, "auto_create_container": True, "file_storage": "swift.storage.SwiftStorage" } @@ -783,4 +787,279 @@ def run(self, opt={}) -> dict: self.log(str_msg, comms = 'error') self.log(json.dumps(d_actionResult, indent = 4), comms = 'tx') + return d_actionResult + + +class fileStorage(PfStorage): + def __init__(self, arg, *args, **kwargs): + """ + Core initialization and logic in the base class + """ + # Check if an upstream object exists, and if so + # merge those args with the current namespace: + if 'upstream' in arg.keys(): + d_argCopy = arg.copy() + # "merge" these 'arg's with upstream. + arg.update(arg['reportData']['args']) + # Since this might overwrite some args specific to this + # app, we update again to the copy. + arg.update(d_argCopy) + + PfStorage.__init__(self, arg, *args, **kwargs) + + def objPull(self, *args, **kwargs): + pass + + def objPut_process(self, *args, **kwargs) -> dict: + """ + Process the 'objPut' directive. + + DICOM handling + -------------- + + A special behaviour is available for DICOM files, triggered by passing + a kwarg of 'DICOMsubstr = '. In this case, DICOM files (as identi- + fied by containing the substring pattern within the filename) will be + read for tag information used to generate the fully qualified storage + path. + + This fully qualified storage path will be substituted into the + 'toLocation = ' by replacing the special tag + '%pack' in the . + + NOTE: + * Typically a list of files all constitute the same DICOM SERIES + and as such, only one of file in the list needs to be processed for + packing tags. + * If the 'do' 'objPut' directive contains a true value for the field + 'packEachDICOM', then each DICOM will be explicitly examined and + packed individually. + + """ + + def toLocation_updateWithDICOMtags(str_DICOMfilename) -> dict: + """ + Read the str_DICOMfilename, determine the pack path, + and update the 'toLocation' if necessary. + + Return the original and modified 'toLocation' and status flag. + """ + b_pack = False + d_DICOMread = self.packer.DICOMfile_read(file=str_DICOMfilename) + d_path = self.packer.packPath_resolve(d_DICOMread) + self.obj[str_DICOMfilename] = d_DICOMread + str_origTo = d_args['toLocation'] + if '%pack' in d_args['toLocation']: + b_pack = True + d_args['toLocation'] = \ + d_args['toLocation'].replace('%pack', d_path['packDir']) + return { + 'pack': b_pack, + 'originalLocation': str_origTo, + 'path': d_path, + 'toLocation': d_args['toLocation'] + } + + def files_putSingly() -> dict: + """ + Handle a single file put, return and update d_ret + """ + nonlocal d_ret + nonlocal b_singleShot + d_pack: dict = {} + b_singleShot = True + d_ret = { + 'status': False, + 'localFileList': [], + 'objectFileList': [] + } + self.obj = {} + # pudb.set_trace() + for f in d_fileList['l_fileFS']: + d_pack = toLocation_updateWithDICOMtags(f) + d_args['file'] = f + d_args['remoteFile'] = d_pack['path']['imageFile'] + d_put = self.objPut(**d_args) + d_ret[f] = d_put + d_args['toLocation'] = d_pack['originalLocation'] + d_ret['status'] = d_put['status'] + if d_ret['status']: + d_ret['localFileList'].append(d_put['localFileList'][0]) + d_ret['objectFileList'].append(d_put['objectFileList'][0]) + else: + break + return d_ret + + d_ret: dict = { + 'status': False, + 'msg': "No 'arg' JSON directive found in request" + } + + d_msg: dict = {} + d_args: dict = {} + str_localPath: str = "" + str_DICOMsubstr: str = "" + b_singleShot: bool = False + + for k, v in kwargs.items(): + if k == 'request': d_msg = v + # pudb.set_trace() + if 'args' in d_msg: + d_args = d_msg['args'] + if 'localpath' in d_args: + str_localPath = d_args['localpath'] + if 'DICOMsubstr' in d_args: + d_fileList = self.filesFind( + root=str_localPath, + fileSubStr=d_args['DICOMsubstr'] + ) + if 'packEachDICOM' in d_args: + if d_args['packEachDICOM']: files_putSingly() + if len(d_fileList['l_fileFS']) and not b_singleShot: + toLocation_updateWithDICOMtags(d_fileList['l_fileFS'][0]) + else: + d_fileList = self.filesFind( + root=str_localPath + ) + if d_fileList['status'] and not b_singleShot: + d_args['fileList'] = d_fileList['l_fileFS'] + d_ret = self.objPut(**d_args) + elif not d_fileList['status']: + d_ret['msg'] = 'No valid file list generated' + return d_ret + + def objPut(self, *args, **kwargs) -> dict: + """ + Put an object (or list of objects) into swift storage. + + This method also "maps" tree locations in the local storage + to new locations in the object storage. For example, assume + a list of local locations starting with: + + /home/user/project/data/ ... + + and we want to pack everything in the 'data' dir to + object storage, at location '/storage'. In this case, the + pattern of kwargs specifying this would be: + + fileList = ['/home/user/project/data/file1', + '/home/user/project/data/dir1/file_d1', + '/home/user/project/data/dir2/file_d2'], + toLocation = '/storage', + mapLocationOver = '/home/user/project/data' + + will replace, for each file in , the with + , resulting in a new list + + '/storage/file1', + '/storage/dir1/file_d1', + '/storage/dir2/file_d2' + + """ + b_status: bool = True + l_localfile: list = [] # Name on the local file system + l_remotefileName: list = [] # A replacement for the remote filename + l_objectfile: list = [] # Name in the object storage + str_swiftLocation: str = '' + str_mapLocationOver: str = '' + str_localfilename: str = '' + str_storagefilename: str = '' + str_swiftLocation: str = "" + str_remoteFile: str = "" + dst: str = "" + d_ret: dict = { + 'status': b_status, + 'localFileList': [], + 'objectFileList': [], + 'localpath': '' + } + + for k, v in kwargs.items(): + if k == 'file': l_localfile.append(v) + if k == 'remoteFile': l_remotefileName.append(v) + if k == 'remoteFileList': l_remotefileName = v + if k == 'fileList': l_localfile = v + if k == 'toLocation': str_swiftLocation = v + if k == 'mapLocationOver': str_mapLocationOver = v + + if len(str_mapLocationOver): + # replace the local file path with object store path + l_objectfile = [w.replace(str_mapLocationOver, str_swiftLocation) \ + for w in l_localfile] + else: + # Prepend the swiftlocation to each element in the localfile list: + l_objectfile = [str_swiftLocation + '{0}'.format(i) for i in l_localfile] + + # Check and possibly change the actual file *names* to put into swift storage + # (the default is to use the same name as the local file -- however in the + # case of DICOM files, the actual final file name might also change) + if len(l_remotefileName): + l_objectfile = [l.replace(os.path.basename(l), f) for l, f in + zip(l_objectfile, l_remotefileName)] + + d_ret['localpath'] = os.path.dirname(l_localfile[0]) + d_conn = self.state('/swift/container_name') + if d_conn: + for str_localfilename, str_storagefilename in zip(l_localfile, l_objectfile): + try: + d_ret['status'] = True and d_ret['status'] + dst = Path(d_conn)/str_storagefilename + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copyfile(str_localfilename, str(dst)) + except Exception as e: + d_ret['error'] = '%s' % e + d_ret['status'] = False + d_ret['localFileList'].append(str_localfilename) + d_ret['objectFileList'].append(str(dst)) + return d_ret + + def connect(self, *args, **kwargs): + pass + + def ls(self, *args, **kwargs): + pass + + def ls_process(self, *args, **kwargs): + pass + + def objExists(self, *args, **kwargs): + pass + + def run(self, opt={}) -> dict: + """ + Perform the storage operation + """ + d_actionResult : dict = { + 'status' : False, + 'msg' : '' + } + try: + # First see if the "do" directive is a CLI + # flag captured in the self.arg structure + d_do : dict = json.loads(self.arg['do']) + except: + # Else, assume that the d_do is the passed opt + d_do = opt + if 'action' in d_do: + self.log("verb: %s detected." % d_do['action'], + comms = 'status') + str_method = '%s_process' % d_do['action'] + self.log("method to call: %s(request = d_msg) " % str_method, + comms = 'status') + try: + # pudb.set_trace() + method = getattr(self, str_method) + d_actionResult = method(request = d_do) + except Exception as ex: + str_msg = "Class '{}' does not implement method '{}':{}".format( + self.__class__.__name__, + str_method, + str(ex)) + d_actionResult = { + 'status': False, + 'msg': str_msg + } + self.log(str_msg, comms = 'error') + self.log(json.dumps(d_actionResult, indent = 4), comms = 'tx') + return d_actionResult \ No newline at end of file diff --git a/pypx/push.py b/pypx/push.py index 42fcae9..7a882e5 100644 --- a/pypx/push.py +++ b/pypx/push.py @@ -11,7 +11,7 @@ import sys from datetime import date, datetime -from .pfstorage import swiftStorage +from .pfstorage import swiftStorage, fileStorage # PYPX modules import pypx.smdb @@ -246,10 +246,13 @@ def serviceKey_process(self) -> dict: if len(self.arg['swift']): d_swiftInfo = self.smdb.service_keyAccess('swift') if d_swiftInfo['status']: - self.arg['str_swiftIP'] = d_swiftInfo['swift'][self.arg['swift']]['ip'] - self.arg['str_swiftPort'] = d_swiftInfo['swift'][self.arg['swift']]['port'] - self.arg['str_swiftLogin'] = d_swiftInfo['swift'][self.arg['swift']]['login'] - + storageType = d_swiftInfo['swift'][self.arg['swift']]['storagetype'] + if storageType == "swift": + self.arg['str_swiftIP'] = d_swiftInfo['swift'][self.arg['swift']]['ip'] + self.arg['str_swiftPort'] = d_swiftInfo['swift'][self.arg['swift']]['port'] + self.arg['str_swiftLogin'] = d_swiftInfo['swift'][self.arg['swift']]['login'] + elif storageType == "fs": + self.arg['str_swiftBaseLocation'] = d_swiftInfo['swift'][self.arg['swift']]['storepath'] return d_swiftInfo def __init__(self, arg): @@ -329,8 +332,10 @@ def path_pushToSwift(self): 'mapLocationOver' : self.arg['str_xcrdir'] } } - - store = swiftStorage(self.arg) + if self.arg['str_swiftBaseLocation']: + store = fileStorage(self.arg) + else: + store = swiftStorage(self.arg) d_storeDo = store.run(d_do) # Record in the smdb an entry for each series @@ -348,7 +353,6 @@ def path_pushToSwift(self): return d_storeDo def run(self, opt={}) -> dict: - d_push : dict = {} if self.pushToSwift_true():