diff --git a/it/__init__.py b/it/__init__.py index 28005c2ea..feeda06db 100644 --- a/it/__init__.py +++ b/it/__init__.py @@ -29,6 +29,7 @@ import random import socket import time +import datetime import pytest @@ -87,7 +88,7 @@ def osbenchmark(cfg, command_line): These commands may have different CLI options than test_execution. """ cmd = osbenchmark_command_line_for(cfg, command_line) - print("\nInvoking OSB:", cmd) + print(f'\n{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")} Invoking OSB: {cmd}') err, retcode = process.run_subprocess_with_stderr(cmd) if retcode != 0: print(err) diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index 73d2c284e..ce75d1236 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -464,7 +464,7 @@ def __init__(self, offline, test_mode): self.test_mode = test_mode self.logger = logging.getLogger(__name__) - def download(self, base_url, target_path, size_in_bytes): + def download(self, base_url, source_url, target_path, size_in_bytes): file_name = os.path.basename(target_path) if not base_url: @@ -472,12 +472,15 @@ def download(self, base_url, target_path, size_in_bytes): if self.offline: raise exceptions.SystemSetupError(f"Cannot find [{target_path}]. Please disable offline mode and retry.") - if base_url.endswith("/"): - separator = "" + if source_url: + data_url = source_url else: - separator = "/" - # join manually as `urllib.parse.urljoin` does not work with S3 or GS URL schemes. - data_url = f"{base_url}{separator}{file_name}" + if base_url.endswith("/"): + separator = "" + else: + separator = "/" + # join manually as `urllib.parse.urljoin` does not work with S3 or GS URL schemes. + data_url = f"{base_url}{separator}{file_name}" try: io.ensure_dir(os.path.dirname(target_path)) if size_in_bytes: @@ -573,7 +576,7 @@ def prepare_document_set(self, document_set, data_root): raise exceptions.BenchmarkAssertionError(f"Workload {self.workload_name} specifies documents but no corpus") try: - self.downloader.download(document_set.base_url, target_path, expected_size) + self.downloader.download(document_set.base_url, document_set.source_url, target_path, expected_size) except exceptions.DataError as e: if e.message == "Cannot download data because no base URL is provided." and \ self.is_locally_available(target_path): @@ -1489,6 +1492,7 @@ def _create_corpora(self, corpora_specs, indices, data_streams): source_format = self._r(doc_spec, "source-format", mandatory=False, default_value=default_source_format) if source_format in workload.Documents.SUPPORTED_SOURCE_FORMAT: + source_url = self._r(doc_spec, "source-url", mandatory=False) docs = self._r(doc_spec, "source-file") if io.is_archive(docs): document_archive = docs @@ -1541,6 +1545,7 @@ def _create_corpora(self, corpora_specs, indices, data_streams): document_file=document_file, document_archive=document_archive, base_url=base_url, + source_url=source_url, includes_action_and_meta_data=includes_action_and_meta_data, number_of_documents=num_docs, compressed_size_in_bytes=compressed_bytes, diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 0d57da3aa..a047cc667 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -190,7 +190,7 @@ class Documents: SOURCE_FORMAT_BIG_ANN = "big-ann" SUPPORTED_SOURCE_FORMAT = [SOURCE_FORMAT_BULK, SOURCE_FORMAT_HDF5, SOURCE_FORMAT_BIG_ANN] - def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, + def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, source_url=None, includes_action_and_meta_data=False, number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None, target_data_stream=None, target_type=None, meta_data=None): @@ -201,7 +201,8 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas just need a mapping but no documents) :param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for percolation we just need a mapping but no documents) - :param base_url: The URL from which to load data if they are not available locally. Optional. + :param base_url: The URL from which to load data if they are not available locally. Excludes the file or object name. Optional. + :param source_url: The full URL to the file or object from which to load data if not available locally. Optional. :param includes_action_and_meta_data: True, if the source file already includes the action and meta-data line. False, if it only contains documents. :param number_of_documents: The number of documents @@ -224,6 +225,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas self.document_file = document_file self.document_archive = document_archive self.base_url = base_url + self.source_url = source_url self.includes_action_and_meta_data = includes_action_and_meta_data self._number_of_documents = number_of_documents self._compressed_size_in_bytes = compressed_size_in_bytes @@ -295,18 +297,18 @@ def __repr__(self): def __hash__(self): return hash(self.source_format) ^ hash(self.document_file) ^ hash(self.document_archive) ^ hash(self.base_url) ^ \ - hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ hash(self.compressed_size_in_bytes) ^ \ - hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ hash(self.target_data_stream) ^ hash(self.target_type) ^ \ - hash(frozenset(self.meta_data.items())) + hash(self.source_url) ^ hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ \ + hash(self.compressed_size_in_bytes) ^ hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ \ + hash(self.target_data_stream) ^ hash(self.target_type) ^ hash(frozenset(self.meta_data.items())) def __eq__(self, othr): return (isinstance(othr, type(self)) and - (self.source_format, self.document_file, self.document_archive, self.base_url, self.includes_action_and_meta_data, - self.number_of_documents, self.compressed_size_in_bytes, self.uncompressed_size_in_bytes, - self.target_type, self.target_data_stream, self.target_type, self.meta_data) == - (othr.source_format, othr.document_file, othr.document_archive, othr.base_url, othr.includes_action_and_meta_data, - othr.number_of_documents, othr.compressed_size_in_bytes, othr.uncompressed_size_in_bytes, - othr.target_type, othr.target_data_stream, othr.target_type, othr.meta_data)) + (self.source_format, self.document_file, self.document_archive, self.base_url, self.source_url, + self.includes_action_and_meta_data, self.number_of_documents, self.compressed_size_in_bytes, + self.uncompressed_size_in_bytes, self.target_type, self.target_data_stream, self.target_type, self.meta_data) == + (othr.source_format, othr.document_file, othr.document_archive, othr.base_url, self.source_url, + othr.includes_action_and_meta_data, othr.number_of_documents, othr.compressed_size_in_bytes, + othr.uncompressed_size_in_bytes, othr.target_type, othr.target_data_stream, othr.target_type, othr.meta_data)) class DocumentCorpus: diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index 6696814bd..fcf40d5f1 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -235,7 +235,7 @@ def test_raise_error_if_compressed_does_not_contain_expected_document_file(self, with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.elasticsearch.org/corpora/unit-test", + base_url="http://benchmarks.opensearch.org/corpora/unit-test", document_file="docs.json", document_archive="docs.json.bz2", number_of_documents=5, @@ -275,7 +275,7 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size, decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.elasticsearch.org/corpora/unit-test", + base_url="http://benchmarks.opensearch.org/corpora/unit-test", document_file="docs.json", document_archive="docs.json.bz2", number_of_documents=5, @@ -285,10 +285,91 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size, ensure_dir.assert_called_with("/tmp") decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") - download.assert_called_with("http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json.bz2", + download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY) prepare_file_offset_table.assert_called_with("/tmp/docs.json") + @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") + @mock.patch("osbenchmark.utils.io.decompress") + @mock.patch("osbenchmark.utils.net.download") + @mock.patch("osbenchmark.utils.io.ensure_dir") + @mock.patch("os.path.getsize") + @mock.patch("os.path.isfile") + def test_download_document_archive_with_source_url_compressed(self, is_file, get_size, ensure_dir, download, decompress, + prepare_file_offset_table): + # uncompressed file does not exist + # compressed file does not exist + # after download compressed file exists + # after download uncompressed file still does not exist (in main loop) + # after download compressed file exists (in main loop) + # after decompression, uncompressed file exists + is_file.side_effect = [False, False, True, False, True, True, True] + # compressed file size is 200 after download + # compressed file size is 200 after download (in main loop) + # uncompressed file size is 2000 after decompression + # uncompressed file size is 2000 after decompression (in main loop) + get_size.side_effect = [200, 200, 2000, 2000] + + prepare_file_offset_table.return_value = 5 + + p = loader.DocumentSetPreparator(workload_name="unit-test", + downloader=loader.Downloader(offline=False, test_mode=False), + decompressor=loader.Decompressor()) + + p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, + base_url="http://benchmarks.opensearch.org/corpora", + source_url="http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), + data_root="/tmp") + + ensure_dir.assert_called_with("/tmp") + decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") + download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", + "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY) + prepare_file_offset_table.assert_called_with("/tmp/docs.json") + + @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") + @mock.patch("osbenchmark.utils.io.decompress") + @mock.patch("osbenchmark.utils.net.download") + @mock.patch("osbenchmark.utils.io.ensure_dir") + @mock.patch("os.path.getsize") + @mock.patch("os.path.isfile") + def test_download_document_with_source_url_uncompressed(self, is_file, get_size, ensure_dir, download, decompress, + prepare_file_offset_table): + # uncompressed file does not exist + # after download uncompressed file exists + # after download uncompressed file exists (main loop) + is_file.side_effect = [False, True, True] + # uncompressed file size is 2000 + get_size.return_value = 2000 + scheme = random.choice(["http", "https", "s3", "gs"]) + + prepare_file_offset_table.return_value = 5 + + p = loader.DocumentSetPreparator(workload_name="unit-test", + downloader=loader.Downloader(offline=False, test_mode=False), + decompressor=loader.Decompressor()) + + p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, + source_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", + base_url=f"{scheme}://benchmarks.opensearch.org/corpora/", + document_file="docs.json", + # --> We don't provide a document archive here <-- + document_archive=None, + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), + data_root="/tmp") + + ensure_dir.assert_called_with("/tmp") + download.assert_called_with(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", + "/tmp/docs.json", 2000, progress_indicator=mock.ANY) + prepare_file_offset_table.assert_called_with("/tmp/docs.json") + @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") @mock.patch("osbenchmark.utils.io.decompress") @mock.patch("osbenchmark.utils.net.download") @@ -312,7 +393,7 @@ def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size, decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url=f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/", + base_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/", document_file="docs.json", # --> We don't provide a document archive here <-- document_archive=None, @@ -322,7 +403,7 @@ def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size, data_root="/tmp") ensure_dir.assert_called_with("/tmp") - download.assert_called_with(f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", + download.assert_called_with(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY) prepare_file_offset_table.assert_called_with("/tmp/docs.json") @@ -346,7 +427,7 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.elasticsearch.org/corpora/unit-test", + base_url="http://benchmarks.opensearch.org/corpora/unit-test", document_file="docs.json", # --> We don't provide a document archive here <-- document_archive=None, @@ -356,7 +437,7 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en data_root="/tmp") ensure_dir.assert_called_with("/tmp") - download.assert_called_with("http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", + download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY) prepare_file_offset_table.assert_called_with("/tmp/docs.json") @@ -373,7 +454,7 @@ def test_raise_download_error_if_offline(self, is_file, ensure_dir, download): with self.assertRaises(exceptions.SystemSetupError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.elasticsearch.org/corpora/unit-test", + base_url="http://benchmarks.opensearch.org/corpora/unit-test", document_file="docs.json", number_of_documents=5, uncompressed_size_in_bytes=2000), @@ -443,7 +524,7 @@ def test_raise_download_error_no_test_mode_file(self, is_file, ensure_dir, downl # uncompressed file does not exist is_file.return_value = False - download.side_effect = urllib.error.HTTPError("http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/unit-test/docs-1k.json", + download.side_effect = urllib.error.HTTPError("http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/unit-test/docs-1k.json", 404, "", None, None) p = loader.DocumentSetPreparator(workload_name="unit-test", @@ -452,7 +533,7 @@ def test_raise_download_error_no_test_mode_file(self, is_file, ensure_dir, downl with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.elasticsearch.org/corpora/unit-test", + base_url="http://benchmarks.opensearch.org/corpora/unit-test", document_file="docs-1k.json", number_of_documents=5, uncompressed_size_in_bytes=None), @@ -462,7 +543,7 @@ def test_raise_download_error_no_test_mode_file(self, is_file, ensure_dir, downl "test mode and retry.", ctx.exception.args[0]) ensure_dir.assert_called_with("/tmp") - download.assert_called_with("http://benchmarks.elasticsearch.org/corpora/unit-test/docs-1k.json", + download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs-1k.json", "/tmp/docs-1k.json", None, progress_indicator=mock.ANY) @mock.patch("osbenchmark.utils.net.download") @@ -472,7 +553,7 @@ def test_raise_download_error_on_connection_problems(self, is_file, ensure_dir, # uncompressed file does not exist is_file.return_value = False - download.side_effect = urllib.error.HTTPError("http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", + download.side_effect = urllib.error.HTTPError("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", 500, "Internal Server Error", None, None) p = loader.DocumentSetPreparator(workload_name="unit-test", @@ -481,17 +562,17 @@ def test_raise_download_error_on_connection_problems(self, is_file, ensure_dir, with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.elasticsearch.org/corpora/unit-test", + base_url="http://benchmarks.opensearch.org/corpora/unit-test", document_file="docs.json", number_of_documents=5, uncompressed_size_in_bytes=2000), data_root="/tmp") - self.assertEqual("Could not download [http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json] " + self.assertEqual("Could not download [http://benchmarks.opensearch.org/corpora/unit-test/docs.json] " "to [/tmp/docs.json] (HTTP status: 500, reason: Internal Server Error)", ctx.exception.args[0]) ensure_dir.assert_called_with("/tmp") - download.assert_called_with("http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", + download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY) @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") @@ -765,7 +846,7 @@ def test_entrypoint_of_replace_includes(self, patched_read_glob, patched_dirname { "version": 2, "description": "unittest workload", - "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "data-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", "indices": [ { "name": "geonames", @@ -775,7 +856,7 @@ def test_entrypoint_of_replace_includes(self, patched_read_glob, patched_dirname "corpora": [ { "name": "geonames", - "base-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "base-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", "documents": [ { "source-file": "documents-2.json.bz2", @@ -809,7 +890,7 @@ def dummy_read_glob(c): { "version": 2, "description": "unittest workload", - "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "data-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", "indices": [ { "name": "geonames", @@ -819,7 +900,7 @@ def dummy_read_glob(c): "corpora": [ { "name": "geonames", - "base-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "base-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", "documents": [ { "source-file": "documents-2.json.bz2",