From d11ce65221a1c53f623b28321bfe64742b795a63 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Thu, 14 Nov 2024 16:54:00 +0000 Subject: [PATCH] Optimisations to removed RETRIEVAL_DICT as it was making the messages too large --- nlds/rabbit/state.py | 11 +- nlds_processors/archiver/archive_get.py | 107 ++++--- nlds_processors/archiver/archive_put.py | 1 - .../archiver/s3_to_tarfile_tape.py | 33 ++- nlds_processors/catalog/catalog.py | 12 +- nlds_processors/catalog/catalog_worker.py | 266 ++++++------------ nlds_processors/transferers/get_transfer.py | 25 +- ...ject_status.py => reset_storage_status.py} | 92 ++++-- nlds_utils/reset_tape_status.py | 92 ------ nlds_utils/view_holding_full.py | 2 +- 10 files changed, 251 insertions(+), 390 deletions(-) rename nlds_utils/{reset_object_status.py => reset_storage_status.py} (54%) delete mode 100755 nlds_utils/reset_tape_status.py diff --git a/nlds/rabbit/state.py b/nlds/rabbit/state.py index f9be432..2982294 100644 --- a/nlds/rabbit/state.py +++ b/nlds/rabbit/state.py @@ -56,11 +56,12 @@ def has_name(cls, name): @classmethod def get_final_states(cls): final_states = ( - cls.TRANSFER_GETTING, - cls.CATALOG_UPDATING, - cls.CATALOG_ARCHIVE_UPDATING, - cls.CATALOG_DELETING, - cls.CATALOG_REMOVING, + # Make final states explicit + # cls.TRANSFER_GETTING, + # cls.CATALOG_UPDATING, + # cls.CATALOG_ARCHIVE_UPDATING, + # cls.CATALOG_DELETING, + # cls.CATALOG_REMOVING, cls.FAILED, cls.COMPLETE, ) diff --git a/nlds_processors/archiver/archive_get.py b/nlds_processors/archiver/archive_get.py index ba20ea0..5fe3354 100644 --- a/nlds_processors/archiver/archive_get.py +++ b/nlds_processors/archiver/archive_get.py @@ -14,7 +14,6 @@ from copy import copy from minio.error import S3Error from retry import retry -from enum import Enum from nlds_processors.archiver.archive_base import BaseArchiveConsumer @@ -22,6 +21,7 @@ from nlds.rabbit.consumer import State from nlds.details import PathDetails +from nlds_processors.catalog.catalog_worker import build_retrieval_dict import nlds.rabbit.routing_keys as RK import nlds.rabbit.message_keys as MSG @@ -60,16 +60,15 @@ def transfer( tape_url=tape_url, ) except S3StreamError as e: + self.log(f"Could not create streamer. Reason: {e}.", RK.LOG_ERROR) # if a S3StreamError occurs then all files have failed for path_details in filelist: path_details.failure_reason = e.message self.failedlist.append(path_details) else: - # For archive_get, the message is structured as a dictionary stored in - # ['data']['retrieval_dict'] and a filelist stored in ['data']['filelist'] - # there is also a filelist in ['data']['retrieval_dict']['filelist'] which - # contains the files to be retrieved each individual tarfile / aggregate: - retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT] + # For archive_get, we build a retrieval dictionary from the filelist, + # which contains a tarfile as a key, then a filelist as items per key + retrieval_dict = build_retrieval_dict(filelist) # looping over the aggregates for tarfile, item in retrieval_dict.items(): # get the holding id and build the holding_prefix @@ -79,10 +78,6 @@ def transfer( ) # get the list of files to retrieve from the tarfile / aggregate aggregate_filelist = item[MSG.FILELIST] - # convert to PathDetails object - aggregate_filelist = [ - PathDetails.from_dict(ag) for ag in aggregate_filelist - ] # empty streamer.filelist for new aggregate streamer.filelist.clear() try: @@ -108,6 +103,10 @@ def transfer( except S3StreamError as e: # if a S3StreamError occurs then all files in the aggregate have # failed + self.log( + f"Error when streaming file {tarfile}. Reason: {e.message}", + RK.LOG_ERROR + ) for path_details in aggregate_filelist: path_details.failure_reason = e.message self.append_and_send( @@ -140,6 +139,7 @@ def transfer( state=State.FAILED, ) + @retry(S3Error, tries=5, delay=1, logger=None) def prepare( self, transaction_id: str, @@ -179,34 +179,28 @@ def prepare( ) except S3StreamError as e: # if a S3StreamError occurs then all files have failed + self.log(f"Could not create streamer. Reason: {e}.", RK.LOG_ERROR) for path_details in filelist: path_details.failure_reason = e.message self.failedlist.append(path_details) else: # For archive_prepare, the message is structured as a dictionary stored in - # ['data']['retrieval_dict'] and a filelist stored in ['data']['filelist'] - retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT] - complete_dict = {} - prepare_dict = {} + retrieval_dict = build_retrieval_dict(filelist) for tarfile, item in retrieval_dict.items(): # get the list of files to retrieve from the tarfile / aggregate # this will be used for the completelist, the prepare_check list # or the failedlist. Convert to PathDetails object - aggregate_filelist = [ - PathDetails.from_dict(ag) for ag in item[MSG.FILELIST] - ] + aggregate_filelist = item[MSG.FILELIST] try: # check for prepare on this tarfile if streamer.prepare_required(tarfile): self.preparelist.extend(aggregate_filelist) - prepare_dict[tarfile] = retrieval_dict[tarfile] else: - # no prepare needed, so add all the aggregate files to the - # complete list, and the tarfile part of the dictionary to the - # complete dictionary self.completelist.extend(aggregate_filelist) - complete_dict[tarfile] = retrieval_dict[tarfile] except S3StreamError as e: + self.log( + f"Error preparing file {tarfile}. Reason: {e}.", RK.LOG_ERROR + ) for path_details in aggregate_filelist: path_details.failure_reason = e.message self.failedlist.append(path_details) @@ -217,48 +211,43 @@ def prepare( "transfer.", RK.LOG_INFO, ) - # remap the retrieval dictionary for the tarfiles that don't need staging - # need to copy the JSON as it will also be used below in preparelist - body_json_complete = copy(body_json) - body_json_complete[MSG.DATA][MSG.RETRIEVAL_DICT] = complete_dict self.send_pathlist( self.completelist, rk_complete, - body_json_complete, + body_json, state=State.ARCHIVE_PREPARING, ) if len(self.preparelist) > 0: - # In this codepath we have a list of tarfiles we need to prepare in the + # In this codepath we have a list of tarfiles we need to prepare in the # prepare_dict keys try: + prepare_dict = build_retrieval_dict(self.preparelist) agg_prepare_list = list(prepare_dict.keys()) prepare_id = streamer.prepare_request(agg_prepare_list) except S3StreamError as e: # fail all in the prepare dict if the prepare_id failed + self.log(f"Error preparing request. Reason: {e}.", RK.LOG_ERROR) for tarfile, item in prepare_dict.items(): - aggregate_filelist = [ - PathDetails.from_dict(ag) for ag in item[MSG.FILELIST] - ] + aggregate_filelist = item[MSG.FILELIST] for path_details in aggregate_filelist: path_details.failure_reason = e.message self.failedlist.append(path_details) - self.log( - "Archive prepare required, passing lists back to archive_get for " - "checking prepare is complete.", - RK.LOG_INFO, - ) - # remap the retrieval dictionary for the tarfiles that need staging - body_json_check = copy(body_json) - body_json_check[MSG.DATA][MSG.RETRIEVAL_DICT] = prepare_dict - # put the prepare_id in the dictionary - body_json_check[MSG.DATA][MSG.PREPARE_ID] = str(prepare_id) - self.send_pathlist( - self.preparelist, - routing_key=rk_check, - body_json=body_json_check, - state=State.ARCHIVE_PREPARING, - ) + else: + self.log( + "Archive prepare required, passing lists back to archive_get for " + "checking prepare is complete.", + RK.LOG_INFO, + ) + # put the prepare_id in the dictionary + body_json_check = copy(body_json) + body_json_check[MSG.DATA][MSG.PREPARE_ID] = str(prepare_id) + self.send_pathlist( + self.preparelist, + routing_key=rk_check, + body_json=body_json_check, + state=State.ARCHIVE_PREPARING, + ) if len(self.failedlist) > 0: self.send_pathlist( @@ -268,7 +257,7 @@ def prepare( state=State.FAILED, ) - + @retry(S3Error, tries=5, delay=1, logger=None) def prepare_check( self, transaction_id: str, @@ -282,12 +271,12 @@ def prepare_check( ) -> None: """Use the streamer object to check whether the prepared files have completed staging. - 1. Those that have not completed staging will be passed back to the message + 1. Those that have not completed staging will be passed back to the message queue with ARCHIVE_GET.PREPARE_CHECK as the routing key. - They will be checked again for completed staging when this message is + They will be checked again for completed staging when this message is processed subsequently in this function. 2. Those that have completed will be passed to the message queue with - ARCHIVE_GET.START and will be subsequently processed by the `transfer` + ARCHIVE_GET.START and will be subsequently processed by the `transfer` function above. """ # Make the routing keys @@ -310,28 +299,31 @@ def prepare_check( ) except S3StreamError as e: # if a S3StreamError occurs then all files have failed + self.log(f"Could not create streamer. Reason: {e}.", RK.LOG_ERROR) for path_details in filelist: path_details.failure_reason = e.message self.failedlist.append(path_details) else: - retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT] + retrieval_dict = build_retrieval_dict(filelist) prepare_id = body_json[MSG.DATA][MSG.PREPARE_ID] # need to convert the retrieval_dict keys to a list of tarfiles tarfile_list = list(retrieval_dict.keys()) try: complete = streamer.prepare_complete(prepare_id, tarfile_list) except S3StreamError as e: + self.log( + f"Could not check prepare id: {prepare_id}. Reason: {e}.", + RK.LOG_ERROR + ) # fail all in the prepare dict if the prepare_id failed for tarfile, item in retrieval_dict: - aggregate_filelist = [ - PathDetails.from_dict(ag) for ag in item[MSG.FILELIST] - ] + aggregate_filelist = item[MSG.FILELIST] for path_details in aggregate_filelist: path_details.failure_reason = e.message self.failedlist.append(path_details) - # only three outcomes here - either all the tarfiles (and, by extension, all - # files) are complete, are not complete, or everything failed + # only three outcomes here - 1. either all the tarfiles (and, by extension, all + # files) are complete, 2. are not complete, or 3. everything failed if len(self.failedlist) > 0: self.send_pathlist( self.failedlist, @@ -365,7 +357,6 @@ def prepare_check( state=State.ARCHIVE_PREPARING, ) - # def transfer_old( # self, # transaction_id: str, diff --git a/nlds_processors/archiver/archive_put.py b/nlds_processors/archiver/archive_put.py index 5774dcc..fead079 100644 --- a/nlds_processors/archiver/archive_put.py +++ b/nlds_processors/archiver/archive_put.py @@ -68,7 +68,6 @@ def transfer( # NOTE: For the purposes of tape reading and writing, the holding prefix # has 'nlds.' prepended holding_prefix = self.get_holding_prefix(body_json) - try: self.completelist, self.failedlist, tarfile, checksum = streamer.put( holding_prefix, filelist, self.chunk_size diff --git a/nlds_processors/archiver/s3_to_tarfile_tape.py b/nlds_processors/archiver/s3_to_tarfile_tape.py index 03ae673..8a955dd 100644 --- a/nlds_processors/archiver/s3_to_tarfile_tape.py +++ b/nlds_processors/archiver/s3_to_tarfile_tape.py @@ -169,19 +169,17 @@ def get( """Stream from a tarfile on tape to Object Store""" if self.filelist != []: raise ValueError(f"self.filelist is not Empty: {self.filelist[0]}") + self.filelist = filelist self.holding_prefix = holding_prefix try: - tarfile_tapepath = ( - f"root://{self.tape_server_url}/{tarfile}" - ) # open the tar file to read from with XRDClient.File() as file: # open on the XRD system - status, _ = file.open(tarfile_tapepath, OpenFlags.READ) + status, _ = file.open(tarfile, OpenFlags.READ) if not status.ok: raise S3StreamError( - f"Could not open tarfile on XRootD: {tarfile_tapepath}. " + f"Could not open tarfile on XRootD: {tarfile}. " f"Reason: {status}" ) file_object = Adler32XRDFile(file, debug_fl=True) @@ -202,10 +200,20 @@ def get( return completelist, failedlist + def __relative_tarfile(tarfile): + # tarfile has the full name here, including the root://server/ part of it + # we only need the path on the FileSystem + return("/"+tarfile.split("//")[2]) + + def __relative_tarfile_list(tarfile_list): + # clean up the tarfilelist by removing the root://server/ part of each file name + return [S3ToTarfileTape.__relative_tarfile(t) for t in tarfile_list] + def prepare_required(self, tarfile: str) -> bool: """Query the storage system as to whether a file needs to be prepared (staged).""" + tarfile_path = S3ToTarfileTape.__relative_tarfile(tarfile) # XrootD use the .stat method on the FileSystem client - status, response = self.tape_client.stat(tarfile) + status, response = self.tape_client.stat(tarfile_path) # check status for success if not status.ok: raise S3StreamError( @@ -225,13 +233,13 @@ def prepare_request(self, tarfilelist: List[str]) -> str: if len(tarfilelist) == 0: # trap this as it causes a seg-fault if it is passed to XRD.prepare raise S3StreamError("tarfilelist is empty in prepare_request") - status, response = self.tape_client.prepare( - tarfilelist, PrepareFlags.STAGE - ) + + clean_tarlist = S3ToTarfileTape.__relative_tarfile_list(tarfilelist) + status, response = self.tape_client.prepare(clean_tarlist, PrepareFlags.STAGE) if not status.ok: raise S3StreamError( - f"Could not prepare tarfile list: {tarfilelist}. " - f"Reason: {status.method}" + f"Could not prepare tarfile list: {clean_tarlist}. " + f"Reason: {status.message}" ) else: prepare_id = response.decode()[:-1] @@ -241,7 +249,8 @@ def prepare_complete(self, prepare_id: str, tarfilelist: List[str]) -> bool: """Query the storage system whether the prepare (staging) for a file has been completed.""" # requires query_args to be built with new line separator - query_args = "\n".join([prepare_id, *tarfilelist]) + clean_tarlist = S3ToTarfileTape.__relative_tarfile_list(tarfilelist) + query_args = "\n".join([prepare_id, *clean_tarlist]) status, response = self.tape_client.query(QueryCode.PREPARE, query_args) if not status.ok: raise S3StreamError( diff --git a/nlds_processors/catalog/catalog.py b/nlds_processors/catalog/catalog.py index ea03cde..12db603 100644 --- a/nlds_processors/catalog/catalog.py +++ b/nlds_processors/catalog/catalog.py @@ -381,13 +381,17 @@ def get_files( except (IntegrityError, KeyError, OperationalError): if holding_label: - err_msg = f"File with holding_label:{holding_label} not found " + err_msg = ( + f"File not found in holding with holding_label:{holding_label}" + ) elif holding_id: - err_msg = f"File with holding_id:{holding_id} not found " + err_msg = f"File not found in holding with holding_id:{holding_id}" elif transaction_id: - err_msg = f"File with transaction_id:{transaction_id} not found " + err_msg = ( + f"File not found in holding with transaction_id:{transaction_id}" + ) elif tag: - err_msg = f"File with tag:{tag} not found" + err_msg = f"File not found in holding with tag:{tag}" else: err_msg = f"File with original_path:{original_path} not found " raise CatalogError(err_msg) diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index 018105c..29cd90c 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -41,7 +41,7 @@ from nlds_processors.catalog.catalog import Catalog from nlds_processors.catalog.catalog_error import CatalogError -from nlds_processors.catalog.catalog_models import Storage +from nlds_processors.catalog.catalog_models import Storage, File from nlds.details import PathDetails from nlds_processors.db_mixin import DBError @@ -96,6 +96,26 @@ def format_datetime(date: datetime): return datetime_str +def build_retrieval_dict(filelist: List[PathDetails], fullpath: bool = False): + """Build a retrieval dict from the filelist. The retrieval dict contains a + tarfile name, a holding id, and the list of files to retrieve from the tarfile. + """ + retrieval_dict = {} + for file in filelist: + # get the tape location for File or FileSystem + tarfile = file.tape_name + # if it has not been added before then create a new record + if not tarfile in retrieval_dict: + retrieval_dict[tarfile] = { + "holding_id": file.holding_id, + "filelist": [file], + } + else: + # if it has been added before then append to the filelist + retrieval_dict[tarfile]["filelist"].append(file) + return retrieval_dict + + class CatalogConsumer(RMQC): DEFAULT_QUEUE_NAME = "catalog_q" DEFAULT_ROUTING_KEY = f"{RK.ROOT}.{RK.CATALOG}.{RK.WILD}" @@ -137,7 +157,6 @@ def __init__(self, queue=DEFAULT_QUEUE_NAME): self.catalog = None self.tapelist = [] - self.tapedict = {} @property def database(self): @@ -150,7 +169,6 @@ def reset(self): self.failedlist.clear() # New list for rerouting to tape archive if not found on object store self.tapelist.clear() - self.tapedict.clear() def _parse_filelist(self, body: Dict) -> list[str]: # get the filelist from the data section of the message @@ -174,18 +192,6 @@ def _parse_filelist(self, body: Dict) -> list[str]: return filelist - def _parse_aggregation_dict(self, body: Dict) -> list[str]: - try: - aggregations = body[MSG.DATA][MSG.RETRIEVAL_DICT] - except KeyError as e: - self.log( - f"Invalid message contents, filelist should be in the data " - f"section of the message body.", - RK.LOG_ERROR, - ) - return - return aggregations - def _parse_user_vars(self, body: Dict) -> Tuple: # get the user id from the details section of the message try: @@ -437,6 +443,8 @@ def _catalog_put(self, body: Dict, rk_origin: str) -> None: for f in filelist: # convert to PathDetails class pd = PathDetails.from_dict(f) + # add the holding id to the PathDetails + pd.holding_id = holding.id try: # Search first for file existence within holding, fail if present try: @@ -506,18 +514,21 @@ def _catalog_put(self, body: Dict, rk_origin: str) -> None: warning=tag_warnings, ) - def _catalog_update(self, body: Dict, rk_origin: str) -> None: + def _catalog_update(self, body: Dict, rk_origin: str, create: bool) -> None: """Upon completion of a TRANSFER_PUT, the list of completed files is returned back to the NLDS worker, but with location on Object Storage of the files appended to each PathDetails JSON object. The NLDS worker then passes this message to the Catalog, and this function processes the PathDetails and updates each file's record in the database to - contain the Object Storage location.""" + contain the Object Storage location. + + Upon completion of an ARCHIVE_GET, the list of completed files is returned + back to the NLDS worker. The information for each completed file is enough to amend the OBJECT STORAGE location in the catalog database. + """ # Parse the message body for required variables try: filelist = self._parse_filelist(body) user, group = self._parse_user_vars(body) - transaction_id = self._parse_transaction_id(body, mandatory=True) except CatalogError as e: # functions above handled message logging, here we just return raise e @@ -531,7 +542,7 @@ def _catalog_update(self, body: Dict, rk_origin: str) -> None: pd = PathDetails.from_dict(f) # need to # 1. find the file, - # 2. create the object storage location, + # 2. find or create the object storage location (if create==True), # 3. add the details to the object storage location from the PathDetails # get the file try: @@ -544,32 +555,53 @@ def _catalog_update(self, body: Dict, rk_origin: str) -> None: file = self.catalog.get_files( user, group, - transaction_id=transaction_id, + holding_id=pd.holding_id, original_path=pd.original_path, )[0] - # access time is now, if None + # access time is now if None if pl.access_time is None: access_time = datetime.now() else: - access_time = pl.access_time + access_time = datetime.fromtimestamp(pl.access_time) st = Storage.from_str(pl.storage_type) - # create location - location = self.catalog.create_location( - file, - storage_type=st, - url_scheme=pl.url_scheme, - url_netloc=pl.url_netloc, - root=pl.root, - path=pl.path, - access_time=access_time, - ) - self.completelist.append(pd) + # get the location + location = self.catalog.get_location(file, st) + if location: + # check empty + if location.url_scheme != "" or location.url_netloc != "": + raise CatalogError( + f"{pl.storage_type} for file {pd.original_path} will be" + f" overwritten, the Storage Location should be empty." + ) + # otherwise update if exists and not empty + location.url_scheme = pl.url_scheme + location.url_netloc = pl.url_netloc + location.root = pl.root + location.path = pl.path + location.access_time = access_time + self.completelist.append(pd) + elif create: + # create location + location = self.catalog.create_location( + file, + storage_type=st, + url_scheme=pl.url_scheme, + url_netloc=pl.url_netloc, + root=pl.root, + path=pl.path, + access_time=access_time, + ) + self.completelist.append(pd) + else: + raise CatalogError( + f"{pl.storage_type} for file {pd.original_path} can not be " + "found." + ) except CatalogError as e: # the file wasn't found or the location couldn't be created pd.failure_reason = e.message self.failedlist.append(pd) self.log(e.message, RK.LOG_ERROR) - continue # stop db transitions and commit self.catalog.save() @@ -603,132 +635,6 @@ def _catalog_update(self, body: Dict, rk_origin: str) -> None: state=State.FAILED, ) - def _catalog_update_from_archive_get(self, body: Dict, rk_origin: str) -> None: - """ - Upon completion of an ARCHIVE_GET, the list of completed files is returned - back to the NLDS worker. The completed files are arranged in a dictionary - indexed by the aggregation name. Inside each aggregation is the holding that - the files in that aggregation belong to. This information is used to amend - the OBJECT STORAGE location in the catalog database. - - This process is sufficiently different to _catalog_update to warrant its own - function. - """ - # Parse the message body for required variables - try: - user, group = self._parse_user_vars(body) - aggregation_dict = self._parse_aggregation_dict(body) - except CatalogError as e: - # functions above handled message logging, here we just return - raise e - - self.catalog.start_session() - self.reset() - - # loop over the aggregation dict - for key, agg in aggregation_dict.items(): - # get the holding id - try: - holding_id = agg[MSG.HOLDING_ID] - except KeyError: - raise CatalogError(f"{MSG.HOLDING_ID} not in aggregation dictionary") - # get the filelist - try: - filelist = agg[MSG.FILELIST] - except KeyError: - raise CatalogError(f"{MSG.FILELIST} not in aggregation dictionary") - # get the holding - try: - holding = self.catalog.get_holding(user, group, holding_id=holding_id) - except CatalogError as e: - for f in filelist: - f.failure_reason = e.message - self.failedlist.append(f) - continue # next aggregation - - # loop over the files - for f in filelist: - # convert to PathDetails class - pd = PathDetails.from_dict(f) - # need to - # 1. find the file, - # 2. get the object storage location, - # 3. update the details to the object storage location from the - # PathDetails - pl = pd.get_object_store() # this returns a PathLocation object - # get the file - try: - file = self.catalog.get_files( - user, - group, - holding_id=holding_id, - original_path=pd.original_path, - )[0] - # access time is now - access_time = datetime.now() - - st = Storage.from_str(pl.storage_type) - # get the location - location = self.catalog.get_location(file, st) - if location: - # check empty - if location.url_scheme != "" or location.url_netloc != "": - raise CatalogError( - f"{pl.storage_type} for file {pd.original_path} will be" - f" overwritten, the Storage Location should be empty." - ) - # otherwise update if exists and not empty - location.url_scheme = pl.url_scheme - location.url_netloc = pl.url_netloc - location.root = pl.root - location.path = pl.path - location.access_time = access_time - else: - raise CatalogError( - f"{pl.storage_type} for file {pd.original_path} can not be " - "found." - ) - # add to complete list - self.completelist.append(pd) - except CatalogError as e: - # the file wasn't found or the location couldn't be created - pd.failure_reason = e.message - self.failedlist.append(pd) - self.log(e.message, RK.LOG_ERROR) - continue - - # stop db transitions and commit - self.catalog.save() - self.catalog.end_session() - - # log the successful and non-successful catalog updates - # SUCCESS - if len(self.completelist) > 0: - rk_complete = ".".join([rk_origin, RK.CATALOG_UPDATE, RK.COMPLETE]) - self.log( - f"Sending completed PathList from CATALOG_UPDATE {self.completelist}", - RK.LOG_DEBUG, - ) - self.send_pathlist( - self.completelist, - routing_key=rk_complete, - body_json=body, - state=State.TRANSFER_INIT, - ) - # FAILED - if len(self.failedlist) > 0: - rk_failed = ".".join([rk_origin, RK.CATALOG_UPDATE, RK.FAILED]) - self.log( - f"Sending failed PathList from CATALOG_UPDATE {self.failedlist}", - RK.LOG_DEBUG, - ) - self.send_pathlist( - self.failedlist, - routing_key=rk_failed, - body_json=body, - state=State.FAILED, - ) - def _catalog_get(self, body: Dict, rk_origin: str) -> None: """Get the details for each file in a filelist and send it to the exchange to be processed by the transfer processor. If any file is only @@ -776,9 +682,8 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: ) # There should only be one file as we set one=True in get_files above file = files[0] - # determine the storage location - None, OBJECT_STORAGE and/or TAPE - pd = PathDetails.from_filemodel(file) + pd = self._filemodel_to_path_details(file) if pd.locations.count == 0: # empty storage location denotes that it is still in its initial # transfer to OBJECT STORAGE @@ -805,11 +710,13 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: pl = pd.get_tape() agg = self.catalog.get_aggregation(pl.aggregation_id) tr = self.catalog.get_transaction(file.transaction_id) - # create a mostly empty OBJECT STORAGE location in the database if pl.access_time is None: access_time = datetime.now() else: access_time = datetime.fromtimestamp(pl.access_time) + + # create a mostly empty OBJECT STORAGE location in the database as + # a marker that the file is currently transferring self.catalog.create_location( file_=file, storage_type=Storage.OBJECT_STORAGE, @@ -825,17 +732,6 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: # the database) pd.set_object_store(tenancy=tenancy, bucket=tr.transaction_id) self.tapelist.append(pd) - - # add the file to a dictionary indexed by tarname - if agg.tarname in self.tapedict: - self.tapedict[agg.tarname][MSG.FILELIST].append(pd) - else: - self.tapedict[agg.tarname] = { - # Holding id required for updating holding on return - MSG.HOLDING_ID: tr.holding_id, - MSG.CHECKSUM: agg.checksum, - MSG.FILELIST: [pd], - } else: # this shouldn't occur but we'll trap the error anyway reason = ( @@ -867,12 +763,8 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: ) # NEED RETRIEVING FROM TAPE - if len(self.tapedict) > 0 and len(self.tapelist) > 0: + if len(self.tapelist) > 0: rk_restore = ".".join([rk_origin, RK.ROUTE, RK.ARCHIVE_RESTORE]) - # Include the original files requested in the message body (tapelist) - # so they can be moved to disk after retrieval, as well as the aggregate - # dictionary (tapedict) - body[MSG.DATA][MSG.RETRIEVAL_DICT] = self.tapedict self.log( f"Rerouting PathList from CATALOG_GET to ARCHIVE_GET for archive " f"retrieval ({self.tapelist})", @@ -899,6 +791,12 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: state=State.FAILED, ) + def _filemodel_to_path_details(self, file: File): + pd = PathDetails.from_filemodel(file) + t = self.catalog.get_transaction(id=file.transaction_id) + pd.holding_id = t.holding_id + return pd + def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None: """Get the next holding for archiving, create a new location for it and pass it for aggregating to the Archive Put process.""" @@ -920,7 +818,7 @@ def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None: # loop over the files and modify the database to have a TAPE storage location self.reset() for f in filelist: - pd = PathDetails.from_filemodel(f) + pd = self._filemodel_to_path_details(f) pl = pd.get_object_store() # this returns a PathLocation object # get the access time of the object store to mirror to tape, or set to now # if no access_time present @@ -942,7 +840,7 @@ def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None: aggregation=None, ) # update the pd now with new location - pd = PathDetails.from_filemodel(f) + pd = self._filemodel_to_path_details(f) # add to the completelist ready for sending self.completelist.append(pd) except CatalogError as e: @@ -1434,7 +1332,6 @@ def _catalog_find(self, body: Dict, properties: Header) -> None: ret_dict[h.label][MSG.TRANSACTIONS][t.transaction_id] = t_rec # get the locations locations = [] - pd = PathDetails.from_filemodel(f) for l in f.locations: l_rec = { "storage_type": l.storage_type, @@ -1453,6 +1350,7 @@ def _catalog_find(self, body: Dict, properties: Header) -> None: MSG.USER: f.user, MSG.GROUP: f.group, "permissions": f.file_permissions, + "holding_id": h.id, "locations": locations, } t_rec[MSG.FILELIST].append(f_rec) @@ -1632,7 +1530,7 @@ def callback( elif rk_parts[1] == RK.CATALOG_REMOVE: self._catalog_remove(body, rk_parts[0], Storage.OBJECT_STORAGE) elif rk_parts[1] == RK.CATALOG_UPDATE: - self._catalog_update_from_archive_get(body, rk_parts[0]) + self._catalog_update(body, rk_parts[0], create=False) elif api_method in (RK.PUTLIST, RK.PUT): # split the routing key @@ -1652,7 +1550,7 @@ def callback( elif rk_parts[1] == RK.CATALOG_DEL: self._catalog_del(body, rk_parts[0]) elif rk_parts[1] == RK.CATALOG_UPDATE: - self._catalog_update(body, rk_parts[0]) + self._catalog_update(body, rk_parts[0], create=True) # Archive put requires getting from the catalog elif api_method == RK.ARCHIVE_PUT: diff --git a/nlds_processors/transferers/get_transfer.py b/nlds_processors/transferers/get_transfer.py index ca4deb4..ce54d63 100644 --- a/nlds_processors/transferers/get_transfer.py +++ b/nlds_processors/transferers/get_transfer.py @@ -201,20 +201,19 @@ def _transfer_files( body_json=body_json, state=State.FAILED, ) - continue - - # change ownership and permissions - self._change_permissions(download_path, path_details) + else: + # change ownership and permissions + self._change_permissions(download_path, path_details) - # all finished successfully! - self.log(f"Successfully got {path_details.original_path}", RK.LOG_DEBUG) - self.append_and_send( - self.completelist, - path_details, - routing_key=rk_complete, - body_json=body_json, - state=State.TRANSFER_GETTING, - ) + # all finished successfully! + self.log(f"Successfully got {path_details.original_path}", RK.LOG_DEBUG) + self.append_and_send( + self.completelist, + path_details, + routing_key=rk_complete, + body_json=body_json, + state=State.TRANSFER_GETTING, + ) @retry(S3Error, tries=5, delay=1, logger=None) def transfer( diff --git a/nlds_utils/reset_object_status.py b/nlds_utils/reset_storage_status.py similarity index 54% rename from nlds_utils/reset_object_status.py rename to nlds_utils/reset_storage_status.py index 4d1baf1..ef67cfa 100755 --- a/nlds_utils/reset_object_status.py +++ b/nlds_utils/reset_storage_status.py @@ -18,7 +18,7 @@ import minio from nlds_processors.catalog.catalog import Catalog -from nlds_processors.catalog.catalog_models import Storage +from nlds_processors.catalog.catalog_models import Storage, File, Location from nlds.details import PathDetails import nlds.server_config as CFG @@ -47,6 +47,39 @@ def _connect_to_s3(access_key: str, secret_key: str): return client +def _remove_location_from_file( + file: File, + location: Location, + loc_type: Storage, + force: bool, + delete: bool, + s3_client, + nlds_cat: Catalog, +): + delloc = ( + location.url_scheme == "" and location.url_netloc == "" and location.root == "" + ) or force + if location.storage_type == loc_type: + if delloc: + if delete and s3_client is not None: + pd = PathDetails.from_filemodel(file) + # delete from object storage - + s3_client.remove_object(pd.bucket_name, pd.object_name) + click.echo(f"Deleted object: {pd.get_object_store().url}") + # delete from catalog + nlds_cat.delete_location(file, loc_type) + click.echo(f"Removed {loc_type} location for {file.original_path}") + if loc_type == Storage.TAPE and location.aggregation_id is not None: + agg = nlds_cat.get_aggregation(location.aggregation_id) + nlds_cat.delete_aggregation(agg) + click.echo(f"Removed TAPE aggregation for {file.original_path}") + else: + click.echo( + f"URL details not empty for the file {file.original_path} and force not" + f" set in command line options. Skipping." + ) + + @click.command() @click.option( "-u", "--user", default=None, type=str, help="The username to reset holdings for." @@ -59,7 +92,7 @@ def _connect_to_s3(access_key: str, secret_key: str): "--holding_id", default=None, type=int, - help="The numeric id of the holding to reset tape archive entries for.", + help="The numeric id of the holding to reset Storage Location entries for.", ) @click.option( "-a", @@ -80,16 +113,23 @@ def _connect_to_s3(access_key: str, secret_key: str): "--force", default=False, is_flag=True, - help="Force the deletion of the OBJECT_STORAGE record", + help="Force the deletion of the Storage Location record", ) @click.option( "-d", "--delete", default=False, is_flag=True, - help="Delete the associated object(s) from the object storage", + help="Delete the associated object(s) from the object storage only", +) +@click.option( + "-l", + "--location", + default=None, + type=str, + help="Storage Location type to delete records for. OBJECT_STORAGE|TAPE", ) -def reset_object_status( +def reset_storage_status( user: str, group: str, holding_id: int, @@ -97,6 +137,7 @@ def reset_object_status( secret_key: str, force: bool, delete: bool, + location: str, ) -> None: """Reset the tape status of a file by deleting a STORAGE LOCATION associated with a file, if the details in the STORAGE LOCATION are empty. @@ -107,14 +148,28 @@ def reset_object_status( raise click.UsageError("Error - group not specified") if holding_id is None: raise click.UsageError("Error - holding id not specified") + if location is None: + raise click.UsageError("Error - location not specified") + else: + if location == "OBJECT_STORAGE": + loc_type = Storage.OBJECT_STORAGE + elif location == "TAPE": + loc_type = Storage.TAPE + else: + raise click.UsageError( + f"Error - unknown location type {location}. Choices are OBJECT_STORAGE" + " or TAPE" + ) # only need to contact S3 if deleting from object storage - if delete: + if delete and loc_type == Storage.OBJECT_STORAGE: s3_client = _connect_to_s3(access_key, secret_key) if access_key is None: raise click.UsageError("Error - access key not specified") if secret_key is None: raise click.UsageError("Error - secret key not specified") + else: + s3_client = None nlds_cat = _connect_to_catalog() nlds_cat.start_session() @@ -124,23 +179,20 @@ def reset_object_status( for t in holding.transactions: for f in t.files: for l in f.locations: - delloc = l.storage_type == Storage.OBJECT_STORAGE - delloc &= ( - l.url_scheme == "" and l.url_netloc == "" and l.root == "" - ) or force - if delloc: - if delete: - pd = PathDetails.from_filemodel(f) - # delete from object storage - - s3_client.remove_object(pd.bucket_name, pd.object_name) - click.echo(f"Deleted object: {pd.get_object_store().url}") - # delete from catalog - nlds_cat.delete_location(f, Storage.OBJECT_STORAGE) - click.echo(f"Removed OBJECT STORAGE location for {f.original_path}") + # first check whether a deletion will leave no locations left + if len(f.locations) == 1 and loc_type == l.storage_type: + click.echo( + f"Deleting this location would leave no storage locations for " + f"the file {f.original_path}. Skipping." + ) + else: + _remove_location_from_file( + f, l, loc_type, force, delete, s3_client, nlds_cat + ) nlds_cat.save() nlds_cat.end_session() if __name__ == "__main__": - reset_object_status() + reset_storage_status() diff --git a/nlds_utils/reset_tape_status.py b/nlds_utils/reset_tape_status.py deleted file mode 100755 index 7005d8f..0000000 --- a/nlds_utils/reset_tape_status.py +++ /dev/null @@ -1,92 +0,0 @@ -#! /usr/bin/env python -# encoding: utf-8 -""" -reset_tape_status.py - -This should be used if a file is marked as TAPE but the copy to tape did not succeed. -This results in url_scheme, url_netloc and root being null strings (""). -These are checked before the TAPE location is removed, unless --force option is -supplied. -""" -__author__ = "Neil Massey" -__date__ = "18 Sep 2024" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" -__license__ = "BSD - see LICENSE file in top-level package directory" -__contact__ = "neil.massey@stfc.ac.uk" - -import click - -from nlds_processors.catalog.catalog import Catalog -from nlds_processors.catalog.catalog_models import Storage -import nlds.server_config as CFG - -def _connect_to_catalog(): - config = CFG.load_config() - - db_engine = config["catalog_q"]["db_engine"] - db_options = config["catalog_q"]["db_options"] - db_options['echo'] = False - nlds_cat = Catalog(db_engine=db_engine, db_options=db_options) - db_connect = nlds_cat.connect(create_db_fl=False) - return nlds_cat - -@click.command() -@click.option( - "-u", "--user", default=None, type=str, help="The username to reset holdings for." -) -@click.option( - "-g", "--group", default=None, type=str, help="The group to reset holdings for." -) -@click.option( - "-i", - "--holding_id", - default=None, - type=int, - help="The numeric id of the holding to reset tape archive entries for.", -) -@click.option( - "-f", - "--force", - default=False, - is_flag=True, - help="Force the deletion of the TAPE record", -) -def reset_tape_status(user: str, group: str, holding_id: int, force: bool) -> None: - """Reset the tape status of a file by deleting a STORAGE LOCATION associated - with a file, if the details in the STORAGE LOCATION are empty. - """ - if user is None: - raise click.UsageError("Error - user not specified") - if group is None: - raise click.UsageError("Error - group not specified") - if holding_id is None: - raise click.UsageError("Error - holding id not specified") - - nlds_cat = _connect_to_catalog() - nlds_cat.start_session() - holding = nlds_cat.get_holding(user=user, group=group, holding_id=holding_id)[0] - - # get the locations - for t in holding.transactions: - for f in t.files: - for l in f.locations: - if l.storage_type == Storage.TAPE: - if ( - l.url_scheme == "" and l.url_netloc == "" and l.root == "" - ) or force: - nlds_cat.delete_location(f, Storage.TAPE) - click.echo( - f"Removed TAPE location for {f.original_path}" - ) - if l.aggregation_id is not None: - agg = nlds_cat.get_aggregation(l.aggregation_id) - nlds_cat.delete_aggregation(agg) - click.echo( - f"Removed TAPE aggregation for {f.original_path}" - ) - nlds_cat.save() - nlds_cat.end_session() - - -if __name__ == "__main__": - reset_tape_status() diff --git a/nlds_utils/view_holding_full.py b/nlds_utils/view_holding_full.py index 483ea67..a19eb7a 100755 --- a/nlds_utils/view_holding_full.py +++ b/nlds_utils/view_holding_full.py @@ -23,7 +23,7 @@ Location, Aggregation, ) -from reset_tape_status import _connect_to_catalog +from reset_storage_status import _connect_to_catalog def integer_permissions_to_string(intperm):