diff --git a/llm-complete-guide/.env.local b/llm-complete-guide/.env.local new file mode 100644 index 00000000..e7fbd395 --- /dev/null +++ b/llm-complete-guide/.env.local @@ -0,0 +1,2 @@ +MODELS=[{"name":"llm-complete-rag-webui","parameters":{"temperature":0.5,"max_new_tokens":1024},"endpoints":[{"type":"openai","baseURL":"http://localhost:3000/generate"}]}] + diff --git a/llm-complete-guide/gh_action_rag.py b/llm-complete-guide/gh_action_rag.py index ee8ac86d..e21e9980 100644 --- a/llm-complete-guide/gh_action_rag.py +++ b/llm-complete-guide/gh_action_rag.py @@ -21,12 +21,10 @@ import click import yaml -from zenml.enums import PluginSubType - from pipelines.llm_index_and_evaluate import llm_index_and_evaluate -from zenml.client import Client from zenml import Model -from zenml.exceptions import ZenKeyError +from zenml.client import Client +from zenml.enums import PluginSubType @click.command( @@ -89,7 +87,7 @@ def main( zenml_model_name: Optional[str] = "zenml-docs-qa-rag", zenml_model_version: Optional[str] = None, ): - """ + """ Executes the pipeline to train a basic RAG model. Args: @@ -108,14 +106,14 @@ def main( config = yaml.safe_load(file) # Read the model version from a file in the root of the repo - # called "ZENML_VERSION.txt". + # called "ZENML_VERSION.txt". if zenml_model_version == "staging": postfix = "-rc0" elif zenml_model_version == "production": postfix = "" else: postfix = "-dev" - + if Path("ZENML_VERSION.txt").exists(): with open("ZENML_VERSION.txt", "r") as file: zenml_model_version = file.read().strip() @@ -177,7 +175,7 @@ def main( service_account_id=service_account_id, auth_window=0, flavor="builtin", - action_type=PluginSubType.PIPELINE_RUN + action_type=PluginSubType.PIPELINE_RUN, ).id client.create_trigger( name="Production Trigger LLM-Complete", diff --git a/llm-complete-guide/pipelines/__init__.py b/llm-complete-guide/pipelines/__init__.py index ae127fa3..ad60e74f 100644 --- a/llm-complete-guide/pipelines/__init__.py +++ b/llm-complete-guide/pipelines/__init__.py @@ -19,5 +19,7 @@ from pipelines.generate_chunk_questions import generate_chunk_questions from pipelines.llm_basic_rag import llm_basic_rag from pipelines.llm_eval import llm_eval +from pipelines.llm_index_and_evaluate import llm_index_and_evaluate +from pipelines.local_deployment import local_deployment +from pipelines.prod_deployment import production_deployment from pipelines.rag_deployment import rag_deployment -from pipelines.llm_index_and_evaluate import llm_index_and_evaluate \ No newline at end of file diff --git a/llm-complete-guide/pipelines/finetune_embeddings.py b/llm-complete-guide/pipelines/finetune_embeddings.py index e53ae3f1..19b8b08c 100644 --- a/llm-complete-guide/pipelines/finetune_embeddings.py +++ b/llm-complete-guide/pipelines/finetune_embeddings.py @@ -12,7 +12,6 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. -from constants import EMBEDDINGS_MODEL_NAME_ZENML from steps.finetune_embeddings import ( evaluate_base_model, evaluate_finetuned_model, diff --git a/llm-complete-guide/pipelines/llm_basic_rag.py b/llm-complete-guide/pipelines/llm_basic_rag.py index 82a97b21..895c4df3 100644 --- a/llm-complete-guide/pipelines/llm_basic_rag.py +++ b/llm-complete-guide/pipelines/llm_basic_rag.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from litellm import config_path from steps.populate_index import ( generate_embeddings, diff --git a/llm-complete-guide/pipelines/llm_index_and_evaluate.py b/llm-complete-guide/pipelines/llm_index_and_evaluate.py index 16423867..b82c84a3 100644 --- a/llm-complete-guide/pipelines/llm_index_and_evaluate.py +++ b/llm-complete-guide/pipelines/llm_index_and_evaluate.py @@ -15,9 +15,10 @@ # limitations under the License. # -from pipelines import llm_basic_rag, llm_eval from zenml import pipeline +from pipelines import llm_basic_rag, llm_eval + @pipeline def llm_index_and_evaluate() -> None: diff --git a/llm-complete-guide/pipelines/local_deployment.py b/llm-complete-guide/pipelines/local_deployment.py new file mode 100644 index 00000000..b68e72e5 --- /dev/null +++ b/llm-complete-guide/pipelines/local_deployment.py @@ -0,0 +1,9 @@ +from steps.bento_builder import bento_builder +from steps.bento_deployment import bento_deployment +from zenml import pipeline + + +@pipeline(enable_cache=False) +def local_deployment(): + bento = bento_builder() + bento_deployment(bento) diff --git a/llm-complete-guide/pipelines/prod_deployment.py b/llm-complete-guide/pipelines/prod_deployment.py new file mode 100644 index 00000000..3abee7a2 --- /dev/null +++ b/llm-complete-guide/pipelines/prod_deployment.py @@ -0,0 +1,32 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2024. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from steps.bento_dockerizer import bento_dockerizer +from steps.k8s_deployment import k8s_deployment +from steps.visualize_chat import create_chat_interface +from zenml import pipeline + + +@pipeline(enable_cache=False) +def production_deployment(): + """Model deployment pipeline. + + This is a pipeline deploys trained model for future inference. + """ + bento_model_image = bento_dockerizer() + deployment_info = k8s_deployment(bento_model_image) + create_chat_interface(deployment_info) diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index a2ba1f94..c6368a65 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -47,12 +47,14 @@ generate_synthetic_data, llm_basic_rag, llm_eval, - rag_deployment, llm_index_and_evaluate, + local_deployment, + production_deployment, + rag_deployment, ) from structures import Document -from zenml.materializers.materializer_registry import materializer_registry from zenml import Model +from zenml.materializers.materializer_registry import materializer_registry logger = get_logger(__name__) @@ -95,6 +97,13 @@ default="gpt4", help="The model to use for the completion.", ) +@click.option( + "--query-text", + "query_text", + required=False, + default=None, + help="The query text to use for the completion.", +) @click.option( "--zenml-model-name", "zenml_model_name", @@ -136,6 +145,12 @@ default=None, help="Path to config", ) +@click.option( + "--env", + "env", + default="local", + help="The environment to use for the completion.", +) def main( pipeline: str, query_text: Optional[str] = None, @@ -146,6 +161,7 @@ def main( use_argilla: bool = False, use_reranker: bool = False, config: Optional[str] = None, + env: str = "local", ): """Main entry point for the pipeline execution. @@ -159,6 +175,7 @@ def main( use_argilla (bool): If True, Argilla an notations will be used use_reranker (bool): If True, rerankers will be used config (Optional[str]): Path to config file + env (str): The environment to use for the deployment (local, huggingface space, k8s etc.) """ pipeline_args = {"enable_cache": not no_cache} embeddings_finetune_args = { @@ -169,9 +186,9 @@ def main( } }, } - + # Read the model version from a file in the root of the repo - # called "ZENML_VERSION.txt". + # called "ZENML_VERSION.txt". if zenml_model_version == "staging": postfix = "-rc0" elif zenml_model_version == "production": @@ -181,8 +198,10 @@ def main( if Path("ZENML_VERSION.txt").exists(): with open("ZENML_VERSION.txt", "r") as file: - zenml_model_version = file.read().strip() - zenml_model_version += postfix + zenml_version = file.read().strip() + zenml_version += postfix + # zenml_model_version = file.read().strip() + # zenml_model_version += postfix else: raise RuntimeError( "No model version file found. Please create a file called ZENML_VERSION.txt in the root of the repo with the model version." @@ -191,7 +210,7 @@ def main( # Create ZenML model zenml_model = Model( name=zenml_model_name, - version=zenml_model_version, + version=zenml_version, license="Apache 2.0", description="RAG application for ZenML docs", tags=["rag", "finetuned", "chatbot"], @@ -251,8 +270,19 @@ def main( )() elif pipeline == "deploy": - rag_deployment.with_options(model=zenml_model, **pipeline_args)() - + zenml_model.version = zenml_model_version + if env == "local": + local_deployment.with_options( + model=zenml_model, config_path=config_path, **pipeline_args + )() + elif env == "huggingface": + rag_deployment.with_options( + model=zenml_model, config_path=config_path, **pipeline_args + )() + elif env == "k8s": + production_deployment.with_options( + model=zenml_model, config_path=config_path, **pipeline_args + )() elif pipeline == "evaluation": pipeline_args["enable_cache"] = False llm_eval.with_options(model=zenml_model, config_path=config_path)() @@ -264,7 +294,9 @@ def main( elif pipeline == "embeddings": finetune_embeddings.with_options( - model=zenml_model, config_path=config_path, **embeddings_finetune_args + model=zenml_model, + config_path=config_path, + **embeddings_finetune_args, )() elif pipeline == "chunks": diff --git a/llm-complete-guide/service.py b/llm-complete-guide/service.py new file mode 100644 index 00000000..73a77683 --- /dev/null +++ b/llm-complete-guide/service.py @@ -0,0 +1,187 @@ +from typing import AsyncGenerator + +import bentoml +import litellm +import numpy as np +from constants import ( + MODEL_NAME_MAP, + OPENAI_MODEL, + SECRET_NAME_ELASTICSEARCH, +) +from elasticsearch import Elasticsearch +from rerankers import Reranker +from sentence_transformers import SentenceTransformer +from utils.openai_utils import get_openai_api_key +from zenml.client import Client + +EMBEDDINGS_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 384 dimensions + + +@bentoml.service( + name="rag-service", + traffic={ + "timeout": 300, + "concurrency": 256, + }, + http={ + "cors": { + "enabled": True, + "access_control_allow_origins": [ + "https://cloud.zenml.io" + ], # Add your allowed origins + "access_control_allow_methods": [ + "GET", + "OPTIONS", + "POST", + "HEAD", + "PUT", + ], + "access_control_allow_credentials": True, + "access_control_allow_headers": ["*"], + # "access_control_allow_origin_regex": "https://.*\.my_org\.com", # Optional regex + "access_control_max_age": 1200, + "access_control_expose_headers": ["Content-Length"], + } + }, +) +class RAGService: + """RAG service for generating responses using LLM and RAG.""" + + def __init__(self): + """Initialize the RAG service.""" + # Initialize embeddings model + self.embeddings_model = SentenceTransformer(EMBEDDINGS_MODEL) + + # Initialize reranker + self.reranker = Reranker("flashrank") + + # Initialize Elasticsearch client + client = Client() + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] + es_api_key = client.get_secret( + SECRET_NAME_ELASTICSEARCH + ).secret_values["elasticsearch_api_key"] + self.es_client = Elasticsearch(es_host, api_key=es_api_key) + + def get_embeddings(self, text: str) -> np.ndarray: + """Get embeddings for the given text.""" + embeddings = self.embeddings_model.encode(text) + if embeddings.ndim == 2: + embeddings = embeddings[0] + return embeddings + + def get_similar_docs( + self, query_embedding: np.ndarray, n: int = 20 + ) -> list: + """Get similar documents for the given query embedding.""" + if query_embedding.ndim == 2: + query_embedding = query_embedding[0] + + response = self.es_client.search( + index="zenml_docs", + knn={ + "field": "embedding", + "query_vector": query_embedding.tolist(), + "num_candidates": 50, + "k": n, + }, + ) + + docs = [] + for hit in response["hits"]["hits"]: + docs.append( + { + "content": hit["_source"]["content"], + "url": hit["_source"]["url"], + "parent_section": hit["_source"]["parent_section"], + } + ) + return docs + + def rerank_documents(self, query: str, documents: list) -> list: + """Rerank documents using the reranker.""" + docs_texts = [ + f"{doc['content']} PARENT SECTION: {doc['parent_section']}" + for doc in documents + ] + results = self.reranker.rank(query=query, docs=docs_texts) + + reranked_docs = [] + for result in results.results: + index_val = result.doc_id + doc = documents[index_val] + reranked_docs.append((result.text, doc["url"])) + return reranked_docs[:5] + + async def get_completion( + self, messages: list, model: str, temperature: float, max_tokens: int + ) -> AsyncGenerator[str, None]: + """Handle the completion request and streaming response.""" + try: + response = await litellm.acompletion( + model=model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + api_key=get_openai_api_key(), + stream=True, + ) + + async for chunk in response: + if chunk.choices and chunk.choices[0].delta.content: + yield chunk.choices[0].delta.content + except Exception as e: + yield f"Error in completion: {str(e)}" + + @bentoml.api + async def generate( + self, + query: str = "Explain ZenML features", + temperature: float = 0.4, + max_tokens: int = 1000, + ) -> AsyncGenerator[str, None]: + """Generate responses for the given query.""" + try: + # Get embeddings for query + query_embedding = self.get_embeddings(query) + + # Retrieve similar documents + similar_docs = self.get_similar_docs(query_embedding, n=20) + + # Rerank documents + reranked_docs = self.rerank_documents(query, similar_docs) + + # Prepare context from reranked documents + context = "\n\n".join([doc[0] for doc in reranked_docs]) + + # Prepare system message + system_message = """ + You are a friendly chatbot. \ + You can answer questions about ZenML, its features and its use cases. \ + You respond in a concise, technically credible tone. \ + You ONLY use the context from the ZenML documentation to provide relevant answers. \ + You do not make up answers or provide opinions that you don't have information to support. \ + If you are unsure or don't know, just say so. \ + """ + + # Prepare messages for LLM + messages = [ + {"role": "system", "content": system_message}, + {"role": "user", "content": query}, + { + "role": "assistant", + "content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}", + }, + ] + + # Get completion from LLM using the new async method + model = MODEL_NAME_MAP.get(OPENAI_MODEL, OPENAI_MODEL) + async for chunk in self.get_completion( + messages, model, temperature, max_tokens + ): + yield chunk + + except Exception as e: + yield f"Error occurred: {str(e)}" diff --git a/llm-complete-guide/steps/bento_builder.py b/llm-complete-guide/steps/bento_builder.py new file mode 100644 index 00000000..c94c4b33 --- /dev/null +++ b/llm-complete-guide/steps/bento_builder.py @@ -0,0 +1,93 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +import os +from typing import Optional + +from bentoml import bentos +from bentoml._internal.bento import bento +from constants import ( + EMBEDDINGS_MODEL_ID_FINE_TUNED, +) +from typing_extensions import Annotated +from zenml import ArtifactConfig, Model, get_step_context, step +from zenml import __version__ as zenml_version +from zenml.client import Client +from zenml.enums import ArtifactType +from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME +from zenml.integrations.bentoml.materializers.bentoml_bento_materializer import ( + BentoMaterializer, +) +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + + +@step(output_materializers=BentoMaterializer, enable_cache=False) +def bento_builder() -> ( + Annotated[ + Optional[bento.Bento], + ArtifactConfig( + name="bentoml_rag_deployment", artifact_type=ArtifactType.MODEL + ), + ] +): + """Predictions step. + + This is an example of a predictions step that takes the data in and returns + predicted values. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to use different input data. + See the documentation for more information: + + https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines + + Args: + dataset_inf: The inference dataset. + + Returns: + The predictions as pandas series + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + if Client().active_stack.orchestrator.flavor == "local": + model = get_step_context().model + version_to_deploy = Model(name=model.name, version="production") + logger.info( + f"Building BentoML bundle for model: {version_to_deploy.name}" + ) + # Build the BentoML bundle + bento = bentos.build( + service="service.py:RAGService", + labels={ + "zenml_version": zenml_version, + "model_name": version_to_deploy.name, + "model_version": version_to_deploy.version, + "model_uri": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", + "bento_uri": os.path.join( + get_step_context().get_output_artifact_uri(), + DEFAULT_BENTO_FILENAME, + ), + }, + build_ctx=source_utils.get_source_root(), + python={ + "requirements_txt": "requirements.txt", + }, + ) + else: + logger.warning("Skipping deployment as the orchestrator is not local.") + bento = None + ### YOUR CODE ENDS HERE ### + return bento diff --git a/llm-complete-guide/steps/bento_deployment.py b/llm-complete-guide/steps/bento_deployment.py new file mode 100644 index 00000000..7e14dac6 --- /dev/null +++ b/llm-complete-guide/steps/bento_deployment.py @@ -0,0 +1,59 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +from typing import Optional + +from bentoml._internal.bento import bento +from zenml import get_step_context, step +from zenml.client import Client +from zenml.integrations.bentoml.services.bentoml_local_deployment import ( + BentoMLLocalDeploymentConfig, + BentoMLLocalDeploymentService, +) +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + + +@step(enable_cache=False) +def bento_deployment( + bento: bento.Bento, +) -> Optional[BentoMLLocalDeploymentService]: + # Deploy a model using the MLflow Model Deployer + zenml_client = Client() + step_context = get_step_context() + pipeline_name = step_context.pipeline.name + step_name = step_context.step_run.name + model_deployer = zenml_client.active_stack.model_deployer + bentoml_deployment_config = BentoMLLocalDeploymentConfig( + model_name=step_context.model.name, + model_version=step_context.model.stage, + description="Deploying RAG model", + pipeline_name=pipeline_name, + pipeline_step_name=step_name, + model_uri=bento.info.labels.get("model_uri"), + bento_tag=str(bento.tag), + bento_uri=bento.info.labels.get("bento_uri"), + working_dir=source_utils.get_source_root(), + timeout=1500, + ) + service = model_deployer.deploy_model( + config=bentoml_deployment_config, + service_type=BentoMLLocalDeploymentService.SERVICE_TYPE, + ) + logger.info( + f"The deployed service info: {model_deployer.get_model_server_info(service)}" + ) + return service diff --git a/llm-complete-guide/steps/bento_dockerizer.py b/llm-complete-guide/steps/bento_dockerizer.py new file mode 100644 index 00000000..81009ce8 --- /dev/null +++ b/llm-complete-guide/steps/bento_dockerizer.py @@ -0,0 +1,60 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +import bentoml +from typing_extensions import Annotated +from zenml import ArtifactConfig, Model, get_step_context, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step(enable_cache=False) +def bento_dockerizer() -> ( + Annotated[ + str, + ArtifactConfig(name="bentoml_model_image"), + ] +): + """dockerize_bento step. + + This step is responsible for dockerizing the BentoML model. + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + zenml_client = Client() + model = get_step_context().model + version_to_deploy = Model(name=model.name) + bentoml_deployment = zenml_client.get_artifact_version( + name_id_or_prefix="bentoml_rag_deployment" + ) + bento_tag = f'{bentoml_deployment.run_metadata["bento_tag_name"]}:{bentoml_deployment.run_metadata["bento_info_version"]}' + container_registry = zenml_client.active_stack.container_registry + assert container_registry, "Container registry is not configured." + image_name = f"{container_registry.config.uri}/{bento_tag}" + image_tag = (image_name,) + try: + bentoml.container.build( + bento_tag=bento_tag, + backend="docker", # hardcoding docker since container service only supports docker + image_tag=image_tag, + ) + + except Exception as e: + logger.error(f"Error containerizing the bento: {e}") + raise e + + container_registry.push_image(image_name) + ### YOUR CODE ENDS HERE ### + return image_name diff --git a/llm-complete-guide/steps/chunk_documents.py b/llm-complete-guide/steps/chunk_documents.py index 43983604..8a48c26b 100644 --- a/llm-complete-guide/steps/chunk_documents.py +++ b/llm-complete-guide/steps/chunk_documents.py @@ -21,7 +21,7 @@ ) from structures import Document from utils.llm_utils import split_documents -from zenml import log_artifact_metadata, step +from zenml import log_metadata, step from zenml.logger import get_logger logger = get_logger(__name__) @@ -137,8 +137,9 @@ def chunk_documents( logger.info( f"Number of documents after chunking: {num_docs_after_chunking}" ) - log_artifact_metadata( + log_metadata( artifact_name="chunked_documents", + infer_artifact=True, metadata={ "before_chunking_count": num_docs_before_chunking, "after_chunking_count": num_docs_after_chunking, diff --git a/llm-complete-guide/steps/eval_pii.py b/llm-complete-guide/steps/eval_pii.py index b81237f3..5148ff31 100644 --- a/llm-complete-guide/steps/eval_pii.py +++ b/llm-complete-guide/steps/eval_pii.py @@ -6,7 +6,7 @@ import matplotlib.pyplot as plt from datasets import Dataset from PIL import Image -from zenml import log_artifact_metadata, step +from zenml import log_metadata, step class PIIDetector: @@ -305,8 +305,10 @@ def eval_pii( "dates_found": train_results["statistics"]["total_findings"]["dates"], "ips_found": train_results["statistics"]["total_findings"]["ips"], } - log_artifact_metadata( - metadata=train_metadata, artifact_name="train_pii_results" + log_metadata( + metadata=train_metadata, + artifact_name="train_pii_results", + infer_artifact=True, ) test_metadata = { @@ -320,8 +322,10 @@ def eval_pii( "dates_found": test_results["statistics"]["total_findings"]["dates"], "ips_found": test_results["statistics"]["total_findings"]["ips"], } - log_artifact_metadata( - metadata=test_metadata, artifact_name="test_pii_results" + log_metadata( + metadata=test_metadata, + artifact_name="test_pii_results", + infer_artifact=True, ) pii_chart = plot_pii_results(train_results, test_results) diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index 2b555b85..0261bef2 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -90,11 +90,11 @@ def query_similar_docs( num_docs = 20 if use_reranking else returned_sample_size # get (content, url) tuples for the top n similar documents top_similar_docs = get_topn_similar_docs( - embedded_question, - conn=conn, - es_client=es_client, - n=num_docs, - include_metadata=True + embedded_question, + conn=conn, + es_client=es_client, + n=num_docs, + include_metadata=True, ) if use_reranking: diff --git a/llm-complete-guide/steps/finetune_embeddings.py b/llm-complete-guide/steps/finetune_embeddings.py index 3117c473..def28080 100644 --- a/llm-complete-guide/steps/finetune_embeddings.py +++ b/llm-complete-guide/steps/finetune_embeddings.py @@ -47,8 +47,9 @@ ) from sentence_transformers.training_args import BatchSamplers from sentence_transformers.util import cos_sim -from zenml import ArtifactConfig, log_model_metadata, step +from zenml import ArtifactConfig, log_metadata, step from zenml.client import Client +from zenml.enums import ArtifactType from zenml.utils.cuda_utils import cleanup_gpu_memory @@ -168,8 +169,8 @@ def evaluate_base_model( for dim in EMBEDDINGS_MODEL_MATRYOSHKA_DIMS } - log_model_metadata( - metadata={"base_model_eval": base_model_eval}, + log_metadata( + metadata={"base_model_eval": base_model_eval}, infer_model=True ) return results @@ -201,8 +202,9 @@ def evaluate_finetuned_model( for dim in EMBEDDINGS_MODEL_MATRYOSHKA_DIMS } - log_model_metadata( + log_metadata( metadata={"finetuned_model_eval": finetuned_model_eval}, + infer_model=True, ) return results @@ -218,7 +220,7 @@ def finetune( ) -> Annotated[ SentenceTransformer, ArtifactConfig( - is_model_artifact=True, + artifact_type=ArtifactType.MODEL, name="finetuned-model", ), ]: @@ -298,7 +300,8 @@ def finetune( token=zenml_client.get_secret(SECRET_NAME).secret_values["hf_token"], ) - log_model_metadata( + log_metadata( + infer_model=True, metadata={ "training_params": { "num_train_epochs": epochs, @@ -322,7 +325,8 @@ def finetune( if torch.cuda.is_available() else "N/A", }, - } + "huggingface_model_id": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", + }, ) # handle materialization error with this workaround: diff --git a/llm-complete-guide/steps/finetune_embeddings_legacy.py b/llm-complete-guide/steps/finetune_embeddings_legacy.py index 7136e784..abda6c24 100644 --- a/llm-complete-guide/steps/finetune_embeddings_legacy.py +++ b/llm-complete-guide/steps/finetune_embeddings_legacy.py @@ -25,7 +25,7 @@ from torch.nn import CosineSimilarity from torch.utils.data import DataLoader from utils.visualization_utils import create_comparison_chart -from zenml import log_artifact_metadata, step +from zenml import log_metadata, step from zenml.logger import get_logger logger = get_logger(__name__) @@ -79,12 +79,14 @@ def load_datasets( print("train_dataset_length_raw", len(train_dataset)) print("test_dataset_length_raw", len(test_dataset)) - log_artifact_metadata( + log_metadata( artifact_name="train_dataset", + infer_artifact=True, metadata={"row_count": len(train_dataset)}, ) - log_artifact_metadata( + log_metadata( artifact_name="test_dataset", + infer_artifact=True, metadata={"row_count": len(test_dataset)}, ) @@ -187,8 +189,9 @@ def train_model( warmup_steps=warmup_steps, ) - log_artifact_metadata( + log_metadata( artifact_name="trained_model", + infer_artifact=True, metadata={ "model_path": model_path, "num_epochs": num_epochs, @@ -280,8 +283,9 @@ def evaluate_model( finetuned_similarity=finetuned_avg_sim, ) - log_artifact_metadata( + log_metadata( artifact_name="evaluation_results", + infer_artifact=True, metadata={ "pretrained_average_similarity": { "value": pretrained_avg_sim, diff --git a/llm-complete-guide/steps/generate_questions.py b/llm-complete-guide/steps/generate_questions.py index f6acdb0a..63df84ff 100644 --- a/llm-complete-guide/steps/generate_questions.py +++ b/llm-complete-guide/steps/generate_questions.py @@ -21,7 +21,7 @@ from rich import print from structures import Document from utils.openai_utils import get_openai_api_key -from zenml import log_artifact_metadata, step +from zenml import log_metadata, step from zenml.logger import get_logger logger = get_logger(__name__) @@ -160,8 +160,9 @@ def generate_questions( f"Generated {len(final_df)} questions for {len(documents)} documents." ) - log_artifact_metadata( + log_metadata( artifact_name="generated_questions", + infer_artifact=True, metadata={ "num_documents": len(documents), "num_questions_generated": len(final_df), diff --git a/llm-complete-guide/steps/k8s_deployment.py b/llm-complete-guide/steps/k8s_deployment.py new file mode 100644 index 00000000..e638e89f --- /dev/null +++ b/llm-complete-guide/steps/k8s_deployment.py @@ -0,0 +1,212 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +import re +from pathlib import Path +from typing import Dict, cast + +import yaml +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from zenml import get_step_context, step +from zenml.client import Client +from zenml.integrations.bentoml.services.bentoml_local_deployment import ( + BentoMLLocalDeploymentService, +) +from zenml.logger import get_logger +from zenml.orchestrators.utils import get_config_environment_vars + +logger = get_logger(__name__) + + +def apply_kubernetes_configuration(k8s_configs: list) -> None: + """Apply Kubernetes configurations using the K8s Python client. + + Args: + k8s_configs: List of Kubernetes configuration dictionaries + """ + # Load Kubernetes configuration + try: + config.load_kube_config() + except: + config.load_incluster_config() # For in-cluster deployment + + # Initialize API clients + k8s_apps_v1 = client.AppsV1Api() + k8s_core_v1 = client.CoreV1Api() + + for k8s_config in k8s_configs: + kind = k8s_config["kind"] + name = k8s_config["metadata"]["name"] + namespace = k8s_config["metadata"].get("namespace", "default") + + try: + if kind == "Deployment": + # Check if deployment exists + try: + k8s_apps_v1.read_namespaced_deployment(name, namespace) + # Update existing deployment + k8s_apps_v1.patch_namespaced_deployment( + name=name, namespace=namespace, body=k8s_config + ) + logger.info(f"Updated existing deployment: {name}") + except ApiException as e: + if e.status == 404: + # Create new deployment + k8s_apps_v1.create_namespaced_deployment( + namespace=namespace, body=k8s_config + ) + logger.info(f"Created new deployment: {name}") + else: + raise e + + elif kind == "Service": + # Check if service exists + try: + k8s_core_v1.read_namespaced_service(name, namespace) + # Update existing service + k8s_core_v1.patch_namespaced_service( + name=name, namespace=namespace, body=k8s_config + ) + logger.info(f"Updated existing service: {name}") + except ApiException as e: + if e.status == 404: + # Create new service + k8s_core_v1.create_namespaced_service( + namespace=namespace, body=k8s_config + ) + logger.info(f"Created new service: {name}") + else: + raise e + + except ApiException as e: + logger.error(f"Error applying {kind} {name}: {e}") + raise e + + +@step(enable_cache=False) +def k8s_deployment(docker_image_tag: str, namespace: str = "default") -> Dict: + step_context = get_step_context() + + # Get the raw model name + raw_model_name = get_step_context().model.name + # Sanitize the model name + model_name = sanitize_name(raw_model_name) + + # Get environment variables + environment_vars = get_config_environment_vars() + + # Get current deployment + zenml_client = Client() + model_deployer = zenml_client.active_stack.model_deployer + services = model_deployer.find_model_server( + model_name=model_name, + model_version=step_context.model.stage, + ) + + # Read the K8s template + template_path = Path(__file__).parent / "k8s_template.yaml" + with open(template_path, "r") as f: + k8s_configs = list(yaml.safe_load_all(f)) + + # Update configurations with sanitized names + for config in k8s_configs: + # Add namespace + config["metadata"]["namespace"] = namespace + + # Update metadata labels and name + config["metadata"]["labels"]["app"] = model_name + config["metadata"]["name"] = model_name + + if config["kind"] == "Service": + # Update service selector + config["spec"]["selector"]["app"] = model_name + + # Update metadata annotations with SSL certificate ARN + config["metadata"]["annotations"] = { + "service.beta.kubernetes.io/aws-load-balancer-ssl-cert": "arn:aws:acm:eu-central-1:339712793861:certificate/0426ace8-5fa3-40dd-bd81-b0fb1064bd85", + "service.beta.kubernetes.io/aws-load-balancer-backend-protocol": "http", + "service.beta.kubernetes.io/aws-load-balancer-ssl-ports": "443", + "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", + } + + # Update ports + config["spec"]["ports"] = [ + {"name": "https", "port": 443, "targetPort": 3000} + ] + + elif config["kind"] == "Deployment": + # Update deployment selector and template + config["spec"]["selector"]["matchLabels"]["app"] = model_name + config["spec"]["template"]["metadata"]["labels"]["app"] = ( + model_name + ) + + # Update the container image and name + containers = config["spec"]["template"]["spec"]["containers"] + for container in containers: + container["name"] = model_name + container["image"] = docker_image_tag + + # Add environment variables to the container + env_vars = [] + for key, value in environment_vars.items(): + env_vars.append({"name": key, "value": value}) + container["env"] = env_vars + + # Apply the configurations + try: + apply_kubernetes_configuration(k8s_configs) + deployment_status = "success" + logger.info( + f"Successfully deployed model {model_name} with image: {docker_image_tag}" + ) + except Exception as e: + deployment_status = "failed" + logger.error(f"Failed to deploy model {model_name}: {str(e)}") + raise e + + # Return deployment information + deployment_info = { + "model_name": model_name, + "docker_image": docker_image_tag, + "namespace": namespace, + "status": deployment_status, + "service_port": 3000, + "configurations": k8s_configs, + "url": "chat-rag.staging.cloudinfra.zenml.io", + } + + if services: + bentoml_deployment = cast(BentoMLLocalDeploymentService, services[0]) + zenml_client.update_service( + id=bentoml_deployment.uuid, + prediction_url="https://chat-rag.staging.cloudinfra.zenml.io", + health_check_url="https://chat-rag.staging.cloudinfra.zenml.io/healthz", + labels={ + "docker_image": docker_image_tag, + "namespace": namespace, + }, + ) + + return deployment_info + + +def sanitize_name(name: str) -> str: + # Convert to lowercase and replace invalid characters with '-' + sanitized = re.sub(r"[^a-z0-9-]", "-", name.lower()) + # Trim to a maximum length of 63 characters and strip leading/trailing '-' + sanitized = sanitized[:63].strip("-") + # Ensure the name doesn't start or end with '-' + sanitized = sanitized.strip("-") + return sanitized diff --git a/llm-complete-guide/steps/k8s_template.yaml b/llm-complete-guide/steps/k8s_template.yaml new file mode 100644 index 00000000..2ad971b2 --- /dev/null +++ b/llm-complete-guide/steps/k8s_template.yaml @@ -0,0 +1,40 @@ +apiVersion: v1 +kind: Service +metadata: + name: placeholder + labels: + app: placeholder + annotations: + service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:region:account-id:certificate/certificate-id + service.beta.kubernetes.io/aws-load-balancer-backend-protocol: http + service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443" + service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout: "3600" +spec: + selector: + app: placeholder + type: LoadBalancer + ports: + - name: https + port: 443 # External port exposed by LoadBalancer (HTTPS) + targetPort: 3000 # Internal container port +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: placeholder + name: placeholder +spec: + selector: + matchLabels: + app: placeholder + template: + metadata: + labels: + app: placeholder + spec: + containers: + - image: placeholder + name: placeholder + ports: + - containerPort: 3000 \ No newline at end of file diff --git a/llm-complete-guide/steps/markdown_loader.py b/llm-complete-guide/steps/markdown_loader.py index 6b2208e0..1838344e 100644 --- a/llm-complete-guide/steps/markdown_loader.py +++ b/llm-complete-guide/steps/markdown_loader.py @@ -18,7 +18,7 @@ import polars as pl from constants import FILES_TO_IGNORE -from zenml import log_artifact_metadata, step +from zenml import log_metadata, step from zenml.logger import get_logger logger = get_logger(__name__) @@ -61,8 +61,9 @@ def load_markdown_files( f"Subfolder '{subfolder}' not found in the cloned repository." ) - log_artifact_metadata( + log_metadata( artifact_name="markdown_files", + infer_artifact=True, metadata={ "num_markdown_files": len(markdown_files), "columns": "filename, page_content", diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index d9a9bd95..556784e3 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -23,26 +23,25 @@ import json import logging import math -from typing import Annotated, Any, Dict, List, Tuple from enum import Enum +from typing import Annotated, Any, Dict, List, Tuple from constants import ( CHUNK_OVERLAP, CHUNK_SIZE, EMBEDDING_DIMENSIONALITY, EMBEDDINGS_MODEL, + SECRET_NAME, SECRET_NAME_ELASTICSEARCH, - ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from PIL import Image, ImageDraw, ImageFont from sentence_transformers import SentenceTransformer from structures import Document from utils.llm_utils import get_db_conn, get_es_client, split_documents -from zenml import ArtifactConfig, log_artifact_metadata, step, log_model_metadata -from zenml.metadata.metadata_types import Uri +from zenml import ArtifactConfig, log_metadata, step from zenml.client import Client -from constants import SECRET_NAME +from zenml.metadata.metadata_types import Uri logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -453,11 +452,11 @@ def draw_bar_chart( """Draws a bar chart on the given image.""" # Ensure labels is a list, even if empty labels = labels or [] - + # Skip drawing if no data if not data: return - + max_value = max(data) bar_width = width // len(data) bar_spacing = 10 @@ -487,10 +486,21 @@ def draw_bar_chart( for i, label in enumerate(labels): if label is not None: # Add null check for individual labels font = ImageFont.load_default(size=10) - bbox = draw.textbbox((0, 0), str(label), font=font) # Convert to string + bbox = draw.textbbox( + (0, 0), str(label), font=font + ) # Convert to string label_width = bbox[2] - bbox[0] - label_x = x + i * (bar_width + bar_spacing) + (bar_width - label_width) // 2 - draw.text((label_x, y + height - 15), str(label), font=font, fill="black") + label_x = ( + x + + i * (bar_width + bar_spacing) + + (bar_width - label_width) // 2 + ) + draw.text( + (label_x, y + height - 15), + str(label), + font=font, + fill="black", + ) @step @@ -515,8 +525,9 @@ def preprocess_documents( Exception: If an error occurs during preprocessing. """ try: - log_artifact_metadata( + log_metadata( artifact_name="split_chunks", + infer_artifact=True, metadata={ "chunk_size": CHUNK_SIZE, "chunk_overlap": CHUNK_OVERLAP, @@ -536,8 +547,9 @@ def preprocess_documents( histogram_chart: Image.Image = create_histogram(stats) bar_chart: Image.Image = create_bar_chart(stats) - log_artifact_metadata( + log_metadata( artifact_name="split_chunks", + infer_artifact=True, metadata=stats, ) @@ -568,8 +580,9 @@ def generate_embeddings( try: model = SentenceTransformer(EMBEDDINGS_MODEL) - log_artifact_metadata( + log_metadata( artifact_name="documents_with_embeddings", + infer_artifact=True, metadata={ "embedding_type": EMBEDDINGS_MODEL, "embedding_dimensionality": EMBEDDING_DIMENSIONALITY, @@ -600,6 +613,7 @@ class IndexType(Enum): ELASTICSEARCH = "elasticsearch" POSTGRES = "postgres" + @step(enable_cache=False) def index_generator( documents: str, @@ -624,11 +638,12 @@ def index_generator( _index_generator_elastic(documents) else: _index_generator_postgres(documents) - + except Exception as e: logger.error(f"Error in index_generator: {e}") raise + def _index_generator_elastic(documents: str) -> None: """Generates an Elasticsearch index for the given documents.""" try: @@ -647,11 +662,11 @@ def _index_generator_elastic(documents: str) -> None: "type": "dense_vector", "dims": EMBEDDING_DIMENSIONALITY, "index": True, - "similarity": "cosine" + "similarity": "cosine", }, "filename": {"type": "text"}, "parent_section": {"type": "text"}, - "url": {"type": "text"} + "url": {"type": "text"}, } } } @@ -661,50 +676,49 @@ def _index_generator_elastic(documents: str) -> None: # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] operations = [] - + for doc in document_list: content_hash = hashlib.md5( f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() ).hexdigest() - - exists_query = { - "query": { - "term": { - "doc_id": content_hash - } - } - } - + + exists_query = {"query": {"term": {"doc_id": content_hash}}} + if not es.count(index=index_name, body=exists_query)["count"]: - operations.append({ - "index": { - "_index": index_name, - "_id": content_hash + operations.append( + {"index": {"_index": index_name, "_id": content_hash}} + ) + + operations.append( + { + "doc_id": content_hash, + "content": doc.page_content, + "token_count": doc.token_count, + "embedding": doc.embedding, + "filename": doc.filename, + "parent_section": doc.parent_section, + "url": doc.url, } - }) - - operations.append({ - "doc_id": content_hash, - "content": doc.page_content, - "token_count": doc.token_count, - "embedding": doc.embedding, - "filename": doc.filename, - "parent_section": doc.parent_section, - "url": doc.url - }) - + ) + if operations: response = es.bulk(operations=operations, timeout="10m") - - success_count = sum(1 for item in response['items'] if 'index' in item and item['index']['status'] == 201) - failed_count = len(response['items']) - success_count - + + success_count = sum( + 1 + for item in response["items"] + if "index" in item and item["index"]["status"] == 201 + ) + failed_count = len(response["items"]) - success_count + logger.info(f"Successfully indexed {success_count} documents") if failed_count > 0: logger.warning(f"Failed to index {failed_count} documents") - for item in response['items']: - if 'index' in item and item['index']['status'] != 201: - logger.warning(f"Failed to index document: {item['index']['error']}") + for item in response["items"]: + if "index" in item and item["index"]["status"] != 201: + logger.warning( + f"Failed to index document: {item['index']['error']}" + ) else: logger.info("No new documents to index") @@ -714,11 +728,12 @@ def _index_generator_elastic(documents: str) -> None: logger.error(f"Error in Elasticsearch indexing: {e}") raise + def _index_generator_postgres(documents: str) -> None: """Generates a PostgreSQL index for the given documents.""" try: conn = get_db_conn() - + with conn.cursor() as cur: # Install pgvector if not already installed cur.execute("CREATE EXTENSION IF NOT EXISTS vector") @@ -740,7 +755,7 @@ def _index_generator_postgres(documents: str) -> None: conn.commit() register_vector(conn) - + # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] @@ -772,7 +787,6 @@ def _index_generator_postgres(documents: str) -> None: ) conn.commit() - cur.execute("SELECT COUNT(*) as cnt FROM embeddings;") num_records = cur.fetchone()[0] logger.info(f"Number of vector records in table: {num_records}") @@ -797,6 +811,7 @@ def _index_generator_postgres(documents: str) -> None: if conn: conn.close() + def _log_metadata(index_type: IndexType) -> None: """Log metadata about the indexing process.""" prompt = """ @@ -809,9 +824,11 @@ def _log_metadata(index_type: IndexType) -> None: """ client = Client() - + if index_type == IndexType.ELASTICSEARCH: - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] connection_details = { "host": es_host, "api_key": "*********", @@ -821,14 +838,21 @@ def _log_metadata(index_type: IndexType) -> None: store_name = "pgvector" connection_details = { - "user": client.get_secret(SECRET_NAME).secret_values["supabase_user"], + "user": client.get_secret(SECRET_NAME).secret_values[ + "supabase_user" + ], "password": "**********", - "host": client.get_secret(SECRET_NAME).secret_values["supabase_host"], - "port": client.get_secret(SECRET_NAME).secret_values["supabase_port"], + "host": client.get_secret(SECRET_NAME).secret_values[ + "supabase_host" + ], + "port": client.get_secret(SECRET_NAME).secret_values[ + "supabase_port" + ], "dbname": "postgres", } - log_model_metadata( + log_metadata( + infer_model=True, metadata={ "embeddings": { "model": EMBEDDINGS_MODEL, diff --git a/llm-complete-guide/steps/rag_deployment.py b/llm-complete-guide/steps/rag_deployment.py index 99a8c911..38c03097 100644 --- a/llm-complete-guide/steps/rag_deployment.py +++ b/llm-complete-guide/steps/rag_deployment.py @@ -2,7 +2,6 @@ import webbrowser from huggingface_hub import HfApi - from utils.hf_utils import get_hf_token from utils.llm_utils import process_input_with_retrieval from zenml import step @@ -50,9 +49,7 @@ def predict(message, history): ) -def upload_files_to_repo( - api, repo_id: str, files_mapping: dict, token: str -): +def upload_files_to_repo(api, repo_id: str, files_mapping: dict, token: str): """Upload multiple files to a Hugging Face repository Args: @@ -92,11 +89,13 @@ def gradio_rag_deployment() -> None: exist_ok=True, token=get_hf_token(), ) + api.add_space_secret( repo_id=hf_repo_id, key="ZENML_STORE_API_KEY", value=ZENML_API_TOKEN, ) + api.add_space_secret( repo_id=hf_repo_id, key="ZENML_STORE_URL", diff --git a/llm-complete-guide/steps/url_scraper.py b/llm-complete-guide/steps/url_scraper.py index 9c54563b..58fc425e 100644 --- a/llm-complete-guide/steps/url_scraper.py +++ b/llm-complete-guide/steps/url_scraper.py @@ -16,7 +16,7 @@ import json from typing_extensions import Annotated -from zenml import ArtifactConfig, log_artifact_metadata, step +from zenml import ArtifactConfig, log_metadata, step from steps.url_scraping_utils import get_all_pages @@ -26,7 +26,7 @@ def url_scraper( docs_url: str = "https://docs.zenml.io", repo_url: str = "https://github.com/zenml-io/zenml", website_url: str = "https://zenml.io", - use_dev_set: bool = False + use_dev_set: bool = False, ) -> Annotated[str, ArtifactConfig(name="urls")]: """Generates a list of relevant URLs to scrape. @@ -42,7 +42,6 @@ def url_scraper( # examples_readme_urls = get_nested_readme_urls(repo_url) use_dev_set = False if use_dev_set: - docs_urls = [ "https://docs.zenml.io/getting-started/system-architectures", "https://docs.zenml.io/getting-started/core-concepts", @@ -58,8 +57,9 @@ def url_scraper( # website_urls = get_all_pages(website_url) # all_urls = docs_urls + website_urls + examples_readme_urls all_urls = docs_urls - log_artifact_metadata( + log_metadata( artifact_name="urls", + infer_artifact=True, metadata={ "count": len(all_urls), }, diff --git a/llm-complete-guide/steps/url_scraping_utils.py b/llm-complete-guide/steps/url_scraping_utils.py index d6367cbf..ec97ac94 100644 --- a/llm-complete-guide/steps/url_scraping_utils.py +++ b/llm-complete-guide/steps/url_scraping_utils.py @@ -13,14 +13,15 @@ # permissions and limitations under the License. import re -import requests -from bs4 import BeautifulSoup -from typing import List from logging import getLogger +from typing import List +import requests +from bs4 import BeautifulSoup logger = getLogger(__name__) + def get_all_pages(base_url: str = "https://docs.zenml.io") -> List[str]: """ Retrieve all pages from the ZenML documentation sitemap. @@ -32,18 +33,19 @@ def get_all_pages(base_url: str = "https://docs.zenml.io") -> List[str]: List[str]: A list of all documentation page URLs. """ logger.info("Fetching sitemap from docs.zenml.io...") - + # Fetch the sitemap sitemap_url = f"{base_url}/sitemap.xml" response = requests.get(sitemap_url) soup = BeautifulSoup(response.text, "xml") - + # Extract all URLs from the sitemap urls = [loc.text for loc in soup.find_all("loc")] - + logger.info(f"Found {len(urls)} pages in the sitemap.") return urls + def extract_parent_section(url: str) -> str: """ Extracts the parent section from a URL. diff --git a/llm-complete-guide/steps/visualize_chat.py b/llm-complete-guide/steps/visualize_chat.py new file mode 100644 index 00000000..480516a3 --- /dev/null +++ b/llm-complete-guide/steps/visualize_chat.py @@ -0,0 +1,310 @@ +from typing import Any, Dict + +from typing_extensions import Annotated +from zenml import get_step_context, log_metadata, step +from zenml.metadata.metadata_types import Uri +from zenml.types import HTMLString +from zenml.utils.dashboard_utils import get_model_version_url + + +@step(enable_cache=False) +def create_chat_interface( + deployment_info: Dict[str, Any], +) -> Annotated[HTMLString, "chat_bot"]: + step_context = get_step_context() + html = """ +
+ + +
+
+

ZenML Assistant

+
+ +
+
+ Assistant is typing... +
+
+ +
+ + +
+
+ + + +
+ """ + model_version_url = get_model_version_url(step_context.model.id) + log_metadata( + infer_artifact=True, + metadata={ + "deployment_info": deployment_info, + "deployment_url": Uri(f"{model_version_url}/?tab=deployments"), + }, + ) + return HTMLString(html) diff --git a/llm-complete-guide/steps/vllm_deployment.py b/llm-complete-guide/steps/vllm_deployment.py new file mode 100644 index 00000000..3ef60cab --- /dev/null +++ b/llm-complete-guide/steps/vllm_deployment.py @@ -0,0 +1,94 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the vllm model deployer pipeline step.""" + +from typing import Optional, cast + +from constants import ( + EMBEDDINGS_MODEL_ID_FINE_TUNED, +) +from zenml import get_step_context, step +from zenml.integrations.vllm.model_deployers.vllm_model_deployer import ( + VLLMModelDeployer, +) +from zenml.integrations.vllm.services.vllm_deployment import ( + VLLMDeploymentService, + VLLMServiceConfig, +) +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step(enable_cache=False) +def vllm_model_deployer_step( + port: int = 8000, + tokenizer: Optional[str] = None, + timeout: int = 1200, + deploy_decision: bool = True, +) -> VLLMDeploymentService: + """Model deployer pipeline step for vLLM. + + This step deploys a given Bento to a local vLLM http prediction server. + + Args: + model: Name or path to huggingface model + port: Port used by vllm server + tokenizer: Name or path of the huggingface tokenizer to use. + If unspecified, model name or path will be used. + timeout: the number of seconds to wait for the service to start/stop. + deploy_decision: whether to deploy the model or not + + Returns: + vLLM deployment service + """ + # get the current active model deployer + model_deployer = cast( + VLLMModelDeployer, VLLMModelDeployer.get_active_model_deployer() + ) + + # get pipeline name, step name and run id + step_context = get_step_context() + pipeline_name = step_context.pipeline.name + step_name = step_context.step_run.name + + # create a config for the new model service + predictor_cfg = VLLMServiceConfig( + pipeline_name=pipeline_name, + step_name=step_name, + model_name=step_context.model.name, + model_version=step_context.model.version, + model=f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", + served_model_name=step_context.model.name, + port=port, + tokenizer=tokenizer, + ) + + # create a new model deployment and replace an old one if it exists + svc = ( + model_deployer.deploy_model( + replace=True, + config=predictor_cfg, + timeout=timeout, + service_type=VLLMDeploymentService.SERVICE_TYPE, + ), + ) + new_service = cast(VLLMDeploymentService, svc) + + logger.info( + f"VLLM deployment service started and reachable at:\n" + f" {new_service.prediction_url}\n" + ) + + return new_service diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index 07516100..34f99a51 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -230,8 +230,12 @@ def get_es_client() -> Elasticsearch: Elasticsearch: An Elasticsearch client. """ client = Client() - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] + es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_api_key" + ] es = Elasticsearch( es_host, @@ -265,12 +269,12 @@ def get_db_conn() -> connection: def get_topn_similar_docs_pgvector( - query_embedding: List[float], - conn: psycopg2.extensions.connection, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False - ) -> List[Tuple]: + query_embedding: List[float], + conn: psycopg2.extensions.connection, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: """Fetches the top n most similar documents to the given query embedding from the PostgreSQL database. Args: @@ -302,13 +306,14 @@ def get_topn_similar_docs_pgvector( return cur.fetchall() + def get_topn_similar_docs_elasticsearch( - query_embedding: List[float], - es_client: Elasticsearch, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False - ) -> List[Tuple]: + query_embedding: List[float], + es_client: Elasticsearch, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: """Fetches the top n most similar documents to the given query embedding from the Elasticsearch index. Args: @@ -319,7 +324,7 @@ def get_topn_similar_docs_elasticsearch( only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. """ index_name = "zenml_docs" - + if only_urls: source = ["url"] elif include_metadata: @@ -334,36 +339,42 @@ def get_topn_similar_docs_elasticsearch( "query": {"match_all": {}}, "script": { "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", - "params": {"query_vector": query_embedding} - } + "params": {"query_vector": query_embedding}, + }, } }, - "size": n + "size": n, } # response = es_client.search(index=index_name, body=query) - response = es_client.search(index=index_name, knn={ - "field": "embedding", - "query_vector": query_embedding, - "num_candidates": 50, - "k": n - }) + response = es_client.search( + index=index_name, + knn={ + "field": "embedding", + "query_vector": query_embedding, + "num_candidates": 50, + "k": n, + }, + ) results = [] - for hit in response['hits']['hits']: + for hit in response["hits"]["hits"]: if only_urls: - results.append((hit['_source']['url'],)) + results.append((hit["_source"]["url"],)) elif include_metadata: - results.append(( - hit['_source']['content'], - hit['_source']['url'], - hit['_source']['parent_section'] - )) + results.append( + ( + hit["_source"]["content"], + hit["_source"]["url"], + hit["_source"]["parent_section"], + ) + ) else: - results.append((hit['_source']['content'],)) + results.append((hit["_source"]["content"],)) return results + def get_topn_similar_docs( query_embedding: List[float], conn: psycopg2.extensions.connection = None, @@ -387,12 +398,17 @@ def get_topn_similar_docs( """ if conn is None and es_client is None: raise ValueError("Either conn or es_client must be provided") - + if conn is not None: - return get_topn_similar_docs_pgvector(query_embedding, conn, n, include_metadata, only_urls) - + return get_topn_similar_docs_pgvector( + query_embedding, conn, n, include_metadata, only_urls + ) + if es_client is not None: - return get_topn_similar_docs_elasticsearch(query_embedding, es_client, n, include_metadata, only_urls) + return get_topn_similar_docs_elasticsearch( + query_embedding, es_client, n, include_metadata, only_urls + ) + def get_completion_from_messages( messages, model=OPENAI_MODEL, temperature=0.4, max_tokens=1000 @@ -431,6 +447,7 @@ def get_embeddings(text): model = SentenceTransformer(EMBEDDINGS_MODEL) return model.encode(text) + def find_vectorstore_name() -> str: """Finds the name of the vector store used for the given embeddings model. @@ -438,10 +455,13 @@ def find_vectorstore_name() -> str: str: The name of the vector store. """ from zenml.client import Client + client = Client() - model = client.get_model_version(ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev") + model = client.get_model_version( + ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev" + ) - return model.run_metadata["vector_store"].value["name"] + return model.run_metadata["vector_store"]["name"] def rerank_documents(