Skip to content

Commit

Permalink
Addressed the following issues:
Browse files Browse the repository at this point in the history
- added expection handling and logging for DeletePipeline runner
- removed empty expection from DeleteMlModel and RegisterMlModel runners
- added the "number-of-hits-to-return" variable

Signed-off-by: Vesa Pehkonen <[email protected]>
  • Loading branch information
vpehkone committed Apr 5, 2024
1 parent 1930e18 commit b55857b
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import ijson
from opensearchpy import ConnectionTimeout
from opensearchpy import NotFoundError

from osbenchmark import exceptions, workload
from osbenchmark.utils import convert
Expand Down Expand Up @@ -1291,8 +1292,8 @@ async def __call__(self, opensearch, params):
master_timeout=params.get("master-timeout"),
timeout=params.get("timeout"),
)
except:
pass # no current pipeline
except NotFoundError as e:
self.logger.info("No current pipeline [%s] to delete.", params.get("id"))

def __repr__(self, *args, **kwargs):
return "delete-pipeline"
Expand Down Expand Up @@ -2395,20 +2396,18 @@ async def __call__(self, opensearch, params):
}
}
},
"size": 1000
"size": params.get('number-of-hits-to-return', 1000)
}

model_ids = set()
try:
resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=body)
for item in resp['hits']['hits']:
doc = item.get('_source')
if doc:
id = doc.get('model_id')
if id:
model_ids.add(id)
except:
pass # no current model

resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=body)
for item in resp['hits']['hits']:
doc = item.get('_source')
if doc:
id = doc.get('model_id')
if id:
model_ids.add(id)

for model_id in model_ids:
resp=await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/' + model_id + '/_undeploy')
Expand All @@ -2435,20 +2434,17 @@ async def __call__(self, opensearch, params):
"match": {
"name": body['name']
}
},
"size": 1000
}
}
model_id = None
try:
resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=search_body)
for item in resp['hits']['hits']:
doc = item.get('_source')
if doc:
model_id = doc.get('model_id')
if model_id:
break
except:
pass

resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=search_body)
for item in resp['hits']['hits']:
doc = item.get('_source')
if doc:
model_id = doc.get('model_id')
if model_id:
break

if not model_id:
resp = await opensearch.transport.perform_request('POST', '_plugins/_ml/models/_register', body=body)
Expand Down

0 comments on commit b55857b

Please sign in to comment.