From 794e11ed62f90dbc7eed36429d64335f83092554 Mon Sep 17 00:00:00 2001 From: vpehkone <101240162+vpehkone@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:03:19 -0700 Subject: [PATCH] Added runners to register and deploy ml-model (#497) Signed-off-by: Vesa Pehkonen --- osbenchmark/worker_coordinator/runner.py | 136 ++++++++++++++++++++++- osbenchmark/workload/workload.py | 24 +++- 2 files changed, 150 insertions(+), 10 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index f629d7f6f..56b5c2667 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -41,6 +41,7 @@ import ijson from opensearchpy import ConnectionTimeout +from opensearchpy import NotFoundError from osbenchmark import exceptions, workload from osbenchmark.utils import convert @@ -78,6 +79,7 @@ def register_default_runners(): # We treat the following as administrative commands and thus already start to wrap them in a retry. register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True) register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True) + register_runner(workload.OperationType.DeletePipeline, Retry(DeletePipeline()), async_runner=True) register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True) register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True) register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True) @@ -100,6 +102,9 @@ def register_default_runners(): register_runner(workload.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True) register_runner(workload.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True) register_runner(workload.OperationType.CreateSearchPipeline, Retry(CreateSearchPipeline()), async_runner=True) + register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True) + register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) + register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) def runner_for(operation_type): @@ -153,7 +158,6 @@ def register_runner(operation_type, runner, **kwargs): __RUNNERS[operation_type] = _with_completion(_with_assertions(cluster_aware_runner)) - # Only intended for unit-testing! def remove_runner(operation_type): del __RUNNERS[operation_type] @@ -216,9 +220,11 @@ def _transport_request_params(self, params): def time_func(func): async def advised(*args, **kwargs): request_context_holder.on_client_request_start() - response = await func(*args, **kwargs) - request_context_holder.on_client_request_end() - return response + try: + response = await func(*args, **kwargs) + return response + finally: + request_context_holder.on_client_request_end() return advised @@ -1278,6 +1284,20 @@ async def __call__(self, opensearch, params): def __repr__(self, *args, **kwargs): return "put-pipeline" +class DeletePipeline(Runner): + @time_func + async def __call__(self, opensearch, params): + try: + await opensearch.ingest.delete_pipeline(id=mandatory(params, "id", self), + master_timeout=params.get("master-timeout"), + timeout=params.get("timeout"), + ) + except NotFoundError: + self.logger.info("No current pipeline [%s] to delete.", params.get("id")) + + def __repr__(self, *args, **kwargs): + return "delete-pipeline" + # TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474 class CreateSearchPipeline(Runner): @time_func @@ -2367,3 +2387,111 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __repr__(self, *args, **kwargs): return "retryable %s" % repr(self.delegate) + +class DeleteMlModel(Runner): + @time_func + async def __call__(self, opensearch, params): + body= { + "query": { + "match_phrase": { + "name": { + "query": params.get('model-name') + } + } + }, + "size": params.get('number-of-hits-to-return', 1000) + } + + model_ids = set() + + resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=body) + for item in resp['hits']['hits']: + doc = item.get('_source') + if doc: + id = doc.get('model_id') + if id: + model_ids.add(id) + + for model_id in model_ids: + resp=await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/' + model_id + '/_undeploy') + resp=await opensearch.transport.perform_request('DELETE', '/_plugins/_ml/models/' + model_id) + + def __repr__(self, *args, **kwargs): + return "delete-ml-model" + +class RegisterMlModel(Runner): + @time_func + async def __call__(self, opensearch, params): + config_file = params.get('model-config-file') + if config_file: + with open(config_file, 'r') as f: + body = json.loads(f.read()) + else: + body = { + "name": params.get('model-name'), + "version": params.get('model-version'), + "model_format": params.get('model-format') + } + search_body = { + "query": { + "match": { + "name": body['name'] + } + } + } + model_id = None + + resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=search_body) + for item in resp['hits']['hits']: + doc = item.get('_source') + if doc: + model_id = doc.get('model_id') + if model_id: + break + + if not model_id: + resp = await opensearch.transport.perform_request('POST', '_plugins/_ml/models/_register', body=body) + task_id = resp.get('task_id') + timeout = params.get('timeout', 120) + end = time.time() + timeout + state = 'CREATED' + while state == 'CREATED' and time.time() < end: + await asyncio.sleep(5) + resp = await opensearch.transport.perform_request('GET', '_plugins/_ml/tasks/' + task_id) + state = resp.get('state') + if state == 'FAILED': + raise exceptions.BenchmarkError("Failed to register ml-model. Error: {}".format(resp['error'])) + if state == 'CREATED': + raise TimeoutError("Timeout when registering ml-model.") + model_id = resp.get('model_id') + + with open('model_id.json', 'w') as f: + d = { 'model_id': model_id } + f.write(json.dumps(d)) + + def __repr__(self, *args, **kwargs): + return "register-ml-model" + +class DeployMlModel(Runner): + @time_func + async def __call__(self, opensearch, params): + with open('model_id.json', 'r') as f: + d = json.loads(f.read()) + model_id = d['model_id'] + + resp = await opensearch.transport.perform_request('POST', '_plugins/_ml/models/' + model_id + '/_deploy') + task_id = resp.get('task_id') + timeout = params.get('timeout', 120) + end = time.time() + timeout + state = 'RUNNING' + while state == 'RUNNING' and time.time() < end: + await asyncio.sleep(5) + resp = await opensearch.transport.perform_request('GET', '_plugins/_ml/tasks/' + task_id) + state = resp.get('state') + if state == 'FAILED': + raise exceptions.BenchmarkError("Failed to deploy ml-model. Error: {}".format(resp['error'])) + if state == 'RUNNING': + raise TimeoutError("Timeout when deploying ml-model.") + + def __repr__(self, *args, **kwargs): + return "deploy-ml-model" diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index a047cc667..3e401d1e6 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -606,12 +606,13 @@ class OperationType(Enum): ForceMerge = 1001 ClusterHealth = 1002 PutPipeline = 1003 - Refresh = 1004 - CreateIndex = 1005 - DeleteIndex = 1006 - CreateIndexTemplate = 1007 - DeleteIndexTemplate = 1008 - ShrinkIndex = 1009 + DeletePipeline = 1004 + Refresh = 1005 + CreateIndex = 1006 + DeleteIndex = 1007 + CreateIndexTemplate = 1008 + DeleteIndexTemplate = 1009 + ShrinkIndex = 1010 Sleep = 1018 DeleteSnapshotRepository = 1019 CreateSnapshotRepository = 1020 @@ -629,6 +630,9 @@ class OperationType(Enum): CreateComponentTemplate = 1032 DeleteComponentTemplate = 1033 CreateSearchPipeline = 1040 + DeleteMlModel = 1041 + RegisterMlModel = 1042 + DeployMlModel = 1043 @property def admin_op(self): @@ -670,6 +674,8 @@ def from_hyphenated_string(cls, v): return OperationType.RawRequest elif v == "put-pipeline": return OperationType.PutPipeline + elif v == "delete-pipeline": + return OperationType.DeletePipeline elif v == "refresh": return OperationType.Refresh elif v == "create-index": @@ -734,6 +740,12 @@ def from_hyphenated_string(cls, v): return OperationType.ListAllPointInTime elif v == "create-search-pipeline": return OperationType.CreateSearchPipeline + elif v == "delete-ml-model": + return OperationType.DeleteMlModel + elif v == "register-ml-model": + return OperationType.RegisterMlModel + elif v == "deploy-ml-model": + return OperationType.DeployMlModel else: raise KeyError(f"No enum value for [{v}]")