Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deployment to llm-complete #160

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions llm-complete-guide/.env.local
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a file that should be commited?

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MODELS=[{"name":"llm-complete-rag-webui","parameters":{"temperature":0.5,"max_new_tokens":1024},"endpoints":[{"type":"openai","baseURL":"http://localhost:3000/generate"}]}]

35 changes: 35 additions & 0 deletions llm-complete-guide/k8s_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: v1
kind: Service
metadata:
labels:
app: placeholder
name: placeholder
spec:
ports:
- name: http # Changed from 'predict' to 'http' for clarity
port: 80 # External port exposed by LoadBalancer
targetPort: 3000 # Internal container port
selector:
app: placeholder
type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: placeholder
name: placeholder
spec:
selector:
matchLabels:
app: placeholder
template:
metadata:
labels:
app: placeholder
spec:
containers:
- image: placeholder
name: placeholder
ports:
- containerPort: 3000
4 changes: 3 additions & 1 deletion llm-complete-guide/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
from pipelines.llm_basic_rag import llm_basic_rag
from pipelines.llm_eval import llm_eval
from pipelines.rag_deployment import rag_deployment
from pipelines.llm_index_and_evaluate import llm_index_and_evaluate
from pipelines.llm_index_and_evaluate import llm_index_and_evaluate
from pipelines.local_deployment import local_deployment
from pipelines.prod_deployment import production_deployment
11 changes: 11 additions & 0 deletions llm-complete-guide/pipelines/local_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from steps.bento_builder import bento_builder
from steps.bento_deployment import bento_deployment
from zenml import pipeline


@pipeline(enable_cache=False)
def local_deployment():
bento = bento_builder()
bento_deployment(bento)

#vllm_model_deployer_step()
31 changes: 31 additions & 0 deletions llm-complete-guide/pipelines/prod_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Apache Software License 2.0
#
# Copyright (c) ZenML GmbH 2024. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from steps.bento_dockerizer import bento_dockerizer
from steps.k8s_deployment import k8s_deployment
from zenml import pipeline


@pipeline(enable_cache=False)
def production_deployment(
):
"""Model deployment pipeline.

This is a pipeline deploys trained model for future inference.
"""
bento_model_image = bento_dockerizer()
k8s_deployment(bento_model_image)
13 changes: 11 additions & 2 deletions llm-complete-guide/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
llm_eval,
rag_deployment,
llm_index_and_evaluate,
local_deployment,
)
from structures import Document
from zenml.materializers.materializer_registry import materializer_registry
Expand Down Expand Up @@ -95,6 +96,13 @@
default="gpt4",
help="The model to use for the completion.",
)
@click.option(
"--query-text",
"query_text",
required=False,
default=None,
help="The query text to use for the completion.",
)
@click.option(
"--zenml-model-name",
"zenml_model_name",
Expand Down Expand Up @@ -251,7 +259,8 @@ def main(
)()

elif pipeline == "deploy":
rag_deployment.with_options(model=zenml_model, **pipeline_args)()
#rag_deployment.with_options(model=zenml_model, **pipeline_args)()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make this configurable somehow through click args

local_deployment.with_options(model=zenml_model, **pipeline_args)()

elif pipeline == "evaluation":
pipeline_args["enable_cache"] = False
Expand Down Expand Up @@ -279,4 +288,4 @@ def main(
materializer_registry.register_materializer_type(
Document, DocumentMaterializer
)
main()
main()
151 changes: 151 additions & 0 deletions llm-complete-guide/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import asyncio
from typing import Any, AsyncGenerator, Dict

import bentoml
import litellm
import numpy as np
from constants import (
EMBEDDINGS_MODEL_ID_FINE_TUNED,
MODEL_NAME_MAP,
OPENAI_MODEL,
SECRET_NAME,
SECRET_NAME_ELASTICSEARCH,
)
from elasticsearch import Elasticsearch
from rerankers import Reranker
from sentence_transformers import SentenceTransformer
from utils.openai_utils import get_openai_api_key
from zenml.client import Client

EMBEDDINGS_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 384 dimensions


@bentoml.service(
name="rag-service",
traffic={
"timeout": 300,
"concurrency": 256,
},
)
class RAGService:
"""RAG service for generating responses using LLM and RAG."""
def __init__(self):
"""Initialize the RAG service."""
# Initialize embeddings model
self.embeddings_model = SentenceTransformer(EMBEDDINGS_MODEL)

# Initialize reranker
self.reranker = Reranker("flashrank")

# Initialize Elasticsearch client
client = Client()
es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"]
es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"]
self.es_client = Elasticsearch(es_host, api_key=es_api_key)

def get_embeddings(self, text: str) -> np.ndarray:
"""Get embeddings for the given text."""
embeddings = self.embeddings_model.encode(text)
if embeddings.ndim == 2:
embeddings = embeddings[0]
return embeddings

def get_similar_docs(self, query_embedding: np.ndarray, n: int = 20) -> list:
"""Get similar documents for the given query embedding."""
if query_embedding.ndim == 2:
query_embedding = query_embedding[0]

response = self.es_client.search(index="zenml_docs", knn={
"field": "embedding",
"query_vector": query_embedding.tolist(),
"num_candidates": 50,
"k": n
})

docs = []
for hit in response["hits"]["hits"]:
docs.append({
"content": hit["_source"]["content"],
"url": hit["_source"]["url"],
"parent_section": hit["_source"]["parent_section"]
})
return docs

def rerank_documents(self, query: str, documents: list) -> list:
"""Rerank documents using the reranker."""
docs_texts = [f"{doc['content']} PARENT SECTION: {doc['parent_section']}" for doc in documents]
results = self.reranker.rank(query=query, docs=docs_texts)

reranked_docs = []
for result in results.results:
index_val = result.doc_id
doc = documents[index_val]
reranked_docs.append((result.text, doc["url"]))
return reranked_docs[:5]

async def get_completion(self, messages: list, model: str, temperature: float, max_tokens: int) -> AsyncGenerator[str, None]:
"""Handle the completion request and streaming response."""
try:
response = await litellm.acompletion(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
api_key=get_openai_api_key(),
stream=True
)

async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error in completion: {str(e)}"

@bentoml.api
async def generate(
self,
query: str = "Explain ZenML features",
temperature: float = 0.4,
max_tokens: int = 1000,
) -> AsyncGenerator[str, None]:
"""Generate responses for the given query."""
try:
# Get embeddings for query
query_embedding = self.get_embeddings(query)

# Retrieve similar documents
similar_docs = self.get_similar_docs(query_embedding, n=20)

# Rerank documents
reranked_docs = self.rerank_documents(query, similar_docs)

# Prepare context from reranked documents
context = "\n\n".join([doc[0] for doc in reranked_docs])

# Prepare system message
system_message = """
You are a friendly chatbot. \
You can answer questions about ZenML, its features and its use cases. \
You respond in a concise, technically credible tone. \
You ONLY use the context from the ZenML documentation to provide relevant answers. \
You do not make up answers or provide opinions that you don't have information to support. \
If you are unsure or don't know, just say so. \
"""

# Prepare messages for LLM
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": query},
{
"role": "assistant",
"content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}"
}
]

# Get completion from LLM using the new async method
model = MODEL_NAME_MAP.get(OPENAI_MODEL, OPENAI_MODEL)
async for chunk in self.get_completion(messages, model, temperature, max_tokens):
yield chunk

except Exception as e:
yield f"Error occurred: {str(e)}"
86 changes: 86 additions & 0 deletions llm-complete-guide/steps/bento_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
import importlib
import os
from typing import Optional

import bentoml
from bentoml import bentos
from bentoml._internal.bento import bento
from constants import (
EMBEDDINGS_MODEL_ID_FINE_TUNED,
)
from typing_extensions import Annotated
from zenml import ArtifactConfig, Model, get_step_context, step
from zenml import __version__ as zenml_version
from zenml.client import Client
from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME
from zenml.integrations.bentoml.materializers.bentoml_bento_materializer import (
BentoMaterializer,
)
from zenml.integrations.bentoml.steps import bento_builder_step
from zenml.logger import get_logger
from zenml.utils import source_utils

logger = get_logger(__name__)

@step(output_materializers=BentoMaterializer, enable_cache=False)
def bento_builder() -> (
Annotated[
Optional[bento.Bento],
ArtifactConfig(name="bentoml_rag_deployment", is_model_artifact=True),
]
):
"""Predictions step.

This is an example of a predictions step that takes the data in and returns
predicted values.

This step is parameterized, which allows you to configure the step
independently of the step code, before running it in a pipeline.
In this example, the step can be configured to use different input data.
See the documentation for more information:

https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines

Args:
dataset_inf: The inference dataset.

Returns:
The predictions as pandas series
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
if Client().active_stack.orchestrator.flavor == "local":
model = get_step_context().model
version_to_deploy = Model(name=model.name, version="production")
# Build the BentoML bundle
bento = bentos.build(
service="service.py:RAGService",
labels={
"zenml_version": zenml_version,
"model_name": version_to_deploy.name,
"model_version": version_to_deploy.version,
"model_uri": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}",
"bento_uri": os.path.join(get_step_context().get_output_artifact_uri(), DEFAULT_BENTO_FILENAME),
},
build_ctx=source_utils.get_source_root(),
python={
"requirements_txt":"requirements.txt",
},
)
else:
logger.warning("Skipping deployment as the orchestrator is not local.")
bento = None
### YOUR CODE ENDS HERE ###
return bento
Loading
Loading