Skip to content

Commit

Permalink
Download pre-generated offset file from data corpus, if present.
Browse files Browse the repository at this point in the history
Signed-off-by: Govind Kamat <[email protected]>
  • Loading branch information
gkamat committed May 26, 2024
1 parent 00c3b48 commit 8aac123
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
12 changes: 10 additions & 2 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ def download(self, base_url, source_url, target_path, size_in_bytes):
self.logger.info("Downloading data from [%s] to [%s].", data_url, target_path)

# we want to have a bit more accurate download progress as these files are typically very large
progress = net.Progress("[INFO] Downloading workload data", accuracy=1)
progress = net.Progress("[INFO] Downloading workload data: " + os.path.basename(target_path),
accuracy=1)
net.download(data_url, target_path, size_in_bytes, progress_indicator=progress)
progress.finish()
self.logger.info("Downloaded data from [%s] to [%s].", data_url, target_path)
Expand All @@ -504,7 +505,7 @@ def download(self, base_url, source_url, target_path, size_in_bytes):
msg += f" (HTTP status: {e.code}, reason: {e.reason})"
else:
msg += f" (HTTP status: {e.code})"
raise exceptions.DataError(msg) from e
raise exceptions.DataError(msg, e) from None
except urllib.error.URLError as e:
raise exceptions.DataError(f"Could not download [{data_url}] to [{target_path}].") from e

Expand All @@ -523,6 +524,7 @@ def __init__(self, workload_name, downloader, decompressor):
self.workload_name = workload_name
self.downloader = downloader
self.decompressor = decompressor
self.logger = logging.getLogger(__name__)

def is_locally_available(self, file_name):
return os.path.isfile(file_name)
Expand Down Expand Up @@ -586,6 +588,12 @@ def prepare_document_set(self, document_set, data_root):
else:
raise
if document_set.support_file_offset_table:
if not document_set.source_url:
try:
self.downloader.download(document_set.base_url, None, doc_path + '.offset', None)
except exceptions.DataError as e:
if type(e.cause) == urllib.error.HTTPError and (e.cause.code == 403 or e.cause.code == 404):
self.logger.info("Pre-generated offset file not found, will generate from corpus data")
self.create_file_offset_table(doc_path, document_set.number_of_lines)

def prepare_bundled_document_set(self, document_set, data_root):
Expand Down
29 changes: 17 additions & 12 deletions tests/workload/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,12 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size,
# after download uncompressed file still does not exist (in main loop)
# after download compressed file exists (in main loop)
# after decompression, uncompressed file exists
is_file.side_effect = [False, False, True, False, True, True, True]
is_file.side_effect = [False, False, True, False, True, True, True, True]
# compressed file size is 200 after download
# compressed file size is 200 after download (in main loop)
# uncompressed file size is 2000 after decompression
# uncompressed file size is 2000 after decompression (in main loop)
get_size.side_effect = [200, 200, 2000, 2000]
get_size.side_effect = [200, 200, 2000, 2000, None]

prepare_file_offset_table.return_value = 5

Expand All @@ -285,8 +285,11 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size,

ensure_dir.assert_called_with("/tmp")
decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp")
download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2",
"/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY)
calls = [ mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2",
"/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY),
mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.offset",
"/tmp/docs.json.offset", None, progress_indicator=mock.ANY) ]
download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
Expand All @@ -303,12 +306,12 @@ def test_download_document_archive_with_source_url_compressed(self, is_file, get
# after download uncompressed file still does not exist (in main loop)
# after download compressed file exists (in main loop)
# after decompression, uncompressed file exists
is_file.side_effect = [False, False, True, False, True, True, True]
is_file.side_effect = [False, False, True, False, True, True, True, True]
# compressed file size is 200 after download
# compressed file size is 200 after download (in main loop)
# uncompressed file size is 2000 after decompression
# uncompressed file size is 2000 after decompression (in main loop)
get_size.side_effect = [200, 200, 2000, 2000]
get_size.side_effect = [200, 200, 2000, 2000, None]

prepare_file_offset_table.return_value = 5

Expand Down Expand Up @@ -381,7 +384,7 @@ def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size,
# uncompressed file does not exist
# after download uncompressed file exists
# after download uncompressed file exists (main loop)
is_file.side_effect = [False, True, True]
is_file.side_effect = [False, True, True, True]
# uncompressed file size is 2000
get_size.return_value = 2000
scheme = random.choice(["http", "https", "s3", "gs"])
Expand All @@ -403,8 +406,9 @@ def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size,
data_root="/tmp")

ensure_dir.assert_called_with("/tmp")
download.assert_called_with(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json",
"/tmp/docs.json", 2000, progress_indicator=mock.ANY)
calls = [ mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY),
mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json.offset", "/tmp/docs.json.offset", None, progress_indicator=mock.ANY) ]
download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
Expand All @@ -416,7 +420,7 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en
# uncompressed file does not exist
# after download uncompressed file exists
# after download uncompressed file exists (main loop)
is_file.side_effect = [False, True, True]
is_file.side_effect = [False, True, True, True]
# uncompressed file size is 2000
get_size.return_value = 2000

Expand All @@ -437,8 +441,9 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en
data_root="/tmp")

ensure_dir.assert_called_with("/tmp")
download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json",
"/tmp/docs.json", 2000, progress_indicator=mock.ANY)
calls = [ mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY),
mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.offset", "/tmp/docs.json.offset", None, progress_indicator=mock.ANY) ]
download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")

@mock.patch("osbenchmark.utils.net.download")
Expand Down

0 comments on commit 8aac123

Please sign in to comment.