Skip to content

Commit

Permalink
Optimisations to removed RETRIEVAL_DICT as it was making the messages…
Browse files Browse the repository at this point in the history
… too large
  • Loading branch information
nmassey001 committed Nov 14, 2024
1 parent 1a3c516 commit d11ce65
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 390 deletions.
11 changes: 6 additions & 5 deletions nlds/rabbit/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ def has_name(cls, name):
@classmethod
def get_final_states(cls):
final_states = (
cls.TRANSFER_GETTING,
cls.CATALOG_UPDATING,
cls.CATALOG_ARCHIVE_UPDATING,
cls.CATALOG_DELETING,
cls.CATALOG_REMOVING,
# Make final states explicit
# cls.TRANSFER_GETTING,
# cls.CATALOG_UPDATING,
# cls.CATALOG_ARCHIVE_UPDATING,
# cls.CATALOG_DELETING,
# cls.CATALOG_REMOVING,
cls.FAILED,
cls.COMPLETE,
)
Expand Down
107 changes: 49 additions & 58 deletions nlds_processors/archiver/archive_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from copy import copy
from minio.error import S3Error
from retry import retry
from enum import Enum

from nlds_processors.archiver.archive_base import BaseArchiveConsumer

from nlds_processors.archiver.s3_to_tarfile_stream import S3StreamError

from nlds.rabbit.consumer import State
from nlds.details import PathDetails
from nlds_processors.catalog.catalog_worker import build_retrieval_dict
import nlds.rabbit.routing_keys as RK
import nlds.rabbit.message_keys as MSG

Expand Down Expand Up @@ -60,16 +60,15 @@ def transfer(
tape_url=tape_url,
)
except S3StreamError as e:
self.log(f"Could not create streamer. Reason: {e}.", RK.LOG_ERROR)
# if a S3StreamError occurs then all files have failed
for path_details in filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
else:
# For archive_get, the message is structured as a dictionary stored in
# ['data']['retrieval_dict'] and a filelist stored in ['data']['filelist']
# there is also a filelist in ['data']['retrieval_dict']['filelist'] which
# contains the files to be retrieved each individual tarfile / aggregate:
retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT]
# For archive_get, we build a retrieval dictionary from the filelist,
# which contains a tarfile as a key, then a filelist as items per key
retrieval_dict = build_retrieval_dict(filelist)
# looping over the aggregates
for tarfile, item in retrieval_dict.items():
# get the holding id and build the holding_prefix
Expand All @@ -79,10 +78,6 @@ def transfer(
)
# get the list of files to retrieve from the tarfile / aggregate
aggregate_filelist = item[MSG.FILELIST]
# convert to PathDetails object
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in aggregate_filelist
]
# empty streamer.filelist for new aggregate
streamer.filelist.clear()
try:
Expand All @@ -108,6 +103,10 @@ def transfer(
except S3StreamError as e:
# if a S3StreamError occurs then all files in the aggregate have
# failed
self.log(
f"Error when streaming file {tarfile}. Reason: {e.message}",
RK.LOG_ERROR
)
for path_details in aggregate_filelist:
path_details.failure_reason = e.message
self.append_and_send(
Expand Down Expand Up @@ -140,6 +139,7 @@ def transfer(
state=State.FAILED,
)

@retry(S3Error, tries=5, delay=1, logger=None)
def prepare(
self,
transaction_id: str,
Expand Down Expand Up @@ -179,34 +179,28 @@ def prepare(
)
except S3StreamError as e:
# if a S3StreamError occurs then all files have failed
self.log(f"Could not create streamer. Reason: {e}.", RK.LOG_ERROR)
for path_details in filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
else:
# For archive_prepare, the message is structured as a dictionary stored in
# ['data']['retrieval_dict'] and a filelist stored in ['data']['filelist']
retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT]
complete_dict = {}
prepare_dict = {}
retrieval_dict = build_retrieval_dict(filelist)
for tarfile, item in retrieval_dict.items():
# get the list of files to retrieve from the tarfile / aggregate
# this will be used for the completelist, the prepare_check list
# or the failedlist. Convert to PathDetails object
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in item[MSG.FILELIST]
]
aggregate_filelist = item[MSG.FILELIST]
try:
# check for prepare on this tarfile
if streamer.prepare_required(tarfile):
self.preparelist.extend(aggregate_filelist)
prepare_dict[tarfile] = retrieval_dict[tarfile]
else:
# no prepare needed, so add all the aggregate files to the
# complete list, and the tarfile part of the dictionary to the
# complete dictionary
self.completelist.extend(aggregate_filelist)
complete_dict[tarfile] = retrieval_dict[tarfile]
except S3StreamError as e:
self.log(
f"Error preparing file {tarfile}. Reason: {e}.", RK.LOG_ERROR
)
for path_details in aggregate_filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
Expand All @@ -217,48 +211,43 @@ def prepare(
"transfer.",
RK.LOG_INFO,
)
# remap the retrieval dictionary for the tarfiles that don't need staging
# need to copy the JSON as it will also be used below in preparelist
body_json_complete = copy(body_json)
body_json_complete[MSG.DATA][MSG.RETRIEVAL_DICT] = complete_dict
self.send_pathlist(
self.completelist,
rk_complete,
body_json_complete,
body_json,
state=State.ARCHIVE_PREPARING,
)

if len(self.preparelist) > 0:
# In this codepath we have a list of tarfiles we need to prepare in the
# In this codepath we have a list of tarfiles we need to prepare in the
# prepare_dict keys
try:
prepare_dict = build_retrieval_dict(self.preparelist)
agg_prepare_list = list(prepare_dict.keys())
prepare_id = streamer.prepare_request(agg_prepare_list)
except S3StreamError as e:
# fail all in the prepare dict if the prepare_id failed
self.log(f"Error preparing request. Reason: {e}.", RK.LOG_ERROR)
for tarfile, item in prepare_dict.items():
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in item[MSG.FILELIST]
]
aggregate_filelist = item[MSG.FILELIST]
for path_details in aggregate_filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
self.log(
"Archive prepare required, passing lists back to archive_get for "
"checking prepare is complete.",
RK.LOG_INFO,
)
# remap the retrieval dictionary for the tarfiles that need staging
body_json_check = copy(body_json)
body_json_check[MSG.DATA][MSG.RETRIEVAL_DICT] = prepare_dict
# put the prepare_id in the dictionary
body_json_check[MSG.DATA][MSG.PREPARE_ID] = str(prepare_id)
self.send_pathlist(
self.preparelist,
routing_key=rk_check,
body_json=body_json_check,
state=State.ARCHIVE_PREPARING,
)
else:
self.log(
"Archive prepare required, passing lists back to archive_get for "
"checking prepare is complete.",
RK.LOG_INFO,
)
# put the prepare_id in the dictionary
body_json_check = copy(body_json)
body_json_check[MSG.DATA][MSG.PREPARE_ID] = str(prepare_id)
self.send_pathlist(
self.preparelist,
routing_key=rk_check,
body_json=body_json_check,
state=State.ARCHIVE_PREPARING,
)

if len(self.failedlist) > 0:
self.send_pathlist(
Expand All @@ -268,7 +257,7 @@ def prepare(
state=State.FAILED,
)


@retry(S3Error, tries=5, delay=1, logger=None)
def prepare_check(
self,
transaction_id: str,
Expand All @@ -282,12 +271,12 @@ def prepare_check(
) -> None:
"""Use the streamer object to check whether the prepared files have completed
staging.
1. Those that have not completed staging will be passed back to the message
1. Those that have not completed staging will be passed back to the message
queue with ARCHIVE_GET.PREPARE_CHECK as the routing key.
They will be checked again for completed staging when this message is
They will be checked again for completed staging when this message is
processed subsequently in this function.
2. Those that have completed will be passed to the message queue with
ARCHIVE_GET.START and will be subsequently processed by the `transfer`
ARCHIVE_GET.START and will be subsequently processed by the `transfer`
function above.
"""
# Make the routing keys
Expand All @@ -310,28 +299,31 @@ def prepare_check(
)
except S3StreamError as e:
# if a S3StreamError occurs then all files have failed
self.log(f"Could not create streamer. Reason: {e}.", RK.LOG_ERROR)
for path_details in filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
else:
retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT]
retrieval_dict = build_retrieval_dict(filelist)
prepare_id = body_json[MSG.DATA][MSG.PREPARE_ID]
# need to convert the retrieval_dict keys to a list of tarfiles
tarfile_list = list(retrieval_dict.keys())
try:
complete = streamer.prepare_complete(prepare_id, tarfile_list)
except S3StreamError as e:
self.log(
f"Could not check prepare id: {prepare_id}. Reason: {e}.",
RK.LOG_ERROR
)
# fail all in the prepare dict if the prepare_id failed
for tarfile, item in retrieval_dict:
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in item[MSG.FILELIST]
]
aggregate_filelist = item[MSG.FILELIST]
for path_details in aggregate_filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)

# only three outcomes here - either all the tarfiles (and, by extension, all
# files) are complete, are not complete, or everything failed
# only three outcomes here - 1. either all the tarfiles (and, by extension, all
# files) are complete, 2. are not complete, or 3. everything failed
if len(self.failedlist) > 0:
self.send_pathlist(
self.failedlist,
Expand Down Expand Up @@ -365,7 +357,6 @@ def prepare_check(
state=State.ARCHIVE_PREPARING,
)


# def transfer_old(
# self,
# transaction_id: str,
Expand Down
1 change: 0 additions & 1 deletion nlds_processors/archiver/archive_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def transfer(
# NOTE: For the purposes of tape reading and writing, the holding prefix
# has 'nlds.' prepended
holding_prefix = self.get_holding_prefix(body_json)

try:
self.completelist, self.failedlist, tarfile, checksum = streamer.put(
holding_prefix, filelist, self.chunk_size
Expand Down
33 changes: 21 additions & 12 deletions nlds_processors/archiver/s3_to_tarfile_tape.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,17 @@ def get(
"""Stream from a tarfile on tape to Object Store"""
if self.filelist != []:
raise ValueError(f"self.filelist is not Empty: {self.filelist[0]}")

self.filelist = filelist
self.holding_prefix = holding_prefix
try:
tarfile_tapepath = (
f"root://{self.tape_server_url}/{tarfile}"
)
# open the tar file to read from
with XRDClient.File() as file:
# open on the XRD system
status, _ = file.open(tarfile_tapepath, OpenFlags.READ)
status, _ = file.open(tarfile, OpenFlags.READ)
if not status.ok:
raise S3StreamError(
f"Could not open tarfile on XRootD: {tarfile_tapepath}. "
f"Could not open tarfile on XRootD: {tarfile}. "
f"Reason: {status}"
)
file_object = Adler32XRDFile(file, debug_fl=True)
Expand All @@ -202,10 +200,20 @@ def get(

return completelist, failedlist

def __relative_tarfile(tarfile):
# tarfile has the full name here, including the root://server/ part of it
# we only need the path on the FileSystem
return("/"+tarfile.split("//")[2])

def __relative_tarfile_list(tarfile_list):
# clean up the tarfilelist by removing the root://server/ part of each file name
return [S3ToTarfileTape.__relative_tarfile(t) for t in tarfile_list]

def prepare_required(self, tarfile: str) -> bool:
"""Query the storage system as to whether a file needs to be prepared (staged)."""
tarfile_path = S3ToTarfileTape.__relative_tarfile(tarfile)
# XrootD use the .stat method on the FileSystem client
status, response = self.tape_client.stat(tarfile)
status, response = self.tape_client.stat(tarfile_path)
# check status for success
if not status.ok:
raise S3StreamError(
Expand All @@ -225,13 +233,13 @@ def prepare_request(self, tarfilelist: List[str]) -> str:
if len(tarfilelist) == 0:
# trap this as it causes a seg-fault if it is passed to XRD.prepare
raise S3StreamError("tarfilelist is empty in prepare_request")
status, response = self.tape_client.prepare(
tarfilelist, PrepareFlags.STAGE
)

clean_tarlist = S3ToTarfileTape.__relative_tarfile_list(tarfilelist)
status, response = self.tape_client.prepare(clean_tarlist, PrepareFlags.STAGE)
if not status.ok:
raise S3StreamError(
f"Could not prepare tarfile list: {tarfilelist}. "
f"Reason: {status.method}"
f"Could not prepare tarfile list: {clean_tarlist}. "
f"Reason: {status.message}"
)
else:
prepare_id = response.decode()[:-1]
Expand All @@ -241,7 +249,8 @@ def prepare_complete(self, prepare_id: str, tarfilelist: List[str]) -> bool:
"""Query the storage system whether the prepare (staging) for a file has been
completed."""
# requires query_args to be built with new line separator
query_args = "\n".join([prepare_id, *tarfilelist])
clean_tarlist = S3ToTarfileTape.__relative_tarfile_list(tarfilelist)
query_args = "\n".join([prepare_id, *clean_tarlist])
status, response = self.tape_client.query(QueryCode.PREPARE, query_args)
if not status.ok:
raise S3StreamError(
Expand Down
12 changes: 8 additions & 4 deletions nlds_processors/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,17 @@ def get_files(

except (IntegrityError, KeyError, OperationalError):
if holding_label:
err_msg = f"File with holding_label:{holding_label} not found "
err_msg = (
f"File not found in holding with holding_label:{holding_label}"
)
elif holding_id:
err_msg = f"File with holding_id:{holding_id} not found "
err_msg = f"File not found in holding with holding_id:{holding_id}"
elif transaction_id:
err_msg = f"File with transaction_id:{transaction_id} not found "
err_msg = (
f"File not found in holding with transaction_id:{transaction_id}"
)
elif tag:
err_msg = f"File with tag:{tag} not found"
err_msg = f"File not found in holding with tag:{tag}"
else:
err_msg = f"File with original_path:{original_path} not found "
raise CatalogError(err_msg)
Expand Down
Loading

0 comments on commit d11ce65

Please sign in to comment.