From adc9da148fd7253ec61a5727cf7ea8aaa38a2781 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 19 Jun 2024 15:02:43 -0700 Subject: [PATCH 01/10] added TrainKNN runner + tests Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 69 ++++- osbenchmark/workload/workload.py | 9 +- tests/worker_coordinator/runner_test.py | 353 ++++++++++------------- 3 files changed, 218 insertions(+), 213 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index d280ddffd..495c95108 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -105,8 +105,7 @@ def register_default_runners(): register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True) register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) - register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) - register_runner(workload.OperationType.DeleteKnnModel, Retry(DeleteKnnModel()), async_runner=True) + register_runner(workload.OperationType.TrainKNNModel, Retry(TrainKNNModel()), async_runner=True) def runner_for(operation_type): try: @@ -652,6 +651,72 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" +class TrainKNNModel(Runner): + """ + Trains model named model_id until training is complete or timeout is exceeded. + """ + + NAME = "train-knn-model" + + async def __call__(self, opensearch, params): + """ + Create and train one model named model_id. + + :param opensearch: The OpenSearch client. + :param params: A hash with all parameters. See below for details. + :return: A hash with meta data for this bulk operation. See below for details. + :raises: Exception if training fails, times out, or a different error occurs. + It expects a parameter dict with the following mandatory keys: + + * ``body``: containing parameters to pass on to the train engine. + See https://opensearch.org/docs/latest/search-plugins/knn/api/#train-a-model for information. + * ``retries``: Maximum number of retries allowed for the training to complete (seconds). + * ``polling-interval``: Polling interval to see if the model has been trained yet (seconds). + * ``model_id``: ID of the model to train. + """ + body = params["body"] + model_id = parse_string_parameter("model_id", params) + max_retries = parse_int_parameter("retries", params) + poll_period = parse_float_parameter("poll_period", params) + + method = "POST" + model_uri = "/_plugins/_knn/models/{}".format(model_id) + + await opensearch.transport.perform_request(method, "{}/_train".format(model_uri), body=body) + + current_number_retries = 0 + while True: + model_response = await opensearch.transport.perform_request("GET", model_uri) + + if 'state' not in model_response.keys() or current_number_retries > max_retries: + request_context_holder.on_client_request_end() + self.logger.error(f"Failed to create model {model_id} within {max_retries} retries.") + raise Exception('Failed to create model: {} within {} retries' + .format(model_id, max_retries)) + + if model_response['state'] == 'training': + current_number_retries += 1 + await asyncio.sleep(poll_period) + continue + + request_context_holder.on_client_request_end() # at this point, training either failed or finished. + if model_response['state'] == 'created': + self.logger.info(f"Training model {model_id} was completed successfully.") + break + else: + # training failed. + self.logger.error(f"Training for model {model_id} failed. Response: {model_response}") + raise Exception("Failed to create model: {}".format(model_response)) + + def inspect_model_response(model_response): + if model_response['state'] == 'created': + return 1, "models_trained" + + if model_response['state'] == 'failed': + raise Exception("Failed to create model: {}".format(model_response)) + + def __repr__(self, *args, **kwargs): + return self.NAME class DeleteKnnModel(Runner): """ diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 95545fb59..9aaedcc36 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -601,8 +601,7 @@ class OperationType(Enum): ListAllPointInTime = 16 VectorSearch = 17 BulkVectorDataSet = 18 - TrainKnnModel = 19 - DeleteKnnModel = 20 + TrainKNNModel = 19 # administrative actions ForceMerge = 1001 @@ -748,10 +747,8 @@ def from_hyphenated_string(cls, v): return OperationType.RegisterMlModel elif v == "deploy-ml-model": return OperationType.DeployMlModel - elif v == "train-knn-model": - return OperationType.TrainKnnModel - elif v == "delete-knn-model": - return OperationType.DeleteKnnModel + elif v == "train-k-n-n-model": + return OperationType.TrainKNNModel else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 4f4b303b2..5647809cf 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2258,154 +2258,41 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_ opensearch.clear_scroll.assert_not_called() -class DeleteKnnModelRunnerTests(TestCase): +class TrainKNNModelRunnerTests(TestCase): model_id = "test-model-id" - - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": model_id - } - - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_delete_knn_success(self, opensearch, on_client_request_start, on_client_request_end): - response = { - "model_id": "test-model", - "result": "deleted" - } - opensearch.transport.perform_request.return_value = as_future(response) - runner_under_test = runner.DeleteKnnModel() - - async with runner_under_test: - result = await runner_under_test(opensearch, self.request) - - self.assertEqual(True, result["success"]) + retries = 120 + poll_period = 0.5 # seconds @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_knn_404_success_when_ignore_if_model_DNE(self, opensearch, on_client_request_start, on_client_request_end): + async def test_train_success(self, opensearch, sleep, on_client_request_start, on_client_request_end): request = { "index": "unittest", "operation-type": "train-knn-model", "model_id": self.model_id, - "ignore-if-model-does-not-exist": True - } - response = { - "error": { - "root_cause": [ - { - "type": "resource_not_found_exception", - "reason": "Unable to delete model [test-model]. Model does not exist" - } - ], - "type": "resource_not_found_exception", - "reason": "Unable to delete model [test-model]. Model does not exist" - }, - "status": 404 - } - opensearch.transport.perform_request.return_value = as_future(response) - runner_under_test = runner.DeleteKnnModel() - async with runner_under_test: - result = await runner_under_test(opensearch, request) - - self.assertEqual(True, result["success"]) - - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_delete_knn_404_fails_if_model_DNE(self, opensearch, on_client_request_start, on_client_request_end): - response = { - "error": { - "root_cause": [ - { - "type": "resource_not_found_exception", - "reason": "Unable to delete model [test-model]. Model does not exist" - } - ], - "type": "resource_not_found_exception", - "reason": "Unable to delete model [test-model]. Model does not exist" - }, - "status": 404 - } - opensearch.transport.perform_request.return_value = as_future(response) - runner_under_test = runner.DeleteKnnModel() - async with runner_under_test: - result = await runner_under_test(opensearch, self.request) - - self.assertEqual(False, result["success"]) - - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_delete_knn_400(self, opensearch, on_client_request_start, on_client_request_end): - - response = { - "error": { - "root_cause": [ - { - "type": "resource_not_found_exception", - "reason": "Unable to delete model [test-model]. Model does not exist" - } - ], - "type": "resource_not_found_exception", - "reason": "Unable to delete model [test-model]. Model does not exist" - }, - "status": 400 - } - opensearch.transport.perform_request.return_value = as_future(response) - runner_under_test = runner.DeleteKnnModel() - async with runner_under_test: - result = await runner_under_test(opensearch, self.request) - - self.assertEqual(False, result["success"]) - -class TrainKnnModelRunnerTests(TestCase): - model_id = "test-model-id" - retries = 120 - poll_period = 0.5 # seconds - - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": model_id, - "poll_period": poll_period, - "retries": retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name": "ivf", - "engine": "faiss", - "space_type": "l2", - "parameters": { + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { "nlist": 10, - "nprobes": 5 + "nprobes": 5 + } } } } - } - train_status_check_response = { - - 'weight': 1, 'unit': 'ops', 'success': True - } - - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("asyncio.sleep", return_value=as_future()) - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_train_success(self, opensearch, sleep, on_client_request_start, on_client_request_end): train_api_status_response = { "model_id": "1", @@ -2426,62 +2313,55 @@ async def test_train_success(self, opensearch, sleep, on_client_request_start, o } } - train_api_first_mock = as_future(self.train_status_check_response) - train_api_status_mock = as_future(train_api_status_response) + train_status_check_response = { + + 'weight': 1, 'unit': 'ops', 'success': True + } - opensearch.transport.perform_request.side_effect = [ - train_api_first_mock, train_api_status_mock] + train_api_first_mock = as_future(train_status_check_response) + train_api_status_mock =as_future(train_api_status_response) + + opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] - runner_under_test = runner.TrainKnnModel() + runner_under_test = runner.TrainKNNModel() async with runner_under_test: - await runner_under_test(opensearch, self.request) - + result = await runner_under_test(opensearch, request) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async async def test_train_failure(self, opensearch, sleep, on_client_request_start, on_client_request_end): - train_api_status_response = { - "model_id": self.model_id, - "model_blob": "", - "state": "failed", - "timestamp": "2024-06-17T23:03:02.475277Z", - "description": "My model description", - "space_type": "l2", - "dimension": 10, - "engine": "faiss", - "training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w", - "model_definition": { - "name": "ivf", - "parameters": { - "nprobes": 5, - "nlist": 10 + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id":self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } } } } - train_api_first_mock = as_future(self.train_status_check_response) - train_api_status_mock = as_future(train_api_status_response) - - opensearch.transport.perform_request.side_effect = [ - train_api_first_mock, train_api_status_mock] - runner_under_test = runner.TrainKnnModel() - - with self.assertRaisesRegex(Exception, f"Failed to create model {self.model_id}: {train_api_status_response}"): - await runner_under_test(opensearch, self.request) - - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("asyncio.sleep", return_value=as_future()) - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_train_illegal_model_state(self, opensearch, sleep, on_client_request_start, on_client_request_end): - illegal_state = "dummy state that is not supported" train_api_status_response = { "model_id": self.model_id, "model_blob": "", - "state": "dummy state that is not supported", + "state": "failed", "timestamp": "2024-06-17T23:03:02.475277Z", "description": "My model description", "space_type": "l2", @@ -2497,58 +2377,95 @@ async def test_train_illegal_model_state(self, opensearch, sleep, on_client_requ } } - train_api_first_mock = as_future(self.train_status_check_response) - train_api_status_mock = as_future(train_api_status_response) + train_status_check_response = { + + 'weight': 1, 'unit': 'ops', 'success': True + } - opensearch.transport.perform_request.side_effect = [ - train_api_first_mock, train_api_status_mock] - runner_under_test = runner.TrainKnnModel() + train_api_first_mock = as_future(train_status_check_response) + train_api_status_mock =as_future(train_api_status_response) + + opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] - with self.assertRaisesRegex(Exception, - f"Model {self.model_id} in unknown state {illegal_state}, response: {train_api_status_response}"): - await runner_under_test(opensearch, self.request) + + runner_under_test = runner.TrainKNNModel() + + with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"): + await runner_under_test(opensearch, request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_train_error_response(self, opensearch, sleep, on_client_request_start, on_client_request_end): + async def test_train_error_response(self,opensearch, sleep, on_client_request_start, on_client_request_end): error_response = { "error": { - "root_cause": - { - "type": "index_not_found_exception", + "root_cause": + { + "type": "index_not_found_exception", "reason": "no such index [.opensearch-knn-models]", "index": ".opensearch-knn-models", "resource.id": ".opensearch-knn-models", "resource.type": "index_expression", "index_uuid": "_na_" - }, + } + , "type": "index_not_found_exception", "reason": "no such index [.opensearch-knn-models]", "index": ".opensearch-knn-models", "resource.id": ".opensearch-knn-models", "resource.type": "index_expression", "index_uuid": "_na_" - }, - "status": 404 + }, + "status": 404 } - side_effect_list = [ - as_future(self.train_status_check_response), as_future(error_response)] - opensearch.transport.perform_request.side_effect = side_effect_list - runner_under_test = runner.TrainKnnModel() + + train_status_check_response = ({ + + 'weight': 1, 'unit': 'ops', 'success': True + }) + side_effect_list = [as_future(train_status_check_response), as_future(error_response)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKNNModel() + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + with self.assertRaises(Exception): - await runner_under_test(opensearch, self.request) + await runner_under_test(opensearch, request) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_train_timeout(self, opensearch, sleep, on_client_request_start, on_client_request_end): + async def test_train_timeout(self,opensearch, sleep, on_client_request_start, on_client_request_end): + still_training_response = ({ "model_id": self.model_id, @@ -2569,14 +2486,40 @@ async def test_train_timeout(self, opensearch, sleep, on_client_request_start, o } }) - side_effect_list = [as_future(self.train_status_check_response)] + [ - as_future(still_training_response) for _ in range(self.retries + 2)] - opensearch.transport.perform_request.side_effect = side_effect_list - runner_under_test = runner.TrainKnnModel() - - # Set model state = Training. - with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'): - await runner_under_test(opensearch, self.request) + train_status_check_response = ({ + + 'weight': 1, 'unit': 'ops', 'success': True + }) + side_effect_list = [as_future(train_status_check_response)] + [as_future(still_training_response) for _ in range(self.retries + 2)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKNNModel() + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + # Set model state = Training. + with self.assertRaisesRegex(Exception, f'Failed to create model: {self.model_id} within {self.retries} retries'): + await runner_under_test(opensearch, request) class VectorSearchQueryRunnerTests(TestCase): From 7291b5323df80290ec823f906be4134e937f322a Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 19 Jun 2024 17:28:12 -0700 Subject: [PATCH 02/10] Addressed Vijay's feedback, cleaned up tests, added TimeoutError Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 55 +++--- osbenchmark/workload/workload.py | 6 +- tests/worker_coordinator/runner_test.py | 222 +++++++---------------- 3 files changed, 100 insertions(+), 183 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 495c95108..2e4cd3169 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -105,7 +105,7 @@ def register_default_runners(): register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True) register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) - register_runner(workload.OperationType.TrainKNNModel, Retry(TrainKNNModel()), async_runner=True) + register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) def runner_for(operation_type): try: @@ -651,13 +651,14 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" -class TrainKNNModel(Runner): + +class TrainKnnModel(Runner): """ - Trains model named model_id until training is complete or timeout is exceeded. + Trains model named model_id until training is complete or retries are exhausted. """ NAME = "train-knn-model" - + async def __call__(self, opensearch, params): """ Create and train one model named model_id. @@ -680,41 +681,45 @@ async def __call__(self, opensearch, params): poll_period = parse_float_parameter("poll_period", params) method = "POST" - model_uri = "/_plugins/_knn/models/{}".format(model_id) + model_uri = f"/_plugins/_knn/models/{model_id}" + request_context_holder.on_client_request_start() + await opensearch.transport.perform_request(method, f"{model_uri}/_train", body=body) - await opensearch.transport.perform_request(method, "{}/_train".format(model_uri), body=body) - current_number_retries = 0 - while True: + while True: model_response = await opensearch.transport.perform_request("GET", model_uri) - if 'state' not in model_response.keys() or current_number_retries > max_retries: + if 'state' not in model_response.keys(): + request_context_holder.on_client_request_end() + self.logger.error( + "Failed to create model [%s] with error response: [%s]", model_id, model_response) + raise Exception( + f"Failed to create model {model_id} with error response: {model_response}") + + if current_number_retries > max_retries: request_context_holder.on_client_request_end() - self.logger.error(f"Failed to create model {model_id} within {max_retries} retries.") - raise Exception('Failed to create model: {} within {} retries' - .format(model_id, max_retries)) - + self.logger.error( + "Failed to create model [%s] within [%i] retries.", model_id, max_retries) + raise TimeoutError( + f'Failed to create model: {model_id} within {max_retries} retries') + if model_response['state'] == 'training': current_number_retries += 1 await asyncio.sleep(poll_period) continue - request_context_holder.on_client_request_end() # at this point, training either failed or finished. + # at this point, training either failed or finished. + request_context_holder.on_client_request_end() if model_response['state'] == 'created': - self.logger.info(f"Training model {model_id} was completed successfully.") + self.logger.info( + "Training model [%s] was completed successfully.", model_id) break - else: - # training failed. - self.logger.error(f"Training for model {model_id} failed. Response: {model_response}") - raise Exception("Failed to create model: {}".format(model_response)) - def inspect_model_response(model_response): - if model_response['state'] == 'created': - return 1, "models_trained" + # training failed. + self.logger.error( + "Training for model [%s] failed. Response: [%s]", model_id, model_response) + raise Exception(f"Failed to create model: {model_response}") - if model_response['state'] == 'failed': - raise Exception("Failed to create model: {}".format(model_response)) - def __repr__(self, *args, **kwargs): return self.NAME diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 9aaedcc36..5fcee1067 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -601,7 +601,7 @@ class OperationType(Enum): ListAllPointInTime = 16 VectorSearch = 17 BulkVectorDataSet = 18 - TrainKNNModel = 19 + TrainKnnModel = 19 # administrative actions ForceMerge = 1001 @@ -747,8 +747,8 @@ def from_hyphenated_string(cls, v): return OperationType.RegisterMlModel elif v == "deploy-ml-model": return OperationType.DeployMlModel - elif v == "train-k-n-n-model": - return OperationType.TrainKNNModel + elif v == "train-knn-model": + return OperationType.TrainKnnModel else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 5647809cf..3ffe19f1d 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2258,10 +2258,39 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_ opensearch.clear_scroll.assert_not_called() -class TrainKNNModelRunnerTests(TestCase): +class TrainKnnModelRunnerTests(TestCase): model_id = "test-model-id" retries = 120 - poll_period = 0.5 # seconds + poll_period = 0.5 # seconds + + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": model_id, + "poll_period": poll_period, + "retries": retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name": "ivf", + "engine": "faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + train_status_check_response = { + + 'weight': 1, 'unit': 'ops', 'success': True + } @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -2269,30 +2298,6 @@ class TrainKNNModelRunnerTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_train_success(self, opensearch, sleep, on_client_request_start, on_client_request_end): - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } train_api_status_response = { "model_id": "1", @@ -2313,51 +2318,22 @@ async def test_train_success(self, opensearch, sleep, on_client_request_start, o } } - train_status_check_response = { - - 'weight': 1, 'unit': 'ops', 'success': True - } + train_api_first_mock = as_future(self.train_status_check_response) + train_api_status_mock = as_future(train_api_status_response) - train_api_first_mock = as_future(train_status_check_response) - train_api_status_mock =as_future(train_api_status_response) - - opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] + opensearch.transport.perform_request.side_effect = [ + train_api_first_mock, train_api_status_mock] - runner_under_test = runner.TrainKNNModel() + runner_under_test = runner.TrainKnnModel() async with runner_under_test: - result = await runner_under_test(opensearch, request) - + await runner_under_test(opensearch, self.request) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async async def test_train_failure(self, opensearch, sleep, on_client_request_start, on_client_request_end): - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id":self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } - train_api_status_response = { "model_id": self.model_id, "model_blob": "", @@ -2377,95 +2353,57 @@ async def test_train_failure(self, opensearch, sleep, on_client_request_start, o } } - train_status_check_response = { - - 'weight': 1, 'unit': 'ops', 'success': True - } + train_api_first_mock = as_future(self.train_status_check_response) + train_api_status_mock = as_future(train_api_status_response) - train_api_first_mock = as_future(train_status_check_response) - train_api_status_mock =as_future(train_api_status_response) - - opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] + opensearch.transport.perform_request.side_effect = [ + train_api_first_mock, train_api_status_mock] + runner_under_test = runner.TrainKnnModel() - - runner_under_test = runner.TrainKNNModel() - with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"): - await runner_under_test(opensearch, request) + await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_train_error_response(self,opensearch, sleep, on_client_request_start, on_client_request_end): + async def test_train_error_response(self, opensearch, sleep, on_client_request_start, on_client_request_end): error_response = { "error": { - "root_cause": - { - "type": "index_not_found_exception", + "root_cause": + { + "type": "index_not_found_exception", "reason": "no such index [.opensearch-knn-models]", "index": ".opensearch-knn-models", "resource.id": ".opensearch-knn-models", "resource.type": "index_expression", "index_uuid": "_na_" - } - , + }, "type": "index_not_found_exception", "reason": "no such index [.opensearch-knn-models]", "index": ".opensearch-knn-models", "resource.id": ".opensearch-knn-models", "resource.type": "index_expression", "index_uuid": "_na_" - }, - "status": 404 + }, + "status": 404 } + side_effect_list = [ + as_future(self.train_status_check_response), as_future(error_response)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKnnModel() - - train_status_check_response = ({ - - 'weight': 1, 'unit': 'ops', 'success': True - }) - side_effect_list = [as_future(train_status_check_response), as_future(error_response)] - opensearch.transport.perform_request.side_effect = side_effect_list - runner_under_test = runner.TrainKNNModel() - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } - with self.assertRaises(Exception): - await runner_under_test(opensearch, request) - + await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_train_timeout(self,opensearch, sleep, on_client_request_start, on_client_request_end): - + async def test_train_timeout(self, opensearch, sleep, on_client_request_start, on_client_request_end): still_training_response = ({ "model_id": self.model_id, @@ -2486,40 +2424,14 @@ async def test_train_timeout(self,opensearch, sleep, on_client_request_start, on } }) - train_status_check_response = ({ - - 'weight': 1, 'unit': 'ops', 'success': True - }) - side_effect_list = [as_future(train_status_check_response)] + [as_future(still_training_response) for _ in range(self.retries + 2)] - opensearch.transport.perform_request.side_effect = side_effect_list - runner_under_test = runner.TrainKNNModel() - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } - # Set model state = Training. - with self.assertRaisesRegex(Exception, f'Failed to create model: {self.model_id} within {self.retries} retries'): - await runner_under_test(opensearch, request) + side_effect_list = [as_future(self.train_status_check_response)] + [ + as_future(still_training_response) for _ in range(self.retries + 2)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKnnModel() + + # Set model state = Training. + with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'): + await runner_under_test(opensearch, self.request) class VectorSearchQueryRunnerTests(TestCase): From cc31a6ca840fb42f276eb0ab68a30a6703721cc2 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Fri, 21 Jun 2024 14:55:21 -0700 Subject: [PATCH 03/10] Added delete-knn-model runner Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 31 ++++++++++ osbenchmark/worker_coordinator/test.py | 42 ++++++++++++++ osbenchmark/workload/workload.py | 3 + tests/worker_coordinator/runner_test.py | 72 ++++++++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 osbenchmark/worker_coordinator/test.py diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 2e4cd3169..7ab89e551 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -106,6 +106,7 @@ def register_default_runners(): register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) + register_runner(workload.OperationType.DeleteKnnModel, Retry(DeleteKnnModel()), async_runner=True) def runner_for(operation_type): try: @@ -651,7 +652,37 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" +class DeleteKnnModel(Runner): + """ + Deletes the K-NN model named model_id. + """ + NAME = "delete-knn-model" + + async def __call__(self, opensearch, params): + model_id = parse_string_parameter("model_id", params) + + method = "DELETE" + model_uri = f"/_plugins/_knn/models/{model_id}" + request_context_holder.on_client_request_start() + + response = await opensearch.transport.perform_request(method, model_uri) + + request_context_holder.on_client_request_end() + + if "error" in response.keys() and response["status"] == 404: + self.logger.debug("Model [%s] does not already exist, skipping delete.", model_id) + return + + if "error" in response.keys(): + self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) + raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") + + self.logger.debug("Model [%s] deleted successfully.", model_id) + + def __repr__(self, *args, **kwargs): + return self.NAME + class TrainKnnModel(Runner): """ Trains model named model_id until training is complete or retries are exhausted. diff --git a/osbenchmark/worker_coordinator/test.py b/osbenchmark/worker_coordinator/test.py new file mode 100644 index 000000000..215287699 --- /dev/null +++ b/osbenchmark/worker_coordinator/test.py @@ -0,0 +1,42 @@ +import requests +import json + +# Define the endpoint and payload for the first POST request +url_1 = "http://localhost:9200/_plugins/_knn/models/3/_train" +payload_1 = { + "training_index": "train-index", + "training_field": "train_field_name", + "dimension": 10, + "description": "My model description", + "method": { + "name": "ivf", + "engine": "faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } +} + +headers_1 = { + "Content-Type": "application/json" +} + +# Send the first POST request +url_2 = "http://localhost:9200/_plugins/_knn/models/3" + +response_1 = requests.post(url_1, headers=headers_1, data=json.dumps(payload_1)) +# print("First response status:", response_1.status_code) +# print(response_1.text) + +# # Define the endpoint for the second GET request + +# # Send the second GET request +response_2 = requests.get(url_2) +print("Second response status:", response_2.status_code) + +# Print the response from the second request +print("Second response content:") +print(response_2.text) +print(response_2) \ No newline at end of file diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 5fcee1067..95545fb59 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -602,6 +602,7 @@ class OperationType(Enum): VectorSearch = 17 BulkVectorDataSet = 18 TrainKnnModel = 19 + DeleteKnnModel = 20 # administrative actions ForceMerge = 1001 @@ -749,6 +750,8 @@ def from_hyphenated_string(cls, v): return OperationType.DeployMlModel elif v == "train-knn-model": return OperationType.TrainKnnModel + elif v == "delete-knn-model": + return OperationType.DeleteKnnModel else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 3ffe19f1d..82bda9d26 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2258,6 +2258,78 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_ opensearch.clear_scroll.assert_not_called() +class DeleteKnnModelRunnerTests(TestCase): + model_id = "test-model-id" + + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": model_id + } + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_success(self, opensearch, on_client_request_start, on_client_request_end): + response = { + "model_id": "test-model", + "result": "deleted" + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: + await runner_under_test(opensearch, self.request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_404(self, opensearch, on_client_request_start, on_client_request_end): + + response = { + "error": { + "root_cause": [ + { + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + } + ], + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + }, + "status": 404 + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: + await runner_under_test(opensearch, self.request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_400(self, opensearch, on_client_request_start, on_client_request_end): + + response = { + "error": { + "root_cause": [ + { + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + } + ], + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + }, + "status": 400 + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + with self.assertRaises(Exception): + await runner_under_test(opensearch, self.request) + + class TrainKnnModelRunnerTests(TestCase): model_id = "test-model-id" retries = 120 From 2dfea044e2915e03038f5302bcf378276d7276f1 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Fri, 21 Jun 2024 16:51:55 -0700 Subject: [PATCH 04/10] Addressed Vijay feedback #2 Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 17 ++++++---- osbenchmark/worker_coordinator/test.py | 42 ------------------------ tests/worker_coordinator/runner_test.py | 39 +++++++++++++++++++++- 3 files changed, 48 insertions(+), 50 deletions(-) delete mode 100644 osbenchmark/worker_coordinator/test.py diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 7ab89e551..737b37265 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -673,11 +673,11 @@ async def __call__(self, opensearch, params): if "error" in response.keys() and response["status"] == 404: self.logger.debug("Model [%s] does not already exist, skipping delete.", model_id) return - + if "error" in response.keys(): self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") - + self.logger.debug("Model [%s] deleted successfully.", model_id) def __repr__(self, *args, **kwargs): @@ -744,12 +744,15 @@ async def __call__(self, opensearch, params): if model_response['state'] == 'created': self.logger.info( "Training model [%s] was completed successfully.", model_id) - break + return - # training failed. - self.logger.error( - "Training for model [%s] failed. Response: [%s]", model_id, model_response) - raise Exception(f"Failed to create model: {model_response}") + if model_response['state'] == 'failed': + self.logger.error( + "Training for model [%s] failed. Response: [%s]", model_id, model_response) + raise Exception(f"Failed to create model {model_id}: {model_response}") + + self.logger.error("Model [%s] in unknown state [%s], response: [%s]", model_id, model_response["state"], model_response) + raise Exception(f"Model {model_id} in unknown state {model_response['state']}, response: {model_response}") def __repr__(self, *args, **kwargs): return self.NAME diff --git a/osbenchmark/worker_coordinator/test.py b/osbenchmark/worker_coordinator/test.py deleted file mode 100644 index 215287699..000000000 --- a/osbenchmark/worker_coordinator/test.py +++ /dev/null @@ -1,42 +0,0 @@ -import requests -import json - -# Define the endpoint and payload for the first POST request -url_1 = "http://localhost:9200/_plugins/_knn/models/3/_train" -payload_1 = { - "training_index": "train-index", - "training_field": "train_field_name", - "dimension": 10, - "description": "My model description", - "method": { - "name": "ivf", - "engine": "faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } -} - -headers_1 = { - "Content-Type": "application/json" -} - -# Send the first POST request -url_2 = "http://localhost:9200/_plugins/_knn/models/3" - -response_1 = requests.post(url_1, headers=headers_1, data=json.dumps(payload_1)) -# print("First response status:", response_1.status_code) -# print(response_1.text) - -# # Define the endpoint for the second GET request - -# # Send the second GET request -response_2 = requests.get(url_2) -print("Second response status:", response_2.status_code) - -# Print the response from the second request -print("Second response content:") -print(response_2.text) -print(response_2) \ No newline at end of file diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 82bda9d26..6cbaacabe 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2432,7 +2432,44 @@ async def test_train_failure(self, opensearch, sleep, on_client_request_start, o train_api_first_mock, train_api_status_mock] runner_under_test = runner.TrainKnnModel() - with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"): + with self.assertRaisesRegex(Exception, f"Failed to create model {self.model_id}: {train_api_status_response}"): + await runner_under_test(opensearch, self.request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_train_illegal_model_state(self, opensearch, sleep, on_client_request_start, on_client_request_end): + illegal_state = "dummy state that is not supported" + train_api_status_response = { + "model_id": self.model_id, + "model_blob": "", + "state": "dummy state that is not supported", + "timestamp": "2024-06-17T23:03:02.475277Z", + "description": "My model description", + "space_type": "l2", + "dimension": 10, + "engine": "faiss", + "training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w", + "model_definition": { + "name": "ivf", + "parameters": { + "nprobes": 5, + "nlist": 10 + } + } + } + + train_api_first_mock = as_future(self.train_status_check_response) + train_api_status_mock = as_future(train_api_status_response) + + opensearch.transport.perform_request.side_effect = [ + train_api_first_mock, train_api_status_mock] + runner_under_test = runner.TrainKnnModel() + + with self.assertRaisesRegex(Exception, + f"Model {self.model_id} in unknown state {illegal_state}, response: {train_api_status_response}"): await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') From a56dee74f7d6d07ef28bb83f4ed24d360eee91e5 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Mon, 24 Jun 2024 16:57:25 -0700 Subject: [PATCH 05/10] Added request ignore on 404; removed local testing file Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 737b37265..148ede34e 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -666,15 +666,12 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_start() - response = await opensearch.transport.perform_request(method, model_uri) + # 404 indicates the model has not been created. + response = await opensearch.transport.perform_request(method, model_uri, params={"ignore": [404]}) request_context_holder.on_client_request_end() - - if "error" in response.keys() and response["status"] == 404: - self.logger.debug("Model [%s] does not already exist, skipping delete.", model_id) - return - - if "error" in response.keys(): + + if "error" in response.keys() and response["status"] != 404: self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") @@ -682,7 +679,7 @@ async def __call__(self, opensearch, params): def __repr__(self, *args, **kwargs): return self.NAME - + class TrainKnnModel(Runner): """ Trains model named model_id until training is complete or retries are exhausted. From 2c99fb056cf83cc03d5ba186cc9a4a0da31aa47c Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Tue, 25 Jun 2024 15:37:59 -0700 Subject: [PATCH 06/10] Fix linting errors Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 165 ++--------------------- tests/worker_coordinator/runner_test.py | 4 +- 2 files changed, 13 insertions(+), 156 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 148ede34e..65045d392 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -652,107 +652,6 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" -class DeleteKnnModel(Runner): - """ - Deletes the K-NN model named model_id. - """ - NAME = "delete-knn-model" - - async def __call__(self, opensearch, params): - model_id = parse_string_parameter("model_id", params) - - method = "DELETE" - model_uri = f"/_plugins/_knn/models/{model_id}" - - request_context_holder.on_client_request_start() - - # 404 indicates the model has not been created. - response = await opensearch.transport.perform_request(method, model_uri, params={"ignore": [404]}) - - request_context_holder.on_client_request_end() - - if "error" in response.keys() and response["status"] != 404: - self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) - raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") - - self.logger.debug("Model [%s] deleted successfully.", model_id) - - def __repr__(self, *args, **kwargs): - return self.NAME - -class TrainKnnModel(Runner): - """ - Trains model named model_id until training is complete or retries are exhausted. - """ - - NAME = "train-knn-model" - - async def __call__(self, opensearch, params): - """ - Create and train one model named model_id. - - :param opensearch: The OpenSearch client. - :param params: A hash with all parameters. See below for details. - :return: A hash with meta data for this bulk operation. See below for details. - :raises: Exception if training fails, times out, or a different error occurs. - It expects a parameter dict with the following mandatory keys: - - * ``body``: containing parameters to pass on to the train engine. - See https://opensearch.org/docs/latest/search-plugins/knn/api/#train-a-model for information. - * ``retries``: Maximum number of retries allowed for the training to complete (seconds). - * ``polling-interval``: Polling interval to see if the model has been trained yet (seconds). - * ``model_id``: ID of the model to train. - """ - body = params["body"] - model_id = parse_string_parameter("model_id", params) - max_retries = parse_int_parameter("retries", params) - poll_period = parse_float_parameter("poll_period", params) - - method = "POST" - model_uri = f"/_plugins/_knn/models/{model_id}" - request_context_holder.on_client_request_start() - await opensearch.transport.perform_request(method, f"{model_uri}/_train", body=body) - - current_number_retries = 0 - while True: - model_response = await opensearch.transport.perform_request("GET", model_uri) - - if 'state' not in model_response.keys(): - request_context_holder.on_client_request_end() - self.logger.error( - "Failed to create model [%s] with error response: [%s]", model_id, model_response) - raise Exception( - f"Failed to create model {model_id} with error response: {model_response}") - - if current_number_retries > max_retries: - request_context_holder.on_client_request_end() - self.logger.error( - "Failed to create model [%s] within [%i] retries.", model_id, max_retries) - raise TimeoutError( - f'Failed to create model: {model_id} within {max_retries} retries') - - if model_response['state'] == 'training': - current_number_retries += 1 - await asyncio.sleep(poll_period) - continue - - # at this point, training either failed or finished. - request_context_holder.on_client_request_end() - if model_response['state'] == 'created': - self.logger.info( - "Training model [%s] was completed successfully.", model_id) - return - - if model_response['state'] == 'failed': - self.logger.error( - "Training for model [%s] failed. Response: [%s]", model_id, model_response) - raise Exception(f"Failed to create model {model_id}: {model_response}") - - self.logger.error("Model [%s] in unknown state [%s], response: [%s]", model_id, model_response["state"], model_response) - raise Exception(f"Model {model_id} in unknown state {model_response['state']}, response: {model_response}") - - def __repr__(self, *args, **kwargs): - return self.NAME class DeleteKnnModel(Runner): """ @@ -760,71 +659,33 @@ class DeleteKnnModel(Runner): """ NAME = "delete-knn-model" - MODEL_DOES_NOT_EXIST_STATUS_CODE = 404 async def __call__(self, opensearch, params): model_id = parse_string_parameter("model_id", params) - ignore_if_model_does_not_exist = params.get( - "ignore-if-model-does-not-exist", False - ) method = "DELETE" model_uri = f"/_plugins/_knn/models/{model_id}" request_context_holder.on_client_request_start() - # 404 indicates the model has not been created. In that case, the runner's response depends on ignore_if_model_does_not_exist. + # 404 indicates the model has not been created. response = await opensearch.transport.perform_request( - method, - model_uri, - params={"ignore": [self.MODEL_DOES_NOT_EXIST_STATUS_CODE]}, + method, model_uri, params={"ignore": [404]} ) request_context_holder.on_client_request_end() - # success condition. - if "result" in response.keys() and response["result"] == "deleted": - self.logger.debug("Model [%s] deleted successfully.", model_id) - return {"weight": 1, "unit": "ops", "success": True} - - if "error" not in response.keys(): - self.logger.warning( - "Request to delete model [%s] failed but no error, response: [%s]", - model_id, - response, - ) - return {"weight": 1, "unit": "ops", "success": False} - - if response["status"] != self.MODEL_DOES_NOT_EXIST_STATUS_CODE: - self.logger.warning( - "Request to delete model [%s] failed with status [%s] and response: [%s]", + if "error" in response.keys() and response["status"] != 404: + self.logger.error( + "Request to delete model [%s] failed with error: with error response: [%s]", model_id, - response["status"], response, ) - return {"weight": 1, "unit": "ops", "success": False} - - if ignore_if_model_does_not_exist: - self.logger.debug( - ( - "Model [%s] does not exist so it could not be deleted, " - "however ignore-if-model-does-not-exist is True so the " - "DeleteKnnModel operation succeeded." - ), - model_id, + raise Exception( + f"Request to delete model {model_id} failed with error: with error response: {response}" ) - return {"weight": 1, "unit": "ops", "success": True} - - self.logger.warning( - ( - "Request to delete model [%s] failed because the model does not exist " - "and ignore-if-model-does-not-exist was set to False. Response: [%s]" - ), - model_id, - response, - ) - return {"weight": 1, "unit": "ops", "success": False} + self.logger.debug("Model [%s] deleted successfully.", model_id) def __repr__(self, *args, **kwargs): return self.NAME @@ -836,8 +697,6 @@ class TrainKnnModel(Runner): """ NAME = "train-knn-model" - DEFAULT_RETRIES = 1000 - DEFAULT_POLL_PERIOD = 0.5 async def __call__(self, opensearch, params): """ @@ -857,10 +716,8 @@ async def __call__(self, opensearch, params): """ body = params["body"] model_id = parse_string_parameter("model_id", params) - max_retries = parse_int_parameter("retries", params, self.DEFAULT_RETRIES) - poll_period = parse_float_parameter( - "poll_period", params, self.DEFAULT_POLL_PERIOD - ) + max_retries = parse_int_parameter("retries", params) + poll_period = parse_float_parameter("poll_period", params) method = "POST" model_uri = f"/_plugins/_knn/models/{model_id}" diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 6cbaacabe..ad67a344f 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2505,7 +2505,7 @@ async def test_train_error_response(self, opensearch, sleep, on_client_request_s runner_under_test = runner.TrainKnnModel() with self.assertRaises(Exception): - await runner_under_test(opensearch, self.request) + await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -2540,7 +2540,7 @@ async def test_train_timeout(self, opensearch, sleep, on_client_request_start, o # Set model state = Training. with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'): - await runner_under_test(opensearch, self.request) + await runner_under_test(opensearch, self.request) class VectorSearchQueryRunnerTests(TestCase): From 1bb98ee39750c75d98ae09e177715f45b02ece18 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 26 Jun 2024 14:27:43 -0700 Subject: [PATCH 07/10] Change DeleteKnnRunner behavior to return success status instead of exception Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 29 +++++++++++---- tests/worker_coordinator/runner_test.py | 46 +++++++++++++++++++++--- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 65045d392..ab3b02a2b 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -662,6 +662,7 @@ class DeleteKnnModel(Runner): async def __call__(self, opensearch, params): model_id = parse_string_parameter("model_id", params) + should_ignore_if_model_DNE = params.get("ignore-if-model-does-not-exist", False) method = "DELETE" model_uri = f"/_plugins/_knn/models/{model_id}" @@ -675,17 +676,29 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_end() + if ( + "error" in response.keys() + and response["status"] == 404 + and not should_ignore_if_model_DNE + ): + self.logger.error( + "Request to delete model [%s] failed because the model does not exist "\ + "and ignore-if-model-does-not-exist was set to True. Response: [%s]", + model_id, + response, + ) + return {"success": False} + if "error" in response.keys() and response["status"] != 404: self.logger.error( "Request to delete model [%s] failed with error: with error response: [%s]", model_id, response, ) - raise Exception( - f"Request to delete model {model_id} failed with error: with error response: {response}" - ) + return {"success": False} self.logger.debug("Model [%s] deleted successfully.", model_id) + return {"success": True} def __repr__(self, *args, **kwargs): return self.NAME @@ -697,6 +710,8 @@ class TrainKnnModel(Runner): """ NAME = "train-knn-model" + DEFAULT_RETRIES = 1000 + DEFAULT_POLL_PERIOD = 0.5 async def __call__(self, opensearch, params): """ @@ -716,8 +731,10 @@ async def __call__(self, opensearch, params): """ body = params["body"] model_id = parse_string_parameter("model_id", params) - max_retries = parse_int_parameter("retries", params) - poll_period = parse_float_parameter("poll_period", params) + max_retries = parse_int_parameter("retries", params, self.DEFAULT_RETRIES) + poll_period = parse_float_parameter( + "poll_period", params, self.DEFAULT_POLL_PERIOD + ) method = "POST" model_uri = f"/_plugins/_knn/models/{model_id}" diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index ad67a344f..4f4b303b2 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2278,15 +2278,48 @@ async def test_delete_knn_success(self, opensearch, on_client_request_start, on_ } opensearch.transport.perform_request.return_value = as_future(response) runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: - await runner_under_test(opensearch, self.request) + result = await runner_under_test(opensearch, self.request) + + self.assertEqual(True, result["success"]) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_knn_404(self, opensearch, on_client_request_start, on_client_request_end): + async def test_delete_knn_404_success_when_ignore_if_model_DNE(self, opensearch, on_client_request_start, on_client_request_end): + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "ignore-if-model-does-not-exist": True + } + response = { + "error": { + "root_cause": [ + { + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + } + ], + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + }, + "status": 404 + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: + result = await runner_under_test(opensearch, request) + self.assertEqual(True, result["success"]) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_404_fails_if_model_DNE(self, opensearch, on_client_request_start, on_client_request_end): response = { "error": { "root_cause": [ @@ -2303,7 +2336,9 @@ async def test_delete_knn_404(self, opensearch, on_client_request_start, on_clie opensearch.transport.perform_request.return_value = as_future(response) runner_under_test = runner.DeleteKnnModel() async with runner_under_test: - await runner_under_test(opensearch, self.request) + result = await runner_under_test(opensearch, self.request) + + self.assertEqual(False, result["success"]) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -2326,9 +2361,10 @@ async def test_delete_knn_400(self, opensearch, on_client_request_start, on_clie } opensearch.transport.perform_request.return_value = as_future(response) runner_under_test = runner.DeleteKnnModel() - with self.assertRaises(Exception): - await runner_under_test(opensearch, self.request) + async with runner_under_test: + result = await runner_under_test(opensearch, self.request) + self.assertEqual(False, result["success"]) class TrainKnnModelRunnerTests(TestCase): model_id = "test-model-id" From 8cb6db4ce44e2ecc636219ac67fcbe76be3c59de Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Thu, 27 Jun 2024 16:18:26 -0700 Subject: [PATCH 08/10] Vijay feedback & reordered conditionals Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 63 +++++++++++++++++------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index ab3b02a2b..9e63da438 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -659,46 +659,71 @@ class DeleteKnnModel(Runner): """ NAME = "delete-knn-model" + MODEL_DOES_NOT_EXIST_STATUS_CODE = 404 async def __call__(self, opensearch, params): model_id = parse_string_parameter("model_id", params) - should_ignore_if_model_DNE = params.get("ignore-if-model-does-not-exist", False) + ignore_if_model_does_not_exist = params.get( + "ignore-if-model-does-not-exist", False + ) method = "DELETE" model_uri = f"/_plugins/_knn/models/{model_id}" request_context_holder.on_client_request_start() - # 404 indicates the model has not been created. + # 404 indicates the model has not been created. The runner's response depends on ignore_if_model_does_not_exist. response = await opensearch.transport.perform_request( - method, model_uri, params={"ignore": [404]} + method, + model_uri, + params={"ignore": [self.MODEL_DOES_NOT_EXIST_STATUS_CODE]}, ) request_context_holder.on_client_request_end() - if ( - "error" in response.keys() - and response["status"] == 404 - and not should_ignore_if_model_DNE - ): - self.logger.error( - "Request to delete model [%s] failed because the model does not exist "\ - "and ignore-if-model-does-not-exist was set to True. Response: [%s]", + # success condition. + if "result" in response.keys() and response["result"] == "deleted": + self.logger.debug("Model [%s] deleted successfully.", model_id) + return {"weight": 1, "unit": "ops", "success": True} + + if "error" not in response.keys(): + self.logger.warning( + "Request to delete model [%s] failed but no error, response: [%s]", model_id, response, ) - return {"success": False} + return {"weight": 1, "unit": "ops", "success": False} - if "error" in response.keys() and response["status"] != 404: - self.logger.error( - "Request to delete model [%s] failed with error: with error response: [%s]", + if response["status"] != self.MODEL_DOES_NOT_EXIST_STATUS_CODE: + self.logger.warning( + "Request to delete model [%s] failed with status [%s] and response: [%s]", model_id, + response["status"], response, ) - return {"success": False} + return {"weight": 1, "unit": "ops", "success": False} + + if ignore_if_model_does_not_exist: + self.logger.debug( + ( + "Model [%s] does not exist so it could not be deleted, " + "however ignore-if-model-does-not-exist is True so the " + "DeleteKnnModel operation succeeded." + ), + model_id, + ) - self.logger.debug("Model [%s] deleted successfully.", model_id) - return {"success": True} + return {"weight": 1, "unit": "ops", "success": True} + + self.logger.warning( + ( + "Request to delete model [%s] failed because the model does not exist " + "and ignore-if-model-does-not-exist was set to False. Response: [%s]" + ), + model_id, + response, + ) + return {"weight": 1, "unit": "ops", "success": False} def __repr__(self, *args, **kwargs): return self.NAME From e84e4efb39b5dffb4b7b7f6c3ff4835355c2a43b Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Thu, 27 Jun 2024 16:21:58 -0700 Subject: [PATCH 09/10] Revert autoformatter changes to comment at top of runner.py Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 9e63da438..d280ddffd 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -672,7 +672,7 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_start() - # 404 indicates the model has not been created. The runner's response depends on ignore_if_model_does_not_exist. + # 404 indicates the model has not been created. In that case, the runner's response depends on ignore_if_model_does_not_exist. response = await opensearch.transport.perform_request( method, model_uri, From ccb63652b94bab444a2fce14a4920e9a7677e975 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Fri, 19 Jul 2024 09:51:56 -0700 Subject: [PATCH 10/10] GH Actions rerun Signed-off-by: Finn Roblin --- it/resources/benchmark-in-memory-it.ini | 2 +- it/resources/benchmark-os-it.ini | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/it/resources/benchmark-in-memory-it.ini b/it/resources/benchmark-in-memory-it.ini index 25ee108e0..8c8b297f2 100644 --- a/it/resources/benchmark-in-memory-it.ini +++ b/it/resources/benchmark-in-memory-it.ini @@ -20,7 +20,7 @@ datastore.type = in-memory [workloads] -default.url = https://github.com/opensearch-project/opensearch-benchmark-workloads +default.url = https://github.com/finnroblin/opensearch-benchmark-workloads [provision_configs] default.dir = default-provision-config diff --git a/it/resources/benchmark-os-it.ini b/it/resources/benchmark-os-it.ini index 5c3b70122..44b4d206d 100644 --- a/it/resources/benchmark-os-it.ini +++ b/it/resources/benchmark-os-it.ini @@ -25,7 +25,7 @@ datastore.password = [workloads] -default.url = https://github.com/opensearch-project/opensearch-benchmark-workloads +default.url = https://github.com/finnroblin/opensearch-benchmark-workloads [provision_configs] default.dir = default-provision-config