Skip to content

Commit

Permalink
feat: executor to azure custom container (#6159)
Browse files Browse the repository at this point in the history
  • Loading branch information
zac-li authored Apr 2, 2024
1 parent a3fb8e1 commit b04f750
Show file tree
Hide file tree
Showing 32 changed files with 143 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_singleton.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/csp
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_singleton.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/csp
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
Expand Down
1 change: 1 addition & 0 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class ProviderType(BetterEnum):

NONE = 0 #: no provider
SAGEMAKER = 1 #: AWS SageMaker
AZURE = 2 #: AZURE


def replace_enum_to_str(obj):
Expand Down
10 changes: 5 additions & 5 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def __init__(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Expand Down Expand Up @@ -478,21 +478,21 @@ def __init__(
args = ArgNamespace.kwargs2namespace(kwargs, parser, True)
self.args = args
self._gateway_load_balancer = False
if self.args.provider == ProviderType.SAGEMAKER:
if self.args.provider in (ProviderType.SAGEMAKER, ProviderType.AZURE):
if self._gateway_kwargs.get('port', 0) == 8080:
raise ValueError(
'Port 8080 is reserved for Sagemaker deployment. '
'Port 8080 is reserved for CSP deployment. '
'Please use another port'
)
if self.args.port != [8080]:
warnings.warn(
'Port is changed to 8080 for Sagemaker deployment. '
'Port is changed to 8080 for CSP deployment. '
f'Port {self.args.port} is ignored'
)
self.args.port = [8080]
if self.args.protocol != [ProtocolType.HTTP]:
warnings.warn(
'Protocol is changed to HTTP for Sagemaker deployment. '
'Protocol is changed to HTTP for CSP deployment. '
f'Protocol {self.args.protocol} is ignored'
)
self.args.protocol = [ProtocolType.HTTP]
Expand Down
12 changes: 6 additions & 6 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def __init__(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down Expand Up @@ -466,7 +466,7 @@ def __init__(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down Expand Up @@ -975,7 +975,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Expand Down Expand Up @@ -1139,7 +1139,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Expand Down Expand Up @@ -1407,7 +1407,7 @@ def config_gateway(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down Expand Up @@ -1508,7 +1508,7 @@ def config_gateway(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down
2 changes: 1 addition & 1 deletion jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ def serve(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'AZURE'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Expand Down
17 changes: 17 additions & 0 deletions jina/serve/runtimes/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ def _get_server(self):
cors=getattr(self.args, 'cors', None),
is_cancel=self.is_cancel,
)
elif (
hasattr(self.args, 'provider')
and self.args.provider == ProviderType.AZURE
):
from jina.serve.runtimes.servers.http import AzureHTTPServer

return AzureHTTPServer(
name=self.args.name,
runtime_args=self.args,
req_handler_cls=self.req_handler_cls,
proxy=getattr(self.args, 'proxy', None),
uvicorn_kwargs=getattr(self.args, 'uvicorn_kwargs', None),
ssl_keyfile=getattr(self.args, 'ssl_keyfile', None),
ssl_certfile=getattr(self.args, 'ssl_certfile', None),
cors=getattr(self.args, 'cors', None),
is_cancel=self.is_cancel,
)
elif not hasattr(self.args, 'protocol') or (
len(self.args.protocol) == 1 and self.args.protocol[0] == ProtocolType.GRPC
):
Expand Down
39 changes: 38 additions & 1 deletion jina/serve/runtimes/servers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,44 @@ def app(self):
"""Get the sagemaker fastapi app
:return: Return a FastAPI app for the sagemaker container
"""
return self._request_handler._http_fastapi_sagemaker_app(
return self._request_handler._http_fastapi_csp_app(
title=self.title,
description=self.description,
no_crud_endpoints=self.no_crud_endpoints,
no_debug_endpoints=self.no_debug_endpoints,
expose_endpoints=self.expose_endpoints,
expose_graphql_endpoint=self.expose_graphql_endpoint,
tracing=self.tracing,
tracer_provider=self.tracer_provider,
cors=self.cors,
logger=self.logger,
)


class AzureHTTPServer(FastAPIBaseServer):
"""
:class:`AzureHTTPServer` is a FastAPIBaseServer that uses a custom FastAPI app for azure endpoints
"""

@property
def port(self):
"""Get the port for the azure server
:return: Return the port for the azure server, always 8080"""
return 8080

@property
def ports(self):
"""Get the port for the azure server
:return: Return the port for the azure server, always 8080"""
return [8080]

@property
def app(self):
"""Get the azure fastapi app
:return: Return a FastAPI app for the azure container
"""
return self._request_handler._http_fastapi_csp_app(
title=self.title,
description=self.description,
no_crud_endpoints=self.no_crud_endpoints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_fastapi_app(
from jina.serve.runtimes.gateway.models import _to_camel_case

if not docarray_v2:
logger.warning('Only docarray v2 is supported with Sagemaker. ')
logger.warning('Only docarray v2 is supported with CSP. ')
return

class Header(BaseModel):
Expand Down
4 changes: 2 additions & 2 deletions jina/serve/runtimes/worker/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ async def _shutdown():

return extend_rest_interface(app)

def _http_fastapi_sagemaker_app(self, **kwargs):
from jina.serve.runtimes.worker.http_sagemaker_app import get_fastapi_app
def _http_fastapi_csp_app(self, **kwargs):
from jina.serve.runtimes.worker.http_csp_app import get_fastapi_app

request_models_map = self._executor._get_endpoint_models_dict()

Expand Down
70 changes: 70 additions & 0 deletions tests/integration/docarray_v2/csp/test_azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os

import requests
from jina.orchestrate.pods import Pod
from jina.parsers import set_pod_parser

azure_port = 8080


def test_provider_azure_embedding_inference():
args, _ = set_pod_parser().parse_known_args(
[
"--uses",
os.path.join(
os.path.dirname(__file__), "SampleColbertExecutor", "config.yml"
),
"--provider",
"azure",
"serve",
]
)
with Pod(args):
resp = requests.get(f"http://localhost:{azure_port}/ping")
assert resp.status_code == 200
assert resp.json() == {}

resp = requests.post(
f"http://localhost:{azure_port}/encode",
json={
"data": [
{"text": "hello world"},
]
},
)
assert resp.status_code == 200
resp_json = resp.json()
assert len(resp_json["data"]) == 1
assert len(resp_json["data"][0]["embeddings"][0]) == 64


def test_provider_azure_rerank_inference():
args, _ = set_pod_parser().parse_known_args(
[
"--uses",
os.path.join(
os.path.dirname(__file__), "SampleColbertExecutor", "config.yml"
),
"--provider",
"azure",
"serve",
]
)
with Pod(args):
resp = requests.post(
f"http://localhost:{azure_port}/rank",
json={
"data": {
"documents": [
{"text": "the dog is in the house"},
{"text": "hey Peter"},
],
"query": "where is the dog",
"top_n": 2,
}
},
)
assert resp.status_code == 200
resp_json = resp.json()
assert len(resp_json["data"]) == 1
assert resp_json["data"][0]["results"][0]["document"]["text"] == "first result"

0 comments on commit b04f750

Please sign in to comment.