Skip to content

Commit

Permalink
Updated pr.
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-ohai committed Oct 13, 2023
1 parent 95ac6ae commit dcd8aba
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 121 deletions.
138 changes: 50 additions & 88 deletions ads/model/deployment/model_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import collections
import copy
import datetime
import sys
import oci
import warnings
import time
Expand Down Expand Up @@ -72,9 +71,6 @@
MODEL_DEPLOYMENT_INSTANCE_COUNT = 1
MODEL_DEPLOYMENT_BANDWIDTH_MBPS = 10

TIME_FRAME = 60
MAXIMUM_PAYLOAD_SIZE = 10 * 1024 * 1024 # bytes

MODEL_DEPLOYMENT_RUNTIMES = {
ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime,
ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime,
Expand Down Expand Up @@ -253,10 +249,6 @@ class ModelDeployment(Builder):
CONST_TIME_CREATED: "time_created",
}

count_start_time = 0
request_counter = 0
estimate_request_per_second = 100

initialize_spec_attributes = [
"display_name",
"description",
Expand Down Expand Up @@ -915,51 +907,60 @@ def predict(
raise AttributeError(
"`data` and `json_input` are both provided. You can only use one of them."
)

if auto_serialize_data:
data = data or json_input
serialized_data = serializer.serialize(data=data)
self._validate_bandwidth(serialized_data)
return send_request(
data=serialized_data,
endpoint=endpoint,
is_json_payload=_is_json_serializable(serialized_data),
header=header,
)

if json_input is not None:
if not _is_json_serializable(json_input):
raise ValueError(
"`json_input` must be json serializable. "
"Set `auto_serialize_data` to True, or serialize the provided input data first,"
"or using `data` to pass binary data."
try:
if auto_serialize_data:
data = data or json_input
serialized_data = serializer.serialize(data=data)
return send_request(
data=serialized_data,
endpoint=endpoint,
is_json_payload=_is_json_serializable(serialized_data),
header=header,
)
utils.get_logger().warning(
"The `json_input` argument of `predict()` will be deprecated soon. "
"Please use `data` argument. "
)
data = json_input

is_json_payload = _is_json_serializable(data)
if not isinstance(data, bytes) and not is_json_payload:
raise TypeError(
"`data` is not bytes or json serializable. Set `auto_serialize_data` to `True` to serialize the input data."
)
if model_name and model_version:
header["model-name"] = model_name
header["model-version"] = model_version
elif bool(model_version) ^ bool(model_name):
raise ValueError(
"`model_name` and `model_version` have to be provided together."
if json_input is not None:
if not _is_json_serializable(json_input):
raise ValueError(
"`json_input` must be json serializable. "
"Set `auto_serialize_data` to True, or serialize the provided input data first,"
"or using `data` to pass binary data."
)
utils.get_logger().warning(
"The `json_input` argument of `predict()` will be deprecated soon. "
"Please use `data` argument. "
)
data = json_input

is_json_payload = _is_json_serializable(data)
if not isinstance(data, bytes) and not is_json_payload:
raise TypeError(
"`data` is not bytes or json serializable. Set `auto_serialize_data` to `True` to serialize the input data."
)
if model_name and model_version:
header["model-name"] = model_name
header["model-version"] = model_version
elif bool(model_version) ^ bool(model_name):
raise ValueError(
"`model_name` and `model_version` have to be provided together."
)
prediction = send_request(
data=data,
endpoint=endpoint,
is_json_payload=is_json_payload,
header=header,
)
self._validate_bandwidth(data)
prediction = send_request(
data=data,
endpoint=endpoint,
is_json_payload=is_json_payload,
header=header,
)
return prediction
return prediction
except oci.exceptions.ServiceError as ex:
# When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend.
if ex.status == 429:
bandwidth_mbps = self.infrastructure.bandwidth_mbps or MODEL_DEPLOYMENT_BANDWIDTH_MBPS
utils.get_logger().warning(
f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps."
"To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024."
"To resolve the issue, try sizing down the payload, slowing down the request rate or increasing the allocated bandwidth."
)
raise

def activate(
self,
Expand Down Expand Up @@ -1800,45 +1801,6 @@ def _extract_spec_kwargs(self, **kwargs) -> Dict:
if attribute in kwargs:
spec_kwargs[attribute] = kwargs[attribute]
return spec_kwargs

def _validate_bandwidth(self, data: Any):
"""Validates payload size and load balancer bandwidth.
Parameters
----------
data: Any
Data or JSON payload for the prediction.
"""
payload_size = sys.getsizeof(data)
if payload_size > MAXIMUM_PAYLOAD_SIZE:
raise ValueError(
f"Payload size exceeds the maximum allowed {MAXIMUM_PAYLOAD_SIZE} bytes. Size down the payload."
)

time_now = int(time.time())
if self.count_start_time == 0:
self.count_start_time = time_now
if time_now - self.count_start_time < TIME_FRAME:
self.request_counter += 1
else:
self.estimate_request_per_second = (int)(self.request_counter / TIME_FRAME)
self.request_counter = 0
self.count_start_time = 0

if not self.infrastructure or not self.runtime:
raise ValueError("Missing parameter infrastructure or runtime. Try reruning it after parameters are fully configured.")

# load balancer bandwidth is only needed for HTTPS mode.
if self.runtime.deployment_mode == ModelDeploymentMode.HTTPS:
bandwidth_mbps = self.infrastructure.bandwidth_mbps or MODEL_DEPLOYMENT_BANDWIDTH_MBPS
# formula: (payload size in KB) * (estimated requests per second) * 8 / 1024
# 20% extra for estimation errors and sporadic peak traffic
payload_size_in_kb = payload_size / 1024
if (payload_size_in_kb * self.estimate_request_per_second * 8 * 1.2) / 1024 > bandwidth_mbps:
raise ValueError(
f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps."
"Try sizing down the payload, slowing down the request rate or increasing bandwidth."
)

def build(self) -> "ModelDeployment":
"""Load default values from the environment for the job infrastructure."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@

import copy
from datetime import datetime
import time
import oci
import pytest
import unittest
import pandas
import sys
from unittest.mock import MagicMock, patch
from ads.common.oci_datascience import OCIDataScienceMixin
from ads.common.oci_logging import ConsolidatedLog, OCILog
Expand All @@ -21,7 +19,6 @@
from ads.model.datascience_model import DataScienceModel

from ads.model.deployment.model_deployment import (
MAXIMUM_PAYLOAD_SIZE,
ModelDeployment,
ModelDeploymentLogType,
ModelDeploymentFailedError,
Expand Down Expand Up @@ -1483,33 +1480,3 @@ def test_model_deployment_with_large_size_artifact(
)
mock_create_model_deployment.assert_called_with(create_model_deployment_details)
mock_sync.assert_called()

@patch.object(sys, "getsizeof")
def test_validate_bandwidth(self, mock_get_size_of):
model_deployment = self.initialize_model_deployment()

mock_get_size_of.return_value = 11 * 1024 * 1024
with pytest.raises(
ValueError,
match=f"Payload size exceeds the maximum allowed {MAXIMUM_PAYLOAD_SIZE} bytes. Size down the payload."
):
model_deployment._validate_bandwidth("test")
mock_get_size_of.assert_called()

mock_get_size_of.return_value = 9 * 1024 * 1024
with pytest.raises(
ValueError,
match=f"Load balancer bandwidth exceeds the allocated {model_deployment.infrastructure.bandwidth_mbps} Mbps."
"Try sizing down the payload, slowing down the request rate or increasing bandwidth."
):
model_deployment._validate_bandwidth("test")
mock_get_size_of.assert_called()

mock_get_size_of.return_value = 5
model_deployment._validate_bandwidth("test")
mock_get_size_of.assert_called()

model_deployment.count_start_time = (int)(time.time()) - 700
model_deployment._validate_bandwidth("test")
mock_get_size_of.assert_called()

0 comments on commit dcd8aba

Please sign in to comment.