Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage errors for HTTP downloads #124

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 69 additions & 51 deletions geospaas_processing/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class DownloadError(Exception):
"""Download failed"""


class RetriableDownloadError(Exception):
"""Download failed but might work if retried"""


class ObsoleteURLError(DownloadError):
"""The URL no longer points to a downloadable dataset"""

Expand Down Expand Up @@ -346,6 +350,8 @@ def get_file_name(cls, url, connection, **kwargs):
and url_file_name.endswith('.nc')):
return url_file_name

LOGGER.error("Could not find file name from HTTP response for %s: %s, %s, %s",
url, connection.status_code, connection.reason, connection.headers)
return ''

@classmethod
Expand All @@ -365,6 +371,8 @@ def connect(cls, url, auth=(None, None), **kwargs):
raise DownloadError(
f"Could not download from '{url}'; response: {details}"
) from error
except (requests.ConnectionError, requests.Timeout) as error:
raise RetriableDownloadError(f"Failed to connect to {url}") from error
except requests.RequestException as error:
raise DownloadError(
f"Could not download from '{url}'"
Expand Down Expand Up @@ -399,14 +407,13 @@ def download_file(cls, file, url, connection):
`connection` argument
"""
chunk = None
for chunk in connection.iter_content(chunk_size=cls.CHUNK_SIZE):
file.write(chunk)
else:
# This executes after the loop and raises an error if the
# response is unexpectedly empty like it sometimes happens
# with scihub
if chunk is None:
raise DownloadError(f"Getting an empty file from '{url}'")
try:
for chunk in connection.iter_content(chunk_size=cls.CHUNK_SIZE):
file.write(chunk)
except requests.exceptions.ChunkedEncodingError as error:
raise RetriableDownloadError(f"Download from {url} was interrupted") from error
if chunk is None:
raise DownloadError(f"Getting an empty file from '{url}'")


class FTPDownloader(Downloader):
Expand Down Expand Up @@ -605,6 +612,51 @@ def already_downloaded(cls, dataset_directory):
return True
return False

def _download_from_uri(self, dataset_uri, directory):
"""Download the file(s) from `dataset_uri` to `directory`"""
# Get the extra settings for the provider
dataset_uri_prefix = "://".join(requests.utils.urlparse(dataset_uri.uri)[0:2])
# Find provider settings
extra_settings = self.get_provider_settings(dataset_uri_prefix)
if extra_settings:
LOGGER.debug("Loaded extra settings for provider %s: %s",
dataset_uri_prefix, extra_settings)
# Launch download if the maximum number of parallel downloads has not been reached
with DownloadLock(dataset_uri_prefix,
extra_settings.get('max_parallel_downloads'),
utils.REDIS_HOST, utils.REDIS_PORT) as acquired:
if not acquired:
raise TooManyDownloadsError(
f"Too many downloads in progress for {dataset_uri_prefix}")
# Try to find a downloader
try:
downloader = self.DOWNLOADERS[dataset_uri.service]
except KeyError:
LOGGER.error("No downloader found for %s service",
dataset_uri.service, exc_info=True)
raise

LOGGER.debug("Attempting to download from '%s'", dataset_uri.uri)
file_name = None
download_error = None
try:
file_name = downloader.check_and_download_url(
url=dataset_uri.uri, download_dir=directory,
**extra_settings)
except DownloadError as error:
LOGGER.warning(
("Failed to download dataset %s from %s. "
"Another URL will be tried if possible"),
dataset_uri.dataset.pk, dataset_uri.uri, exc_info=True)
download_error = error
shutil.rmtree(directory, ignore_errors=True)
except (FileNotFoundError, IsADirectoryError) as error:
shutil.rmtree(directory, ignore_errors=True)
raise DownloadError(
f"Could not write the downloaded file to {error.filename}") from error

return file_name, download_error

def download_dataset(self, dataset, download_directory):
"""
Attempt to download a dataset by trying its URIs one by one. For each `DatasetURI`, it
Expand All @@ -625,49 +677,15 @@ def download_dataset(self, dataset, download_directory):
else:
os.makedirs(full_dataset_directory, exist_ok=True)
for dataset_uri in dataset.dataseturi_set.all():
# Get the extra settings for the provider
dataset_uri_prefix = "://".join(requests.utils.urlparse(dataset_uri.uri)[0:2])
# Find provider settings
extra_settings = self.get_provider_settings(dataset_uri_prefix)
if extra_settings:
LOGGER.debug("Loaded extra settings for provider %s: %s",
dataset_uri_prefix, extra_settings)
# Launch download if the maximum number of parallel downloads has not been reached
with DownloadLock(dataset_uri_prefix,
extra_settings.get('max_parallel_downloads'),
utils.REDIS_HOST, utils.REDIS_PORT) as acquired:
if not acquired:
raise TooManyDownloadsError(
f"Too many downloads in progress for {dataset_uri_prefix}")
# Try to find a downloader
try:
downloader = self.DOWNLOADERS[dataset_uri.service]
except KeyError:
LOGGER.error("No downloader found for %s service",
dataset_uri.service, exc_info=True)
raise

LOGGER.debug("Attempting to download from '%s'", dataset_uri.uri)
try:
file_name = downloader.check_and_download_url(
url=dataset_uri.uri, download_dir=full_dataset_directory,
**extra_settings)
except DownloadError as error:
LOGGER.warning(
("Failed to download dataset %s from %s. "
"Another URL will be tried if possible"),
dataset.pk, dataset_uri.uri, exc_info=True)
errors.append(error)
shutil.rmtree(full_dataset_directory, ignore_errors=True)
except (FileNotFoundError, IsADirectoryError) as error:
shutil.rmtree(full_dataset_directory, ignore_errors=True)
raise DownloadError(
f"Could not write the downloaded file to {error.filename}") from error
else:
dataset_path = os.path.join(dataset_directory, file_name)
LOGGER.info("Successfully downloaded dataset %d to %s",
dataset.pk, dataset_path)
break
file_name, download_error = self._download_from_uri(dataset_uri,
full_dataset_directory)
if file_name:
dataset_path = os.path.join(dataset_directory, file_name)
LOGGER.info("Successfully downloaded dataset %d to %s",
dataset_uri.dataset.pk, dataset_path)
break
if download_error:
errors.append(download_error)

if file_name:
if self.save_path:
Expand Down
4 changes: 2 additions & 2 deletions geospaas_processing/tasks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
FaultTolerantTask,
WORKING_DIRECTORY,
PROVIDER_SETTINGS_PATH)
from ..downloaders import DownloadManager, TooManyDownloadsError
from ..downloaders import DownloadManager, RetriableDownloadError, TooManyDownloadsError

from . import app, DATASET_LOCK_PREFIX

Expand All @@ -38,7 +38,7 @@ def download(self, args):
except IndexError:
logger.error("Nothing was downloaded for dataset %s", dataset_id, exc_info=True)
raise
except TooManyDownloadsError:
except (TooManyDownloadsError, RetriableDownloadError):
# Stop retrying after 24 hours
self.retry((args,), countdown=90, max_retries=960)
except OSError as error:
Expand Down
31 changes: 25 additions & 6 deletions tests/test_downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ def test_build_oauth2_authentication_unknown_placement(self):
'username', 'password', 'token_url', 'client_id',
token_placement='foo', token_parameter_name='token')


def test_get_oauth2_auth_no_totp(self):
"""Test getting an OAuth2 authentication from get_auth()"""
mock_auth = mock.Mock()
Expand Down Expand Up @@ -446,7 +445,8 @@ def test_get_file_name_no_header(self):
"""
response = requests.Response()
response.status_code = 200
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')
with self.assertLogs(level=logging.ERROR):
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')

def test_get_file_name_no_filename_in_header(self):
"""`get_file_name()` must return an empty string if the
Expand All @@ -455,7 +455,8 @@ def test_get_file_name_no_filename_in_header(self):
response = requests.Response()
response.status_code = 202
response.headers['Content-Disposition'] = ''
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')
with self.assertLogs(level=logging.ERROR):
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')

def test_get_file_name_multiple_possibilities(self):
"""An error must be raised if several file names are found in the header"""
Expand Down Expand Up @@ -497,12 +498,22 @@ def test_connect_error_code(self):
downloaders.HTTPDownloader.connect('url')
self.assertIsInstance(error.exception.__cause__, requests.HTTPError)

def test_connect_request_exception(self):
"""An exception must be raised if an error prevents the
connection from happening

def test_connect_connection_exception(self):
"""A RetriableDownloadError must be raised if an error prevents
the connection from happening
"""
with mock.patch('geospaas_processing.utils.http_request',
side_effect=requests.ConnectionError):
with self.assertRaises(downloaders.RetriableDownloadError):
downloaders.HTTPDownloader.connect('url')

def test_connect_request_exception(self):
"""An DownloadError must be raised if an other error prevents
the connection from happening
"""
with mock.patch('geospaas_processing.utils.http_request',
side_effect=requests.TooManyRedirects):
with self.assertRaises(downloaders.DownloadError):
downloaders.HTTPDownloader.connect('url')

Expand Down Expand Up @@ -554,6 +565,14 @@ def test_download_empty_file(self):
with self.assertRaises(downloaders.DownloadError):
downloaders.HTTPDownloader.download_file(mock.Mock(), 'url', response)

def test_download_interrupted_connection(self):
"""An exception must be raised if the connection is interrupted
"""
response = mock.Mock()
response.iter_content.side_effect = requests.exceptions.ChunkedEncodingError
with self.assertRaises(downloaders.RetriableDownloadError):
downloaders.HTTPDownloader.download_file(mock.Mock(), 'url', response)


class URLOAuth2TestCase(unittest.TestCase):
"""Tests for the URLOAuth2 class"""
Expand Down
Loading