diff --git a/osbenchmark/workload_generator/extractors.py b/osbenchmark/workload_generator/extractors.py index 2253b7f3f..ef435f995 100644 --- a/osbenchmark/workload_generator/extractors.py +++ b/osbenchmark/workload_generator/extractors.py @@ -129,7 +129,7 @@ def is_valid(self, index_name): class CorpusExtractor(ABC): @abstractmethod - def extract_documents(self): + def extract_documents(self, index, documents_limit=None): pass @@ -158,7 +158,8 @@ def extract_documents(self, index, documents_limit=None): Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``. :param index: Name of index to dump - :param documents_limit: The number of documents to extract. Must be equal to or less than the total number of documents that exists in the index + :param documents_limit: The number of documents to extract. Must be equal + to or less than the total number of documents that exists in the index :return: dict of properties describing the corpus for templates """ @@ -172,7 +173,12 @@ def extract_documents(self, index, documents_limit=None): logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", total_documents, index, documents_to_extract) docs_path = self._get_doc_outpath(self.custom_workload.workload_path, index) # Create test mode corpora - self.dump_documents(self.client, index, self._get_doc_outpath(self.custom_workload.workload_path, index, "-1k"), min(documents_to_extract, 1000), " for test mode") + self.dump_documents( + self.client, + index, + self._get_doc_outpath(self.custom_workload.workload_path, index, "-1k"), + min(documents_to_extract, 1000), + " for test mode") # Create full corpora self.dump_documents(self.client, index, docs_path, documents_to_extract) @@ -215,4 +221,3 @@ def render_progress(self, progress, progress_message_suffix, index, cur, total, msg = f"Extracting documents for index [{index}]{progress_message_suffix}..." percent = (cur * 100) / total progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]") - diff --git a/osbenchmark/workload_generator/helpers.py b/osbenchmark/workload_generator/helpers.py index 59e1ab4fe..4660749c2 100644 --- a/osbenchmark/workload_generator/helpers.py +++ b/osbenchmark/workload_generator/helpers.py @@ -11,11 +11,10 @@ import logging import shutil -from opensearchpy import OpenSearchException from jinja2 import Environment, FileSystemLoader, select_autoescape -from osbenchmark import PROGRAM_NAME, exceptions -from osbenchmark.utils import io, opts, console +from osbenchmark import exceptions +from osbenchmark.utils import io from osbenchmark.workload_generator.config import CustomWorkload, Index BASE_WORKLOAD = "base-workload" @@ -31,7 +30,9 @@ def __init__(self, custom_workload: CustomWorkload, templates_path: str): self.custom_workload = custom_workload self.templates_path = templates_path - self.custom_workload.workload_path = os.path.abspath(os.path.join(io.normalize_path(self.custom_workload.root_path), self.custom_workload.workload_name)) + self.custom_workload.workload_path = os.path.abspath( + os.path.join(io.normalize_path(self.custom_workload.root_path), + self.custom_workload.workload_name)) self.custom_workload.operations_path = os.path.join(self.custom_workload.workload_path, "operations") self.custom_workload.test_procedures_path = os.path.join(self.custom_workload.workload_path, "test_procedures") self.logger = logging.getLogger(__name__) @@ -39,10 +40,12 @@ def __init__(self, custom_workload: CustomWorkload, templates_path: str): def make_workload_directory(self): if os.path.exists(self.custom_workload.workload_path): try: - self.logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", self.custom_workload.workload_name, self.custom_workload.workload_path) + self.logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", + self.custom_workload.workload_name, self.custom_workload.workload_path) shutil.rmtree(self.custom_workload.workload_path) except OSError: - self.logger.error("Had issues removing existing workload [%s] in path [%s]", self.custom_workload.workload_name, self.custom_workload.workload_path) + self.logger.error("Had issues removing existing workload [%s] in path [%s]", + self.custom_workload.workload_name, self.custom_workload.workload_path) io.ensure_dir(self.custom_workload.workload_path) io.ensure_dir(self.custom_workload.operations_path) @@ -124,4 +127,3 @@ def validate_index_documents_map(indices, indices_docs_map): "Index from : pair was not found in --indices. " + "Ensure that indices from all : pairs exist in --indices." ) - diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index b402eb1bd..ed204b8fc 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -11,7 +11,7 @@ from osbenchmark import PROGRAM_NAME, exceptions from osbenchmark.client import OsClientFactory -from osbenchmark.workload_generator.config import CustomWorkload, Index +from osbenchmark.workload_generator.config import CustomWorkload from osbenchmark.workload_generator.helpers import QueryProcessor, CustomWorkloadWriter, process_indices, validate_index_documents_map from osbenchmark.workload_generator.extractors import IndexExtractor, SequentialCorpusExtractor from osbenchmark.utils import io, opts, console diff --git a/tests/workload_generator/corpus_test.py b/tests/workload_generator/corpus_test.py index a3840f769..d2a28bbf5 100644 --- a/tests/workload_generator/corpus_test.py +++ b/tests/workload_generator/corpus_test.py @@ -23,20 +23,23 @@ # under the License. import json from unittest import mock, TestCase -from unittest.mock import call, Mock, patch +from unittest.mock import call, Mock from osbenchmark.workload_generator.config import CustomWorkload from osbenchmark.workload_generator.extractors import SequentialCorpusExtractor class TestSequentialCorpusExtractor(TestCase): - @mock.patch("opensearchpy.OpenSearch") - def setUp(self, client): + def setUp(self): self.mock_custom_workload = Mock(spec=CustomWorkload) self.mock_custom_workload.workload_path = "/abs/outpath/to/workloads/" - self.mock_client = client + self.mock_client = self.create_mock_client() self.corpus_extractor = SequentialCorpusExtractor(self.mock_custom_workload, self.mock_client) + @mock.patch("opensearchpy.OpenSearch") + def create_mock_client(self, client): + return client + def serialize_doc(self, doc): return (json.dumps(doc, separators=(",", ":")) + "\n").encode("utf-8") @@ -81,7 +84,6 @@ def set_corp_size(*args, **kwargs): self.mock_client.scroll.return_value = {} index = "test" - outdir = "/abs/outpath/to/workloads/" with mock.patch("os.stat") as osstat: osstat.side_effect = set_corp_size diff --git a/tests/workload_generator/index_test.py b/tests/workload_generator/index_test.py index 05ac09c67..7c6f10b07 100644 --- a/tests/workload_generator/index_test.py +++ b/tests/workload_generator/index_test.py @@ -30,12 +30,15 @@ class TestIndexExtractor(TestCase): - @patch("opensearchpy.OpenSearch") - def setUp(self, client): + def setUp(self): self.mock_custom_workload = Mock(spec=CustomWorkload()) - self.mock_client = client + self.mock_client = self.create_mock_client() self.index_extractor = IndexExtractor(self.mock_custom_workload, self.mock_client) + @patch("opensearchpy.OpenSearch") + def create_mock_client(self, client): + return client + def test_index_setting_filter(self): unfiltered_index_settings = { "number_of_shards": "5",