From 4fce90c297e0b3914f7d838da905508353934a58 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 14 Nov 2024 13:37:14 +0530 Subject: [PATCH] ability to switch --- llm-complete-guide/configs/dev/rag.yaml | 4 +- llm-complete-guide/configs/dev/rag_eval.yaml | 1 + .../configs/production/eval.yaml | 1 + llm-complete-guide/configs/staging/eval.yaml | 1 + llm-complete-guide/configs/staging/rag.yaml | 2 +- llm-complete-guide/constants.py | 3 + llm-complete-guide/pipelines/llm_basic_rag.py | 2 +- llm-complete-guide/steps/eval_retrieval.py | 17 +- llm-complete-guide/steps/populate_index.py | 196 ++++++++++++++---- llm-complete-guide/utils/llm_utils.py | 115 ++++++++-- 10 files changed, 278 insertions(+), 64 deletions(-) diff --git a/llm-complete-guide/configs/dev/rag.yaml b/llm-complete-guide/configs/dev/rag.yaml index 9b37781b..efed7f93 100644 --- a/llm-complete-guide/configs/dev/rag.yaml +++ b/llm-complete-guide/configs/dev/rag.yaml @@ -1,4 +1,4 @@ -enable_cache: False +enable_cache: True # environment configuration settings: @@ -29,3 +29,5 @@ steps: parameters: docs_url: https://docs.zenml.io/ use_dev_set: true + index_generator: + enable_cache: False diff --git a/llm-complete-guide/configs/dev/rag_eval.yaml b/llm-complete-guide/configs/dev/rag_eval.yaml index 4ac3c94b..2581daa2 100644 --- a/llm-complete-guide/configs/dev/rag_eval.yaml +++ b/llm-complete-guide/configs/dev/rag_eval.yaml @@ -13,4 +13,5 @@ settings: - psycopg2-binary - tiktoken - pygithub + - elasticsearch python_package_installer: "uv" diff --git a/llm-complete-guide/configs/production/eval.yaml b/llm-complete-guide/configs/production/eval.yaml index 1786b3b8..f0265dda 100644 --- a/llm-complete-guide/configs/production/eval.yaml +++ b/llm-complete-guide/configs/production/eval.yaml @@ -17,6 +17,7 @@ settings: - matplotlib - pillow - pygithub + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/configs/staging/eval.yaml b/llm-complete-guide/configs/staging/eval.yaml index 1786b3b8..f0265dda 100644 --- a/llm-complete-guide/configs/staging/eval.yaml +++ b/llm-complete-guide/configs/staging/eval.yaml @@ -17,6 +17,7 @@ settings: - matplotlib - pillow - pygithub + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/configs/staging/rag.yaml b/llm-complete-guide/configs/staging/rag.yaml index 88492d75..ebd66c21 100644 --- a/llm-complete-guide/configs/staging/rag.yaml +++ b/llm-complete-guide/configs/staging/rag.yaml @@ -18,7 +18,7 @@ settings: - rerankers[flashrank] - matplotlib - elasticsearch - + environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/constants.py b/llm-complete-guide/constants.py index c537c9d8..7b2767c5 100644 --- a/llm-complete-guide/constants.py +++ b/llm-complete-guide/constants.py @@ -23,6 +23,9 @@ 384 # Update this to match the dimensionality of the new model ) +# ZenML constants +ZENML_CHATBOT_MODEL = "zenml-docs-qa-chatbot" + # Scraping constants RATE_LIMIT = 5 # Maximum number of requests per second diff --git a/llm-complete-guide/pipelines/llm_basic_rag.py b/llm-complete-guide/pipelines/llm_basic_rag.py index 82a97b21..0bf0b381 100644 --- a/llm-complete-guide/pipelines/llm_basic_rag.py +++ b/llm-complete-guide/pipelines/llm_basic_rag.py @@ -26,7 +26,7 @@ from zenml import pipeline -@pipeline +@pipeline(enable_cache=True) def llm_basic_rag() -> None: """Executes the pipeline to train a basic RAG model. diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index c51bf902..2b555b85 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -19,6 +19,7 @@ from datasets import load_dataset from utils.llm_utils import ( + find_vectorstore_name, get_db_conn, get_embeddings, get_es_client, @@ -77,11 +78,23 @@ def query_similar_docs( Tuple containing the question, URL ending, and retrieved URLs. """ embedded_question = get_embeddings(question) - es_client = get_es_client() + conn = None + es_client = None + + vector_store_name = find_vectorstore_name() + if vector_store_name == "pgvector": + conn = get_db_conn() + else: + es_client = get_es_client() + num_docs = 20 if use_reranking else returned_sample_size # get (content, url) tuples for the top n similar documents top_similar_docs = get_topn_similar_docs( - embedded_question, es_client, n=num_docs, include_metadata=True + embedded_question, + conn=conn, + es_client=es_client, + n=num_docs, + include_metadata=True ) if use_reranking: diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index baa12fc2..d9a9bd95 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -19,10 +19,12 @@ # https://www.timescale.com/blog/postgresql-as-a-vector-database-create-store-and-query-openai-embeddings-with-pgvector/ # for providing the base implementation for this indexing functionality +import hashlib import json import logging import math from typing import Annotated, Any, Dict, List, Tuple +from enum import Enum from constants import ( CHUNK_OVERLAP, @@ -30,6 +32,7 @@ EMBEDDING_DIMENSIONALITY, EMBEDDINGS_MODEL, SECRET_NAME_ELASTICSEARCH, + ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from PIL import Image, ImageDraw, ImageFont @@ -593,9 +596,14 @@ def generate_embeddings( raise +class IndexType(Enum): + ELASTICSEARCH = "elasticsearch" + POSTGRES = "postgres" + @step(enable_cache=False) def index_generator( documents: str, + index_type: IndexType = IndexType.ELASTICSEARCH, ) -> None: """Generates an index for the given documents. @@ -606,14 +614,23 @@ def index_generator( Args: documents (str): A JSON string containing the Document objects with generated embeddings. + index_type (IndexType): The type of index to use. Defaults to Elasticsearch. Raises: Exception: If an error occurs during the index generation. """ - from elasticsearch import Elasticsearch - from elasticsearch.helpers import bulk - import hashlib - + try: + if index_type == IndexType.ELASTICSEARCH: + _index_generator_elastic(documents) + else: + _index_generator_postgres(documents) + + except Exception as e: + logger.error(f"Error in index_generator: {e}") + raise + +def _index_generator_elastic(documents: str) -> None: + """Generates an Elasticsearch index for the given documents.""" try: es = get_es_client() index_name = "zenml_docs" @@ -643,16 +660,13 @@ def index_generator( # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] - - # Prepare bulk operations operations = [] + for doc in document_list: - # Create a unique identifier based on content and metadata content_hash = hashlib.md5( f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() ).hexdigest() - # Check if document exists exists_query = { "query": { "term": { @@ -694,45 +708,139 @@ def index_generator( else: logger.info("No new documents to index") - # Log the model metadata - prompt = """ - You are a friendly chatbot. \ - You can answer questions about ZenML, its features and its use cases. \ - You respond in a concise, technically credible tone. \ - You ONLY use the context from the ZenML documentation to provide relevant - answers. \ - You do not make up answers or provide opinions that you don't have - information to support. \ - If you are unsure or don't know, just say so. \ - """ - - client = Client() + _log_metadata(index_type=IndexType.ELASTICSEARCH) + + except Exception as e: + logger.error(f"Error in Elasticsearch indexing: {e}") + raise + +def _index_generator_postgres(documents: str) -> None: + """Generates a PostgreSQL index for the given documents.""" + try: + conn = get_db_conn() + + with conn.cursor() as cur: + # Install pgvector if not already installed + cur.execute("CREATE EXTENSION IF NOT EXISTS vector") + conn.commit() + + # Create the embeddings table if it doesn't exist + table_create_command = f""" + CREATE TABLE IF NOT EXISTS embeddings ( + id SERIAL PRIMARY KEY, + content TEXT, + token_count INTEGER, + embedding VECTOR({EMBEDDING_DIMENSIONALITY}), + filename TEXT, + parent_section TEXT, + url TEXT + ); + """ + cur.execute(table_create_command) + conn.commit() + + register_vector(conn) + + # Parse the JSON string into a list of Document objects + document_list = [Document(**doc) for doc in json.loads(documents)] + + # Insert data only if it doesn't already exist + for doc in document_list: + content = doc.page_content + token_count = doc.token_count + embedding = doc.embedding + filename = doc.filename + parent_section = doc.parent_section + url = doc.url + + cur.execute( + "SELECT COUNT(*) FROM embeddings WHERE content = %s", + (content,), + ) + count = cur.fetchone()[0] + if count == 0: + cur.execute( + "INSERT INTO embeddings (content, token_count, embedding, filename, parent_section, url) VALUES (%s, %s, %s, %s, %s, %s)", + ( + content, + token_count, + embedding, + filename, + parent_section, + url, + ), + ) + conn.commit() + + + cur.execute("SELECT COUNT(*) as cnt FROM embeddings;") + num_records = cur.fetchone()[0] + logger.info(f"Number of vector records in table: {num_records}") + + # calculate the index parameters according to best practices + num_lists = max(num_records / 1000, 10) + if num_records > 1000000: + num_lists = math.sqrt(num_records) + + # use the cosine distance measure, which is what we'll later use for querying + cur.execute( + f"CREATE INDEX IF NOT EXISTS embeddings_idx ON embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = {num_lists});" + ) + conn.commit() + + _log_metadata(index_type=IndexType.POSTGRES) + + except Exception as e: + logger.error(f"Error in PostgreSQL indexing: {e}") + raise + finally: + if conn: + conn.close() + +def _log_metadata(index_type: IndexType) -> None: + """Log metadata about the indexing process.""" + prompt = """ + You are a friendly chatbot. \ + You can answer questions about ZenML, its features and its use cases. \ + You respond in a concise, technically credible tone. \ + You ONLY use the context from the ZenML documentation to provide relevant answers. \ + You do not make up answers or provide opinions that you don't have information to support. \ + If you are unsure or don't know, just say so. \ + """ + + client = Client() + + if index_type == IndexType.ELASTICSEARCH: es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - CONNECTION_DETAILS = { + connection_details = { "host": es_host, "api_key": "*********", } + store_name = "elasticsearch" + else: + store_name = "pgvector" + + connection_details = { + "user": client.get_secret(SECRET_NAME).secret_values["supabase_user"], + "password": "**********", + "host": client.get_secret(SECRET_NAME).secret_values["supabase_host"], + "port": client.get_secret(SECRET_NAME).secret_values["supabase_port"], + "dbname": "postgres", + } - log_model_metadata( - metadata={ - "embeddings": { - "model": EMBEDDINGS_MODEL, - "dimensionality": EMBEDDING_DIMENSIONALITY, - "model_url": Uri( - f"https://huggingface.co/{EMBEDDINGS_MODEL}" - ), - }, - "prompt": { - "content": prompt, - }, - "vector_store": { - "name": "elasticsearch", - "connection_details": CONNECTION_DETAILS, - "index_name": index_name - }, + log_model_metadata( + metadata={ + "embeddings": { + "model": EMBEDDINGS_MODEL, + "dimensionality": EMBEDDING_DIMENSIONALITY, + "model_url": Uri(f"https://huggingface.co/{EMBEDDINGS_MODEL}"), }, - ) - - except Exception as e: - logger.error(f"Error in index_generator: {e}") - raise + "prompt": { + "content": prompt, + }, + "vector_store": { + "name": store_name, + "connection_details": connection_details, + }, + }, + ) diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index f72e4c1d..07516100 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -48,6 +48,7 @@ OPENAI_MODEL, SECRET_NAME, SECRET_NAME_ELASTICSEARCH, + ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from psycopg2.extensions import connection @@ -263,25 +264,59 @@ def get_db_conn() -> connection: return psycopg2.connect(**CONNECTION_DETAILS) -def get_topn_similar_docs( - query_embedding: List[float], - es_client: Elasticsearch, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False, -) -> List[Tuple]: - """Fetches the top n most similar documents to the given query embedding from the database. +def get_topn_similar_docs_pgvector( + query_embedding: List[float], + conn: psycopg2.extensions.connection, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False + ) -> List[Tuple]: + """Fetches the top n most similar documents to the given query embedding from the PostgreSQL database. Args: query_embedding (list): The query embedding to compare against. conn (psycopg2.extensions.connection): The database connection object. - n (int, optional): The number of similar documents to fetch. Defaults to - 5. - include_metadata (bool, optional): Whether to include metadata in the - results. Defaults to False. + n (int, optional): The number of similar documents to fetch. Defaults to 5. + include_metadata (bool, optional): Whether to include metadata in the results. Defaults to False. + only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. + """ + embedding_array = np.array(query_embedding) + register_vector(conn) + cur = conn.cursor() + + if include_metadata: + cur.execute( + f"SELECT content, url, parent_section FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", + (embedding_array,), + ) + elif only_urls: + cur.execute( + f"SELECT url FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", + (embedding_array,), + ) + else: + cur.execute( + f"SELECT content FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", + (embedding_array,), + ) - Returns: - list: A list of tuples containing the content and metadata (if include_metadata is True) of the top n most similar documents. + return cur.fetchall() + +def get_topn_similar_docs_elasticsearch( + query_embedding: List[float], + es_client: Elasticsearch, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False + ) -> List[Tuple]: + """Fetches the top n most similar documents to the given query embedding from the Elasticsearch index. + + Args: + query_embedding (list): The query embedding to compare against. + es_client (Elasticsearch): The Elasticsearch client. + n (int, optional): The number of similar documents to fetch. Defaults to 5. + include_metadata (bool, optional): Whether to include metadata in the results. Defaults to False. + only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. """ index_name = "zenml_docs" @@ -329,6 +364,35 @@ def get_topn_similar_docs( return results +def get_topn_similar_docs( + query_embedding: List[float], + conn: psycopg2.extensions.connection = None, + es_client: Elasticsearch = None, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: + """Fetches the top n most similar documents to the given query embedding from the database. + + Args: + query_embedding (list): The query embedding to compare against. + conn (psycopg2.extensions.connection): The database connection object. + n (int, optional): The number of similar documents to fetch. Defaults to + 5. + include_metadata (bool, optional): Whether to include metadata in the + results. Defaults to False. + + Returns: + list: A list of tuples containing the content and metadata (if include_metadata is True) of the top n most similar documents. + """ + if conn is None and es_client is None: + raise ValueError("Either conn or es_client must be provided") + + if conn is not None: + return get_topn_similar_docs_pgvector(query_embedding, conn, n, include_metadata, only_urls) + + if es_client is not None: + return get_topn_similar_docs_elasticsearch(query_embedding, es_client, n, include_metadata, only_urls) def get_completion_from_messages( messages, model=OPENAI_MODEL, temperature=0.4, max_tokens=1000 @@ -367,6 +431,18 @@ def get_embeddings(text): model = SentenceTransformer(EMBEDDINGS_MODEL) return model.encode(text) +def find_vectorstore_name() -> str: + """Finds the name of the vector store used for the given embeddings model. + + Returns: + str: The name of the vector store. + """ + from zenml.client import Client + client = Client() + model = client.get_model_version(ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev") + + return model.run_metadata["vector_store"].value["name"] + def rerank_documents( query: str, documents: List[Tuple], reranker_model: str = "flashrank" @@ -420,11 +496,20 @@ def process_input_with_retrieval( str: The processed output. """ delimiter = "```" + es_client = None + conn = None + + vector_store_name = find_vectorstore_name() + if vector_store_name == "pgvector": + conn = get_db_conn() + else: + es_client = get_es_client() # Step 1: Get documents related to the user input from database related_docs = get_topn_similar_docs( get_embeddings(input), - get_es_client(), + conn=conn, + es_client=es_client, n=n_items_retrieved, include_metadata=use_reranking, )