diff --git a/assets/common/components/batch_deploy_model/asset.yaml b/assets/common/components/batch_deploy_model/asset.yaml new file mode 100644 index 0000000000..5d0befc5f8 --- /dev/null +++ b/assets/common/components/batch_deploy_model/asset.yaml @@ -0,0 +1,3 @@ +type: component +spec: spec.yaml +categories: ["Models"] \ No newline at end of file diff --git a/assets/common/components/batch_deploy_model/spec.yaml b/assets/common/components/batch_deploy_model/spec.yaml new file mode 100644 index 0000000000..d0c8ce0768 --- /dev/null +++ b/assets/common/components/batch_deploy_model/spec.yaml @@ -0,0 +1,197 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json +name: batch_deploy_model +version: 0.0.1 +type: command + +is_deterministic: True + +display_name: Batch deploy model +description: + Batch deploy a model to a workspace. The component works on compute with [MSI](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-create-manage-compute-instance?tabs=python) attached. + +environment: azureml://registries/azureml/environments/python-sdk-v2/versions/6 + +code: ../../src +command: >- + python batch_deploy.py + $[[--registration_details_folder ${{inputs.registration_details_folder}}]] + $[[--model_id ${{inputs.model_id}}]] + $[[--inference_payload_file ${{inputs.inference_payload_file}}]] + $[[--inference_payload_folder ${{inputs.inference_payload_folder}}]] + $[[--endpoint_name ${{inputs.endpoint_name}}]] + $[[--deployment_name ${{inputs.deployment_name}}]] + $[[--compute_name ${{inputs.compute_name}}]] + $[[--size ${{inputs.size}}]] + $[[--min_instances ${{inputs.min_instances}}]] + $[[--max_instances ${{inputs.max_instances}}]] + $[[--idle_time_before_scale_down ${{inputs.idle_time_before_scale_down}}]] + $[[--output_file_name ${{inputs.output_file_name}}]] + $[[--max_concurrency_per_instance ${{inputs.max_concurrency_per_instance}}]] + $[[--error_threshold ${{inputs.error_threshold}}]] + $[[--max_retries ${{inputs.max_retries}}]] + $[[--timeout ${{inputs.timeout}}]] + $[[--logging_level ${{inputs.logging_level}}]] + $[[--mini_batch_size ${{inputs.mini_batch_size}}]] + $[[--instance_count ${{inputs.instance_count}}]] + --batch_job_output_folder ${{outputs.batch_job_output_folder}} + +inputs: + # Output of registering component + registration_details_folder: + type: uri_folder + optional: true + description: Folder containing model registration details in a JSON file named model_registration_details.json + + model_id: + type: string + optional: true + description: | + Asset ID of the model registered in workspace/registry. + Registry - azureml://registries//models//versions/ + Workspace - azureml:: + + inference_payload_file: + type: uri_file + optional: true + description: File containing data used to validate deployment + + inference_payload_folder: + type: uri_folder + optional: true + description: Folder containing files used to validate deployment + + endpoint_name: + type: string + optional: true + description: Name of the endpoint + + deployment_name: + type: string + optional: true + default: default + description: Name of the deployment + + compute_name: + type: string + optional: true + default: cpu-cluster + description: Name of the compute cluster to execute the batch scoring jobs on. New compute will be created if the compute cluster is not present. + + size: + type: string + optional: true + enum: + - Standard_DS1_v2 + - Standard_DS2_v2 + - Standard_DS3_v2 + - Standard_DS4_v2 + - Standard_DS5_v2 + - Standard_F2s_v2 + - Standard_F4s_v2 + - Standard_F8s_v2 + - Standard_F16s_v2 + - Standard_F32s_v2 + - Standard_F48s_v2 + - Standard_F64s_v2 + - Standard_F72s_v2 + - Standard_FX24mds + - Standard_FX36mds + - Standard_FX48mds + - Standard_E2s_v3 + - Standard_E4s_v3 + - Standard_E8s_v3 + - Standard_E16s_v3 + - Standard_E32s_v3 + - Standard_E48s_v3 + - Standard_E64s_v3 + - Standard_NC4as_T4_v3 + - Standard_NC6s_v2 + - Standard_NC6s_v3 + - Standard_NC8as_T4_v3 + - Standard_NC12s_v2 + - Standard_NC12s_v3 + - Standard_NC16as_T4_v3 + - Standard_NC24s_v2 + - Standard_NC24s_v3 + - Standard_NC24rs_v3 + - Standard_NC64as_T4_v3 + - Standard_ND40rs_v2 + - Standard_ND96asr_v4 + - Standard_ND96amsr_A100_v4 + default: Standard_NC24s_v3 + description: Compute instance size to deploy model. Make sure that instance type is available and have enough quota available. + + min_instances: + type: integer + optional: true + default: 0 + description: Minimum number of instances of the compute cluster to be created. + + max_instances: + type: integer + optional: true + default: 1 + description: Maximum number of instances of the compute cluster to be created. + + idle_time_before_scale_down: + type: integer + optional: true + default: 120 + description: Node Idle Time before scaling down the compute cluster to be created. + + output_file_name: + type: string + optional: true + default: predictions.csv + description: Name of the batch scoring output file. + + max_concurrency_per_instance: + type: integer + optional: true + default: 1 + description: The maximum number of parallel scoring_script runs per instance. + + error_threshold: + type: integer + optional: true + default: -1 + description: The number of file failures that should be ignored. + + max_retries: + type: integer + optional: true + default: 3 + description: The maximum number of retries for a failed or timed-out mini batch. + + timeout: + type: integer + optional: true + default: 500 + description: The timeout in seconds for scoring a single mini batch. + + logging_level: + type: string + optional: true + default: info + description: The log verbosity level. + + mini_batch_size: + type: integer + optional: true + default: 10 + description: The number of files the code_configuration.scoring_script can process in one run() call. + + instance_count: + type: integer + optional: true + default: 1 + description: The number of nodes to use for each batch scoring job. + +outputs: + batch_job_output_folder: + type: uri_folder + description: Folder to which batch job outputs will be saved. + +tags: + Preview: "" + Internal: "" diff --git a/assets/common/src/batch_deploy.py b/assets/common/src/batch_deploy.py new file mode 100644 index 0000000000..81457e45f8 --- /dev/null +++ b/assets/common/src/batch_deploy.py @@ -0,0 +1,409 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Run Model deployment module.""" +import argparse +import json +import re +import time +import shutil +import os + +from azure.ai.ml import Input +from azure.ai.ml.entities import ( + AmlCompute, + BatchEndpoint, + BatchDeployment, + BatchRetrySettings, +) +from azureml._common._error_definition import AzureMLError +from azureml._common.exceptions import AzureMLException +from pathlib import Path + +from utils.config import AppName, ComponentVariables +from utils.common_utils import get_mlclient, get_model_name +from utils.logging_utils import custom_dimensions, get_logger +from utils.exceptions import ( + swallow_all_exceptions, + BatchEndpointInvocationError, + EndpointCreationError, + DeploymentCreationError, + ComputeCreationError, +) + + +MAX_INSTANCE_COUNT = 20 +DEFAULT_COMPUTE_SIZE = "Standard_NC24s_v3" +DEFAULT_MIN_INSTANCES = 0 +DEFAULT_MAX_INSTANCES = 1 +DEFAULT_IDLE_TIME_BEFORE_SCALE_DOWN = 120 # 2min +DEFAULT_OUTPUT_FILE_NAME = "predictions.csv" +DEFAULT_MAX_CONCURRENCY_PER_INSTANCE = 1 +DEFAULT_ERROR_THRESHOLD = -1 +DEFAULT_MAX_RETRIES = 3 +DEFAULT_TIMEOUT = 500 # 500sec +DEFAULT_LOGGING_LEVEL = "info" +DEFAULT_MINI_BATCH_SIZE = 10 +DEFAULT_INSTANCE_COUNT = 1 +DEPLOYMENT_DETAILS_JSON_FILE_NAME = "model_deployment_details.json" + + +logger = get_logger(__name__) +custom_dimensions.app_name = AppName.BATCH_DEPLOY_MODEL + + +def parse_args(): + """Return arguments.""" + parser = argparse.ArgumentParser() + + # Defaults for batch endpoint has been picked mostly from: + # https://learn.microsoft.com/en-us/azure/machine-learning/reference-yaml-deployment-batch + # Some of the defaults have been tweaked to cater to large models. + + # add arguments + parser.add_argument( + "--registration_details_folder", + type=Path, + help="Folder containing model registration details in a JSON file named model_registration_details.json", + ) + parser.add_argument( + "--model_id", + type=str, + help="Asset ID of the model registered in workspace/registry.", + ) + parser.add_argument( + "--inference_payload_file", + type=Path, + help="File containing data used to validate deployment", + ) + parser.add_argument( + "--inference_payload_folder", + type=Path, + help="Folder containing files used to validate deployment", + ) + parser.add_argument( + "--endpoint_name", + type=str, + help="Name of the endpoint", + ) + parser.add_argument("--deployment_name", type=str, help="Name of the the deployment") + parser.add_argument( + "--compute_name", + type=str, + help="Name of the compute target to execute the batch scoring jobs on", + ) + parser.add_argument( + "--size", + type=str, + help="Compute instance size to deploy model", + default=DEFAULT_COMPUTE_SIZE, + ) + parser.add_argument( + "--min_instances", + type=int, + default=DEFAULT_MIN_INSTANCES, + help="Minimum number of instances of the compute cluster", + ) + parser.add_argument( + "--max_instances", + type=int, + default=DEFAULT_MAX_INSTANCES, + help="Maximum number of instances of the compute cluster", + ) + parser.add_argument( + "--idle_time_before_scale_down", + type=int, + default=DEFAULT_IDLE_TIME_BEFORE_SCALE_DOWN, + help="Node Idle Time before scaling down amlCompute", + ) + parser.add_argument( + "--output_file_name", + type=str, + default=DEFAULT_OUTPUT_FILE_NAME, + help="Name of the batch scoring output file", + ) + parser.add_argument( + "--max_concurrency_per_instance", + type=int, + default=DEFAULT_MAX_CONCURRENCY_PER_INSTANCE, + help="The maximum number of parallel scoring_script runs per instance", + ) + parser.add_argument( + "--error_threshold", + type=int, + default=DEFAULT_ERROR_THRESHOLD, + help="The number of file failures that should be ignored", + ) + parser.add_argument( + "--max_retries", + type=int, + default=DEFAULT_MAX_RETRIES, + help="The maximum number of retries for a failed or timed-out mini batch", + ) + parser.add_argument( + "--timeout", + type=int, + default=DEFAULT_TIMEOUT, + help="The timeout in seconds for scoring a single mini batch.", + ) + parser.add_argument( + "--logging_level", + type=str, + default=DEFAULT_LOGGING_LEVEL, + help="The log verbosity level", + ) + parser.add_argument( + "--mini_batch_size", + type=int, + default=DEFAULT_MINI_BATCH_SIZE, + help="The number of files the code_configuration.scoring_script can process in one run() call", + ) + parser.add_argument( + "--instance_count", + type=int, + help="The number of nodes to use for each batch scoring job", + default=DEFAULT_INSTANCE_COUNT, + choices=range(1, MAX_INSTANCE_COUNT), + ) + parser.add_argument( + "--batch_job_output_folder", + type=Path, + help="Folder to which batch job outputs will be saved", + ) + # parse args + args = parser.parse_args() + logger.info(f"Args received {args}") + print("args received ", args) + + return args + + +def download_batch_output(ml_client, job, args): + """Download the output file.""" + scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0] + + logger.info(f"Downloading the {args.output_file_name} file.") + ml_client.jobs.download( + name=scoring_job.name, download_path=args.batch_job_output_folder, output_name="score" + ) + + named_outputs_folder = "named-outputs" + score_folder = "score" + source_folder = args.batch_job_output_folder / named_outputs_folder / score_folder + destination_folder = args.batch_job_output_folder / score_folder + + if not os.path.exists(destination_folder): + os.makedirs(destination_folder) + + for root, dirs, files in os.walk(source_folder): + for file in files: + source_path = os.path.join(root, file) + destination_path = os.path.join(destination_folder, file) + shutil.move(source_path, destination_path) + + shutil.rmtree(args.batch_job_output_folder / named_outputs_folder) + logger.info(f"Successfully downloaded the {args.output_file_name} file.") + + +def invoke_endpoint_job(ml_client, endpoint, type, args): + """Invoke a job using the endpoint.""" + print(f"Invoking inference with {type} test payload ...") + try: + if type == "uri_folder": + path = args.inference_payload_folder + else: + path = args.inference_payload_file + input = Input(type=type, path=rf"{path}") + + job = ml_client.batch_endpoints.invoke( + endpoint_name=endpoint.name, input=input + ) + + ml_client.jobs.stream(job.name) + logger.info(f"Endpoint invoked successfully with {type} test payload.") + download_batch_output(ml_client, job, args) + + except Exception as e: + raise AzureMLException._with_error( + AzureMLError.create(BatchEndpointInvocationError, exception=e) + ) + + +def get_or_create_compute(ml_client, compute_name, args): + """Get or create the compute cluster and return details.""" + try: + compute_cluster = ml_client.compute.get(compute_name) + logger.info(f"Using compute cluster {compute_name}.") + except Exception: + compute_cluster = AmlCompute( + name=compute_name, + size=args.size, + min_instances=args.min_instances, + max_instances=args.max_instances, + idle_time_before_scale_down=args.idle_time_before_scale_down, + ) + try: + logger.info(f"Creating compute cluster {compute_name}") + ml_client.begin_create_or_update(compute_cluster).wait() + logger.info("Compute cluster created successfully.") + except Exception as e: + raise AzureMLException._with_error( + AzureMLError.create(ComputeCreationError, exception=e) + ) + return compute_cluster + + +def create_endpoint_and_deployment(ml_client, compute_name, model_id, endpoint_name, deployment_name, args): + """Create endpoint and deployment and return details.""" + endpoint = BatchEndpoint(name=endpoint_name) + + # deployment + deployment = BatchDeployment( + name=deployment_name, + endpoint_name=endpoint_name, + model=model_id, + compute=compute_name, + output_file_name=args.output_file_name, + max_concurrency_per_instance=args.max_concurrency_per_instance, + error_threshold=args.error_threshold, + retry_settings=BatchRetrySettings( + max_retries=args.max_retries, + timeout=args.timeout, + ), + logging_level=args.logging_level, + mini_batch_size=args.mini_batch_size, + instance_count=args.instance_count, + ) + + try: + logger.info(f"Creating endpoint {endpoint_name}") + ml_client.begin_create_or_update(endpoint).wait() + logger.info("Endpoint created successfully.") + except Exception as e: + raise AzureMLException._with_error( + AzureMLError.create(EndpointCreationError, exception=e) + ) + + try: + logger.info(f"Creating deployment {deployment}") + ml_client.batch_deployments.begin_create_or_update(deployment).wait() + except Exception as e: + raise AzureMLException._with_error( + AzureMLError.create(DeploymentCreationError, exception=e) + ) + + logger.info("Deployment successful.") + + # set the deployment as default + try: + logger.info(f"Updating endpoint to make {deployment_name} as default deployment") + endpoint = ml_client.batch_endpoints.get(endpoint_name) + endpoint.defaults.deployment_name = deployment_name + ml_client.begin_create_or_update(endpoint).wait() + + endpoint = ml_client.batch_endpoints.get(endpoint_name) + except Exception as e: + error_msg = f"Error occured while updating deployment - {e}" + raise Exception(error_msg) + + logger.info(f"The default deployment is {endpoint.defaults.deployment_name}") + return endpoint, deployment + + +@swallow_all_exceptions(logger) +def main(): + """Run main function.""" + args = parse_args() + ml_client = get_mlclient() + # get registered model id + + if args.model_id: + model_id = str(args.model_id) + elif args.registration_details_folder: + registration_details_file = args.registration_details_folder/ComponentVariables.REGISTRATION_DETAILS_JSON_FILE + if registration_details_file.exists(): + try: + with open(registration_details_file) as f: + model_info = json.load(f) + model_id = model_info["id"] + except Exception as e: + raise Exception(f"model_registration_details json file is missing model information {e}.") + else: + raise Exception(f"{ComponentVariables.REGISTRATION_DETAILS_JSON_FILE} is missing inside folder.") + else: + raise Exception("Arguments model_id and registration_details both are missing.") + + # Endpoint has following restrictions: + # 1. Name must begin with lowercase letter + # 2. Followed by lowercase letters, hyphen or numbers + # 3. End with a lowercase letter or number + + # 1. Replace underscores and slashes by hyphens and convert them to lower case. + # 2. Take 21 chars from model name and append '-' & timstamp(10chars) to it + model_name = get_model_name(model_id) + + endpoint_name = re.sub("[^A-Za-z0-9]", "-", model_name).lower()[:21] + endpoint_name = f"{endpoint_name}-{int(time.time())}" + endpoint_name = endpoint_name + + endpoint_name = args.endpoint_name if args.endpoint_name else endpoint_name + deployment_name = args.deployment_name if args.deployment_name else "default" + compute_name = args.compute_name if args.compute_name else "cpu-cluster" + + compute_cluster = get_or_create_compute( + ml_client=ml_client, + compute_name=compute_name, + args=args + ) + + endpoint, deployment = create_endpoint_and_deployment( + ml_client=ml_client, + compute_name=compute_name, + endpoint_name=endpoint_name, + deployment_name=deployment_name, + model_id=model_id, + args=args + ) + + if args.inference_payload_file and args.inference_payload_folder: + logger.warning("Dump all csv files under uri_folder instead of providing multiple inputs.") + + if args.inference_payload_folder: + invoke_endpoint_job( + ml_client=ml_client, + endpoint=endpoint, + type="uri_folder", + args=args, + ) + elif args.inference_payload_file: + invoke_endpoint_job( + ml_client=ml_client, + endpoint=endpoint, + type="uri_file", + args=args, + ) + + print("Saving deployment details ...") + + # write deployment details to file + endpoint_type = "aml_batch_inference" + deployment_details = { + "endpoint_name": endpoint.name, + "deployment_name": deployment.name, + "endpoint_uri": endpoint.__dict__["_scoring_uri"], + "endpoint_type": endpoint_type, + "compute_size": compute_cluster.size, + "instance_count": deployment.instance_count, + "max_concurrency_per_instance": deployment.max_concurrency_per_instance, + } + json_object = json.dumps(deployment_details, indent=4) + file_path = args.batch_job_output_folder / DEPLOYMENT_DETAILS_JSON_FILE_NAME + with open(file_path, "w") as outfile: + outfile.write(json_object) + logger.info("Saved deployment details in output json file.") + + +# run script +if __name__ == "__main__": + # run main function + main() diff --git a/assets/common/src/utils/config.py b/assets/common/src/utils/config.py index 694d8274b3..7133b8af75 100644 --- a/assets/common/src/utils/config.py +++ b/assets/common/src/utils/config.py @@ -10,6 +10,7 @@ class AppName: IMPORT_MODEL = "import_model" REGISTER_MODEL = "register_model" DEPLOY_MODEL = "deploy_model" + BATCH_DEPLOY_MODEL = "batch_deploy_model" MLFLOW_MODEL_LOCAL_VALIDATION = "mlflow_model_local_validation" diff --git a/assets/common/src/utils/exceptions.py b/assets/common/src/utils/exceptions.py index 0804f6107e..2cb7b93ad2 100644 --- a/assets/common/src/utils/exceptions.py +++ b/assets/common/src/utils/exceptions.py @@ -26,9 +26,11 @@ class ModelImportErrorStrings: ) UNSUPPORTED_MODEL_TYPE_ERROR = "Unsupported model type : {model_type}" MISSING_MODEL_NAME_ERROR = "Missing Model Name. Provide model_name as input or in the model_download_metadata JSON" + COMPUTE_CREATION_ERROR = "Error occured while creating compute cluster - {exception}" ENDPOINT_CREATION_ERROR = "Error occured while creating endpoint - {exception}" DEPLOYMENT_CREATION_ERROR = "Error occured while creating deployment - {exception}" ONLINE_ENDPOINT_INVOCATION_ERROR = "Invocation failed with error: {exception}" + BATCH_ENDPOINT_INVOCATION_ERROR = "Invocation failed with error: {exception}" USER_IDENTITY_MISSING_ERROR = ( "Failed to get AzureMLOnBehalfOfCredential." " Kindly set UserIdentity as identity type if submitting job using sdk or cli." @@ -112,6 +114,15 @@ def message_format(self) -> str: return ModelImportErrorStrings.MISSING_MODEL_NAME_ERROR +class ComputeCreationError(ClientError): + """Internal Import Model Generic Error.""" + + @property + def message_format(self) -> str: + """Message format.""" + return ModelImportErrorStrings.COMPUTE_CREATION_ERROR + + class EndpointCreationError(ClientError): """Internal Import Model Generic Error.""" @@ -139,6 +150,15 @@ def message_format(self) -> str: return ModelImportErrorStrings.ONLINE_ENDPOINT_INVOCATION_ERROR +class BatchEndpointInvocationError(ClientError): + """Internal Import Model Generic Error.""" + + @property + def message_format(self) -> str: + """Message format.""" + return ModelImportErrorStrings.BATCH_ENDPOINT_INVOCATION_ERROR + + class UserIdentityMissingError(ClientError): """Internal Import Model Generic Error."""