Skip to content

Commit

Permalink
Added runners to register and deploy ml-model (#497)
Browse files Browse the repository at this point in the history
Signed-off-by: Vesa Pehkonen <[email protected]>
  • Loading branch information
vpehkone authored Apr 19, 2024
1 parent f0e522d commit 794e11e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 10 deletions.
136 changes: 132 additions & 4 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import ijson
from opensearchpy import ConnectionTimeout
from opensearchpy import NotFoundError

from osbenchmark import exceptions, workload
from osbenchmark.utils import convert
Expand Down Expand Up @@ -78,6 +79,7 @@ def register_default_runners():
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(workload.OperationType.DeletePipeline, Retry(DeletePipeline()), async_runner=True)
register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True)
Expand All @@ -100,6 +102,9 @@ def register_default_runners():
register_runner(workload.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True)
register_runner(workload.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True)
register_runner(workload.OperationType.CreateSearchPipeline, Retry(CreateSearchPipeline()), async_runner=True)
register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True)
register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True)
register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True)


def runner_for(operation_type):
Expand Down Expand Up @@ -153,7 +158,6 @@ def register_runner(operation_type, runner, **kwargs):

__RUNNERS[operation_type] = _with_completion(_with_assertions(cluster_aware_runner))


# Only intended for unit-testing!
def remove_runner(operation_type):
del __RUNNERS[operation_type]
Expand Down Expand Up @@ -216,9 +220,11 @@ def _transport_request_params(self, params):
def time_func(func):
async def advised(*args, **kwargs):
request_context_holder.on_client_request_start()
response = await func(*args, **kwargs)
request_context_holder.on_client_request_end()
return response
try:
response = await func(*args, **kwargs)
return response
finally:
request_context_holder.on_client_request_end()
return advised


Expand Down Expand Up @@ -1278,6 +1284,20 @@ async def __call__(self, opensearch, params):
def __repr__(self, *args, **kwargs):
return "put-pipeline"

class DeletePipeline(Runner):
@time_func
async def __call__(self, opensearch, params):
try:
await opensearch.ingest.delete_pipeline(id=mandatory(params, "id", self),
master_timeout=params.get("master-timeout"),
timeout=params.get("timeout"),
)
except NotFoundError:
self.logger.info("No current pipeline [%s] to delete.", params.get("id"))

def __repr__(self, *args, **kwargs):
return "delete-pipeline"

# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474
class CreateSearchPipeline(Runner):
@time_func
Expand Down Expand Up @@ -2367,3 +2387,111 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):

def __repr__(self, *args, **kwargs):
return "retryable %s" % repr(self.delegate)

class DeleteMlModel(Runner):
@time_func
async def __call__(self, opensearch, params):
body= {
"query": {
"match_phrase": {
"name": {
"query": params.get('model-name')
}
}
},
"size": params.get('number-of-hits-to-return', 1000)
}

model_ids = set()

resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=body)
for item in resp['hits']['hits']:
doc = item.get('_source')
if doc:
id = doc.get('model_id')
if id:
model_ids.add(id)

for model_id in model_ids:
resp=await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/' + model_id + '/_undeploy')
resp=await opensearch.transport.perform_request('DELETE', '/_plugins/_ml/models/' + model_id)

def __repr__(self, *args, **kwargs):
return "delete-ml-model"

class RegisterMlModel(Runner):
@time_func
async def __call__(self, opensearch, params):
config_file = params.get('model-config-file')
if config_file:
with open(config_file, 'r') as f:
body = json.loads(f.read())
else:
body = {
"name": params.get('model-name'),
"version": params.get('model-version'),
"model_format": params.get('model-format')
}
search_body = {
"query": {
"match": {
"name": body['name']
}
}
}
model_id = None

resp = await opensearch.transport.perform_request('POST', '/_plugins/_ml/models/_search', body=search_body)
for item in resp['hits']['hits']:
doc = item.get('_source')
if doc:
model_id = doc.get('model_id')
if model_id:
break

if not model_id:
resp = await opensearch.transport.perform_request('POST', '_plugins/_ml/models/_register', body=body)
task_id = resp.get('task_id')
timeout = params.get('timeout', 120)
end = time.time() + timeout
state = 'CREATED'
while state == 'CREATED' and time.time() < end:
await asyncio.sleep(5)
resp = await opensearch.transport.perform_request('GET', '_plugins/_ml/tasks/' + task_id)
state = resp.get('state')
if state == 'FAILED':
raise exceptions.BenchmarkError("Failed to register ml-model. Error: {}".format(resp['error']))
if state == 'CREATED':
raise TimeoutError("Timeout when registering ml-model.")
model_id = resp.get('model_id')

with open('model_id.json', 'w') as f:
d = { 'model_id': model_id }
f.write(json.dumps(d))

def __repr__(self, *args, **kwargs):
return "register-ml-model"

class DeployMlModel(Runner):
@time_func
async def __call__(self, opensearch, params):
with open('model_id.json', 'r') as f:
d = json.loads(f.read())
model_id = d['model_id']

resp = await opensearch.transport.perform_request('POST', '_plugins/_ml/models/' + model_id + '/_deploy')
task_id = resp.get('task_id')
timeout = params.get('timeout', 120)
end = time.time() + timeout
state = 'RUNNING'
while state == 'RUNNING' and time.time() < end:
await asyncio.sleep(5)
resp = await opensearch.transport.perform_request('GET', '_plugins/_ml/tasks/' + task_id)
state = resp.get('state')
if state == 'FAILED':
raise exceptions.BenchmarkError("Failed to deploy ml-model. Error: {}".format(resp['error']))
if state == 'RUNNING':
raise TimeoutError("Timeout when deploying ml-model.")

def __repr__(self, *args, **kwargs):
return "deploy-ml-model"
24 changes: 18 additions & 6 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,13 @@ class OperationType(Enum):
ForceMerge = 1001
ClusterHealth = 1002
PutPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
CreateIndexTemplate = 1007
DeleteIndexTemplate = 1008
ShrinkIndex = 1009
DeletePipeline = 1004
Refresh = 1005
CreateIndex = 1006
DeleteIndex = 1007
CreateIndexTemplate = 1008
DeleteIndexTemplate = 1009
ShrinkIndex = 1010
Sleep = 1018
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
Expand All @@ -629,6 +630,9 @@ class OperationType(Enum):
CreateComponentTemplate = 1032
DeleteComponentTemplate = 1033
CreateSearchPipeline = 1040
DeleteMlModel = 1041
RegisterMlModel = 1042
DeployMlModel = 1043

@property
def admin_op(self):
Expand Down Expand Up @@ -670,6 +674,8 @@ def from_hyphenated_string(cls, v):
return OperationType.RawRequest
elif v == "put-pipeline":
return OperationType.PutPipeline
elif v == "delete-pipeline":
return OperationType.DeletePipeline
elif v == "refresh":
return OperationType.Refresh
elif v == "create-index":
Expand Down Expand Up @@ -734,6 +740,12 @@ def from_hyphenated_string(cls, v):
return OperationType.ListAllPointInTime
elif v == "create-search-pipeline":
return OperationType.CreateSearchPipeline
elif v == "delete-ml-model":
return OperationType.DeleteMlModel
elif v == "register-ml-model":
return OperationType.RegisterMlModel
elif v == "deploy-ml-model":
return OperationType.DeployMlModel
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down

0 comments on commit 794e11e

Please sign in to comment.