Skip to content

Commit

Permalink
Error when fetch_fcst doesn't find any data for a model
Browse files Browse the repository at this point in the history
Fixes #824
  • Loading branch information
jfrost-mo committed Sep 25, 2024
1 parent f18255b commit 6a3b37a
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions src/CSET/_workflow_utils/fetch_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __exit__(self, exc_type, exc_value, traceback):
logging.debug("Tearing down FileRetriever.")

@abc.abstractmethod
def get_file(self, file_path: str, output_dir: str) -> None: # pragma: no cover
def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover
"""Save a file from the data source to the output directory.
Not all of the given paths will exist, so FileNotFoundErrors should be
Expand All @@ -60,14 +60,19 @@ def get_file(self, file_path: str, output_dir: str) -> None: # pragma: no cover
like globs, which will be expanded in a system specific manner.
output_dir: str
Path to filesystem directory into which the file should be copied.
Returns
-------
bool:
True if files were transferred, otherwise False.
"""
raise NotImplementedError


class FilesystemFileRetriever(FileRetrieverABC):
"""Retrieve files from the filesystem."""

def get_file(self, file_path: str, output_dir: str) -> None:
def get_file(self, file_path: str, output_dir: str) -> bool:
"""Save a file from the filesystem to the output directory.
Parameters
Expand All @@ -77,22 +82,30 @@ def get_file(self, file_path: str, output_dir: str) -> None:
like globs, which will be expanded in a system specific manner.
output_dir: str
Path to filesystem directory into which the file should be copied.
Returns
-------
bool:
True if files were transferred, otherwise False.
"""
file_paths = glob.glob(os.path.expanduser(file_path))
logging.debug("Copying files:\n%s", "\n".join(file_paths))
if not file_paths:
logging.warning("file_path does not match any files: %s", file_path)
any_files_copied = False
for file in file_paths:
try:
shutil.copy(file, output_dir)
any_files_copied = True
except OSError as err:
logging.warning("Failed to copy %s, error: %s", file, err)
return any_files_copied


class HTTPFileRetriever(FileRetrieverABC):
"""Retrieve files via HTTP."""

def get_file(self, file_path: str, output_dir: str) -> None:
def get_file(self, file_path: str, output_dir: str) -> bool:
"""Save a file from a HTTP address to the output directory.
Parameters
Expand All @@ -102,12 +115,18 @@ def get_file(self, file_path: str, output_dir: str) -> None:
globs, which will be expanded in a system specific manner.
output_dir: str
Path to filesystem directory into which the file should be copied.
Returns
-------
bool:
True if files were transferred, otherwise False.
"""
ctx = ssl.create_default_context()
save_path = (
f"{output_dir.removesuffix('/')}/"
+ urllib.parse.urlparse(file_path).path.split("/")[-1]
)
any_files_copied = False
try:
with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response:
if response.status != 200:
Expand All @@ -116,8 +135,10 @@ def get_file(self, file_path: str, output_dir: str) -> None:
# Read in 1 MiB chunks so data needn't fit in memory.
while data := response.read(1024 * 1024):
fp.write(data)
any_files_copied = True
except OSError as err:
logging.warning("Failed to retrieve %s, error: %s", file_path, err)
return any_files_copied


def _get_needed_environment_variables() -> dict:
Expand Down Expand Up @@ -209,6 +230,11 @@ def fetch_data(file_retriever: FileRetrieverABC = FilesystemFileRetriever):
----------
file_retriever: FileRetriever
FileRetriever implementation to use. Defaults to FilesystemFileRetriever.
Raises
------
FileNotFound:
If no files are found for the model, across all tried paths.
"""
v = _get_needed_environment_variables()

Expand All @@ -230,5 +256,10 @@ def fetch_data(file_retriever: FileRetrieverABC = FilesystemFileRetriever):

# Use file retriever to transfer data with multiple threads.
with file_retriever() as retriever, ThreadPoolExecutor() as executor:
for path in paths:
executor.submit(retriever.get_file, path, cycle_data_dir)
files_found_iterator = executor.map(retriever.get_file, paths)
files_found = any(files_found_iterator)
# Exhause iterator to ensure all files have been fetched.
for _ in files_found_iterator:
pass
if not files_found:
raise FileNotFoundError("No files found for model!")

0 comments on commit 6a3b37a

Please sign in to comment.