Skip to content

Commit

Permalink
Archive_get now fetches from tape\!
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Nov 13, 2024
1 parent 13df0d0 commit c981aa3
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 26 deletions.
33 changes: 17 additions & 16 deletions nlds_processors/archiver/archive_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,27 +211,13 @@ def prepare(
path_details.failure_reason = e.message
self.failedlist.append(path_details)

# now have a list of tarfiles we need to prepare in the prepare_dict keys
try:
prepare_id = streamer.prepare_request(prepare_dict.keys())
except S3StreamError as e:
# fail all in the prepare dict if the prepare_id failed
for tarfile, item in prepare_dict.items():
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in item[MSG.FILELIST]
]
for path_details in aggregate_filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)

if len(self.completelist) > 0:
self.log(
"Archive prepare not required, passing lists back to archive_get for "
"transfer.",
RK.LOG_INFO,
)
# remap the retrieval dictionary for the complete (don't need staging)
# tarfiles
# remap the retrieval dictionary for the tarfiles that don't need staging
# need to copy the JSON as it will also be used below in preparelist
body_json_complete = copy(body_json)
body_json_complete[MSG.DATA][MSG.RETRIEVAL_DICT] = complete_dict
Expand All @@ -243,6 +229,20 @@ def prepare(
)

if len(self.preparelist) > 0:
# In this codepath we have a list of tarfiles we need to prepare in the
# prepare_dict keys
try:
agg_prepare_list = list(prepare_dict.keys())
prepare_id = streamer.prepare_request(agg_prepare_list)
except S3StreamError as e:
# fail all in the prepare dict if the prepare_id failed
for tarfile, item in prepare_dict.items():
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in item[MSG.FILELIST]
]
for path_details in aggregate_filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
self.log(
"Archive prepare required, passing lists back to archive_get for "
"checking prepare is complete.",
Expand Down Expand Up @@ -316,7 +316,8 @@ def prepare_check(
else:
retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT]
prepare_id = body_json[MSG.DATA][MSG.PREPARE_ID]
tarfile_list = retrieval_dict.keys()
# need to convert the retrieval_dict keys to a list of tarfiles
tarfile_list = list(retrieval_dict.keys())
try:
complete = streamer.prepare_complete(prepare_id, tarfile_list)
except S3StreamError as e:
Expand Down
6 changes: 5 additions & 1 deletion nlds_processors/archiver/s3_to_tarfile_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ def get(
self.log(msg, RK.LOG_ERROR)
raise S3StreamError(msg)
except S3StreamError as e:
msg = f"Exception occurred during read of tarfile {self.tarfile_diskpath}."
msg = (
f"Exception occurred during read of tarfile {self.tarfile_diskpath}. "
f"Original Exception: {e}"
)

self.log(msg, RK.LOG_ERROR)
raise S3StreamError(msg)

Expand Down
80 changes: 71 additions & 9 deletions nlds_processors/archiver/s3_to_tarfile_tape.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
import json

from XRootD import client as XRDClient
from XRootD.client.flags import StatInfoFlags, MkDirFlags, OpenFlags, QueryCode
from XRootD.client.flags import (
StatInfoFlags,
MkDirFlags,
OpenFlags,
QueryCode,
PrepareFlags,
)

from nlds.details import PathDetails
from nlds_processors.archiver.s3_to_tarfile_stream import (
Expand All @@ -23,7 +29,6 @@
from nlds_processors.archiver.adler32file import Adler32XRDFile
import nlds.rabbit.routing_keys as RK


class S3ToTarfileTape(S3ToTarfileStream):
"""Class to stream files from / to an S3 resource (AWS, minio, DataCore Swarm etc.)
to a tarfile that resides on tape.
Expand Down Expand Up @@ -148,7 +153,7 @@ def put(
# add the location to the completelist
for f in completelist:
f.set_tape(
server="",
server=self.tape_server_url,
tapepath=self.holding_tapepath,
tarfile=f"{self.filelist_hash}.tar",
)
Expand All @@ -161,7 +166,41 @@ def get(
filelist: List[PathDetails],
chunk_size: int,
) -> tuple[List[PathDetails], List[PathDetails], str, int]:
raise NotImplementedError
"""Stream from a tarfile on tape to Object Store"""
if self.filelist != []:
raise ValueError(f"self.filelist is not Empty: {self.filelist[0]}")
self.filelist = filelist
self.holding_prefix = holding_prefix
try:
tarfile_tapepath = (
f"root://{self.tape_server_url}/{tarfile}"
)
# open the tar file to read from
with XRDClient.File() as file:
# open on the XRD system
status, _ = file.open(tarfile_tapepath, OpenFlags.READ)
if not status.ok:
raise S3StreamError(
f"Could not open tarfile on XRootD: {tarfile_tapepath}. "
f"Reason: {status}"
)
file_object = Adler32XRDFile(file, debug_fl=True)
completelist, failedlist = self._stream_to_s3object(
file_object, self.filelist, chunk_size
)
except FileNotFoundError:
msg = f"Couldn't open tarfile ({tarfile})."
self.log(msg, RK.LOG_ERROR)
raise S3StreamError(msg)
except S3StreamError as e:
msg = (
f"Exception occurred during read of tarfile {tarfile}. "
f"Original Exception: {e}"
)
self.log(msg, RK.LOG_ERROR)
raise S3StreamError(msg)

return completelist, failedlist

def prepare_required(self, tarfile: str) -> bool:
"""Query the storage system as to whether a file needs to be prepared (staged)."""
Expand All @@ -174,26 +213,49 @@ def prepare_required(self, tarfile: str) -> bool:
f"Reason: {status.message}"
)
# check whether file is OFFLINE in response StatInfoFlags
return (response.flags & StatInfoFlags.OFFLINE)
prepare = bool(response.flags & StatInfoFlags.OFFLINE)
return prepare

def prepare_request(self, tarfilelist: List[str]) -> str:
"""Request the storage system for a file to be prepared (staged)."""
raise NotImplementedError
# tarfilelist is a list of strings, which is fine for XRootD >= 5.6,
# but for versions < 5.5.5 the list of tar names need to be encoded as bytes,
# from the utf-8 string, e.g. tar_list = [i.decode("utf_8") for i in tar_list]
# shouldn't be neccessary for us, though!
if len(tarfilelist) == 0:
# trap this as it causes a seg-fault if it is passed to XRD.prepare
raise S3StreamError("tarfilelist is empty in prepare_request")
status, response = self.tape_client.prepare(
tarfilelist, PrepareFlags.STAGE
)
if not status.ok:
raise S3StreamError(
f"Could not prepare tarfile list: {tarfilelist}. "
f"Reason: {status.method}"
)
else:
prepare_id = response.decode()[:-1]
return prepare_id

def prepare_complete(self, prepare_id: str, tarfilelist: List[str]) -> bool:
"""Query the storage system whether the prepare (staging) for a file has been
completed."""
# requires query_args to be built with new line separator
query_args = "\n".join([prepare_id, *tarfilelist])
status, response = self.tape_client.query(QueryCode.PREPARE, query_args)
if status.status != 0:
if not status.ok:
raise S3StreamError(
f"Could not check status of prepare request " f"{prepare_id}. "
f"Could not check status of prepare request {prepare_id}. "
f"Reason: {status.message}"
)
# get the response and convert to a dictionary
jr = json.loads(response.decode())["responses"]
print(jr)
# loop over all responses, if all 'online' flags are set then the prepare is
# complete
prepare_complete = True
for r in jr:
prepare_complete &= r['online']
return prepare_complete

"""Note that there are a number of different methods below to get the tapepaths"""

Expand Down

0 comments on commit c981aa3

Please sign in to comment.