From b55857b8d3efad6dc6ab0dbb41b74ce3b95f3c59 Mon Sep 17 00:00:00 2001 From: Vesa Pehkonen Date: Fri, 5 Apr 2024 09:46:38 -0700 Subject: [PATCH] Addressed the following issues: - added expection handling and logging for DeletePipeline runner - removed empty expection from DeleteMlModel and RegisterMlModel runners - added the "number-of-hits-to-return" variable Signed-off-by: Vesa Pehkonen --- osbenchmark/worker_coordinator/runner.py | 46 +++++++++++------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 60e7f6e76..050155c66 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -41,6 +41,7 @@ import ijson from opensearchpy import ConnectionTimeout +from opensearchpy import NotFoundError from osbenchmark import exceptions, workload from osbenchmark.utils import convert @@ -1291,8 +1292,8 @@ async def __call__(self, opensearch, params): master_timeout=params.get("master-timeout"), timeout=params.get("timeout"), ) - except: - pass # no current pipeline + except NotFoundError as e: + self.logger.info("No current pipeline [%s] to delete.", params.get("id")) def __repr__(self, *args, **kwargs): return "delete-pipeline" @@ -2395,20 +2396,18 @@ async def __call__(self, opensearch, params): } } }, - "size": 1000 + "size": params.get('number-of-hits-to-return', 1000) } model_ids = set() - try: - resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=body) - for item in resp['hits']['hits']: - doc = item.get('_source') - if doc: - id = doc.get('model_id') - if id: - model_ids.add(id) - except: - pass # no current model + + resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=body) + for item in resp['hits']['hits']: + doc = item.get('_source') + if doc: + id = doc.get('model_id') + if id: + model_ids.add(id) for model_id in model_ids: resp=await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/' + model_id + '/_undeploy') @@ -2435,20 +2434,17 @@ async def __call__(self, opensearch, params): "match": { "name": body['name'] } - }, - "size": 1000 + } } model_id = None - try: - resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=search_body) - for item in resp['hits']['hits']: - doc = item.get('_source') - if doc: - model_id = doc.get('model_id') - if model_id: - break - except: - pass + + resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=search_body) + for item in resp['hits']['hits']: + doc = item.get('_source') + if doc: + model_id = doc.get('model_id') + if model_id: + break if not model_id: resp = await opensearch.transport.perform_request('POST', '_plugins/_ml/models/_register', body=body)