From 64858017d4c825a532e92d4e2f57ba55ec70ab0a Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 13 Nov 2024 16:28:33 +0530 Subject: [PATCH 01/10] test elasticsearch --- llm-complete-guide/constants.py | 1 + llm-complete-guide/requirements.txt | 2 +- llm-complete-guide/steps/eval_retrieval.py | 5 +- llm-complete-guide/steps/populate_index.py | 165 ++++++++++----------- llm-complete-guide/utils/llm_utils.py | 79 +++++++--- 5 files changed, 142 insertions(+), 110 deletions(-) diff --git a/llm-complete-guide/constants.py b/llm-complete-guide/constants.py index 21b6b462..c537c9d8 100644 --- a/llm-complete-guide/constants.py +++ b/llm-complete-guide/constants.py @@ -78,3 +78,4 @@ USE_ARGILLA_ANNOTATIONS = False SECRET_NAME = os.getenv("ZENML_PROJECT_SECRET_NAME", "llm-complete") +SECRET_NAME_ELASTICSEARCH = "elasticsearch-zenml" diff --git a/llm-complete-guide/requirements.txt b/llm-complete-guide/requirements.txt index 2c107e4b..e8a8e9d9 100644 --- a/llm-complete-guide/requirements.txt +++ b/llm-complete-guide/requirements.txt @@ -1,4 +1,4 @@ -zenml[server]>=0.68.1 +zenml[server]==0.68.1 ratelimit pgvector psycopg2-binary diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index d9100e25..c51bf902 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -21,6 +21,7 @@ from utils.llm_utils import ( get_db_conn, get_embeddings, + get_es_client, get_topn_similar_docs, rerank_documents, ) @@ -76,11 +77,11 @@ def query_similar_docs( Tuple containing the question, URL ending, and retrieved URLs. """ embedded_question = get_embeddings(question) - db_conn = get_db_conn() + es_client = get_es_client() num_docs = 20 if use_reranking else returned_sample_size # get (content, url) tuples for the top n similar documents top_similar_docs = get_topn_similar_docs( - embedded_question, db_conn, n=num_docs, include_metadata=True + embedded_question, es_client, n=num_docs, include_metadata=True ) if use_reranking: diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index bb17dc94..3f1d186f 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -29,12 +29,13 @@ CHUNK_SIZE, EMBEDDING_DIMENSIONALITY, EMBEDDINGS_MODEL, + SECRET_NAME_ELASTICSEARCH, ) from pgvector.psycopg2 import register_vector from PIL import Image, ImageDraw, ImageFont from sentence_transformers import SentenceTransformer from structures import Document -from utils.llm_utils import get_db_conn, split_documents +from utils.llm_utils import get_db_conn, get_es_client, split_documents from zenml import ArtifactConfig, log_artifact_metadata, step, log_model_metadata from zenml.metadata.metadata_types import Uri from zenml.client import Client @@ -609,83 +610,76 @@ def index_generator( Raises: Exception: If an error occurs during the index generation. """ - conn = None + from elasticsearch import Elasticsearch + from elasticsearch.helpers import bulk + import hashlib + try: - conn = get_db_conn() - with conn.cursor() as cur: - # Install pgvector if not already installed - cur.execute("CREATE EXTENSION IF NOT EXISTS vector") - conn.commit() - - # Create the embeddings table if it doesn't exist - table_create_command = f""" - CREATE TABLE IF NOT EXISTS embeddings ( - id SERIAL PRIMARY KEY, - content TEXT, - token_count INTEGER, - embedding VECTOR({EMBEDDING_DIMENSIONALITY}), - filename TEXT, - parent_section TEXT, - url TEXT - ); - """ - cur.execute(table_create_command) - conn.commit() - - register_vector(conn) - - # Parse the JSON string into a list of Document objects - document_list = [Document(**doc) for doc in json.loads(documents)] - - # Insert data only if it doesn't already exist - for doc in document_list: - content = doc.page_content - token_count = doc.token_count - embedding = doc.embedding - filename = doc.filename - parent_section = doc.parent_section - url = doc.url - - cur.execute( - "SELECT COUNT(*) FROM embeddings WHERE content = %s", - (content,), - ) - count = cur.fetchone()[0] - if count == 0: - cur.execute( - "INSERT INTO embeddings (content, token_count, embedding, filename, parent_section, url) VALUES (%s, %s, %s, %s, %s, %s)", - ( - content, - token_count, - embedding, - filename, - parent_section, - url, - ), - ) - conn.commit() - - cur.execute("SELECT COUNT(*) as cnt FROM embeddings;") - num_records = cur.fetchone()[0] - logger.info(f"Number of vector records in table: {num_records}") - - # calculate the index parameters according to best practices - num_lists = max(num_records / 1000, 10) - if num_records > 1000000: - num_lists = math.sqrt(num_records) - - # use the cosine distance measure, which is what we'll later use for querying - cur.execute( - f"CREATE INDEX IF NOT EXISTS embeddings_idx ON embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = {num_lists});" - ) - conn.commit() + es = get_es_client() + index_name = "zenml_docs" + + # Create index with mappings if it doesn't exist + if not es.indices.exists(index=index_name): + mappings = { + "mappings": { + "properties": { + "doc_id": {"type": "keyword"}, + "content": {"type": "text"}, + "token_count": {"type": "integer"}, + "embedding": { + "type": "dense_vector", + "dims": EMBEDDING_DIMENSIONALITY, + "index": True, + "similarity": "cosine" + }, + "filename": {"type": "text"}, + "parent_section": {"type": "text"}, + "url": {"type": "text"} + } + } + } + # TODO move to using mappings param directly + es.indices.create(index=index_name, body=mappings) - except Exception as e: - logger.error(f"Error in index_generator: {e}") - raise - finally: - if conn: - conn.close() + # Parse the JSON string into a list of Document objects + document_list = [Document(**doc) for doc in json.loads(documents)] + + def generate_actions(): + for doc in document_list: + # Create a unique identifier based on content and metadata + content_hash = hashlib.md5( + f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() + ).hexdigest() + + # Check if document already exists + exists_query = { + "query": { + "term": { + "doc_id": content_hash + } + } + } + + # TODO same as above, use the query param directly + if not es.count(index=index_name, body=exists_query)["count"]: + yield { + "_index": index_name, + "_id": content_hash, # Use hash as document ID + "_source": { + "doc_id": content_hash, + "content": doc.page_content, + "token_count": doc.token_count, + "embedding": doc.embedding, + "filename": doc.filename, + "parent_section": doc.parent_section, + "url": doc.url + } + } + + success, failed = bulk(es, generate_actions()) + logger.info(f"Successfully indexed {success} documents") + if failed: + logger.warning(f"Failed to index {len(failed)} documents") # Log the model metadata prompt = """ @@ -700,12 +694,10 @@ def index_generator( """ client = Client() + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] CONNECTION_DETAILS = { - "user": client.get_secret(SECRET_NAME).secret_values["supabase_user"], - "password": "**********", - "host": client.get_secret(SECRET_NAME).secret_values["supabase_host"], - "port": client.get_secret(SECRET_NAME).secret_values["supabase_port"], - "dbname": "postgres", + "host": es_host, + "api_key": "*********", } log_model_metadata( @@ -721,12 +713,13 @@ def index_generator( "content": prompt, }, "vector_store": { - "name": "pgvector", + "name": "elasticsearch", "connection_details": CONNECTION_DETAILS, - # TODO: Hard-coded for now - "database_url": Uri( - "https://supabase.com/dashboard/project/rkoiacgkeiwpwceahtlp/editor/29505?schema=public" - ), + "index_name": index_name }, }, ) + + except Exception as e: + logger.error(f"Error in index_generator: {e}") + raise diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index 6cebde8c..60c4ab4d 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -21,6 +21,7 @@ import logging +from elasticsearch import Elasticsearch from zenml.client import Client from utils.openai_utils import get_openai_api_key @@ -46,6 +47,7 @@ MODEL_NAME_MAP, OPENAI_MODEL, SECRET_NAME, + SECRET_NAME_ELASTICSEARCH, ) from pgvector.psycopg2 import register_vector from psycopg2.extensions import connection @@ -220,6 +222,23 @@ def split_documents( return chunked_documents +def get_es_client() -> Elasticsearch: + """Get an Elasticsearch client. + + Returns: + Elasticsearch: An Elasticsearch client. + """ + client = Client() + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] + es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + + es = Elasticsearch( + es_host, + api_key=es_api_key, + ) + return es + + def get_db_conn() -> connection: """Establishes and returns a connection to the PostgreSQL database. @@ -246,7 +265,7 @@ def get_db_conn() -> connection: def get_topn_similar_docs( query_embedding: List[float], - conn: psycopg2.extensions.connection, + es_client: Elasticsearch, n: int = 5, include_metadata: bool = False, only_urls: bool = False, @@ -264,27 +283,45 @@ def get_topn_similar_docs( Returns: list: A list of tuples containing the content and metadata (if include_metadata is True) of the top n most similar documents. """ - embedding_array = np.array(query_embedding) - register_vector(conn) - cur = conn.cursor() - - if include_metadata: - cur.execute( - f"SELECT content, url, parent_section FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", - (embedding_array,), - ) - elif only_urls: - cur.execute( - f"SELECT url FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", - (embedding_array,), - ) + index_name = "zenml_docs" + + if only_urls: + source = ["url"] + elif include_metadata: + source = ["content", "url", "parent_section"] else: - cur.execute( - f"SELECT content FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", - (embedding_array,), - ) + source = ["content"] + + query = { + "_source": source, + "query": { + "script_score": { + "query": {"match_all": {}}, + "script": { + "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", + "params": {"query_vector": query_embedding} + } + } + }, + "size": n + } + + response = es_client.search(index=index_name, body=query) + + results = [] + for hit in response['hits']['hits']: + if only_urls: + results.append((hit['_source']['url'],)) + elif include_metadata: + results.append(( + hit['_source']['content'], + hit['_source']['url'], + hit['_source']['parent_section'] + )) + else: + results.append((hit['_source']['content'],)) - return cur.fetchall() + return results def get_completion_from_messages( @@ -381,7 +418,7 @@ def process_input_with_retrieval( # Step 1: Get documents related to the user input from database related_docs = get_topn_similar_docs( get_embeddings(input), - get_db_conn(), + get_es_client(), n=n_items_retrieved, include_metadata=use_reranking, ) From 60aac8c3a674fdd4cc16ad780f169fac08c00b3c Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 13 Nov 2024 16:32:19 +0530 Subject: [PATCH 02/10] update req --- llm-complete-guide/configs/dev/rag.yaml | 1 + llm-complete-guide/requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/llm-complete-guide/configs/dev/rag.yaml b/llm-complete-guide/configs/dev/rag.yaml index 9ab97ea2..9b37781b 100644 --- a/llm-complete-guide/configs/dev/rag.yaml +++ b/llm-complete-guide/configs/dev/rag.yaml @@ -17,6 +17,7 @@ settings: - pygithub - rerankers[flashrank] - matplotlib + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete diff --git a/llm-complete-guide/requirements.txt b/llm-complete-guide/requirements.txt index e8a8e9d9..4a761fbd 100644 --- a/llm-complete-guide/requirements.txt +++ b/llm-complete-guide/requirements.txt @@ -20,6 +20,7 @@ datasets torch gradio huggingface-hub +elasticsearch # optional requirements for S3 artifact store # s3fs>2022.3.0 From bafeb1396d6660eff235a47da264920e7525faf3 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 13 Nov 2024 17:42:18 +0530 Subject: [PATCH 03/10] a combination of lesser docs and higher timeouts --- llm-complete-guide/steps/populate_index.py | 81 +++++++++++++--------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index 3f1d186f..314dc42a 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -593,7 +593,7 @@ def generate_embeddings( raise -@step +@step(enable_cache=False) def index_generator( documents: str, ) -> None: @@ -643,43 +643,56 @@ def index_generator( # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] - - def generate_actions(): - for doc in document_list: - # Create a unique identifier based on content and metadata - content_hash = hashlib.md5( - f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() - ).hexdigest() - - # Check if document already exists - exists_query = { - "query": { - "term": { - "doc_id": content_hash - } + + # Prepare bulk operations + operations = [] + for doc in document_list[:500]: + # Create a unique identifier based on content and metadata + content_hash = hashlib.md5( + f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() + ).hexdigest() + + # Check if document exists + exists_query = { + "query": { + "term": { + "doc_id": content_hash } } - - # TODO same as above, use the query param directly - if not es.count(index=index_name, body=exists_query)["count"]: - yield { + } + + if not es.count(index=index_name, body=exists_query)["count"]: + operations.append({ + "index": { "_index": index_name, - "_id": content_hash, # Use hash as document ID - "_source": { - "doc_id": content_hash, - "content": doc.page_content, - "token_count": doc.token_count, - "embedding": doc.embedding, - "filename": doc.filename, - "parent_section": doc.parent_section, - "url": doc.url - } + "_id": content_hash } - - success, failed = bulk(es, generate_actions()) - logger.info(f"Successfully indexed {success} documents") - if failed: - logger.warning(f"Failed to index {len(failed)} documents") + }) + + operations.append({ + "doc_id": content_hash, + "content": doc.page_content, + "token_count": doc.token_count, + "embedding": doc.embedding, + "filename": doc.filename, + "parent_section": doc.parent_section, + "url": doc.url + }) + + if operations: + response = es.bulk(operations=operations, timeout="10m") + + success_count = sum(1 for item in response['items'] if 'index' in item and item['index']['status'] == 201) + failed_count = len(response['items']) - success_count + + logger.info(f"Successfully indexed {success_count} documents") + if failed_count > 0: + logger.warning(f"Failed to index {failed_count} documents") + for item in response['items']: + if 'index' in item and item['index']['status'] != 201: + logger.warning(f"Failed to index document: {item['index']['error']}") + else: + logger.info("No new documents to index") # Log the model metadata prompt = """ From 2761f2f1e11d1c9ad32c0b003584e782f2d5773b Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 13 Nov 2024 17:50:24 +0530 Subject: [PATCH 04/10] a faster search --- llm-complete-guide/utils/llm_utils.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index 60c4ab4d..f72e4c1d 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -306,7 +306,13 @@ def get_topn_similar_docs( "size": n } - response = es_client.search(index=index_name, body=query) + # response = es_client.search(index=index_name, body=query) + response = es_client.search(index=index_name, knn={ + "field": "embedding", + "query_vector": query_embedding, + "num_candidates": 50, + "k": n + }) results = [] for hit in response['hits']['hits']: From b49d425ee590f745ba3cb4c6ea005214967ab2e6 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 13 Nov 2024 18:41:16 +0530 Subject: [PATCH 05/10] no need to limit docs with new approach --- llm-complete-guide/steps/populate_index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index 314dc42a..baa12fc2 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -646,7 +646,7 @@ def index_generator( # Prepare bulk operations operations = [] - for doc in document_list[:500]: + for doc in document_list: # Create a unique identifier based on content and metadata content_hash = hashlib.md5( f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() From 62d3e0162af770a324ee6e7ed0af860e1b862b34 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Wed, 13 Nov 2024 17:17:26 +0100 Subject: [PATCH 06/10] Add Elasticsearch dependency to production and staging configs --- llm-complete-guide/configs/production/rag.yaml | 2 ++ llm-complete-guide/configs/staging/rag.yaml | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/llm-complete-guide/configs/production/rag.yaml b/llm-complete-guide/configs/production/rag.yaml index 4cce6240..a63bdab2 100644 --- a/llm-complete-guide/configs/production/rag.yaml +++ b/llm-complete-guide/configs/production/rag.yaml @@ -17,6 +17,8 @@ settings: - pygithub - rerankers[flashrank] - matplotlib + - elasticsearch + environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/configs/staging/rag.yaml b/llm-complete-guide/configs/staging/rag.yaml index 8fac1baa..88492d75 100644 --- a/llm-complete-guide/configs/staging/rag.yaml +++ b/llm-complete-guide/configs/staging/rag.yaml @@ -17,7 +17,8 @@ settings: - pygithub - rerankers[flashrank] - matplotlib - + - elasticsearch + environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE From 83158e8b0ac89788561ea52fd3056873a81f4f85 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 14 Nov 2024 11:19:54 +0530 Subject: [PATCH 07/10] check model metadata to figure out what vectorstore to use --- llm-complete-guide/steps/eval_retrieval.py | 17 ++- llm-complete-guide/utils/llm_utils.py | 115 ++++++++++++++++++--- 2 files changed, 115 insertions(+), 17 deletions(-) diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index c51bf902..2b555b85 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -19,6 +19,7 @@ from datasets import load_dataset from utils.llm_utils import ( + find_vectorstore_name, get_db_conn, get_embeddings, get_es_client, @@ -77,11 +78,23 @@ def query_similar_docs( Tuple containing the question, URL ending, and retrieved URLs. """ embedded_question = get_embeddings(question) - es_client = get_es_client() + conn = None + es_client = None + + vector_store_name = find_vectorstore_name() + if vector_store_name == "pgvector": + conn = get_db_conn() + else: + es_client = get_es_client() + num_docs = 20 if use_reranking else returned_sample_size # get (content, url) tuples for the top n similar documents top_similar_docs = get_topn_similar_docs( - embedded_question, es_client, n=num_docs, include_metadata=True + embedded_question, + conn=conn, + es_client=es_client, + n=num_docs, + include_metadata=True ) if use_reranking: diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index f72e4c1d..07516100 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -48,6 +48,7 @@ OPENAI_MODEL, SECRET_NAME, SECRET_NAME_ELASTICSEARCH, + ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from psycopg2.extensions import connection @@ -263,25 +264,59 @@ def get_db_conn() -> connection: return psycopg2.connect(**CONNECTION_DETAILS) -def get_topn_similar_docs( - query_embedding: List[float], - es_client: Elasticsearch, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False, -) -> List[Tuple]: - """Fetches the top n most similar documents to the given query embedding from the database. +def get_topn_similar_docs_pgvector( + query_embedding: List[float], + conn: psycopg2.extensions.connection, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False + ) -> List[Tuple]: + """Fetches the top n most similar documents to the given query embedding from the PostgreSQL database. Args: query_embedding (list): The query embedding to compare against. conn (psycopg2.extensions.connection): The database connection object. - n (int, optional): The number of similar documents to fetch. Defaults to - 5. - include_metadata (bool, optional): Whether to include metadata in the - results. Defaults to False. + n (int, optional): The number of similar documents to fetch. Defaults to 5. + include_metadata (bool, optional): Whether to include metadata in the results. Defaults to False. + only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. + """ + embedding_array = np.array(query_embedding) + register_vector(conn) + cur = conn.cursor() + + if include_metadata: + cur.execute( + f"SELECT content, url, parent_section FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", + (embedding_array,), + ) + elif only_urls: + cur.execute( + f"SELECT url FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", + (embedding_array,), + ) + else: + cur.execute( + f"SELECT content FROM embeddings ORDER BY embedding <=> %s LIMIT {n}", + (embedding_array,), + ) - Returns: - list: A list of tuples containing the content and metadata (if include_metadata is True) of the top n most similar documents. + return cur.fetchall() + +def get_topn_similar_docs_elasticsearch( + query_embedding: List[float], + es_client: Elasticsearch, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False + ) -> List[Tuple]: + """Fetches the top n most similar documents to the given query embedding from the Elasticsearch index. + + Args: + query_embedding (list): The query embedding to compare against. + es_client (Elasticsearch): The Elasticsearch client. + n (int, optional): The number of similar documents to fetch. Defaults to 5. + include_metadata (bool, optional): Whether to include metadata in the results. Defaults to False. + only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. """ index_name = "zenml_docs" @@ -329,6 +364,35 @@ def get_topn_similar_docs( return results +def get_topn_similar_docs( + query_embedding: List[float], + conn: psycopg2.extensions.connection = None, + es_client: Elasticsearch = None, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: + """Fetches the top n most similar documents to the given query embedding from the database. + + Args: + query_embedding (list): The query embedding to compare against. + conn (psycopg2.extensions.connection): The database connection object. + n (int, optional): The number of similar documents to fetch. Defaults to + 5. + include_metadata (bool, optional): Whether to include metadata in the + results. Defaults to False. + + Returns: + list: A list of tuples containing the content and metadata (if include_metadata is True) of the top n most similar documents. + """ + if conn is None and es_client is None: + raise ValueError("Either conn or es_client must be provided") + + if conn is not None: + return get_topn_similar_docs_pgvector(query_embedding, conn, n, include_metadata, only_urls) + + if es_client is not None: + return get_topn_similar_docs_elasticsearch(query_embedding, es_client, n, include_metadata, only_urls) def get_completion_from_messages( messages, model=OPENAI_MODEL, temperature=0.4, max_tokens=1000 @@ -367,6 +431,18 @@ def get_embeddings(text): model = SentenceTransformer(EMBEDDINGS_MODEL) return model.encode(text) +def find_vectorstore_name() -> str: + """Finds the name of the vector store used for the given embeddings model. + + Returns: + str: The name of the vector store. + """ + from zenml.client import Client + client = Client() + model = client.get_model_version(ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev") + + return model.run_metadata["vector_store"].value["name"] + def rerank_documents( query: str, documents: List[Tuple], reranker_model: str = "flashrank" @@ -420,11 +496,20 @@ def process_input_with_retrieval( str: The processed output. """ delimiter = "```" + es_client = None + conn = None + + vector_store_name = find_vectorstore_name() + if vector_store_name == "pgvector": + conn = get_db_conn() + else: + es_client = get_es_client() # Step 1: Get documents related to the user input from database related_docs = get_topn_similar_docs( get_embeddings(input), - get_es_client(), + conn=conn, + es_client=es_client, n=n_items_retrieved, include_metadata=use_reranking, ) From 02c158ae28fb4a614d11f64e899f4ac87aa18609 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 14 Nov 2024 11:21:26 +0530 Subject: [PATCH 08/10] add ability to switch bw elastic and pgvector --- llm-complete-guide/constants.py | 3 + llm-complete-guide/steps/populate_index.py | 196 ++++++++++++++++----- 2 files changed, 155 insertions(+), 44 deletions(-) diff --git a/llm-complete-guide/constants.py b/llm-complete-guide/constants.py index c537c9d8..7b2767c5 100644 --- a/llm-complete-guide/constants.py +++ b/llm-complete-guide/constants.py @@ -23,6 +23,9 @@ 384 # Update this to match the dimensionality of the new model ) +# ZenML constants +ZENML_CHATBOT_MODEL = "zenml-docs-qa-chatbot" + # Scraping constants RATE_LIMIT = 5 # Maximum number of requests per second diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index baa12fc2..d9a9bd95 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -19,10 +19,12 @@ # https://www.timescale.com/blog/postgresql-as-a-vector-database-create-store-and-query-openai-embeddings-with-pgvector/ # for providing the base implementation for this indexing functionality +import hashlib import json import logging import math from typing import Annotated, Any, Dict, List, Tuple +from enum import Enum from constants import ( CHUNK_OVERLAP, @@ -30,6 +32,7 @@ EMBEDDING_DIMENSIONALITY, EMBEDDINGS_MODEL, SECRET_NAME_ELASTICSEARCH, + ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from PIL import Image, ImageDraw, ImageFont @@ -593,9 +596,14 @@ def generate_embeddings( raise +class IndexType(Enum): + ELASTICSEARCH = "elasticsearch" + POSTGRES = "postgres" + @step(enable_cache=False) def index_generator( documents: str, + index_type: IndexType = IndexType.ELASTICSEARCH, ) -> None: """Generates an index for the given documents. @@ -606,14 +614,23 @@ def index_generator( Args: documents (str): A JSON string containing the Document objects with generated embeddings. + index_type (IndexType): The type of index to use. Defaults to Elasticsearch. Raises: Exception: If an error occurs during the index generation. """ - from elasticsearch import Elasticsearch - from elasticsearch.helpers import bulk - import hashlib - + try: + if index_type == IndexType.ELASTICSEARCH: + _index_generator_elastic(documents) + else: + _index_generator_postgres(documents) + + except Exception as e: + logger.error(f"Error in index_generator: {e}") + raise + +def _index_generator_elastic(documents: str) -> None: + """Generates an Elasticsearch index for the given documents.""" try: es = get_es_client() index_name = "zenml_docs" @@ -643,16 +660,13 @@ def index_generator( # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] - - # Prepare bulk operations operations = [] + for doc in document_list: - # Create a unique identifier based on content and metadata content_hash = hashlib.md5( f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() ).hexdigest() - # Check if document exists exists_query = { "query": { "term": { @@ -694,45 +708,139 @@ def index_generator( else: logger.info("No new documents to index") - # Log the model metadata - prompt = """ - You are a friendly chatbot. \ - You can answer questions about ZenML, its features and its use cases. \ - You respond in a concise, technically credible tone. \ - You ONLY use the context from the ZenML documentation to provide relevant - answers. \ - You do not make up answers or provide opinions that you don't have - information to support. \ - If you are unsure or don't know, just say so. \ - """ - - client = Client() + _log_metadata(index_type=IndexType.ELASTICSEARCH) + + except Exception as e: + logger.error(f"Error in Elasticsearch indexing: {e}") + raise + +def _index_generator_postgres(documents: str) -> None: + """Generates a PostgreSQL index for the given documents.""" + try: + conn = get_db_conn() + + with conn.cursor() as cur: + # Install pgvector if not already installed + cur.execute("CREATE EXTENSION IF NOT EXISTS vector") + conn.commit() + + # Create the embeddings table if it doesn't exist + table_create_command = f""" + CREATE TABLE IF NOT EXISTS embeddings ( + id SERIAL PRIMARY KEY, + content TEXT, + token_count INTEGER, + embedding VECTOR({EMBEDDING_DIMENSIONALITY}), + filename TEXT, + parent_section TEXT, + url TEXT + ); + """ + cur.execute(table_create_command) + conn.commit() + + register_vector(conn) + + # Parse the JSON string into a list of Document objects + document_list = [Document(**doc) for doc in json.loads(documents)] + + # Insert data only if it doesn't already exist + for doc in document_list: + content = doc.page_content + token_count = doc.token_count + embedding = doc.embedding + filename = doc.filename + parent_section = doc.parent_section + url = doc.url + + cur.execute( + "SELECT COUNT(*) FROM embeddings WHERE content = %s", + (content,), + ) + count = cur.fetchone()[0] + if count == 0: + cur.execute( + "INSERT INTO embeddings (content, token_count, embedding, filename, parent_section, url) VALUES (%s, %s, %s, %s, %s, %s)", + ( + content, + token_count, + embedding, + filename, + parent_section, + url, + ), + ) + conn.commit() + + + cur.execute("SELECT COUNT(*) as cnt FROM embeddings;") + num_records = cur.fetchone()[0] + logger.info(f"Number of vector records in table: {num_records}") + + # calculate the index parameters according to best practices + num_lists = max(num_records / 1000, 10) + if num_records > 1000000: + num_lists = math.sqrt(num_records) + + # use the cosine distance measure, which is what we'll later use for querying + cur.execute( + f"CREATE INDEX IF NOT EXISTS embeddings_idx ON embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = {num_lists});" + ) + conn.commit() + + _log_metadata(index_type=IndexType.POSTGRES) + + except Exception as e: + logger.error(f"Error in PostgreSQL indexing: {e}") + raise + finally: + if conn: + conn.close() + +def _log_metadata(index_type: IndexType) -> None: + """Log metadata about the indexing process.""" + prompt = """ + You are a friendly chatbot. \ + You can answer questions about ZenML, its features and its use cases. \ + You respond in a concise, technically credible tone. \ + You ONLY use the context from the ZenML documentation to provide relevant answers. \ + You do not make up answers or provide opinions that you don't have information to support. \ + If you are unsure or don't know, just say so. \ + """ + + client = Client() + + if index_type == IndexType.ELASTICSEARCH: es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - CONNECTION_DETAILS = { + connection_details = { "host": es_host, "api_key": "*********", } + store_name = "elasticsearch" + else: + store_name = "pgvector" + + connection_details = { + "user": client.get_secret(SECRET_NAME).secret_values["supabase_user"], + "password": "**********", + "host": client.get_secret(SECRET_NAME).secret_values["supabase_host"], + "port": client.get_secret(SECRET_NAME).secret_values["supabase_port"], + "dbname": "postgres", + } - log_model_metadata( - metadata={ - "embeddings": { - "model": EMBEDDINGS_MODEL, - "dimensionality": EMBEDDING_DIMENSIONALITY, - "model_url": Uri( - f"https://huggingface.co/{EMBEDDINGS_MODEL}" - ), - }, - "prompt": { - "content": prompt, - }, - "vector_store": { - "name": "elasticsearch", - "connection_details": CONNECTION_DETAILS, - "index_name": index_name - }, + log_model_metadata( + metadata={ + "embeddings": { + "model": EMBEDDINGS_MODEL, + "dimensionality": EMBEDDING_DIMENSIONALITY, + "model_url": Uri(f"https://huggingface.co/{EMBEDDINGS_MODEL}"), }, - ) - - except Exception as e: - logger.error(f"Error in index_generator: {e}") - raise + "prompt": { + "content": prompt, + }, + "vector_store": { + "name": store_name, + "connection_details": connection_details, + }, + }, + ) From 84413bdb5e72a246378bb079b4ac96efbd12522b Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 14 Nov 2024 11:22:16 +0530 Subject: [PATCH 09/10] update req --- llm-complete-guide/configs/dev/rag_eval.yaml | 1 + llm-complete-guide/configs/production/eval.yaml | 1 + llm-complete-guide/configs/production/rag.yaml | 1 + llm-complete-guide/configs/staging/eval.yaml | 1 + llm-complete-guide/configs/staging/rag.yaml | 1 + 5 files changed, 5 insertions(+) diff --git a/llm-complete-guide/configs/dev/rag_eval.yaml b/llm-complete-guide/configs/dev/rag_eval.yaml index 4ac3c94b..2581daa2 100644 --- a/llm-complete-guide/configs/dev/rag_eval.yaml +++ b/llm-complete-guide/configs/dev/rag_eval.yaml @@ -13,4 +13,5 @@ settings: - psycopg2-binary - tiktoken - pygithub + - elasticsearch python_package_installer: "uv" diff --git a/llm-complete-guide/configs/production/eval.yaml b/llm-complete-guide/configs/production/eval.yaml index 1786b3b8..f0265dda 100644 --- a/llm-complete-guide/configs/production/eval.yaml +++ b/llm-complete-guide/configs/production/eval.yaml @@ -17,6 +17,7 @@ settings: - matplotlib - pillow - pygithub + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/configs/production/rag.yaml b/llm-complete-guide/configs/production/rag.yaml index 4cce6240..87c9927c 100644 --- a/llm-complete-guide/configs/production/rag.yaml +++ b/llm-complete-guide/configs/production/rag.yaml @@ -17,6 +17,7 @@ settings: - pygithub - rerankers[flashrank] - matplotlib + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/configs/staging/eval.yaml b/llm-complete-guide/configs/staging/eval.yaml index 1786b3b8..f0265dda 100644 --- a/llm-complete-guide/configs/staging/eval.yaml +++ b/llm-complete-guide/configs/staging/eval.yaml @@ -17,6 +17,7 @@ settings: - matplotlib - pillow - pygithub + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete ZENML_ENABLE_RICH_TRACEBACK: FALSE diff --git a/llm-complete-guide/configs/staging/rag.yaml b/llm-complete-guide/configs/staging/rag.yaml index 8fac1baa..ebd66c21 100644 --- a/llm-complete-guide/configs/staging/rag.yaml +++ b/llm-complete-guide/configs/staging/rag.yaml @@ -17,6 +17,7 @@ settings: - pygithub - rerankers[flashrank] - matplotlib + - elasticsearch environment: ZENML_PROJECT_SECRET_NAME: llm_complete From 2f591999400095b44dee8a9c99850a5da8451a72 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Thu, 14 Nov 2024 09:38:31 +0100 Subject: [PATCH 10/10] Update ZENML_VERSION.txt to v0.70.0 --- llm-complete-guide/ZENML_VERSION.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llm-complete-guide/ZENML_VERSION.txt b/llm-complete-guide/ZENML_VERSION.txt index af7abcff..8fc8841a 100644 --- a/llm-complete-guide/ZENML_VERSION.txt +++ b/llm-complete-guide/ZENML_VERSION.txt @@ -1 +1 @@ -v0.68.1 \ No newline at end of file +v0.70.0 \ No newline at end of file