diff --git a/README.md b/README.md index 1f501bb..ba7c1fd 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,13 @@ The Swiss Army Llama is designed to facilitate and optimize the process of worki Some additional useful endpoints are provided, such as computing semantic similarity between submitted text strings. The service leverages a high-performance Rust-based library, `fast_vector_similarity`, to offer a range of similarity measures including `spearman_rho`, `kendall_tau`, `approximate_distance_correlation`, `jensen_shannon_similarity`, and [`hoeffding_d`](https://blogs.sas.com/content/iml/2021/05/03/examples-hoeffding-d.html). Additionally, semantic search across all your cached embeddings is supported using FAISS vector searching. You can either use the built in cosine similarity from FAISS, or supplement this with a second pass that computes the more sophisticated similarity measures for the most relevant subset of the stored vectors found using cosine similarity (see the advanced semantic search endpoint for this functionality). -As mentioned above, you can now submit not only plaintext and fully digital PDFs but also MS Word documents, images, and other file types supported by the textract library. The library can automatically apply OCR using Tesseract for scanned text. The returned embeddings for each sentence in a document can be organized in various formats like records, table, etc., using the Pandas to_json() function. The results can be returned either as a ZIP file containing a JSON file or as a direct JSON response. You can now also submit audio files in MP3 or WAV formats. The library uses OpenAI's Whisper model, as optimized by the Faster Whisper Python library, to transcribe the audio into text. Optionally, this transcript can be treated like any other document, with each sentence's embeddings computed and stored. The results are returned as a URL to a downloadable ZIP file containing a JSON with the embedding vector data. +Also, we now supports multiple embedding pooling methods for combining token-level embedding vectors into a single fixed-length embedding vector for any length of input text, including the following: + - `means`: Element-wise average of the token embeddings. + - `means_mins_maxes`: Concatenation of element-wise mean, min, and max of the token embeddings. + - `means_mins_maxes_stds_kurtoses`: Concatenation of element-wise mean, min, max, standard deviation, and kurtosis of the token embeddings. + - `svd`: Concatenation of the first two singular vectors obtained from the Singular Value Decomposition (SVD) of the token embeddings matrix. -In addition to fixed-sized embedding vectors, we also expose functionality that allows you to get back token-level embeddings, where each token in the input stream is embedded with its context in the string as a full sized vector, thus producing a matrix that has a number of rows equal to the number of tokens in the input string. This includes far more nuanced information about the contents of the string at the expense of much greater compute and storage requirements. The other drawback is that, instead of having the same sized output for every string, regardless of length (which makes it very easy to compare unequal length strings using cosine similarity and other measures), the token-level embedding matrix obviously differs in dimensions for two different strings if the strings have different numbers of tokens. To deal with this, we introduce combined feature vectors, which compute the column-wise mean, min, max, and std. deviation of the token-level emeddding matrix, and concatenate these together in to a single huge matrix; this allows you to compare strings of different lengths while still capturing more nuance. The combined results, including the embedding matrix and associated combined feature vector, can similarly be returned as either a zip file or direct JSON response. +As mentioned above, you can now submit not only plaintext and fully digital PDFs but also MS Word documents, images, and other file types supported by the textract library. The library can automatically apply OCR using Tesseract for scanned text. The returned embeddings for each sentence in a document can be organized in various formats like records, table, etc., using the Pandas to_json() function. The results can be returned either as a ZIP file containing a JSON file or as a direct JSON response. You can now also submit audio files in MP3 or WAV formats. The library uses OpenAI's Whisper model, as optimized by the Faster Whisper Python library, to transcribe the audio into text. Optionally, this transcript can be treated like any other document, with each sentence's embeddings computed and stored. The results are returned as a URL to a downloadable ZIP file containing a JSON with the embedding vector data. Finally, we add a new endpoint for generating multiple text completions for a given input prompt, with the ability to specify a grammar file that will enforce a particular form of response, such as JSON. There is also a useful new utility feature: a real-time application log viewer that can be accessed via a web browser, which allows for syntax highlighting and offers options for downloading the logs or copying them to the clipboard. This allows a user to watch the logs without having direct SSH access to the server. @@ -63,24 +67,23 @@ Watch the the automated setup process in action [here](https://asciinema.org/a/6 ## Features -1. **Text Embedding Computation**: Utilizes pre-trained LLama2 and other LLMs via llama_cpp to generate embeddings for any provided text, including token-level embeddings that capture more nuanced information about the content. -2. **Embedding Caching**: Efficiently stores and retrieves computed embeddings in SQLite, minimizing redundant computations. It supports caching both fixed-sized embedding vectors and token-level embeddings. +1. **Text Embedding Computation**: Utilizes pre-trained LLama3 and other LLMs via llama_cpp to generate embeddings for any provided text. +2. **Embedding Caching**: Efficiently stores and retrieves computed embeddings in SQLite, minimizing redundant computations. 3. **Advanced Similarity Measurements and Retrieval**: Utilizes the author's own `fast_vector_similarity` library written in Rust to offer highly optimized advanced similarity measures such as `spearman_rho`, `kendall_tau`, `approximate_distance_correlation`, `jensen_shannon_similarity`, and `hoeffding_d`. Semantic search across cached embeddings is also supported using FAISS vector searching. 4. **Two-Step Advanced Semantic Search**: The API first leverages FAISS and cosine similarity for rapid filtering, and then applies additional similarity measures like `spearman_rho`, `kendall_tau`, `approximate_distance_correlation`, `jensen_shannon_similarity`, and `hoeffding_d` for a more nuanced comparison. 5. **File Processing for Documents**: The library now accepts a broader range of file types including plaintext, PDFs, MS Word documents, and images. It can also handle OCR automatically. Returned embeddings for each sentence are organized in various formats like records, table, etc., using Pandas to_json() function. 6. **Advanced Text Preprocessing**: The library now employs a more advanced sentence splitter to segment text into meaningful sentences. It handles cases where periods are used in abbreviations, domain names, or numbers and also ensures complete sentences even when quotes are used. It also takes care of pagination issues commonly found in scanned documents, such as awkward newlines and hyphenated line breaks. 7. **Audio Transcription and Embedding**: Upload an audio file in MP3 or WAV format. The library uses OpenAI's Whisper model for transcription. Optionally, sentence embeddings can be computed for the transcript. -8. **Token-Level Embeddings and Combined Feature Vectors**: Provides token-level embeddings to capture the context of each token in the input string. Introduces combined feature vectors by computing the column-wise mean, min, max, and std. deviation of the token-level embedding matrix, allowing comparison of unequal length strings. -9. **RAM Disk Usage**: Optionally uses RAM Disk to store models for faster access and execution. Automatically handles the creation and management of RAM Disks. -10. **Robust Exception Handling**: Features comprehensive exception management to ensure system resilience. -11. **Interactive API Documentation**: Integrates with Swagger UI for an interactive and user-friendly experience, accommodating large result sets without crashing. -12. **Scalability and Concurrency**: Built on the FastAPI framework, handles concurrent requests and supports parallel inference with configurable concurrency levels. -13. **Flexible Configurations**: Offers configurable settings through environment variables and input parameters, including response formats like JSON or ZIP files. -14. **Comprehensive Logging**: Captures essential information with detailed logs, without overwhelming storage or readability. -15. **Support for Multiple Models and Measures**: Accommodates multiple embedding models and similarity measures, allowing flexibility and customization based on user needs. -16. **Ability to Generate Multiple Completions using Specified Grammar**: Get back structured LLM completions for a specified input prompt. -17. **Real-Time Log File Viewer in Browser**: Lets anyone with access to the API server conveniently watch the application logs to gain insight into the execution of their requests. -18. **Uses Redis for Request Locking**: Uses Redis to allow for multiple Uvicorn workers to run in parallel without conflicting with each other. +8. **RAM Disk Usage**: Optionally uses RAM Disk to store models for faster access and execution. Automatically handles the creation and management of RAM Disks. +9. **Robust Exception Handling**: Features comprehensive exception management to ensure system resilience. +10. **Interactive API Documentation**: Integrates with Swagger UI for an interactive and user-friendly experience, accommodating large result sets without crashing. +11. **Scalability and Concurrency**: Built on the FastAPI framework, handles concurrent requests and supports parallel inference with configurable concurrency levels. +12. **Flexible Configurations**: Offers configurable settings through environment variables and input parameters, including response formats like JSON or ZIP files. +13. **Comprehensive Logging**: Captures essential information with detailed logs, without overwhelming storage or readability. +14. **Support for Multiple Models and Measures**: Accommodates multiple embedding models and similarity measures, allowing flexibility and customization based on user needs. +15. **Ability to Generate Multiple Completions using Specified Grammar**: Get back structured LLM completions for a specified input prompt. +16. **Real-Time Log File Viewer in Browser**: Lets anyone with access to the API server conveniently watch the application logs to gain insight into the execution of their requests. +17. **Uses Redis for Request Locking**: Uses Redis to allow for multiple Uvicorn workers to run in parallel without conflicting with each other. ## Demo Screen Recording in Action [Here](https://asciinema.org/a/39dZ8vv9nkcNygasUl35wnBPq) is the live console output while I interact with it from the Swagger page to make requests. @@ -123,11 +126,13 @@ python-multipart pytz redis ruff +scikit-learn scipy sqlalchemy textract-py3 uvicorn uvloop +zstandard ``` ## Running the Application @@ -205,7 +210,6 @@ The following endpoints are available: - **GET `/show_logs/`**: Shows logs for the last 5 minutes by default. Can also provide a parameter like this: `/show_logs/{minutes}` to get the last N minutes of log data. - **POST `/add_new_model/`**: Add New Model by URL. Submit a new model URL for download and use. The model must be in `.gguf` format and larger than 100 MB to ensure it's a valid model file (you can directly paste in the Huggingface URL) - **POST `/get_embedding_vector_for_string/`**: Retrieve Embedding Vector for a Given Text String. Retrieves the embedding vector for a given input text string using the specified model. -- **POST `/get_token_level_embeddings_matrix_and_combined_feature_vector_for_string/`**: Retrieve Token-Level Embeddings and Combined Feature Vector for a Given Input String. Retrieve the token-level embeddings and combined feature vector for a given input text using the specified model. - **POST `/compute_similarity_between_strings/`**: Compute Similarity Between Two Strings. Leverages the `fast_vector_similarity` library to compute the similarity between two given input strings using specified model embeddings and a selected similarity measure. - **POST `/search_stored_embeddings_with_query_string_for_semantic_similarity/`**: Get Most Similar Strings from Stored Embeddings in Database. Find the most similar strings in the database to the given input "query" text. - **POST `/advanced_search_stored_embeddings_with_query_string_for_semantic_similarity/`**: Perform a two-step advanced semantic search. First uses FAISS and cosine similarity to narrow down the most similar strings, then applies additional similarity measures for refined comparison. @@ -264,7 +268,7 @@ The application uses a SQLite database via SQLAlchemy ORM. Here are the data mod - `file_hash`: Hash of the file - `llm_model_name`: Model used to compute the embedding - `file_data`: Binary data of the original file -- `document_embedding_results_json`: The computed embedding results in JSON format +- `document_embedding_results_json_compressed_binary`: The computed embedding results in JSON format compressed with Z-standard compression ### Document Table @@ -272,30 +276,6 @@ The application uses a SQLite database via SQLAlchemy ORM. Here are the data mod - `llm_model_name`: Model name associated with the document - `document_hash`: Computed Hash of the document -### TokenLevelEmbedding Table - -- `id`: Primary Key -- `word`: Word for which the embedding was computed -- `word_hash`: Hash of the token, computed using SHA3-256 -- `llm_model_name`: Model used to compute the embedding -- `token_level_embedding_json`: The computed token-level embedding in JSON format - -### TokenLevelEmbeddingBundle Table - -- `id`: Primary Key -- `input_text`: Input text associated with the token-level embeddings -- `input_text_hash`: Hash of the input text -- `llm_model_name`: Model used to compute the embeddings -- `token_level_embeddings_bundle_json`: JSON containing the token-level embeddings - -### TokenLevelEmbeddingBundleCombinedFeatureVector Table - -- `id`: Primary Key -- `token_level_embedding_bundle_id`: Foreign Key referencing the TokenLevelEmbeddingBundle table -- `llm_model_name`: Model name associated with the combined feature vector -- `combined_feature_vector_json`: JSON containing the combined feature vector -- `combined_feature_vector_hash`: Hash of the combined feature vector - ### AudioTranscript Table - `audio_file_hash`: Primary Key @@ -315,18 +295,10 @@ The application uses a SQLite database via SQLAlchemy ORM. Here are the data mod - `DocumentEmbedding` has a Foreign Key `document_hash` that references `Document`'s `document_hash`. - This establishes a one-to-many relationship between `Document` and `DocumentEmbedding`. -3. **TokenLevelEmbedding - TokenLevelEmbeddingBundle**: - - `TokenLevelEmbedding` has a Foreign Key `token_level_embedding_bundle_id` that references `TokenLevelEmbeddingBundle`'s `id`. - - This is a one-to-many relationship, meaning multiple token-level embeddings can belong to a single token-level embedding bundle. - -4. **TokenLevelEmbeddingBundle - TokenLevelEmbeddingBundleCombinedFeatureVector**: - - `TokenLevelEmbeddingBundle` has a one-to-one relationship with `TokenLevelEmbeddingBundleCombinedFeatureVector` via `token_level_embedding_bundle_id`. - - This means each token-level embedding bundle can have exactly one combined feature vector. - -5. **AudioTranscript**: +3. **AudioTranscript**: - This table doesn't seem to have a direct relationship with other tables based on the given code. -6. **Request/Response Models**: +4. **Request/Response Models**: - These are not directly related to the database tables but are used for handling API requests and responses. @@ -368,7 +340,7 @@ This section highlights the major performance enhancements integrated into the p --- -### Dockerized Llama2 Embeddings API Service App +### Dockerized Version A bash script is included in this repo, `setup_dockerized_app_on_fresh_machine.sh`, that will automatically do everything for you, including installing docker with apt install. @@ -459,7 +431,6 @@ During startup, the application performs the following tasks: - Each downloaded model is loaded into memory. If any model file is not found, an error log is recorded. 6. **Build FAISS Indexes**: - The application creates FAISS indexes for efficient similarity search using the embeddings from the database. - - Separate FAISS indexes are built for token-level embeddings. - Associated texts are stored by model name for further use. Note: @@ -582,38 +553,7 @@ curl -X 'POST' \ -F 'llm_model_name=custom-llm-model' ``` -### 7. `/get_token_level_embeddings_matrix_and_combined_feature_vector_for_string/` (POST) - -#### Purpose -Retrieve the token-level embeddings and combined feature vector for a given input text using the specified model. - -#### Parameters -- `text`: The input text for which the embeddings are to be retrieved. -- `model_name`: The model used to calculate the embeddings (optional). -- `db_writer`: Database writer instance for managing write operations (internal use). -- `req`: HTTP request object (optional). -- `token`: Security token (optional). -- `client_ip`: Client IP address (optional). -- `json_format`: Format for JSON response of token-level embeddings (optional). -- `send_back_json_or_zip_file`: Whether to return a JSON response or a ZIP file containing the JSON file (optional, defaults to `zip`). - - - -#### Response -The response will be a JSON object containing complete transcription details, computational times, and an optional URL for downloading a ZIP file containing the document embeddings. - -#### Example Response -```json -{ - "transcript": "This is the transcribed text...", - "time_taken_for_transcription_in_seconds": 12.345, - "time_taken_for_embedding_computation_in_seconds": 3.456, - "embedding_download_url": "http://localhost:8000/download/your_embedding.zip", - "llm_model_name": "custom-llm-model" -} -``` - -### 8. `/get_text_completions_from_input_prompt/` (POST) +### 7. `/get_text_completions_from_input_prompt/` (POST) #### Purpose Generate text completions for a given input prompt using the specified model. @@ -645,7 +585,7 @@ The JSON object should have the following keys: } ``` -### 9. `/get_list_of_available_model_names/` (GET) +### 8. `/get_list_of_available_model_names/` (GET) #### Purpose Retrieve the list of available model names for generating embeddings. @@ -653,7 +593,7 @@ Retrieve the list of available model names for generating embeddings. #### Parameters - `token`: Security token (optional). -### 10. `/get_all_stored_strings/` (GET) +### 9. `/get_all_stored_strings/` (GET) #### Purpose Retrieve a list of all stored strings from the database for which embeddings have been computed. @@ -661,7 +601,7 @@ Retrieve a list of all stored strings from the database for which embeddings hav #### Parameters - `token`: Security token (optional). -### 11. `/get_all_stored_documents/` (GET) +### 10. `/get_all_stored_documents/` (GET) #### Purpose Retrieve a list of all stored documents from the database for which embeddings have been computed. @@ -669,7 +609,7 @@ Retrieve a list of all stored documents from the database for which embeddings h #### Parameters - `token`: Security token (optional). -### 12. `/clear_ramdisk/` (POST) +### 11. `/clear_ramdisk/` (POST) #### Purpose Clear the RAM Disk to free up memory. @@ -678,7 +618,7 @@ Clear the RAM Disk to free up memory. - `token`: Security token (optional). -### 13. `/download/{file_name}` (GET) +### 12. `/download/{file_name}` (GET) #### Purpose Download a ZIP file containing document embeddings that were generated through the `/compute_transcript_with_whisper_from_audio/` endpoint. The URL for this download will be supplied in the JSON response of the audio file transcription endpoint. @@ -686,7 +626,7 @@ Download a ZIP file containing document embeddings that were generated through t #### Parameters - `file_name`: The name of the ZIP file that you want to download. -### 14. `/add_new_model/` (POST) +### 13. `/add_new_model/` (POST) #### Purpose Submit a new model URL for download and use. The model must be in `.gguf` format and larger than 100 MB to ensure it's a valid model file. diff --git a/database_functions.py b/database_functions.py index a03f686..d937c54 100644 --- a/database_functions.py +++ b/database_functions.py @@ -1,18 +1,17 @@ -from embeddings_data_models import Base, TextEmbedding, DocumentEmbedding, DocumentTokenLevelEmbedding, Document, TokenLevelEmbedding, TokenLevelEmbeddingBundle, TokenLevelEmbeddingBundleCombinedFeatureVector, AudioTranscript +from embeddings_data_models import Base, TextEmbedding, DocumentEmbedding, Document, AudioTranscript from logger_config import setup_logger import traceback import asyncio import random -from sqlalchemy import select +from sqlalchemy import select, update, UniqueConstraint, exists from sqlalchemy import text as sql_text from sqlalchemy.exc import SQLAlchemyError, OperationalError, IntegrityError from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from decouple import config -from datetime import datetime, timedelta +from datetime import datetime logger = setup_logger() - db_writer = None DATABASE_URL = "sqlite+aiosqlite:///swiss_army_llama.sqlite" MAX_RETRIES = config("MAX_RETRIES", default=3, cast=int) @@ -21,45 +20,45 @@ JITTER_FACTOR = config("JITTER_FACTOR", default=0.1, cast=float) engine = create_async_engine(DATABASE_URL, echo=False, connect_args={"check_same_thread": False}) -AsyncSessionLocal = sessionmaker( - bind=engine, - class_=AsyncSession, - expire_on_commit=False, - autoflush=False -) +AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False, autoflush=False) + +async def consolidate_wal_data(): + consolidate_command = "PRAGMA wal_checkpoint(FULL);" + try: + async with engine.begin() as conn: + result = await conn.execute(sql_text(consolidate_command)) + result_fetch = result.fetchone() + return result_fetch + except Exception as e: + logger.error(f"Error during WAL consolidation: {e}") + return None + class DatabaseWriter: def __init__(self, queue): self.queue = queue - self.processing_hashes = set() # Set to store the hashes if everything that is currently being processed in the queue (to avoid duplicates of the same task being added to the queue) + self.processing_hashes = set() def _get_hash_from_operation(self, operation): - attr_name = { - TextEmbedding: 'text_hash', - DocumentEmbedding: 'file_hash', - DocumentTokenLevelEmbedding: 'file_hash', - Document: 'document_hash', - TokenLevelEmbedding: 'word_hash', - TokenLevelEmbeddingBundle: 'input_text_hash', - TokenLevelEmbeddingBundleCombinedFeatureVector: 'combined_feature_vector_hash', - AudioTranscript: 'audio_file_hash' - }.get(type(operation)) - hash_value = getattr(operation, attr_name, None) - llm_model_name = getattr(operation, 'llm_model_name', None) - return f"{hash_value}_{llm_model_name}" if hash_value and llm_model_name else None + if isinstance(operation, TextEmbedding): + return f"{operation.text_hash}_{operation.llm_model_name}" + elif isinstance(operation, DocumentEmbedding): + return f"{operation.file_hash}_{operation.llm_model_name}_{operation.corpus_identifier_string}" + elif isinstance(operation, Document): + return operation.document_hash + elif isinstance(operation, AudioTranscript): + return operation.audio_file_hash + return None async def initialize_processing_hashes(self, chunk_size=1000): start_time = datetime.utcnow() async with AsyncSessionLocal() as session: queries = [ - (select(TextEmbedding.text_hash, TextEmbedding.llm_model_name), True), - (select(DocumentEmbedding.file_hash, DocumentEmbedding.llm_model_name), True), - (select(Document.document_hash, Document.llm_model_name), True), - (select(TokenLevelEmbedding.word_hash, TokenLevelEmbedding.llm_model_name), True), - (select(TokenLevelEmbeddingBundle.input_text_hash, TokenLevelEmbeddingBundle.llm_model_name), True), - (select(TokenLevelEmbeddingBundleCombinedFeatureVector.combined_feature_vector_hash, TokenLevelEmbeddingBundleCombinedFeatureVector.llm_model_name), True), - (select(AudioTranscript.audio_file_hash), False) + (select(TextEmbedding.text_hash, TextEmbedding.llm_model_name), TextEmbedding), + (select(DocumentEmbedding.file_hash, DocumentEmbedding.llm_model_name, DocumentEmbedding.corpus_identifier_string), DocumentEmbedding), + (select(Document.document_hash), Document), + (select(AudioTranscript.audio_file_hash), AudioTranscript) ] - for query, has_llm in queries: + for query, model_class in queries: offset = 0 while True: result = await session.execute(query.limit(chunk_size).offset(offset)) @@ -67,9 +66,13 @@ async def initialize_processing_hashes(self, chunk_size=1000): if not rows: break for row in rows: - if has_llm: + if model_class in [TextEmbedding]: hash_with_model = f"{row[0]}_{row[1]}" - else: + elif model_class in [DocumentEmbedding]: + hash_with_model = f"{row[0]}_{row[1]}_{row[2]}" + elif model_class == Document: + hash_with_model = row[0] + elif model_class == AudioTranscript: hash_with_model = row[0] self.processing_hashes.add(hash_with_model) offset += chunk_size @@ -78,35 +81,43 @@ async def initialize_processing_hashes(self, chunk_size=1000): if len(self.processing_hashes) > 0: logger.info(f"Finished initializing set of input hash/llm_model_name combinations that are either currently being processed or have already been processed. Set size: {len(self.processing_hashes)}; Took {total_time} seconds, for an average of {total_time / len(self.processing_hashes)} seconds per hash.") - async def _handle_integrity_error(self, e, write_operation, session): - unique_constraint_msg = { - TextEmbedding: "token_embeddings.text_hash, token_embeddings.llm_model_name", - DocumentEmbedding: "document_embeddings.file_hash, document_embeddings.llm_model_name", - DocumentTokenLevelEmbedding: "document_token_level_embeddings.file_hash, document_token_level_embeddings.llm_model_name", - Document: "documents.document_hash, documents.llm_model_name", - TokenLevelEmbedding: "token_level_embeddings.word_hash, token_level_embeddings.llm_model_name", - TokenLevelEmbeddingBundle: "token_level_embedding_bundles.input_text_hash, token_level_embedding_bundles.llm_model_name", - AudioTranscript: "audio_transcripts.audio_file_hash" - }.get(type(write_operation)) - if unique_constraint_msg and unique_constraint_msg in str(e): - logger.warning(f"Embedding already exists in the database for given input and llm_model_name: {e}") - await session.rollback() - else: - raise - + async def _record_exists(self, session, operation): + model_class = type(operation) + if model_class == TextEmbedding: + return await session.execute(select(exists().where(TextEmbedding.text_hash == operation.text_hash).where(TextEmbedding.llm_model_name == operation.llm_model_name))) + elif model_class == DocumentEmbedding: + return await session.execute(select(exists().where(DocumentEmbedding.file_hash == operation.file_hash).where(DocumentEmbedding.llm_model_name == operation.llm_model_name).where(DocumentEmbedding.corpus_identifier_string == operation.corpus_identifier_string))) + elif model_class == Document: + return await session.execute(select(exists().where(Document.document_hash == operation.document_hash))) + elif model_class == AudioTranscript: + return await session.execute(select(exists().where(AudioTranscript.audio_file_hash == operation.audio_file_hash))) + return None + async def dedicated_db_writer(self): while True: write_operations_batch = await self.queue.get() async with AsyncSessionLocal() as session: + filtered_operations = [] try: - for write_operation in write_operations_batch: - session.add(write_operation) - await session.flush() # Flush to get the IDs - await session.commit() - for write_operation in write_operations_batch: - hash_to_remove = self._get_hash_from_operation(write_operation) - if hash_to_remove is not None and hash_to_remove in self.processing_hashes: - self.processing_hashes.remove(hash_to_remove) + if write_operations_batch: + for write_operation in write_operations_batch: + existing_record = await self._record_exists(session, write_operation) + if not existing_record.scalar(): + filtered_operations.append(write_operation) + hash_value = self._get_hash_from_operation(write_operation) + if hash_value: + self.processing_hashes.add(hash_value) + else: + await self._update_existing_record(session, write_operation) + if filtered_operations: + await consolidate_wal_data() # Consolidate WAL before performing writes + session.add_all(filtered_operations) + await session.flush() # Flush to get the IDs + await session.commit() + for write_operation in filtered_operations: + hash_to_remove = self._get_hash_from_operation(write_operation) + if hash_to_remove is not None and hash_to_remove in self.processing_hashes: + self.processing_hashes.remove(hash_to_remove) except IntegrityError as e: await self._handle_integrity_error(e, write_operation, session) except SQLAlchemyError as e: @@ -117,12 +128,43 @@ async def dedicated_db_writer(self): logger.error(f"Unexpected error: {e}\n{tb}") await session.rollback() self.queue.task_done() - + + async def _update_existing_record(self, session, operation): + model_class = type(operation) + primary_keys = [key.name for key in model_class.__table__.primary_key] + unique_constraints = [c for c in model_class.__table__.constraints if isinstance(c, UniqueConstraint)] + conditions = [] + for constraint in unique_constraints: + if set(constraint.columns.keys()).issubset(set(operation.__dict__.keys())): + for col in constraint.columns.keys(): + conditions.append(getattr(model_class, col) == getattr(operation, col)) + break + if not conditions: + for pk in primary_keys: + conditions.append(getattr(model_class, pk) == getattr(operation, pk)) + values = {col: getattr(operation, col) for col in operation.__dict__.keys() if col in model_class.__table__.columns.keys()} + stmt = update(model_class).where(*conditions).values(**values) + await session.execute(stmt) + await session.commit() + + async def _handle_integrity_error(self, e, write_operation, session): + unique_constraint_msg = { + TextEmbedding: "embeddings.text_hash, embeddings.llm_model_name", + DocumentEmbedding: "document_embeddings.file_hash, document_embeddings.llm_model_name", + Document: "documents.document_hash, documents.llm_model_name", + AudioTranscript: "audio_transcripts.audio_file_hash" + }.get(type(write_operation)) + if unique_constraint_msg and unique_constraint_msg in str(e): + logger.warning(f"Embedding already exists in the database for given input and llm_model_name: {e}") + await self._update_existing_record(session, write_operation) + else: + raise + async def enqueue_write(self, write_operations): - write_operations = [op for op in write_operations if self._get_hash_from_operation(op) not in self.processing_hashes] # Filter out write operations for hashes that are already being processed - if not write_operations: # If there are no write operations left after filtering, return early + write_operations = [op for op in write_operations if self._get_hash_from_operation(op) not in self.processing_hashes] + if not write_operations: return - for op in write_operations: # Add the hashes of the write operations to the set + for op in write_operations: hash_value = self._get_hash_from_operation(op) if hash_value: self.processing_hashes.add(hash_value) @@ -146,13 +188,21 @@ async def execute_with_retry(func, *args, **kwargs): async def initialize_db(use_verbose = 0): logger.info("Initializing database, creating tables, and setting SQLite PRAGMAs...") - list_of_sqlite_pragma_strings = ["PRAGMA journal_mode=WAL;", "PRAGMA synchronous = NORMAL;", "PRAGMA cache_size = -1048576;", "PRAGMA busy_timeout = 2000;", "PRAGMA wal_autocheckpoint = 100;"] - list_of_sqlite_pragma_justification_strings = ["Set SQLite to use Write-Ahead Logging (WAL) mode (from default DELETE mode) so that reads and writes can occur simultaneously", - "Set synchronous mode to NORMAL (from FULL) so that writes are not blocked by reads", - "Set cache size to 1GB (from default 2MB) so that more data can be cached in memory and not read from disk; to make this 256MB, set it to -262144 instead", - "Increase the busy timeout to 2 seconds so that the database waits", - "Set the WAL autocheckpoint to 100 (from default 1000) so that the WAL file is checkpointed more frequently"] - assert(len(list_of_sqlite_pragma_strings) == len(list_of_sqlite_pragma_justification_strings)) + list_of_sqlite_pragma_strings = [ + "PRAGMA journal_mode=WAL;", + "PRAGMA synchronous = NORMAL;", + "PRAGMA cache_size = -1048576;", + "PRAGMA busy_timeout = 2000;", + "PRAGMA wal_autocheckpoint = 100;" + ] + list_of_sqlite_pragma_justification_strings = [ + "Set SQLite to use Write-Ahead Logging (WAL) mode (from default DELETE mode) so that reads and writes can occur simultaneously", + "Set synchronous mode to NORMAL (from FULL) so that writes are not blocked by reads", + "Set cache size to 1GB (from default 2MB) so that more data can be cached in memory and not read from disk; to make this 256MB, set it to -262144 instead", + "Increase the busy timeout to 2 seconds so that the database waits", + "Set the WAL autocheckpoint to 100 (from default 1000) so that the WAL file is checkpointed more frequently" + ] + assert len(list_of_sqlite_pragma_strings) == len(list_of_sqlite_pragma_justification_strings) async with engine.begin() as conn: for pragma_string in list_of_sqlite_pragma_strings: await conn.execute(sql_text(pragma_string)) @@ -160,7 +210,7 @@ async def initialize_db(use_verbose = 0): logger.info(f"Executed SQLite PRAGMA: {pragma_string}") logger.info(f"Justification: {list_of_sqlite_pragma_justification_strings[list_of_sqlite_pragma_strings.index(pragma_string)]}") try: - await conn.run_sync(Base.metadata.create_all) # Create tables if they don't exist + await conn.run_sync(Base.metadata.create_all) # Create tables if they don't exist except Exception as e: # noqa: F841 pass logger.info("Database initialization completed.") @@ -168,15 +218,15 @@ async def initialize_db(use_verbose = 0): def get_db_writer() -> DatabaseWriter: return db_writer # Return the existing DatabaseWriter instance -def delete_expired_rows(session_factory): - async def async_delete_expired_rows(): - async with session_factory() as session: - expiration_time = datetime.utcnow() - timedelta(hours=48) - expired_rows = await session.execute( - select(TokenLevelEmbeddingBundle).where(TokenLevelEmbeddingBundle.created_at < expiration_time) - ) - expired_rows = expired_rows.scalars().all() - for row in expired_rows: - await session.delete(row) - await session.commit() - return async_delete_expired_rows +# def delete_expired_rows(session_factory): +# async def async_delete_expired_rows(): +# async with session_factory() as session: +# expiration_time = datetime.utcnow() - timedelta(hours=48) +# expired_rows = await session.execute( +# select(TokenLevelEmbeddingBundle).where(TokenLevelEmbeddingBundle.created_at < expiration_time) +# ) +# expired_rows = expired_rows.scalars().all() +# for row in expired_rows: +# await session.delete(row) +# await session.commit() +# return async_delete_expired_rows \ No newline at end of file diff --git a/embeddings_data_models.py b/embeddings_data_models.py index 9986a4c..fef5a75 100644 --- a/embeddings_data_models.py +++ b/embeddings_data_models.py @@ -1,13 +1,12 @@ from sqlalchemy import Column, String, Float, DateTime, Integer, UniqueConstraint, ForeignKey, LargeBinary from sqlalchemy.dialects.sqlite import JSON -from sqlalchemy.orm import declarative_base, relationship, validates +from sqlalchemy.orm import declarative_base, relationship +from sqlalchemy.ext.declarative import declared_attr from hashlib import sha3_256 -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, field_validator from typing import List, Optional, Union, Dict -from typing_extensions import Annotated from decouple import config from sqlalchemy import event -from sqlalchemy.ext.hybrid import hybrid_property from datetime import datetime Base = declarative_base() @@ -16,11 +15,21 @@ DEFAULT_NUMBER_OF_COMPLETIONS_TO_GENERATE = config("DEFAULT_NUMBER_OF_COMPLETIONS_TO_GENERATE", default=4, cast=int) DEFAULT_COMPLETION_TEMPERATURE = config("DEFAULT_COMPLETION_TEMPERATURE", default=0.7, cast=float) -class TextEmbedding(Base): +class SerializerMixin: + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + + def as_dict(self): + return {c.key: getattr(self, c.key) for c in self.__table__.columns} + +class TextEmbedding(Base, SerializerMixin): __tablename__ = "embeddings" id = Column(Integer, primary_key=True, index=True) text = Column(String, index=True) text_hash = Column(String, index=True) + embedding_pooling_method = Column(String, index=True) + embedding_hash = Column(String, index=True) llm_model_name = Column(String, index=True) corpus_identifier_string = Column(String, index=True) embedding_json = Column(String) @@ -29,36 +38,8 @@ class TextEmbedding(Base): response_time = Column(DateTime) total_time = Column(Float) document_file_hash = Column(String, ForeignKey('document_embeddings.file_hash')) - corpus_identifier_string = Column(String, index=True) document = relationship("DocumentEmbedding", back_populates="embeddings", foreign_keys=[document_file_hash, corpus_identifier_string]) __table_args__ = (UniqueConstraint('text_hash', 'llm_model_name', name='_text_hash_model_uc'),) - @validates('text') - def update_text_hash(self, key, text): - self.text_hash = sha3_256(text.encode('utf-8')).hexdigest() - return text - - -class TokenLevelEmbedding(Base): - __tablename__ = "token_level_embeddings" - id = Column(Integer, primary_key=True, index=True) - word = Column(String, index=True) - word_hash = Column(String, index=True) - llm_model_name = Column(String, index=True) - token_level_embedding_json = Column(String) - ip_address = Column(String) - request_time = Column(DateTime) - response_time = Column(DateTime) - total_time = Column(Float) - document_file_hash = Column(String, ForeignKey('document_token_level_embeddings.file_hash')) - corpus_identifier_string = Column(String, index=True) - document = relationship("DocumentTokenLevelEmbedding", back_populates="token_level_embeddings", foreign_keys=[document_file_hash, corpus_identifier_string]) - token_level_embedding_bundle_id = Column(Integer, ForeignKey('token_level_embedding_bundles.id')) - token_level_embedding_bundle = relationship("TokenLevelEmbeddingBundle", back_populates="token_level_embeddings") - __table_args__ = (UniqueConstraint('word_hash', 'llm_model_name', name='_word_hash_model_uc'),) - @validates('word') - def update_word_hash(self, key, word): - self.word_hash = sha3_256(word.encode('utf-8')).hexdigest() - return word class DocumentEmbedding(Base): __tablename__ = "document_embeddings" @@ -67,38 +48,19 @@ class DocumentEmbedding(Base): filename = Column(String) mimetype = Column(String) file_hash = Column(String, index=True) + embedding_pooling_method = Column(String, index=True) llm_model_name = Column(String, index=True) corpus_identifier_string = Column(String, index=True) file_data = Column(LargeBinary) # To store the original file sentences = Column(String) - document_embedding_results_json = Column(JSON) # To store the embedding results JSON + document_embedding_results_json_compressed_binary = Column(LargeBinary) # To store the embedding results JSON ip_address = Column(String) request_time = Column(DateTime) response_time = Column(DateTime) total_time = Column(Float) embeddings = relationship("TextEmbedding", back_populates="document", foreign_keys=[TextEmbedding.document_file_hash]) __table_args__ = (UniqueConstraint('file_hash', 'llm_model_name', 'corpus_identifier_string', name='_file_hash_model_corpus_uc'),) - document = relationship("Document", back_populates="document_embeddings", foreign_keys=[document_hash, corpus_identifier_string]) - -class DocumentTokenLevelEmbedding(Base): - __tablename__ = "document_token_level_embeddings" - id = Column(Integer, primary_key=True, index=True) - document_hash = Column(String, ForeignKey('documents.document_hash')) - filename = Column(String) - mimetype = Column(String) - file_hash = Column(String, index=True) - llm_model_name = Column(String, index=True) - corpus_identifier_string = Column(String, index=True) - file_data = Column(LargeBinary) # To store the original file - sentences = Column(String) - document_embedding_results_json = Column(JSON) # To store the embedding results JSON - ip_address = Column(String) - request_time = Column(DateTime) - response_time = Column(DateTime) - total_time = Column(Float) - token_level_embeddings = relationship("TokenLevelEmbedding", back_populates="document", foreign_keys=[TokenLevelEmbedding.document_file_hash]) - __table_args__ = (UniqueConstraint('file_hash', 'llm_model_name', 'corpus_identifier_string', name='_file_hash_model_corpus_uc'),) - document = relationship("Document", back_populates="document_token_level_embeddings", foreign_keys=[document_hash, corpus_identifier_string]) + document = relationship("Document", back_populates="document_embeddings", foreign_keys=[document_hash]) class Document(Base): __tablename__ = "documents" @@ -107,8 +69,6 @@ class Document(Base): corpus_identifier_string = Column(String, index=True) document_hash = Column(String, index=True) document_embeddings = relationship("DocumentEmbedding", back_populates="document", foreign_keys=[DocumentEmbedding.document_hash]) - document_token_level_embeddings = relationship("DocumentTokenLevelEmbedding", back_populates="document", foreign_keys=[DocumentTokenLevelEmbedding.document_hash]) - corpus_identifier_string = Column(String, index=True) def update_hash(self): # Concatenate specific attributes from the document_embeddings relationship hash_data = "".join([emb.filename + emb.mimetype for emb in self.document_embeddings]) self.document_hash = sha3_256(hash_data.encode('utf-8')).hexdigest() @@ -118,62 +78,20 @@ def update_document_hash_on_append(target, value, initiator): @event.listens_for(Document.document_embeddings, 'remove') def update_document_hash_on_remove(target, value, initiator): target.update_hash() -@event.listens_for(Document.document_token_level_embeddings, 'append') -def update_document_token_level_hash_on_append(target, value, initiator): - target.update_hash() -@event.listens_for(Document.document_token_level_embeddings, 'remove') -def update_document_token_level_hash_hash_on_remove(target, value, initiator): - target.update_hash() - -class TokenLevelEmbeddingBundle(Base): - __tablename__ = "token_level_embedding_bundles" - id = Column(Integer, primary_key=True, index=True) - input_text = Column(String, index=True) - input_text_hash = Column(String, index=True) # Hash of the input text - llm_model_name = Column(String, index=True) - corpus_identifier_string = Column(String, index=True) - token_level_embeddings_bundle_json = Column(JSON) # JSON containing the token-level embeddings - ip_address = Column(String) - request_time = Column(DateTime) - response_time = Column(DateTime) - total_time = Column(Float) - token_level_embeddings = relationship("TokenLevelEmbedding", back_populates="token_level_embedding_bundle") - combined_feature_vector = relationship("TokenLevelEmbeddingBundleCombinedFeatureVector", uselist=False, back_populates="token_level_embedding_bundle") - __table_args__ = (UniqueConstraint('input_text_hash', 'llm_model_name', name='_input_text_hash_model_uc'),) - @validates('input_text') - def update_input_text_hash(self, key, input_text): - self.input_text_hash = sha3_256(input_text.encode('utf-8')).hexdigest() - return input_text - -class TokenLevelEmbeddingBundleCombinedFeatureVector(Base): - __tablename__ = "token_level_embedding_bundle_combined_feature_vectors" - id = Column(Integer, primary_key=True, index=True) - token_level_embedding_bundle_id = Column(Integer, ForeignKey('token_level_embedding_bundles.id')) - llm_model_name = Column(String, index=True) - corpus_identifier_string = Column(String, index=True) - combined_feature_vector_json = Column(String) # Store as JSON string - combined_feature_vector_hash = Column(String, index=True) # Hash of the combined feature vector - token_level_embedding_bundle = relationship("TokenLevelEmbeddingBundle", back_populates="combined_feature_vector") - __table_args__ = (UniqueConstraint('combined_feature_vector_hash', 'llm_model_name', name='_combined_feature_vector_hash_model_uc'),) - @hybrid_property - def input_text(self): - return self.token_level_embedding_bundle.input_text - @validates('combined_feature_vector_json') - def update_text_hash(self, key, combined_feature_vector_json): - self.combined_feature_vector_hash = sha3_256(combined_feature_vector_json.encode('utf-8')).hexdigest() - return combined_feature_vector_json # Request/Response models start here: class EmbeddingRequest(BaseModel): text: str llm_model_name: Optional[str] = DEFAULT_MODEL_NAME + embedding_pooling_method: str = "means" corpus_identifier_string: Optional[str] = "" class SimilarityRequest(BaseModel): text1: str text2: str llm_model_name: Optional[str] = DEFAULT_MODEL_NAME + embedding_pooling_method: str = "means" similarity_measure: Optional[str] = "all" @field_validator('similarity_measure') def validate_similarity_measure(cls, value): @@ -186,17 +104,19 @@ class SemanticSearchRequest(BaseModel): query_text: str number_of_most_similar_strings_to_return: int = 10 llm_model_name: str = DEFAULT_MODEL_NAME + embedding_pooling_method: str = "means" corpus_identifier_string: str = "" - use_token_level_embeddings: Annotated[int, Field(ge=0, le=1)] = 0 class SemanticSearchResponse(BaseModel): query_text: str corpus_identifier_string: str + embedding_pooling_method: str = "means" results: List[dict] # List of similar strings and their similarity scores using cosine similarity with Faiss (in descending order) class AdvancedSemanticSearchRequest(BaseModel): query_text: str llm_model_name: str = DEFAULT_MODEL_NAME + embedding_pooling_method: str = "means" corpus_identifier_string: str similarity_filter_percentage: float = 0.98 number_of_most_similar_strings_to_return: Optional[int] = None @@ -204,15 +124,30 @@ class AdvancedSemanticSearchRequest(BaseModel): class AdvancedSemanticSearchResponse(BaseModel): query_text: str corpus_identifier_string: str + embedding_pooling_method: str = "means" results: List[Dict[str, Union[str, float, Dict[str, float]]]] class EmbeddingResponse(BaseModel): + id: int + text: str + text_hash: str + embedding_pooling_method: str + embedding_hash: str + llm_model_name: str + corpus_identifier_string: str + embedding_json: str + ip_address: Optional[str] + request_time: datetime + response_time: datetime + total_time: float + document_file_hash: Optional[str] embedding: List[float] class SimilarityResponse(BaseModel): text1: str text2: str similarity_measure: str + embedding_pooling_method: str = "means" similarity_score: Union[float, Dict[str, float]] # Now can be either a float or a dictionary embedding1: List[float] embedding2: List[float] diff --git a/environment.yml b/environment.yml index 1b88ed3..ffbc17d 100644 --- a/environment.yml +++ b/environment.yml @@ -29,7 +29,9 @@ dependencies: - redis - ruff - scipy + - scikit-learn - sqlalchemy - textract-py3 - uvicorn - uvloop + - zstandard \ No newline at end of file diff --git a/misc_utility_functions.py b/misc_utility_functions.py index 8353fb3..6908811 100644 --- a/misc_utility_functions.py +++ b/misc_utility_functions.py @@ -1,5 +1,5 @@ from logger_config import setup_logger -from embeddings_data_models import TextEmbedding, TokenLevelEmbeddingBundle, TokenLevelEmbeddingBundleCombinedFeatureVector +from embeddings_data_models import TextEmbedding import socket import os import re @@ -127,53 +127,32 @@ def configure_redis_in_background(): threading.Thread(target=configure_redis_optimally).start() async def build_faiss_indexes(force_rebuild=False): - global faiss_indexes, token_faiss_indexes, associated_texts_by_model + global faiss_indexes, associated_texts_by_model_and_pooling_method if os.environ.get("FAISS_SETUP_DONE") == "1" and not force_rebuild: - return faiss_indexes, token_faiss_indexes, associated_texts_by_model + return faiss_indexes, associated_texts_by_model_and_pooling_method faiss_indexes = {} - token_faiss_indexes = {} # Separate FAISS indexes for token-level embeddings - associated_texts_by_model = defaultdict(list) # Create a dictionary to store associated texts by model name - associated_token_level_embeddings_by_model = defaultdict(list) # Create a dictionary to store associated token-level embeddings by model name + associated_texts_by_model_and_pooling_method = defaultdict(lambda: defaultdict(list)) # Create a nested dictionary to store associated texts by model name and pooling method async with AsyncSessionLocal() as session: - result = await session.execute(select(TextEmbedding.llm_model_name, TextEmbedding.text, TextEmbedding.embedding_json)) - token_result = await session.execute( - select( - TokenLevelEmbeddingBundleCombinedFeatureVector.llm_model_name, - TokenLevelEmbeddingBundleCombinedFeatureVector.combined_feature_vector_json, - TokenLevelEmbeddingBundleCombinedFeatureVector.token_level_embedding_bundle, - ).join(TokenLevelEmbeddingBundle) - ) - embeddings_by_model = defaultdict(list) - token_embeddings_by_model = defaultdict(list) - for row in result.fetchall(): # Process regular embeddings + result = await session.execute(select(TextEmbedding.llm_model_name, TextEmbedding.text, TextEmbedding.embedding_json, TextEmbedding.embedding_pooling_method)) + embeddings_by_model_and_pooling = defaultdict(lambda: defaultdict(list)) + for row in result.fetchall(): # Process regular embeddings llm_model_name = row[0] - associated_texts_by_model[llm_model_name].append(row[1]) # Store the associated text by model name - embeddings_by_model[llm_model_name].append((row[1], json.loads(row[2]))) - for row in token_result.fetchall(): # Process token-level embeddings - llm_model_name = row[0] - associated_token_level_embeddings_by_model[llm_model_name].append(row[1]) # Store the associated token-level embeddings by model name - token_embeddings_by_model[llm_model_name].append(json.loads(row[2])) - for llm_model_name, embeddings in embeddings_by_model.items(): - logger.info(f"Building Faiss index over embeddings for model {llm_model_name}...") - embeddings_array = np.array([e[1] for e in embeddings]).astype('float32') - if embeddings_array.size == 0: - logger.error(f"No embeddings were loaded from the database for model {llm_model_name}, so nothing to build the Faiss index with!") - continue - faiss.normalize_L2(embeddings_array) # Normalize the vectors for cosine similarity - faiss_index = faiss.IndexFlatIP(embeddings_array.shape[1]) # Use IndexFlatIP for cosine similarity - faiss_index.add(embeddings_array) - faiss_indexes[llm_model_name] = faiss_index # Store the index by model name - for llm_model_name, token_embeddings in token_embeddings_by_model.items(): - token_embeddings_combined_feature_vector = np.array([e[1] for e in token_embeddings]).astype('float32') - if token_embeddings_combined_feature_vector.size == 0: - logger.error(f"No token-level embeddings were loaded from the database for model {llm_model_name}, so nothing to build the Faiss index with!") - continue - faiss.normalize_L2(token_embeddings_combined_feature_vector) # Normalize the vectors for cosine similarity - token_faiss_index = faiss.IndexFlatIP(token_embeddings_combined_feature_vector.shape[1]) # Use IndexFlatIP for cosine similarity - token_faiss_index.add(token_embeddings_combined_feature_vector) - token_faiss_indexes[llm_model_name] = token_faiss_index # Store the token-level index by model name + embedding_pooling_method = row[3] + associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method].append(row[1]) # Store the associated text by model name and pooling method + embeddings_by_model_and_pooling[llm_model_name][embedding_pooling_method].append((row[1], json.loads(row[2]))) + for llm_model_name, embeddings_by_pooling in embeddings_by_model_and_pooling.items(): + for embedding_pooling_method, embeddings in embeddings_by_pooling.items(): + logger.info(f"Building Faiss index over embeddings for model {llm_model_name} with pooling method {embedding_pooling_method}...") + embeddings_array = np.array([e[1] for e in embeddings]).astype('float32') + if embeddings_array.size == 0: + logger.error(f"No embeddings were loaded from the database for model {llm_model_name} with pooling method {embedding_pooling_method}, so nothing to build the Faiss index with!") + continue + faiss.normalize_L2(embeddings_array) # Normalize the vectors for cosine similarity + faiss_index = faiss.IndexFlatIP(embeddings_array.shape[1]) # Use IndexFlatIP for cosine similarity + faiss_index.add(embeddings_array) + faiss_indexes[(llm_model_name, embedding_pooling_method)] = faiss_index # Store the index by model name and pooling method os.environ["FAISS_SETUP_DONE"] = "1" - return faiss_indexes, token_faiss_indexes, associated_texts_by_model, associated_token_level_embeddings_by_model + return faiss_indexes, associated_texts_by_model_and_pooling_method def normalize_logprobs(avg_logprob, min_logprob, max_logprob): range_logprob = max_logprob - min_logprob @@ -182,22 +161,6 @@ def normalize_logprobs(avg_logprob, min_logprob, max_logprob): def truncate_string(s: str, max_length: int = 100) -> str: return s[:max_length] -def analyze_token_embeddings(token_embeddings): - lengths = [len(lst) for lst in token_embeddings] - max_length = max(lengths) - min_length = min(lengths) - return lengths, max_length, min_length - -def filter_shortest_lists(token_embeddings): - lengths, max_length, min_length = analyze_token_embeddings(token_embeddings) - shortest_lists = [lst for lst in token_embeddings if len(lst) == min_length] - return shortest_lists - -def filter_longest_lists(token_embeddings): - lengths, max_length, min_length = analyze_token_embeddings(token_embeddings) - longest_lists = [lst for lst in token_embeddings if len(lst) == max_length] - return longest_lists - def remove_pagination_breaks(text: str) -> str: text = re.sub(r'-(\n)(?=[a-z])', '', text) # Remove hyphens at the end of lines when the word continues on the next line text = re.sub(r'(?<=\w)(? str: text = ' '.join(line for line in text.splitlines() if line.strip() != '') return text +def compress_data(input_data): + if isinstance(input_data, str): + input_data = input_data.encode('utf-8') + zstd_compression_level = 15 # 22 is the highest compression level; 15 is a good balance between compression and speed + zstandard_compressor = zstd.ZstdCompressor(level=zstd_compression_level, write_content_size=True, write_checksum=True) + zstd_compressed_data = zstandard_compressor.compress(input_data) + return zstd_compressed_data + +def decompress_data(compressed_data): + return zstd.decompress(compressed_data) + def extract_embeddings(input_data): embeddings = [] for item in input_data['data']: @@ -129,95 +141,48 @@ def add_model_url(new_url: str) -> str: logger.info("Model URL already exists.") return corrected_url -async def get_embedding_from_db(text: str, llm_model_name: str): - text_hash = sha3_256(text.encode('utf-8')).hexdigest() # Compute the hash - return await execute_with_retry(_get_embedding_from_db, text_hash, llm_model_name) +async def get_embedding_from_db(text: str, llm_model_name: str, embedding_pooling_method: str): + text_hash = sha3_256(text.encode('utf-8')).hexdigest() + return await execute_with_retry(_get_embedding_from_db, text_hash, llm_model_name, embedding_pooling_method) -async def _get_embedding_from_db(text_hash: str, llm_model_name: str) -> Optional[dict]: +async def _get_embedding_from_db(text_hash: str, llm_model_name: str, embedding_pooling_method: str) -> Optional[TextEmbedding]: async with AsyncSessionLocal() as session: result = await session.execute( - sql_text("SELECT embedding_json FROM embeddings WHERE text_hash=:text_hash AND llm_model_name=:llm_model_name"), - {"text_hash": text_hash, "llm_model_name": llm_model_name}, + select(TextEmbedding) + .filter(TextEmbedding.text_hash == text_hash, + TextEmbedding.llm_model_name == llm_model_name, + TextEmbedding.embedding_pooling_method == embedding_pooling_method) ) - row = result.fetchone() - if row: - embedding_json = row[0] - return json.loads(embedding_json) - return None + return result.scalars().first() -async def get_token_level_embedding_bundle_from_db(text: str, llm_model_name: str): - input_text_hash = sha3_256(text.encode('utf-8')).hexdigest() # Compute the hash - return await execute_with_retry(_get_token_level_embedding_bundle_from_db, input_text_hash, llm_model_name) - -async def _get_token_level_embedding_bundle_from_db(input_text_hash: str, llm_model_name: str) -> Optional[TokenLevelEmbeddingBundle]: - async with AsyncSessionLocal() as session: - result = await session.execute( - select(TokenLevelEmbeddingBundle) - .options(joinedload(TokenLevelEmbeddingBundle.token_level_embeddings)) - .options(joinedload(TokenLevelEmbeddingBundle.combined_feature_vector)) - .filter(TokenLevelEmbeddingBundle.input_text_hash == input_text_hash, TokenLevelEmbeddingBundle.llm_model_name == llm_model_name) - ) - token_level_embedding_bundle = result.unique().scalar_one_or_none() - if token_level_embedding_bundle: - await session.expunge(token_level_embedding_bundle) # Detach the object from the session - return token_level_embedding_bundle - return None - -async def get_corpus_identifier_from_embedding_text(text: str, llm_model_name: str): - text_hash = sha3_256(text.encode('utf-8')).hexdigest() # Compute the hash - return await execute_with_retry(_get_corpus_identifier_from_embedding_text, text_hash, llm_model_name) +async def get_corpus_identifier_from_embedding_text(text: str, llm_model_name: str, embedding_pooling_method: str): + text_hash = sha3_256(text.encode('utf-8')).hexdigest() + return await execute_with_retry(_get_corpus_identifier_from_embedding_text, text_hash, llm_model_name, embedding_pooling_method) -async def _get_corpus_identifier_from_embedding_text(text_hash: str, llm_model_name: str) -> Optional[str]: +async def _get_corpus_identifier_from_embedding_text(text_hash: str, llm_model_name: str, embedding_pooling_method: str) -> Optional[str]: async with AsyncSessionLocal() as session: result = await session.execute( - sql_text("SELECT corpus_identifier_string FROM embeddings WHERE text_hash=:text_hash AND llm_model_name=:llm_model_name"), - {"text_hash": text_hash, "llm_model_name": llm_model_name}, + select(TextEmbedding.corpus_identifier_string) + .filter(TextEmbedding.text_hash == text_hash, + TextEmbedding.llm_model_name == llm_model_name, + TextEmbedding.embedding_pooling_method == embedding_pooling_method) ) - row = result.fetchone() - if row: - corpus_identifier_string = row[0] - return corpus_identifier_string - return None - -async def get_list_of_corpus_identifiers_from_list_of_embedding_texts(list_of_texts: List[str], llm_model_name: str): - list_of_text_hashes = [sha3_256(text.encode('utf-8')).hexdigest() for text in list_of_texts] - return await execute_with_retry(_get_list_of_corpus_identifiers_from_list_of_embedding_texts, list_of_text_hashes, llm_model_name) + return result.scalar() -async def _get_list_of_corpus_identifiers_from_list_of_embedding_texts(list_of_text_hashes: List[str], llm_model_name: str) -> List[str]: - async with AsyncSessionLocal() as session: - placeholders = ', '.join(f':hash{i}' for i in range(len(list_of_text_hashes))) - query = sql_text(f""" - SELECT corpus_identifier_string - FROM embeddings - WHERE text_hash IN ({placeholders}) - AND llm_model_name = :llm_model_name - """) - params = {f'hash{i}': hash for i, hash in enumerate(list_of_text_hashes)} - params['llm_model_name'] = llm_model_name - result = await session.execute(query, params) - rows = result.fetchall() - corpus_identifiers = [row[0] for row in rows] - return corpus_identifiers - -async def get_list_of_corpus_identifiers_from_list_of_token_level_embedding_texts(list_of_texts: List[str], llm_model_name: str): +async def get_list_of_corpus_identifiers_from_list_of_embedding_texts(list_of_texts: List[str], llm_model_name: str, embedding_pooling_method: str): list_of_text_hashes = [sha3_256(text.encode('utf-8')).hexdigest() for text in list_of_texts] - return await execute_with_retry(_get_list_of_corpus_identifiers_from_list_of_token_level_embedding_texts, list_of_text_hashes, llm_model_name) + return await execute_with_retry(_get_list_of_corpus_identifiers_from_list_of_embedding_texts, list_of_text_hashes, llm_model_name, embedding_pooling_method) -async def _get_list_of_corpus_identifiers_from_list_of_token_level_embedding_texts(list_of_text_hashes: List[str], llm_model_name: str) -> List[str]: +async def _get_list_of_corpus_identifiers_from_list_of_embedding_texts(list_of_text_hashes: List[str], llm_model_name: str, embedding_pooling_method: str) -> List[str]: async with AsyncSessionLocal() as session: - placeholders = ', '.join(f':hash{i}' for i in range(len(list_of_text_hashes))) - query = sql_text(f""" - SELECT corpus_identifier_string - FROM token_level_embeddings - WHERE input_text_hash IN ({placeholders}) - AND llm_model_name = :llm_model_name - """) - params = {f'hash{i}': hash for i, hash in enumerate(list_of_text_hashes)} - params['llm_model_name'] = llm_model_name - result = await session.execute(query, params) - rows = result.fetchall() - corpus_identifiers = [row[0] for row in rows] - return corpus_identifiers + result = await session.execute( + select(TextEmbedding.corpus_identifier_string) + .filter(TextEmbedding.text_hash.in_(list_of_text_hashes), + TextEmbedding.llm_model_name == llm_model_name, + TextEmbedding.embedding_pooling_method == embedding_pooling_method) + ) + rows = result.scalars().all() + return rows async def get_texts_for_corpus_identifier(corpus_identifier_string: str) -> Dict[str, List[str]]: async with AsyncSessionLocal() as session: @@ -227,27 +192,27 @@ async def get_texts_for_corpus_identifier(corpus_identifier_string: str) -> Dict .filter(DocumentEmbedding.corpus_identifier_string == corpus_identifier_string) ) document_embeddings = result.unique().scalars().all() - texts_by_model = {doc.llm_model_name: [] for doc in document_embeddings} + texts_by_model_and_embedding_pooling_method = {(doc.llm_model_name, doc.embedding_pooling_method): [] for doc in document_embeddings} for document_embedding in document_embeddings: - texts_by_model[document_embedding.llm_model_name].extend( + texts_by_model_and_embedding_pooling_method[(document_embedding.llm_model_name, document_embedding.embedding_pooling_method)].extend( [embedding.text for embedding in document_embedding.embeddings] ) - return texts_by_model + return texts_by_model_and_embedding_pooling_method -async def get_texts_for_model(llm_model_name: str) -> Dict[str, List[str]]: +async def get_texts_for_model_and_embedding_pooling_method(llm_model_name: str, embedding_pooling_method: str) -> Dict[str, List[str]]: async with AsyncSessionLocal() as session: result = await session.execute( select(DocumentEmbedding) .options(joinedload(DocumentEmbedding.embeddings)) - .filter(DocumentEmbedding.llm_model_name == llm_model_name) + .filter(DocumentEmbedding.llm_model_name == llm_model_name, DocumentEmbedding.embedding_pooling_method == embedding_pooling_method) ) document_embeddings = result.unique().scalars().all() - texts_by_model = {llm_model_name: []} + texts_by_model_and_embedding_pooling_method = {(doc.llm_model_name, doc.embedding_pooling_method): [] for doc in document_embeddings} for document_embedding in document_embeddings: - texts_by_model[llm_model_name].extend( + texts_by_model_and_embedding_pooling_method[(document_embedding.llm_model_name, document_embedding.embedding_pooling_method)].extend( [embedding.text for embedding in document_embedding.embeddings] ) - return texts_by_model + return texts_by_model_and_embedding_pooling_method async def get_or_compute_embedding(request: EmbeddingRequest, req: Request = None, client_ip: str = None, document_file_hash: str = None, use_verbose: bool = True) -> dict: request_time = datetime.utcnow() # Capture request time as datetime object @@ -255,512 +220,157 @@ async def get_or_compute_embedding(request: EmbeddingRequest, req: Request = Non client_ip or (req.client.host if req else "localhost") ) # If client_ip is provided, use it; otherwise, try to get from req; if not available, default to "localhost" if use_verbose: - logger.info(f"Received request for embedding for '{request.text}' using model '{request.llm_model_name}' from IP address '{ip_address}'") - embedding_list = await get_embedding_from_db( - request.text, request.llm_model_name - ) # Check if embedding exists in the database - if embedding_list is not None: + logger.info(f"Received request for embedding for '{request.text}' using model '{request.llm_model_name}' and embedding pooling method '{request.embedding_pooling_method}' from IP address '{ip_address}'") + text_embedding_instance = await get_embedding_from_db( + request.text, request.llm_model_name, request.embedding_pooling_method + ) + # Check if embedding exists in the database + if text_embedding_instance is not None: response_time = datetime.utcnow() # Capture response time as datetime object total_time = ( response_time - request_time ).total_seconds() # Calculate time taken in seconds if use_verbose: - logger.info(f"Embedding found in database for '{request.text}' using model '{request.llm_model_name}'; returning in {total_time:.4f} seconds") - return {"embedding": embedding_list} + logger.info(f"Embedding found in database for '{request.text}' using model '{request.llm_model_name}' and embedding pooling method '{request.embedding_pooling_method}'; returning in {total_time:.4f} seconds") + return {"text_embedding_dict": text_embedding_instance.as_dict()} model = load_model(request.llm_model_name) - embedding_list = await calculate_sentence_embedding( - model, request.text - ) # Compute the embedding if not in the database - if embedding_list is None: + # Compute the embedding if not in the database + list_of_embedding_entry_dicts = await calculate_sentence_embeddings_list(model, [request.text], request.embedding_pooling_method) + embedding_entry_dict = list_of_embedding_entry_dicts[0] + if embedding_entry_dict is None: logger.error( - f"Could not calculate the embedding for the given text: '{request.text}' using model '{request.llm_model_name}!'" + f"Could not calculate the embedding for the given text: '{request.text}' using model '{request.llm_model_name} and embedding pooling method '{request.embedding_pooling_method}!'" ) raise HTTPException( status_code=400, detail="Could not calculate the embedding for the given text", ) - embedding_json = json.dumps( - embedding_list - ) # Serialize the list to JSON and save to the database - response_time = datetime.utcnow() # Capture response time as datetime object - total_time = ( - response_time - request_time - ).total_seconds() # Calculate total time using datetime objects + else: + embedding = embedding_entry_dict['embedding'] + embedding_hash = embedding_entry_dict['embedding_hash'] + text = request.text + text_hash = sha3_256(text.encode('utf-8')).hexdigest() + embedding_json = json.dumps(embedding) + request_time = datetime.utcnow() + response_time = datetime.utcnow() + total_time = (response_time - request_time).total_seconds() + embedding_instance = TextEmbedding( + text=text, + text_hash=text_hash, + embedding_hash=embedding_hash, + llm_model_name=request.llm_model_name, + embedding_pooling_method=request.embedding_pooling_method, + corpus_identifier_string=request.corpus_identifier_string, + embedding_json=embedding_json, + ip_address=client_ip, + request_time=request_time, + response_time=response_time, + total_time=total_time, + document_file_hash=document_file_hash, + ) word_length_of_input_text = len(request.text.split()) if word_length_of_input_text > 0: if use_verbose: - logger.info(f"Embedding calculated for '{request.text}' using model '{request.llm_model_name}' in {total_time:,.2f} seconds, or an average of {total_time/word_length_of_input_text :.2f} seconds per word. Now saving to database...") - await save_embedding_to_db( - text=request.text, - llm_model_name=request.llm_model_name, - corpus_identifier_string=request.corpus_identifier_string, - embedding_json=embedding_json, - ip_address=ip_address, - request_time=request_time, - response_time=response_time, - total_time=total_time, - document_file_hash=document_file_hash - ) - return {"embedding": embedding_list} - -async def save_embedding_to_db(text: str, llm_model_name: str, corpus_identifier_string: str, embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime, total_time: float, document_file_hash: str = None): - existing_embedding = await get_embedding_from_db(text, llm_model_name) # Check if the embedding already exists - if existing_embedding is not None: - return existing_embedding - return await execute_with_retry(_save_embedding_to_db, text, llm_model_name, corpus_identifier_string, embedding_json, ip_address, request_time, response_time, total_time, document_file_hash) - -async def _save_embedding_to_db(text: str, llm_model_name: str, corpus_identifier_string: str, embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime, total_time: float, document_file_hash: str = None): - existing_embedding = await get_embedding_from_db(text, llm_model_name) - if existing_embedding: - return existing_embedding - embedding = TextEmbedding( - text=text, - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - embedding_json=embedding_json, - ip_address=ip_address, - request_time=request_time, - response_time=response_time, - total_time=total_time, - document_file_hash=document_file_hash, - ) - await shared_resources.db_writer.enqueue_write([embedding]) # Enqueue the write operation using the db_writer instance - -def load_token_level_embedding_model(llm_model_name: str, raise_http_exception: bool = True): - global USE_VERBOSE + logger.info(f"Embedding calculated for '{request.text}' using model '{request.llm_model_name}' and embedding pooling method '{request.embedding_pooling_method}' in {total_time:,.2f} seconds, or an average of {total_time/word_length_of_input_text :.2f} seconds per word. Now saving to database...") + await shared_resources.db_writer.enqueue_write([embedding_instance]) # Enqueue the write operation using the db_writer instance + return {"text_embedding_dict": embedding_instance.as_dict()} + +async def calculate_sentence_embeddings_list(llama, texts: list, embedding_pooling_method: str) -> list: + sentence_embeddings_object = llama.create_embedding(texts) + sentence_embeddings_list = sentence_embeddings_object['data'] + if len(sentence_embeddings_list) != len(texts): + raise ValueError("Inconsistent number of embeddings found.") + list_of_embedding_entry_dicts = [] + cnt = 0 + for i, current_text in enumerate(texts): + current_set_of_embeddings = sentence_embeddings_list[i]['embedding'] + # Check if `current_set_of_embeddings` is a list of lists or just a list; if it's just a list, then number_of_embeddings will be 1 and we need to convert it to a list of lists + if isinstance(current_set_of_embeddings[0], list): + number_of_embeddings = len(current_set_of_embeddings) + else: + number_of_embeddings = 1 + current_set_of_embeddings = [current_set_of_embeddings] + logger.info(f"Sentence {i + 1} of {len(texts):,} has {number_of_embeddings:,} embeddings for text '{current_text[:50]}...'") + embeddings = np.array(current_set_of_embeddings) + if embedding_pooling_method == "means": + means = np.mean(embeddings, axis=0) + flattened_vector = means.flatten() + elif embedding_pooling_method == "means_mins_maxes": + means = np.mean(embeddings, axis=0) + mins = np.min(embeddings, axis=0) + maxes = np.max(embeddings, axis=0) + combined_feature_vector = np.concatenate([means, mins, maxes]).flatten() + flattened_vector = combined_feature_vector.flatten() + elif embedding_pooling_method == "means_mins_maxes_stds_kurtoses": + means = np.mean(embeddings, axis=0) + mins = np.min(embeddings, axis=0) + maxes = np.max(embeddings, axis=0) + stds = np.std(embeddings, axis=0) + kurtoses = scipy.stats.kurtosis(embeddings, axis=0) + combined_feature_vector = np.concatenate([means, mins, maxes, stds, kurtoses]) + flattened_vector = combined_feature_vector.flatten() + elif embedding_pooling_method == "svd": + svd = TruncatedSVD(n_components=2) # Set n_components to 2 + svd_embeddings = svd.fit_transform(embeddings.T) + flattened_vector = svd_embeddings.flatten() + else: + raise ValueError(f"Unknown embedding_pooling_method: {embedding_pooling_method}") + combined_embedding = flattened_vector.tolist() + embedding_length = len(combined_embedding) + cnt += 1 + embedding_json = json.dumps(combined_embedding) + embedding_hash = sha3_256(embedding_json.encode('utf-8')).hexdigest() + embedding_entry_dict = {'text_index': i, 'text': current_text, 'embedding_pooling_method': embedding_pooling_method,'number_of_token_embeddings_used': number_of_embeddings, 'embedding_length': embedding_length, 'embedding_hash': embedding_hash,'embedding': combined_embedding} + list_of_embedding_entry_dicts.append(embedding_entry_dict) + return list_of_embedding_entry_dicts + +async def batch_save_embeddings_to_db(embeddings: List[TextEmbedding]): + await shared_resources.db_writer.enqueue_write(embeddings) # Perform a single batch insertion + +async def compute_embeddings_for_document(strings: list, llm_model_name: str, embedding_pooling_method: str, corpus_identifier_string: str, client_ip: str, document_file_hash: str, file, original_file_content: bytes, json_format: str = 'records') -> list: + strings = [prepare_string_for_embedding(text) for text in strings] + model = load_model(llm_model_name) try: - if llm_model_name in token_level_embedding_model_cache: # Check if the model is already loaded in the cache - return token_level_embedding_model_cache[llm_model_name] - models_dir = os.path.join(RAMDISK_PATH, 'models') if USE_RAMDISK else os.path.join(BASE_DIRECTORY, 'models') # Determine the model directory path - matching_files = glob.glob(os.path.join(models_dir, f"{llm_model_name}*")) # Search for matching model files - if not matching_files: - logger.error(f"No model file found matching: {llm_model_name}") - raise FileNotFoundError - matching_files.sort(key=os.path.getmtime, reverse=True) # Sort the files based on modification time (recently modified files first) - model_file_path = matching_files[0] - with suppress_stdout_stderr(): - gpu_info = is_gpu_available() - if gpu_info['gpu_found']: - model_instance = Llama(model_path=model_file_path, embedding=True, n_ctx=LLM_CONTEXT_SIZE_IN_TOKENS, verbose=USE_VERBOSE, n_gpu_layers=-1) # Load the model with GPU acceleration - else: - model_instance = Llama(model_path=model_file_path, embedding=True, n_ctx=LLM_CONTEXT_SIZE_IN_TOKENS, verbose=USE_VERBOSE) # Load the model without GPU acceleration - token_level_embedding_model_cache[llm_model_name] = model_instance # Cache the loaded model - return model_instance - except TypeError as e: - logger.error(f"TypeError occurred while loading the model: {e}") - logger.error(traceback.format_exc()) - raise + list_of_embedding_entry_dicts = await calculate_sentence_embeddings_list(model, strings, embedding_pooling_method) except Exception as e: - logger.error(f"Exception occurred while loading the model: {e}") + logger.error(f"Error computing embeddings for batch: {e}") logger.error(traceback.format_exc()) - if raise_http_exception: - raise HTTPException(status_code=404, detail="Model file not found") - else: - raise FileNotFoundError(f"No model file found matching: {llm_model_name}") - -def flatten_token_embeddings(token_embeddings): - flattened_embeddings = [] - for embeddings_list in token_embeddings: - flattened_list = [embedding.item() for embedding in embeddings_list] - flattened_embeddings.append(flattened_list) - return flattened_embeddings - -async def calculate_token_level_embeddings(text: str, llm_model_name: str, corpus_identifier_string: str, client_ip: str, use_verbose = 0) -> List[List[np.array]]: - text = prepare_string_for_embedding(text) - request_time = datetime.utcnow() - if use_verbose: - logger.info(f"Starting token-level embedding calculation for text: '{text}' using model: '{llm_model_name}'") - logger.info(f"Loading model: '{llm_model_name}'") - llm = load_token_level_embedding_model(llm_model_name) - word_list = text.split() - if use_verbose: - logger.info(f"Transformed input text into {len(word_list):,} words/expressions") - async def fetch_existing_embeddings(word: str) -> Optional[List[np.array]]: - existing_embeddings = await get_token_level_embeddings_from_db(word, llm_model_name) - if existing_embeddings is not None: - if use_verbose: - logger.info(f"Embedding retrieved from database for word '{word}'") - return [np.array(embedding) for embedding in existing_embeddings] - return None - existing_embeddings = await asyncio.gather(*[fetch_existing_embeddings(word) for word in word_list]) - missing_words = [word for word, embedding in zip(word_list, existing_embeddings) if embedding is None] - if missing_words: - if use_verbose: - logger.info(f"Computing embeddings for {len(missing_words):,} missing words/expressions in batch") - try: - token_embeddings_object = llm.embed(missing_words) - if isinstance(token_embeddings_object[0][0], list): - complex_embedding_structure = 1 - else: - complex_embedding_structure = 0 - token_embedding_map = {word: embeddings for word, embeddings in zip(missing_words, token_embeddings_object)} - for word, embedding_list in token_embedding_map.items(): - response_time = datetime.utcnow() - token_level_embedding_json = json.dumps(embedding_list) - await store_token_level_embeddings_in_db( - word=word, - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - token_level_embedding_json=token_level_embedding_json, - ip_address=client_ip, - request_time=request_time, - response_time=response_time, - ) - if complex_embedding_structure: - for i, embedding in enumerate(existing_embeddings): - if embedding is None: - existing_embeddings[i] = [np.array(embedding) for embedding in token_embedding_map[word_list[i]]] - else: - for i, embedding in enumerate(existing_embeddings): - if embedding is None: - existing_embeddings[i] = token_embedding_map[word_list[i]] - except Exception as e: - logger.error(f"Exception occurred while computing embeddings: {e}") - logger.error(traceback.format_exc()) - raise - if use_verbose: - logger.info(f"Completed token embedding calculation for all words in text: '{text}'") - return word_list, existing_embeddings, complex_embedding_structure - -def ensure_list_of_floats(input_list): - if all(isinstance(i, np.ndarray) and i.size == 1 for i in input_list): - return [i.item() for i in input_list] - return input_list - -def convert_raw_token_embeddings_to_json(token_embeddings, word_list, complex_embedding_structure, json_format="records", use_verbose=False): - if complex_embedding_structure: - lengths, max_length, min_length = analyze_token_embeddings(token_embeddings) - if use_verbose: - logger.info(f"Split input text into {len(token_embeddings):,} words/expressions. Organizing results.") - logger.info(f"LLM tokens per word of input text: {str([x for x in zip(word_list,lengths)])}") - embedding_lengths = [len(embedding) for embeddings in token_embeddings for embedding in embeddings] - if len(set(embedding_lengths)) != 1: - raise ValueError("Inconsistent embedding vector lengths found.") - df_data = [] # Create a DataFrame with 'word' column and separate columns for each embedding vector - for word_index, (word, embeddings) in enumerate(zip(word_list, token_embeddings)): - for token_index, embedding in enumerate(embeddings): - row = { - 'word_tokennum': f"{word}_{token_index}", - 'embedding': embedding - } - df_data.append(row) - else: - df_data = [] # Create a DataFrame with 'word' column and separate columns for each embedding vector - for word_index, (word, embedding) in enumerate(zip(word_list, token_embeddings)): - row = { - 'word': word, - 'embedding': ensure_list_of_floats(embedding) - } - df_data.append(row) - token_level_embeddings_df = pd.DataFrame(df_data) - json_content = token_level_embeddings_df.to_json(orient=json_format) - return json_content, token_level_embeddings_df - -async def get_token_level_embeddings_from_db(word: str, llm_model_name: str, use_verbose=0) -> Optional[List[List[float]]]: - word_hash = sha3_256(word.encode('utf-8')).hexdigest() # Compute the hash - async with AsyncSessionLocal() as session: - result = await session.execute( - sql_text("SELECT token_level_embedding_json FROM token_level_embeddings WHERE word_hash=:word_hash AND llm_model_name=:llm_model_name"), - {"word_hash": word_hash, "llm_model_name": llm_model_name}, - ) - row = result.fetchone() - if row: - embedding_json = row[0] - if use_verbose: - logger.info(f"Embedding found in database for word hash '{word_hash}' using model '{llm_model_name}'") - return json.loads(embedding_json) - return None - -async def store_token_level_embeddings_in_db(word: str, llm_model_name: str, corpus_identifier_string: str, token_level_embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime): - stored_embedding = await get_token_level_embeddings_from_db(word, llm_model_name, use_verbose=0) # Check if the embedding already exists in the database - if stored_embedding is None: + raise + embeddings_to_save = [] + results = [] + for embedding_entry_dict in list_of_embedding_entry_dicts: + embedding = embedding_entry_dict['embedding'] + embedding_hash = embedding_entry_dict['embedding_hash'] + text_index = embedding_entry_dict['text_index'] + text = strings[text_index] + text_hash = sha3_256(text.encode('utf-8')).hexdigest() + embedding_json = json.dumps(embedding) + request_time = datetime.utcnow() + response_time = datetime.utcnow() total_time = (response_time - request_time).total_seconds() - embedding = TokenLevelEmbedding( - word=word, + embedding_instance = TextEmbedding( + text=text, + text_hash=text_hash, + embedding_hash=embedding_hash, llm_model_name=llm_model_name, + embedding_pooling_method=embedding_pooling_method, corpus_identifier_string=corpus_identifier_string, - token_level_embedding_json=token_level_embedding_json, - ip_address=ip_address, + embedding_json=embedding_json, + ip_address=client_ip, request_time=request_time, response_time=response_time, - total_time=total_time - ) - await shared_resources.db_writer.enqueue_write([embedding]) - -async def compute_token_level_embedding_bundle_combined_feature_vector(token_level_embeddings_df, use_verbose=False) -> List[float]: - start_time = datetime.utcnow() - token_level_embeddings_list = token_level_embeddings_df['embedding'].tolist() - # Check if all embedding vectors have the same length - embedding_lengths = [len(embedding) for embedding in token_level_embeddings_list] - if len(set(embedding_lengths)) != 1: - raise ValueError("Inconsistent embedding vector lengths found.") - embeddings = np.array(token_level_embeddings_list) - if use_verbose: - logger.info(f"Embeddings shape: {embeddings.shape}") - logger.info(f"Computing column-wise means/mins/maxes/std_devs/kurtosis of the embeddings... (shape: {embeddings.shape})") - assert len(embeddings) > 0 - # Compute summary statistics across the embeddings - means = np.mean(embeddings, axis=0) - mins = np.min(embeddings, axis=0) - maxes = np.max(embeddings, axis=0) - stds = np.std(embeddings, axis=0) - kurtoses = scipy.stats.kurtosis(embeddings, axis=0) - if use_verbose: - logger.info("Concatenating the computed statistics to form the combined feature vector") - combined_feature_vector = np.concatenate([means, mins, maxes, stds, kurtoses]) - flattened_vector = combined_feature_vector.flatten() # Ensure the vector is 1D - end_time = datetime.utcnow() - total_time = (end_time - start_time).total_seconds() - if use_verbose: - logger.info(f"Computed the token-level embedding bundle's combined feature vector in {total_time:.2f} seconds.") - return flattened_vector.tolist() - -async def get_or_compute_token_level_embedding_bundle_combined_feature_vector(token_level_embedding_bundle_id, token_level_embeddings_df, use_verbose=0) -> List[float]: - request_time = datetime.utcnow() - if use_verbose: - logger.info(f"Checking for existing combined feature vector for token-level embedding bundle ID: {token_level_embedding_bundle_id}") - if token_level_embedding_bundle_id is not None: - async with AsyncSessionLocal() as session: - result = await session.execute( - select(TokenLevelEmbeddingBundleCombinedFeatureVector) - .filter(TokenLevelEmbeddingBundleCombinedFeatureVector.token_level_embedding_bundle_id == token_level_embedding_bundle_id) - ) - existing_combined_feature_vector = result.scalar_one_or_none() - if existing_combined_feature_vector: - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - if use_verbose: - logger.info(f"Found existing combined feature vector for token-level embedding bundle ID: {token_level_embedding_bundle_id}. Returning cached result in {total_time:.2f} seconds.") - return json.loads(existing_combined_feature_vector.combined_feature_vector_json) - if use_verbose: - logger.info(f"No cached combined feature_vector found for token-level embedding bundle ID: {token_level_embedding_bundle_id}. Computing now...") - combined_feature_vector = await compute_token_level_embedding_bundle_combined_feature_vector(token_level_embeddings_df) - combined_feature_vector_db_object = TokenLevelEmbeddingBundleCombinedFeatureVector( - token_level_embedding_bundle_id=token_level_embedding_bundle_id, - combined_feature_vector_json=json.dumps(combined_feature_vector) - ) - if use_verbose: - logger.info(f"Writing combined feature vector for database write for token-level embedding bundle ID: {token_level_embedding_bundle_id} to the database...") - await shared_resources.db_writer.enqueue_write([combined_feature_vector_db_object]) - return combined_feature_vector - -async def get_existing_combined_feature_vector(text: str, llm_model_name: str) -> Optional[List[float]]: - input_text_hash = sha3_256(text.encode('utf-8')).hexdigest() - async with AsyncSessionLocal() as session: - result = await session.execute( - select(TokenLevelEmbeddingBundle) - .options(joinedload(TokenLevelEmbeddingBundle.token_level_embeddings)) - .filter(TokenLevelEmbeddingBundle.input_text_hash == input_text_hash, TokenLevelEmbeddingBundle.llm_model_name == llm_model_name) + total_time=total_time, + document_file_hash=document_file_hash, ) - existing_embedding_bundle = result.unique().scalar() - if existing_embedding_bundle: - combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(existing_embedding_bundle.id, existing_embedding_bundle.token_level_embeddings) - return combined_feature_vector - return None - -async def get_or_compute_token_level_embedding_combined_feature_vector_from_text(text: str, llm_model_name: str, corpus_identifier_string: str, client_ip: str, use_verbose=0) -> List[float]: - request_time = datetime.utcnow() - combined_feature_vector = await get_existing_combined_feature_vector(text, llm_model_name) - if combined_feature_vector: - return combined_feature_vector - lock_id = f"get_token_level_embeddings_{text}_{llm_model_name}" - lock = await shared_resources.lock_manager.lock(lock_id) - if lock.valid: - try: - text = prepare_string_for_embedding(text) - word_list, token_embeddings, complex_embedding_structure = await calculate_token_level_embeddings(text, llm_model_name, corpus_identifier_string, client_ip, use_verbose) - json_content, token_level_embeddings_df = convert_raw_token_embeddings_to_json(token_embeddings=token_embeddings, word_list=word_list, complex_embedding_structure=complex_embedding_structure, json_format="records", use_verbose=False) - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - embedding_bundle = TokenLevelEmbeddingBundle( - input_text=text, - input_text_hash=sha3_256(text.encode('utf-8')).hexdigest(), - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - ip_address=client_ip, - request_time=request_time - ) - embedding_bundle.token_level_embeddings_bundle_json = json_content - embedding_bundle.response_time = response_time - embedding_bundle.total_time = total_time - combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(embedding_bundle.id, token_level_embeddings_df) - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - if use_verbose: - logger.info(f"Computed and stored combined feature vector for input text '{text}' using model '{llm_model_name}' in {total_time:.2f} seconds.") - return combined_feature_vector - except Exception as e: - logger.error(f"An error occurred while processing the request: {e}") - logger.error(traceback.format_exc()) - raise HTTPException(status_code=500, detail="Internal Server Error") - finally: - await shared_resources.lock_manager.unlock(lock) - else: - if use_verbose: - logger.info(f"Request for input text '{text}' using model '{llm_model_name}' is already being processed.") - return {"status": "already processing"} - -async def calculate_sentence_embedding(llama, text: str) -> np.array: - sentence_embedding_vector = None - retry_count = 0 - while sentence_embedding_vector is None and retry_count < 3: - try: - if retry_count > 0: - logger.info(f"Attempting again to calculate sentence embedding. Attempt number {retry_count + 1}") - prepared_text = prepare_string_for_embedding(text) - sentence_embedding_object = llama.create_embedding(prepared_text) - sentence_embedding_vector = extract_embeddings(sentence_embedding_object) - except TypeError as e: - logger.error(f"TypeError in calculate_sentence_embedding: {e}") - logger.error(traceback.format_exc()) - raise - except Exception as e: - logger.error(f"Exception in calculate_sentence_embedding: {e}") - logger.error(traceback.format_exc()) - text = text[:-int(len(text) * 0.1)] - retry_count += 1 - logger.info(f"Trimming sentence due to too many tokens. New length: {len(text):,}") - if sentence_embedding_vector is None: - logger.error("Failed to calculate sentence embedding after multiple attempts") - return sentence_embedding_vector - -async def calculate_sentence_embeddings_list(llama, texts: list) -> list: - retry_count = 0 - sentence_embeddings_vectors = None - while sentence_embeddings_vectors is None and retry_count < 3: - try: - if retry_count > 0: - logger.info(f"Attempting again to calculate sentence embeddings. Attempt number {retry_count + 1}") - prepared_texts = [prepare_string_for_embedding(text) for text in texts] - sentence_embeddings_object = llama.create_embedding(prepared_texts) - sentence_embeddings_vectors = extract_embeddings_list(sentence_embeddings_object) - except TypeError as e: - logger.error(f"TypeError in calculate_sentence_embeddings_list: {e}") - logger.error(traceback.format_exc()) - raise - except Exception as e: - logger.error(f"Exception in calculate_sentence_embeddings_list: {e}") - logger.error(traceback.format_exc()) - texts = [text[:-int(len(text) * 0.1)] for text in texts] - retry_count += 1 - logger.info(f"Trimming sentences due to too many tokens. New lengths: {[len(text) for text in texts]:,}") - if sentence_embeddings_vectors is None: - logger.error("Failed to calculate sentence embeddings after multiple attempts") - return sentence_embeddings_vectors - -async def compute_embeddings_for_document(strings: list, llm_model_name: str, corpus_identifier_string: str, client_ip: str, document_file_hash: str) -> List[Tuple[str, np.array]]: - results = [] - strings = [prepare_string_for_embedding(text) for text in strings] - if USE_PARALLEL_INFERENCE_QUEUE: - logger.info(f"Using parallel inference queue to compute embeddings for {len(strings):,} strings") - start_time = time.perf_counter() - semaphore = asyncio.Semaphore(MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS) - model = load_model(llm_model_name) - async def compute_embedding_batch(batch): - try: - async with semaphore: - embeddings = await calculate_sentence_embeddings_list(model, batch) - batch_results = [] - for text, embedding in zip(batch, embeddings): - embedding_json = json.dumps(embedding) - request_time = datetime.utcnow() - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - await save_embedding_to_db( - text=text, - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - embedding_json=embedding_json, - ip_address=client_ip, - request_time=request_time, - response_time=response_time, - total_time=total_time, - document_file_hash=document_file_hash, - ) - batch_results.append((text, embedding)) - return batch_results - except Exception as e: - logger.error(f"Error computing embeddings for batch: {e}") - logger.error(traceback.format_exc()) - return [(text, None) for text in batch] - batch_size = MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS - batches = [strings[i:i + batch_size] for i in range(0, len(strings), batch_size)] - batch_results = await asyncio.gather(*[compute_embedding_batch(batch) for batch in batches]) - for batch_result in batch_results: - results.extend(batch_result) - end_time = time.perf_counter() - duration = end_time - start_time - if len(strings) > 0: - logger.info(f"Parallel inference task for {len(strings):,} strings completed in {duration:.2f} seconds; {duration / len(strings):.2f} seconds per string") - else: - logger.info(f"Using sequential inference to compute embeddings for {len(strings)} strings") - start_time = time.perf_counter() - model = load_model(llm_model_name) - embeddings = await calculate_sentence_embeddings_list(model, strings) - for text, embedding in zip(strings, embeddings): - prepared_text = prepare_string_for_embedding(text) - embedding_json = json.dumps(embedding) - request_time = datetime.utcnow() - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - await save_embedding_to_db( - text=prepared_text, - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - embedding_json=embedding_json, - ip_address=client_ip, - request_time=request_time, - response_time=response_time, - total_time=total_time, - document_file_hash=document_file_hash - ) - results.append((text, embedding)) - end_time = time.perf_counter() - duration = end_time - start_time - if len(strings) > 0: - logger.info(f"Sequential inference task for {len(strings):,} strings completed in {duration:.2f} seconds; {duration / len(strings):.2f} seconds per string") - filtered_results = [(text, embedding) for text, embedding in results if embedding is not None] - return filtered_results - -async def compute_token_level_embeddings_for_document(strings: list, llm_model_name: str, corpus_identifier_string: str, client_ip: str, document_file_hash: str) -> List[Tuple[str, List[float]]]: - results = [] - strings = [prepare_string_for_embedding(text) for text in strings] - if USE_PARALLEL_INFERENCE_QUEUE: - start_time = time.perf_counter() - semaphore = asyncio.Semaphore(MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS) - async def compute_embedding_batch(batch): - try: - async with semaphore: - batch_results = [] - for text in batch: - combined_feature_vector = await get_or_compute_token_level_embedding_combined_feature_vector_from_text(text, llm_model_name, corpus_identifier_string, client_ip) - batch_results.append((text, combined_feature_vector)) - return batch_results - except Exception as e: - logger.error(f"Error computing token-level embeddings for batch: {e}") - logger.error(traceback.format_exc()) - return [(text, None) for text in batch] - batch_size = MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS - batches = [strings[i:i + batch_size] for i in range(0, len(strings), batch_size)] - batch_results = await asyncio.gather(*[compute_embedding_batch(batch) for batch in batches]) - for batch_result in batch_results: - results.extend(batch_result) - end_time = time.perf_counter() - duration = end_time - start_time - if len(strings) > 0: - logger.info(f"Parallel inference task for {len(strings):,} strings completed in {duration:.2f} seconds; {duration / len(strings):.2f} seconds per string") - else: - logger.info(f"Using sequential inference to compute token-level embeddings for {len(strings)} strings") - start_time = time.perf_counter() - for text in strings: - combined_feature_vector = await get_or_compute_token_level_embedding_combined_feature_vector_from_text(text, llm_model_name, corpus_identifier_string, client_ip) - results.append((text, combined_feature_vector)) - end_time = time.perf_counter() - duration = end_time - start_time - if len(strings) > 0: - logger.info(f"Sequential inference task for {len(strings):,} strings completed in {duration:.2f} seconds; {duration / len(strings):.2f} seconds per string") - filtered_results = [(text, combined_feature_vector) for text, combined_feature_vector in results if combined_feature_vector is not None] - return filtered_results + embeddings_to_save.append(embedding_instance) + results.append((text, embedding)) + logger.info(f"Storing {len(embeddings_to_save):,} text embeddings in database...") + await batch_save_embeddings_to_db(embeddings_to_save) + logger.info(f"Done storing {len(embeddings_to_save):,} text embeddings in database.") + document_embedding_results_df = pd.DataFrame(list_of_embedding_entry_dicts) + json_content = document_embedding_results_df.to_json(orient=json_format or 'records').encode() + await store_document_embeddings_in_db(file, document_file_hash, original_file_content, strings, json_content, llm_model_name, embedding_pooling_method, corpus_identifier_string, client_ip, request_time) + return json_content async def parse_submitted_document_file_into_sentence_strings_func(temp_file_path: str, mime_type: str): content = "" @@ -786,112 +396,39 @@ async def parse_submitted_document_file_into_sentence_strings_func(temp_file_pat logger.info("No sentences found in the document") raise HTTPException(status_code=400, detail="No sentences found in the document") strings = [s.strip() for s in sentences if len(s.strip()) > MINIMUM_STRING_LENGTH_FOR_DOCUMENT_EMBEDDING] - return strings + thousands_of_input_words = sum(len(s.split()) for s in strings) + return strings, thousands_of_input_words async def _get_document_from_db(file_hash: str): async with AsyncSessionLocal() as session: result = await session.execute(select(Document).filter(Document.document_hash == file_hash)) return result.scalar_one_or_none() -async def store_document_embeddings_in_db(file: File, - file_hash: str, - original_file_content: bytes, - sentences: List[str], - json_content: bytes, - results: List[Tuple[str, np.array]], - llm_model_name: str, - corpus_identifier_string: str, - client_ip: str, - request_time: datetime - ): +async def store_document_embeddings_in_db(file, file_hash: str, original_file_content: bytes, sentences: List[str], json_content: bytes, llm_model_name: str, embedding_pooling_method:str, corpus_identifier_string: str, client_ip: str, request_time: datetime): sentences = json.dumps(sentences) - document = await _get_document_from_db(file_hash) # First, check if a Document with the same hash already exists - if not document: # If not, create a new Document object + document = await _get_document_from_db(file_hash) + if not document: document = Document(document_hash=file_hash, llm_model_name=llm_model_name, corpus_identifier_string=corpus_identifier_string) - await shared_resources.db_writer.enqueue_write([document]) + await shared_resources.db_writer.enqueue_write([document]) + document_embedding_results_json_compressed_binary = compress_data(json_content) document_embedding = DocumentEmbedding( filename=file.filename, mimetype=file.content_type, file_hash=file_hash, llm_model_name=llm_model_name, + embedding_pooling_method=embedding_pooling_method, corpus_identifier_string=corpus_identifier_string, file_data=original_file_content, sentences=sentences, - document_embedding_results_json=json.loads(json_content.decode()), + document_embedding_results_json_compressed_binary=document_embedding_results_json_compressed_binary, ip_address=client_ip, request_time=request_time, response_time=datetime.utcnow(), total_time=(datetime.utcnow() - request_time).total_seconds() ) - document.document_embeddings.append(document_embedding) # Associate it with the Document - document.update_hash() # This will trigger the SQLAlchemy event to update the document_hash - await shared_resources.db_writer.enqueue_write([document, document_embedding]) # Enqueue the write operation for the document embedding - write_operations = [] # Collect text embeddings to write - logger.info(f"Storing {len(results):,} text embeddings in database") - for text, embedding in results: - embedding_entry = await _get_embedding_from_db(text, llm_model_name) - if not embedding_entry: - embedding_entry = TextEmbedding( - text=text, - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - embedding_json=json.dumps(embedding), - ip_address=client_ip, - request_time=request_time, - response_time=datetime.utcnow(), - total_time=(datetime.utcnow() - request_time).total_seconds(), - document_file_hash=file_hash # Link it to the DocumentEmbedding via file_hash - ) - else: - write_operations.append(embedding_entry) - await shared_resources.db_writer.enqueue_write(write_operations) # Enqueue the write operation for text embeddings - -async def _get_document_token_level_embedding_from_db(file_hash: str, llm_model_name: str, corpus_identifier_string: str) -> Optional[DocumentTokenLevelEmbedding]: - async with AsyncSessionLocal() as session: - result = await session.execute( - select(DocumentTokenLevelEmbedding) - .filter( - DocumentTokenLevelEmbedding.file_hash == file_hash, - DocumentTokenLevelEmbedding.llm_model_name == llm_model_name, - DocumentTokenLevelEmbedding.corpus_identifier_string == corpus_identifier_string - ) - ) - return result.scalar_one_or_none() - -async def store_document_token_level_embeddings_in_db(file: File, - file_hash: str, - original_file_content: bytes, - sentences: List[str], - json_content: bytes, - results: List[Tuple[str, List[float]]], - llm_model_name: str, - corpus_identifier_string: str, - client_ip: str, - request_time: datetime): - sentences = json.dumps(sentences) - document = await _get_document_from_db(file_hash) # First, check if a Document with the same hash already exists - if not document: # If not, create a new Document object - document = Document(document_hash=file_hash, llm_model_name=llm_model_name, corpus_identifier_string=corpus_identifier_string) - await shared_resources.db_writer.enqueue_write([document]) - existing_document_token_level_embedding = await _get_document_token_level_embedding_from_db(file_hash, llm_model_name, corpus_identifier_string) - if not existing_document_token_level_embedding: - document_token_level_embedding = DocumentTokenLevelEmbedding( - filename=file.filename, - mimetype=file.content_type, - file_hash=file_hash, - llm_model_name=llm_model_name, - corpus_identifier_string=corpus_identifier_string, - file_data=original_file_content, - sentences=sentences, - document_embedding_results_json=json.loads(json_content.decode()), - ip_address=client_ip, - request_time=request_time, - response_time=datetime.utcnow(), - total_time=(datetime.utcnow() - request_time).total_seconds() - ) - document.document_token_level_embeddings.append(document_token_level_embedding) # Associate it with the Document - document.update_hash() # This will trigger the SQLAlchemy event to update the document_hash - await shared_resources.db_writer.enqueue_write([document, document_token_level_embedding]) # Enqueue the write operation for the document token-level embedding + document.document_embeddings.append(document_embedding) + document.update_hash() + await shared_resources.db_writer.enqueue_write([document, document_embedding]) def load_text_completion_model(llm_model_name: str, raise_http_exception: bool = True): global USE_VERBOSE @@ -1002,7 +539,7 @@ def validate_bnf_grammar_func(grammar): return True, "Valid BNF Grammar" async def convert_document_to_sentences_func(file_path: str, mime_type: str) -> Dict[str, Any]: - sentences = await parse_submitted_document_file_into_sentence_strings_func(file_path, mime_type) + sentences, thousands_of_input_words = await parse_submitted_document_file_into_sentence_strings_func(file_path, mime_type) total_number_of_sentences = len(sentences) total_input_file_size_in_bytes = os.path.getsize(file_path) total_text_size_in_characters = sum(len(sentence) for sentence in sentences) @@ -1013,7 +550,8 @@ async def convert_document_to_sentences_func(file_path: str, mime_type: str) -> "total_number_of_sentences": total_number_of_sentences, "average_words_per_sentence": average_words_per_sentence, "total_input_file_size_in_bytes": total_input_file_size_in_bytes, - "total_text_size_in_characters": total_text_size_in_characters + "total_text_size_in_characters": total_text_size_in_characters, + "thousands_of_input_words": thousands_of_input_words } return result @@ -1086,9 +624,6 @@ async def _get_transcript_from_db(audio_file_hash: str) -> Optional[dict]: return None async def save_transcript_to_db(audio_file_hash, audio_file_name, audio_file_size_mb, transcript_segments, info, ip_address, request_time, response_time, total_time, combined_transcript_text, combined_transcript_text_list_of_metadata_dicts, corpus_identifier_string): - existing_transcript = await get_transcript_from_db(audio_file_hash) - if existing_transcript: - return existing_transcript audio_transcript = AudioTranscript( audio_file_hash=audio_file_hash, audio_file_name=audio_file_name, diff --git a/shared_resources.py b/shared_resources.py index dc2324a..bbb93e3 100644 --- a/shared_resources.py +++ b/shared_resources.py @@ -1,5 +1,5 @@ from misc_utility_functions import is_redis_running, start_redis_server, build_faiss_indexes -from database_functions import DatabaseWriter, initialize_db, delete_expired_rows, AsyncSessionLocal +from database_functions import DatabaseWriter, initialize_db #, AsyncSessionLocal, delete_expired_rows from ramdisk_functions import setup_ramdisk, copy_models_to_ramdisk, check_that_user_has_required_permissions_to_manage_ramdisks from logger_config import setup_logger from aioredlock import Aioredlock @@ -15,16 +15,15 @@ from typing import List, Tuple, Dict from decouple import config from fastapi import HTTPException -from apscheduler.schedulers.asyncio import AsyncIOScheduler +# from apscheduler.schedulers.asyncio import AsyncIOScheduler logger = setup_logger() embedding_model_cache = {} # Model cache to store loaded models -token_level_embedding_model_cache = {} # Model cache to store loaded token-level embedding models text_completion_model_cache = {} # Model cache to store loaded text completion models -scheduler = AsyncIOScheduler() -scheduler.add_job(delete_expired_rows(AsyncSessionLocal), 'interval', hours=1) -scheduler.start() +# scheduler = AsyncIOScheduler() +# scheduler.add_job(delete_expired_rows(AsyncSessionLocal), 'interval', hours=1) +# scheduler.start() SWISS_ARMY_LLAMA_SERVER_LISTEN_PORT = config("SWISS_ARMY_LLAMA_SERVER_LISTEN_PORT", default=8089, cast=int) DEFAULT_MODEL_NAME = config("DEFAULT_MODEL_NAME", default="openchat_v3.2_super", cast=str) @@ -71,7 +70,7 @@ def is_gpu_available(): } async def initialize_globals(): - global db_writer, faiss_indexes, token_faiss_indexes, associated_texts_by_model, redis, lock_manager + global db_writer, faiss_indexes, associated_texts_by_model_and_pooling_method, redis, lock_manager if not is_redis_running(): logger.info("Starting Redis server...") start_redis_server() @@ -98,14 +97,13 @@ async def initialize_globals(): load_model(llm_model_name, raise_http_exception=False) except FileNotFoundError as e: logger.error(e) - faiss_indexes, token_faiss_indexes, associated_texts_by_model, associated_token_level_embeddings_by_model = await build_faiss_indexes() + faiss_indexes, associated_texts_by_model_and_pooling_method = await build_faiss_indexes() # other shared variables and methods db_writer = None faiss_indexes = None -token_faiss_indexes = None -associated_texts_by_model = None +associated_texts_by_model_and_pooling_method = None redis = None lock_manager = None diff --git a/swiss_army_llama.py b/swiss_army_llama.py index 42a2ffd..c319e7c 100644 --- a/swiss_army_llama.py +++ b/swiss_army_llama.py @@ -1,16 +1,16 @@ import shared_resources from shared_resources import initialize_globals, download_models, is_gpu_available from logger_config import setup_logger -from database_functions import AsyncSessionLocal, DatabaseWriter, get_db_writer +from database_functions import AsyncSessionLocal from ramdisk_functions import clear_ramdisk -from misc_utility_functions import build_faiss_indexes, safe_path, configure_redis_optimally -from embeddings_data_models import DocumentEmbedding, DocumentTokenLevelEmbedding, TokenLevelEmbeddingBundle +from misc_utility_functions import build_faiss_indexes, configure_redis_optimally +from embeddings_data_models import DocumentEmbedding from embeddings_data_models import EmbeddingRequest, SemanticSearchRequest, AdvancedSemanticSearchRequest, SimilarityRequest, TextCompletionRequest, AddGrammarRequest from embeddings_data_models import EmbeddingResponse, SemanticSearchResponse, AdvancedSemanticSearchResponse, SimilarityResponse, AllStringsResponse, AllDocumentsResponse, TextCompletionResponse, AddGrammarResponse from embeddings_data_models import ShowLogsIncrementalModel -from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file, start_resource_monitoring, end_resource_monitoring, convert_raw_token_embeddings_to_json -from service_functions import get_list_of_corpus_identifiers_from_list_of_embedding_texts, get_list_of_corpus_identifiers_from_list_of_token_level_embedding_texts, compute_embeddings_for_document, compute_token_level_embeddings_for_document, parse_submitted_document_file_into_sentence_strings_func -from service_functions import store_document_embeddings_in_db, store_document_token_level_embeddings_in_db, generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func, get_audio_duration_seconds, prepare_string_for_embedding, get_or_compute_token_level_embedding_combined_feature_vector_from_text +from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, download_file, start_resource_monitoring, end_resource_monitoring, decompress_data +from service_functions import get_list_of_corpus_identifiers_from_list_of_embedding_texts, compute_embeddings_for_document, parse_submitted_document_file_into_sentence_strings_func +from service_functions import generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func, get_audio_duration_seconds, prepare_string_for_embedding from grammar_builder import GrammarBuilder from log_viewer_functions import show_logs_incremental_func, show_logs_func from uvicorn_config import option @@ -20,7 +20,6 @@ import os import sys import random -import re import tempfile import traceback import zipfile @@ -33,15 +32,13 @@ from decouple import config import uvicorn import fastapi -from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Depends, Form -from fastapi.responses import JSONResponse, FileResponse, HTMLResponse, Response +from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Form +from fastapi.responses import JSONResponse, FileResponse, HTMLResponse from contextlib import asynccontextmanager from sqlalchemy import select from sqlalchemy import text as sql_text from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import joinedload import faiss -import pandas as pd import fast_vector_similarity as fvs import uvloop from magika import Magika @@ -293,12 +290,14 @@ async def add_new_model(model_url: str, token: str = None) -> Dict[str, Any]: The request must contain the following attributes: - `text`: The input text for which the embedding vector is to be retrieved. - `llm_model_name`: The model used to calculate the embedding (optional, will use the default model if not provided). +- `embedding_pooling_method`: The method used to pool the embeddings (Choices: 'mean', 'means_mins_maxes', 'means_mins_maxes_stds_kurtoses', 'svd'; default is 'mean'). ### Example (note that `llm_model_name` is optional): ```json { "text": "This is a sample text.", "llm_model_name": "bge-m3-q8_0", + "embedding_pooling_method": "means", "corpus_identifier_string": "pastel_related_documentation_corpus" } ``` @@ -318,7 +317,7 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques raise HTTPException(status_code=403, detail="Unauthorized") try: request.text = prepare_string_for_embedding(request.text) - unique_id = f"get_embedding_{request.text}_{request.llm_model_name}" + unique_id = f"get_embedding_{request.text}_{request.llm_model_name}_{request.embedding_pooling_method}" lock = await shared_resources.lock_manager.lock(unique_id) if lock.valid: try: @@ -337,157 +336,6 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques -@app.post("/get_token_level_embeddings_matrix_and_combined_feature_vector_for_string/", - summary="Retrieve Token-Level Embeddings and Combined Feature Vector for a Given Input String", - description="""Retrieve the token-level embeddings and combined feature vector for a given input text using the specified model. - -### Parameters: -- `request`: A JSON object containing the text and the model name. -- `db_writer`: Database writer instance for managing write operations. -- `req`: HTTP request object (optional). -- `token`: Security token (optional). -- `client_ip`: Client IP address (optional). -- `json_format`: Format for JSON response of token-level embeddings (optional). -- `send_back_json_or_zip_file`: Whether to return a JSON response or a ZIP file containing the JSON file (optional, defaults to `zip`). - -### Request JSON Format: -The request must contain the following attributes: -- `text`: The input text for which the embeddings are to be retrieved. -- `llm_model_name`: The model used to calculate the embeddings (optional). - -### Example Request: -```json -{ - "text": "This is a sample text.", - "llm_model_name": "Meta-Llama-3-8B-Instruct.Q3_K_S", - "corpus_identifier_string": "pastel_related_documentation_corpus" -} -``` - -### Response: - -The response will include the input text for reference, and token-level embeddings matrix for the input text. The response is organized as a JSON array of objects, each containing a token and its corresponding embedding vector. -Token level embeddings represent a text by breaking it down into individual tokens (words) and associating an embedding vector with each token. These embeddings capture the semantic and -syntactic meaning of each token within the context of the text. Token level embeddings result in a matrix (number of tokens by embedding size), whereas a single embedding vector results -in a one-dimensional vector of fixed size. - -The response will also include a combined feature vector derived from the the token-level embeddings matrix; this combined feature vector has the great benefit that it is always the same length -for all input texts, regardless of length (whereas the token-level embeddings matrix will have a different number of rows for each input text, depending on the number of tokens in the text). -The combined feature vector is obtained by calculating the column-wise means, mins, maxes, and standard deviations of the token-level embeddings matrix; thus if the token-level embedding vectors -are of length `n`, the combined feature vector will be of length `4n`. - -- `input_text`: The original input text. -- `token_level_embedding_bundle`: Either a ZIP file containing the JSON file, or a direct JSON array containing the token-level embeddings and combined feature vector for the input text, depending on the value of `send_back_json_or_zip_file`. -- `combined_feature_vector`: A list containing the combined feature vector, obtained by calculating the column-wise means, mins, maxes, and standard deviations of the token-level embeddings. This vector is always of length `4n`, where `n` is the length of the token-level embedding vectors. - -### Example Response: -```json -{ - "input_text": "This is a sample text.", - "token_level_embedding_bundle": [ - {"token": "This", "embedding": [0.1234, 0.5678, ...]}, - {"token": "is", "embedding": [...]}, - ... - ], - "combined_feature_vector": [0.5678, 0.1234, ...] -} -``` -""", - response_description="A JSON object containing the input text, token embeddings, and combined feature vector for the input text.") -async def get_token_level_embeddings_matrix_and_combined_feature_vector_for_string( - request: EmbeddingRequest, - db_writer: DatabaseWriter = Depends(get_db_writer), - req: Request = None, - token: str = None, - client_ip: str = None, - json_format: str = 'records', - send_back_json_or_zip_file: str = 'zip' -) -> Response: - request.text = prepare_string_for_embedding(request.text) - logger.info(f"Received request for token embeddings with text length {len(request.text):,} and model: '{request.llm_model_name}' from client IP: {client_ip}; input text: {request.text}") - request_time = datetime.utcnow() - if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): - logger.warning(f"Unauthorized request from client IP {client_ip}") - raise HTTPException(status_code=403, detail="Unauthorized") - input_text_hash = sha3_256(request.text.encode('utf-8')).hexdigest() - logger.info(f"Computed input text hash: {input_text_hash}") - async with AsyncSessionLocal() as session: - logger.info(f"Querying database for existing token-level embedding bundle for input text string {request.text} and model {request.llm_model_name}") - result = await session.execute( - select(TokenLevelEmbeddingBundle) - .options(joinedload(TokenLevelEmbeddingBundle.token_level_embeddings)) # Eagerly load the relationship - .filter(TokenLevelEmbeddingBundle.input_text_hash == input_text_hash, TokenLevelEmbeddingBundle.llm_model_name == request.llm_model_name) - ) - existing_embedding_bundle = result.unique().scalar() - if existing_embedding_bundle: - logger.info("Found existing token-level embedding bundle in the database.") - combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(existing_embedding_bundle.id, existing_embedding_bundle.token_level_embeddings) - response_content = { - 'input_text': request.text, - 'token_level_embedding_bundle': json.loads(existing_embedding_bundle.token_level_embeddings_bundle_json), - 'combined_feature_vector': combined_feature_vector - } - return JSONResponse(content=response_content) - unique_id = f"get_token_level_embeddings_{request.text}_{request.llm_model_name}" - lock = await shared_resources.lock_manager.lock(unique_id) - if lock.valid: - try: - logger.info("No cached result found. Calculating token-level embeddings now...") - try: - embedding_bundle = TokenLevelEmbeddingBundle( - input_text=request.text, - llm_model_name=request.llm_model_name, - corpus_identifier_string=request.corpus_identifier_string, - ip_address=client_ip, - request_time=request_time - ) - word_list, token_embeddings, complex_embedding_structure = await calculate_token_level_embeddings(request.text, request.llm_model_name, request.corpus_identifier_string, client_ip) - json_content, token_level_embeddings_df = convert_raw_token_embeddings_to_json(token_embeddings=token_embeddings, word_list=word_list, complex_embedding_structure=complex_embedding_structure, json_format=json_format, use_verbose=True) - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - embedding_bundle.token_level_embeddings_bundle_json = json_content - embedding_bundle.response_time = response_time - embedding_bundle.total_time = total_time - combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(embedding_bundle.id, token_level_embeddings_df) - response_content = { - 'input_text': request.text, - 'token_level_embedding_bundle': json.loads(embedding_bundle.token_level_embeddings_bundle_json), - 'combined_feature_vector': combined_feature_vector - } - logger.info(f"Done getting token-level embedding matrix and combined feature vector for input text string {request.text} and model {request.llm_model_name}") - json_content = embedding_bundle.token_level_embeddings_bundle_json - json_content_length = len(json.dumps(response_content)) - overall_total_time = (datetime.utcnow() - request_time).total_seconds() - if len(embedding_bundle.token_level_embeddings_bundle_json) > 0: - tokens = re.findall(r'\b\w+\b', request.text) - logger.info(f"The response took {overall_total_time:,.2f} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0):,.2f} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0):,.2f} seconds per million output characters.") - if send_back_json_or_zip_file == 'json': # Assume 'json' response should be sent back - logger.info(f"Now sending back JSON response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of JSON response out of {len(json_content)} total characters: {json_content[:100]}") - return JSONResponse(content=response_content) - else: # Assume 'zip' file should be sent back - output_file_name_without_extension = f"token_level_embeddings_and_combined_feature_vector_for_input_hash_{input_text_hash}_and_model_name__{request.llm_model_name}" - is_safe_json, safe_json_file_path = safe_path('/tmp', f"{output_file_name_without_extension}.json") - is_safe_zip, safe_zip_file_path = safe_path('/tmp', f"{output_file_name_without_extension}.zip") - if is_safe_json and is_safe_zip: - with open(safe_json_file_path, 'w') as json_file: - json.dump(response_content, json_file) - with zipfile.ZipFile(safe_zip_file_path, 'w') as zipf: - zipf.write(safe_json_file_path, os.path.basename(safe_json_file_path)) - logger.info(f"Now sending back ZIP file response for input text string '{request.text}' and model {request.llm_model_name}; First 100 characters of zipped JSON file out of {len(json_content)} total characters: '{json_content[:100]}'") - return FileResponse(safe_zip_file_path, headers={"Content-Disposition": f"attachment; filename={output_file_name_without_extension}.zip"}) - else: - logger.error("Potential path injection attack detected.") - raise HTTPException(status_code=500, detail="Internal Server Error") - except Exception as e: - logger.error(f"An error occurred while processing the request: {e}") - logger.error(traceback.format_exc()) - raise HTTPException(status_code=500, detail="Internal Server Error") - finally: - await shared_resources.lock_manager.unlock(lock) - else: - return {"status": "already processing"} - - @app.post("/compute_similarity_between_strings/", response_model=SimilarityResponse, summary="Compute Similarity Between Two Strings", @@ -521,13 +369,13 @@ async def compute_similarity_between_strings(request: SimilarityRequest, req: Re similarity_measure = request.similarity_measure.lower() if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): raise HTTPException(status_code=403, detail="Unauthorized") - unique_id = f"compute_similarity_{request.text1}_{request.text2}_{request.llm_model_name}_{similarity_measure}" + unique_id = f"compute_similarity_{request.text1}_{request.text2}_{request.llm_model_name}_{request.embedding_pooling_method}_{similarity_measure}" lock = await shared_resources.lock_manager.lock(unique_id) if lock.valid: try: client_ip = req.client.host if req else "localhost" - embedding_request1 = EmbeddingRequest(text=request.text1, llm_model_name=request.llm_model_name) - embedding_request2 = EmbeddingRequest(text=request.text2, llm_model_name=request.llm_model_name) + embedding_request1 = EmbeddingRequest(text=request.text1, llm_model_name=request.llm_model_name, embedding_pooling_method=request.embedding_pooling_method) + embedding_request2 = EmbeddingRequest(text=request.text2, llm_model_name=request.llm_model_name, embedding_pooling_method=request.embedding_pooling_method) embedding1_response = await get_or_compute_embedding(request=embedding_request1, req=req, client_ip=client_ip, use_verbose=False) embedding2_response = await get_or_compute_embedding(request=embedding_request2, req=req, client_ip=client_ip, use_verbose=False) embedding1 = np.array(embedding1_response["embedding"]) @@ -582,18 +430,18 @@ async def compute_similarity_between_strings(request: SimilarityRequest, req: Re The request must contain the following attributes: - `query_text`: The input text for which to find the most similar string. - `llm_model_name`: The model used to calculate embeddings. +- `embedding_pooling_method`: The method used to pool the embeddings (Choices: 'mean', 'means_mins_maxes', 'means_mins_maxes_stds_kurtoses', 'svd'; default is 'mean'). - `corpus_identifier_string`: An optional string identifier to restrict the search to a specific corpus. - `number_of_most_similar_strings_to_return`: (Optional) The number of most similar strings to return, defaults to 10. -- `use_token_level_embeddings`: (Optional) Whether to use token-level embeddings for the search, defaults to 0. ### Example: ```json { "query_text": "Find me the most similar string!", "llm_model_name": "bge-m3-q8_0", - "corpus_identifier_string": "pastel_related_documentation_corpus" - "number_of_most_similar_strings_to_return": 5, - "use_token_level_embeddings": 1 + "corpus_identifier_string": "pastel_related_documentation_corpus", + "embedding_pooling_method": "means", + "number_of_most_similar_strings_to_return": 5 } ``` @@ -613,65 +461,54 @@ async def compute_similarity_between_strings(request: SimilarityRequest, req: Re ```""", response_description="A JSON object containing the query text along with the most similar strings and similarity scores.") async def search_stored_embeddings_with_query_string_for_semantic_similarity(request: SemanticSearchRequest, req: Request, token: str = None) -> SemanticSearchResponse: - global faiss_indexes, token_faiss_indexes, associated_texts_by_model + global faiss_indexes, associated_texts_by_model_and_pooling_method request.query_text = prepare_string_for_embedding(request.query_text) - unique_id = f"semantic_search_{request.query_text}_{request.llm_model_name}_{request.use_token_level_embeddings}_{request.corpus_identifier_string}_{request.number_of_most_similar_strings_to_return}" # Unique ID for this operation + unique_id = f"semantic_search_{request.query_text}_{request.llm_model_name}_{request.embedding_pooling_method}_{request.corpus_identifier_string}_{request.number_of_most_similar_strings_to_return}" # Unique ID for this operation lock = await shared_resources.lock_manager.lock(unique_id) - client_ip = req.client.host if req else "localhost" if lock.valid: try: if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): raise HTTPException(status_code=403, detail="Unauthorized") - faiss_indexes, token_faiss_indexes, associated_texts_by_model, associated_token_level_embeddings_by_model = await build_faiss_indexes(force_rebuild=True) + faiss_indexes, associated_texts_by_model_and_pooling_method = await build_faiss_indexes(force_rebuild=True) + try: + faiss_index = faiss_indexes[(request.llm_model_name, request.embedding_pooling_method)] + except KeyError: + raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {request.llm_model_name} and pooling method: {request.embedding_pooling_method}") request_time = datetime.utcnow() llm_model_name = request.llm_model_name + embedding_pooling_method = request.embedding_pooling_method num_results = request.number_of_most_similar_strings_to_return num_results_before_corpus_filter = num_results*100 - if request.use_token_level_embeddings: - total_entries = len(associated_token_level_embeddings_by_model[llm_model_name]) # Get the total number of entries for the model - else: - total_entries = len(associated_texts_by_model[llm_model_name]) # Get the total number of entries for the model + total_entries = len(associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method]) # Get the total number of entries for the model and pooling method num_results = min(num_results, total_entries) # Ensure num_results doesn't exceed the total number of entries num_results_before_corpus_filter = min(num_results_before_corpus_filter, total_entries) # Ensure num_results_before_corpus_filter doesn't exceed the total number of entries - logger.info(f"Received request to find {num_results:,} most similar strings for query text: `{request.query_text}` using model: {llm_model_name} and corpus: {request.corpus_identifier_string}") + logger.info(f"Received request to find {num_results:,} most similar strings for query text: `{request.query_text}` using model: {llm_model_name}, pooling method: {embedding_pooling_method}, and corpus: {request.corpus_identifier_string}") try: - if request.use_token_level_embeddings: - logger.info(f"Computing token-level combined embedding for input text: {request.query_text}") - token_level_embeddings = associated_token_level_embeddings_by_model[llm_model_name] - logger.info(f"Found {len(token_level_embeddings)} token-level embeddings for model {llm_model_name}.") - combined_feature_vector = await get_or_compute_token_level_embedding_combined_feature_vector_from_text(text=request.query_text, llm_model_name=llm_model_name, corpus_identifier_string=request.corpus_identifier_string, client_ip=client_ip, use_verbose=1) - input_embedding = np.array(combined_feature_vector).astype('float32').reshape(1, -1) - else: - logger.info(f"Computing embedding for input text: {request.query_text}") - embedding_request = EmbeddingRequest(text=request.query_text, llm_model_name=request.llm_model_name, corpus_identifier_string=request.corpus_identifier_string) - embedding_response = await get_or_compute_embedding(embedding_request, req) - input_embedding = np.array(embedding_response["embedding"]).astype('float32').reshape(1, -1) + logger.info(f"Computing embedding for input text: {request.query_text}") + embedding_request = EmbeddingRequest(text=request.query_text, llm_model_name=request.llm_model_name, embedding_pooling_method=request.embedding_pooling_method, corpus_identifier_string=request.corpus_identifier_string) + embedding_response = await get_or_compute_embedding(embedding_request, req) + embedding_json = embedding_response["text_embedding_dict"]["embedding_json"] + embedding_vector = json.loads(embedding_json) + input_embedding = np.array(embedding_vector).astype('float32').reshape(1, -1) faiss.normalize_L2(input_embedding) # Normalize the input vector for cosine similarity - logger.info(f"Computed embedding for input text: {request.query_text}") - if request.use_token_level_embeddings: - faiss_index = token_faiss_indexes.get(llm_model_name) - associated_texts_by_model_for_llm = associated_token_level_embeddings_by_model[llm_model_name] - list_of_corpus_identifier_strings = await get_list_of_corpus_identifiers_from_list_of_token_level_embedding_texts(associated_texts_by_model_for_llm, llm_model_name) - logger.info("Searching for the most similar string in the FAISS index using token-level combined feature embeddings") - else: - faiss_index = faiss_indexes.get(llm_model_name) - associated_texts_by_model_for_llm = associated_texts_by_model[llm_model_name] - list_of_corpus_identifier_strings = await get_list_of_corpus_identifiers_from_list_of_embedding_texts(associated_texts_by_model_for_llm, llm_model_name) - logger.info("Searching for the most similar string in the FAISS index using regular embeddings") + results = [] # Create an empty list to store the results + faiss_index = faiss_indexes[(llm_model_name, embedding_pooling_method)] + associated_texts = associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method] + list_of_corpus_identifier_strings = await get_list_of_corpus_identifiers_from_list_of_embedding_texts(associated_texts, llm_model_name, embedding_pooling_method) + logger.info(f"Searching for the most similar string in the FAISS index using {embedding_pooling_method} embeddings") if faiss_index is None: - raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {llm_model_name}") + raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {llm_model_name} and pooling method: {embedding_pooling_method}") similarities, indices = faiss_index.search(input_embedding.reshape(1, -1), num_results_before_corpus_filter) # Search for num_results similar strings - results = [] # Create an empty list to store the results for ii in range(num_results_before_corpus_filter): index = indices[0][ii] - if index < len(associated_texts_by_model_for_llm): + if index < len(associated_texts): similarity = float(similarities[0][ii]) # Convert numpy.float32 to native float - most_similar_text = associated_texts_by_model_for_llm[index] + most_similar_text = associated_texts[index] corpus_identifier_string = list_of_corpus_identifier_strings[index] if (corpus_identifier_string == request.corpus_identifier_string) and (most_similar_text != request.query_text) and (len(results) <= num_results): results.append({"search_result_text": most_similar_text, "similarity_to_query_text": similarity}) else: - logger.warning(f"Index {index} out of range for model {llm_model_name}") + logger.warning(f"Index {index} out of range for model {llm_model_name} and pooling method {embedding_pooling_method}") response_time = datetime.utcnow() total_time = (response_time - request_time).total_seconds() logger.info(f"Finished searching for the most similar string in the FAISS index in {total_time:,.2f} seconds. Found {len(results):,} results, returning the top {num_results:,}.") @@ -685,7 +522,7 @@ async def search_stored_embeddings_with_query_string_for_semantic_similarity(req await shared_resources.lock_manager.unlock(lock) else: return {"status": "already processing"} - + @app.post("/advanced_search_stored_embeddings_with_query_string_for_semantic_similarity/", @@ -702,6 +539,7 @@ async def search_stored_embeddings_with_query_string_for_semantic_similarity(req The request must contain the following attributes: - `query_text`: The input text for which to find the most similar string. - `llm_model_name`: The model used to calculate embeddings. +- `embedding_pooling_method`: The method used to pool the embeddings (Choices: 'mean', 'means_mins_maxes', 'means_mins_maxes_stds_kurtoses', 'svd'; default is 'mean'). - `corpus_identifier_string`: An optional string identifier to restrict the search to a specific corpus. - `similarity_filter_percentage`: (Optional) The percentage of embeddings to filter based on cosine similarity, defaults to 0.02 (i.e., top 2%). - `number_of_most_similar_strings_to_return`: (Optional) The number of most similar strings to return after applying the second similarity measure, defaults to 10. @@ -711,6 +549,7 @@ async def search_stored_embeddings_with_query_string_for_semantic_similarity(req { "query_text": "Find me the most similar string!", "llm_model_name": "bge-m3-q8_0", + "embedding_pooling_method": "means", "corpus_identifier_string": "specific_corpus" "similarity_filter_percentage": 0.02, "number_of_most_similar_strings_to_return": 5, @@ -733,44 +572,45 @@ async def search_stored_embeddings_with_query_string_for_semantic_similarity(req ```""", response_description="A JSON object containing the query text and the most similar strings, along with their similarity scores for multiple measures.") async def advanced_search_stored_embeddings_with_query_string_for_semantic_similarity(request: AdvancedSemanticSearchRequest, req: Request, token: str = None) -> AdvancedSemanticSearchResponse: - global faiss_indexes, token_faiss_indexes, associated_texts_by_model + global faiss_indexes, associated_texts_by_model_and_pooling_method request.query_text = prepare_string_for_embedding(request.query_text) - unique_id = f"advanced_semantic_search_{request.query_text}_{request.llm_model_name}_{request.similarity_filter_percentage}_{request.number_of_most_similar_strings_to_return}" + unique_id = f"advanced_semantic_search_{request.query_text}_{request.llm_model_name}_{request.embedding_pooling_method}_{request.similarity_filter_percentage}_{request.number_of_most_similar_strings_to_return}" lock = await shared_resources.lock_manager.lock(unique_id) if lock.valid: try: if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): raise HTTPException(status_code=403, detail="Unauthorized") - faiss_indexes, token_faiss_indexes, associated_texts_by_model, associated_token_level_embeddings_by_model = await build_faiss_indexes(force_rebuild=True) + faiss_indexes, associated_texts_by_model_and_pooling_method = await build_faiss_indexes(force_rebuild=True) request_time = datetime.utcnow() llm_model_name = request.llm_model_name - num_results = max([1, int((1 - request.similarity_filter_percentage) * len(associated_texts_by_model[llm_model_name]))]) + embedding_pooling_method = request.embedding_pooling_method num_results_before_corpus_filter = request.number_of_most_similar_strings_to_return * 100 - num_results_before_corpus_filter = min(num_results_before_corpus_filter, len(associated_texts_by_model[llm_model_name])) - logger.info(f"Received request to find {num_results} most similar strings for query text: `{request.query_text}` using model: {llm_model_name}") + logger.info(f"Received request to find most similar strings for query text: `{request.query_text}` using model: {llm_model_name}") try: logger.info(f"Computing embedding for input text: {request.query_text}") - embedding_request = EmbeddingRequest(text=request.query_text, llm_model_name=llm_model_name) + embedding_request = EmbeddingRequest(text=request.query_text, llm_model_name=llm_model_name, embedding_pooling_method=embedding_pooling_method) embedding_response = await get_or_compute_embedding(embedding_request, req) input_embedding = np.array(embedding_response["embedding"]).astype('float32').reshape(1, -1) faiss.normalize_L2(input_embedding) logger.info(f"Computed embedding for input text: {request.query_text}") - faiss_index = faiss_indexes.get(llm_model_name) + final_results = [] + faiss_index = faiss_indexes[(llm_model_name, embedding_pooling_method)] if faiss_index is None: - raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {llm_model_name}") + raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {llm_model_name} and pooling method: {embedding_pooling_method}") + num_results = max([1, int((1 - request.similarity_filter_percentage) * len(associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method]))]) + num_results_before_corpus_filter = min(num_results_before_corpus_filter, len(associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method])) similarities, indices = faiss_index.search(input_embedding, num_results_before_corpus_filter) filtered_indices = indices[0] similarity_results = [] - associated_texts_by_model_for_llm = associated_texts_by_model[llm_model_name] - list_of_corpus_identifier_strings = await get_list_of_corpus_identifiers_from_list_of_embedding_texts(associated_texts_by_model_for_llm, llm_model_name) + associated_texts = associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method] + list_of_corpus_identifier_strings = await get_list_of_corpus_identifiers_from_list_of_embedding_texts(associated_texts, llm_model_name, embedding_pooling_method) for idx in filtered_indices: if list_of_corpus_identifier_strings[idx] == request.corpus_identifier_string: - associated_text = associated_texts_by_model_for_llm[idx] + associated_text = associated_texts[idx] similarity_results.append((similarities[0][idx], associated_text)) similarity_results = sorted(similarity_results, key=lambda x: x[0], reverse=True)[:num_results] - final_results = [] for _, associated_text in similarity_results: - embedding_request = EmbeddingRequest(text=associated_text, llm_model_name=llm_model_name) + embedding_request = EmbeddingRequest(text=associated_text, llm_model_name=llm_model_name, embedding_pooling_method=embedding_pooling_method) embedding_response = await get_or_compute_embedding(request=embedding_request, req=req, use_verbose=False) filtered_embedding = np.array(embedding_response["embedding"]) params = { @@ -795,6 +635,8 @@ async def advanced_search_stored_embeddings_with_query_string_for_semantic_simil raise HTTPException(status_code=500, detail="Internal Server Error") finally: await shared_resources.lock_manager.unlock(lock) + else: + return {"status": "already processing"} @app.post("/get_all_embedding_vectors_for_document/", @@ -807,8 +649,8 @@ async def advanced_search_stored_embeddings_with_query_string_for_semantic_simil - `hash`: SHA3-256 hash of the document file to verify integrity. - `size`: Size of the document file in bytes to verify completeness. - `llm_model_name`: The model used to calculate embeddings (optional). +- `embedding_pooling_method`: The method used to pool the embeddings (Choices: 'mean', 'means_mins_maxes', 'means_mins_maxes_stds_kurtoses', 'svd'; default is 'mean'). - `corpus_identifier_string`: An optional string identifier for grouping documents into a specific corpus. -- `use_token_level_embeddings`: Whether to use token-level embeddings (optional, defaults to 0). - `json_format`: The format of the JSON response (optional, see details below). - `send_back_json_or_zip_file`: Whether to return a JSON file or a ZIP file containing the embeddings file (optional, defaults to `zip`). - `token`: Security token (optional). @@ -834,8 +676,8 @@ async def get_all_embedding_vectors_for_document( hash: str = Form(None), size: int = Form(None), llm_model_name: str = "bge-m3-q8_0", + embedding_pooling_method: str = "means", corpus_identifier_string: str = "", - use_token_level_embeddings: int = 0, json_format: str = 'records', token: str = None, send_back_json_or_zip_file: str = 'zip', @@ -874,7 +716,7 @@ async def get_all_embedding_vectors_for_document( logger.info(f"SHA3-256 hash of submitted file: {file_hash}") if corpus_identifier_string == "": corpus_identifier_string = file_hash - unique_id = f"document_embedding_{file_hash}_{llm_model_name}" + unique_id = f"document_embedding_{file_hash}_{llm_model_name}_{embedding_pooling_method}" # Exponential backoff with jitter for acquiring lock max_retries = 5 for attempt in range(max_retries): @@ -890,27 +732,26 @@ async def get_all_embedding_vectors_for_document( raise HTTPException(status_code=503, detail="Service temporarily unavailable. Please try again later.") try: async with AsyncSessionLocal() as session: - if use_token_level_embeddings: - result = await session.execute(select(DocumentTokenLevelEmbedding).filter(DocumentTokenLevelEmbedding.file_hash == file_hash, DocumentTokenLevelEmbedding.llm_model_name == llm_model_name)) - else: - result = await session.execute(select(DocumentEmbedding).filter(DocumentEmbedding.file_hash == file_hash, DocumentEmbedding.llm_model_name == llm_model_name)) + result = await session.execute(select(DocumentEmbedding).filter(DocumentEmbedding.file_hash == file_hash, DocumentEmbedding.llm_model_name == llm_model_name, DocumentEmbedding.embedding_pooling_method == embedding_pooling_method)) existing_document_embedding = result.scalar_one_or_none() if existing_document_embedding: logger.info("Document has been processed before, returning existing result") sentences = existing_document_embedding.sentences - json_content = json.dumps(existing_document_embedding.document_embedding_results_json).encode() + document_embedding_results_json_compressed_binary = existing_document_embedding.document_embedding_results_json_compressed_binary + document_embedding_results_json = decompress_data(document_embedding_results_json_compressed_binary) + json_content = json.dumps(document_embedding_results_json).encode() if len(json_content) == 0: raise HTTPException(status_code=400, detail="Could not retrieve document embedding results.") + existing_document = 1 else: + existing_document = 0 with open(temp_file_path, 'rb') as f: input_data_binary = f.read() result = magika.identify_bytes(input_data_binary) mime_type = result.output.mime_type - if use_token_level_embeddings: - logger.info(f"Received request to extract token-level embeddings for document with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path):,} bytes from IP address: {client_ip}") - else: - logger.info(f"Received request to extract regular embeddings for document with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path):,} bytes from IP address: {client_ip}") - sentences = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type) + sentences, thousands_of_input_words = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type) + first_10_words_of_input_text = ' '.join(' '.join(sentences).split()[:10]) + logger.info(f"Received request to extract embeddings for document with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path):,} bytes from IP address: {client_ip}; First 10 words of the document: '{first_10_words_of_input_text}...'") input_data = { "sentences": sentences, "file_size_mb": os.path.getsize(temp_file_path) / (1024 * 1024), @@ -918,32 +759,19 @@ async def get_all_embedding_vectors_for_document( } context = start_resource_monitoring("get_all_embedding_vectors_for_document", input_data, client_ip) try: - if use_token_level_embeddings: - results = await compute_token_level_embeddings_for_document(sentences, llm_model_name, corpus_identifier_string, client_ip, file_hash) - logger.info(f"Done getting all token-level embeddings for document containing {len(sentences):,} sentences with model {llm_model_name} and corpus {corpus_identifier_string}") - else: - results = await compute_embeddings_for_document(sentences, llm_model_name, corpus_identifier_string, client_ip, file_hash) - logger.info(f"Done getting all regular embeddings for document containing {len(sentences):,} sentences with model {llm_model_name} and corpus {corpus_identifier_string}") + json_content = await compute_embeddings_for_document(strings=sentences, llm_model_name=llm_model_name, embedding_pooling_method=embedding_pooling_method, corpus_identifier_string=corpus_identifier_string, client_ip=client_ip, document_file_hash=file_hash, file=file, original_file_content=input_data_binary, json_format=json_format) + logger.info(f"Done getting all regular embeddings for document containing {len(sentences):,} sentences with model {llm_model_name} and embedding pooling method {embedding_pooling_method} and corpus {corpus_identifier_string}") except Exception as e: logger.error(f"Error while computing embeddings for document: {e}") traceback.print_exc() raise HTTPException(status_code=400, detail="Error while computing embeddings for document") finally: end_resource_monitoring(context) - df = pd.DataFrame(results, columns=['text', 'embedding']) - json_content = df.to_json(orient=json_format or 'records').encode() - with open(temp_file_path, 'rb') as file_buffer: - original_file_content = file_buffer.read() - json_content_length = len(json_content) - if json_content_length > 0: - if use_token_level_embeddings: - await store_document_token_level_embeddings_in_db(file, file_hash, original_file_content, sentences, json_content, results, llm_model_name, corpus_identifier_string, client_ip, request_time) - else: - await store_document_embeddings_in_db(file, file_hash, original_file_content, sentences, json_content, results, llm_model_name, corpus_identifier_string, client_ip, request_time) overall_total_time = (datetime.utcnow() - request_time).total_seconds() json_content_length = len(json_content) if json_content_length > 0: - logger.info(f"The response took {overall_total_time:,.2f} seconds to generate, or {overall_total_time / (len(sentences) / 1000.0):,.2f} seconds per thousand input tokens and {overall_total_time / (float(json_content_length) / 1000000.0):,.2f} seconds per million output characters.") + if not existing_document: + logger.info(f"The response took {overall_total_time:,.2f} seconds to generate, or {float(overall_total_time / (thousands_of_input_words)):,.2f} seconds per thousand input tokens and {overall_total_time / (float(json_content_length) / 1000000.0):,.2f} seconds per million output characters.") if send_back_json_or_zip_file == 'json': logger.info(f"Returning JSON response for document containing {len(sentences):,} sentences with model {llm_model_name}; first 100 characters out of {json_content_length:,} total of JSON response: {json_content[:100]}" if 'sentences' in locals() else f"Returning JSON response; first 100 characters out of {json_content_length:,} total of JSON response: {json_content[:100]}") return JSONResponse(content=json.loads(json_content.decode())) diff --git a/tests/swiss_army_llama/test_build_faiss_indexes.py b/tests/swiss_army_llama/test_build_faiss_indexes.py index 253de45..bf7556b 100644 --- a/tests/swiss_army_llama/test_build_faiss_indexes.py +++ b/tests/swiss_army_llama/test_build_faiss_indexes.py @@ -7,32 +7,27 @@ @pytest.mark.asyncio async def test_build_faiss_indexes(monkeypatch): - # Mocking data returned from the database for regular and token-level embeddings + # Mocking data returned from the database for embeddings mock_embedding_data = [("model1", "text1", json.dumps([1.0, 1.0])), ("model1", "text2", json.dumps([1.0, 1.0]))] - mock_token_embedding_data = [("model1", "token1", json.dumps([1.0, 1.0])), ("model1", "token2", json.dumps([1.0, 1.0]))] # Mocking SQLAlchemy execute method to return our mock data async def mock_execute(*args, **kwargs): if "SELECT llm_model_name, text, embedding_json FROM embeddings" in args[0]: return AsyncMock(fetchall=AsyncMock(return_value=mock_embedding_data))() - if "SELECT llm_model_name, token, token_level_embedding_json FROM token_level_embeddings" in args[0]: - return AsyncMock(fetchall=AsyncMock(return_value=mock_token_embedding_data))() - + # Mocking the database session monkeypatch.setattr(AsyncSessionLocal, "execute", mock_execute) # Run the function to test - faiss_indexes, token_faiss_indexes, associated_texts_by_model = await build_faiss_indexes() + faiss_indexes, associated_texts_by_model_and_pooling_method = await build_faiss_indexes() # Verify that FAISS indexes have been built for the mock data assert "model1" in faiss_indexes - assert "model1" in token_faiss_indexes # Verify that associated texts have been correctly identified - assert associated_texts_by_model["model1"] == ["text1", "text2"] + assert associated_texts_by_model_and_pooling_method["model1"] == ["text1", "text2"] # Verify that the FAISS index is valid embedding_array = np.array([[1.0, 1.0], [1.0, 1.0]]).astype('float32') faiss.normalize_L2(embedding_array) assert faiss_indexes["model1"].ntotal == len(embedding_array) - assert token_faiss_indexes["model1"].ntotal == len(embedding_array) diff --git a/tests/swiss_army_llama/test_database_operations.py b/tests/swiss_army_llama/test_database_operations.py index 78e4d9c..b2c5d06 100644 --- a/tests/swiss_army_llama/test_database_operations.py +++ b/tests/swiss_army_llama/test_database_operations.py @@ -3,7 +3,7 @@ from datetime import datetime from sqlalchemy import select from swiss_army_llama import DatabaseWriter, execute_with_retry, engine, AsyncSessionLocal -from embeddings_data_models import Base, TextEmbedding, DocumentEmbedding, Document, TokenLevelEmbedding, TokenLevelEmbeddingBundle, TokenLevelEmbeddingBundleCombinedFeatureVector, AudioTranscript +from embeddings_data_models import Base, TextEmbedding, DocumentEmbedding, Document, AudioTranscript from sqlalchemy.exc import OperationalError @pytest.fixture(scope='module') @@ -71,58 +71,6 @@ async def test_enqueue_document_write(db_writer): result = await execute_with_retry(session, select(Document).where(Document.document_hash == "doc_hash"), OperationalError) assert result.scalar_one().document_hash == "doc_hash" -@pytest.mark.asyncio -@pytest.mark.usefixtures("setup_db") -async def test_enqueue_token_level_embedding_write(db_writer): - async with AsyncSessionLocal() as session: - token_embedding = TokenLevelEmbedding( - token="token", - llm_model_name="model", - token_level_embedding_json="{}", - ip_address="127.0.0.1", - request_time=datetime.now(), - response_time=datetime.now(), - total_time=1.0, - token_level_embedding_bundle_id=1 - ) - await db_writer.enqueue_write([token_embedding]) - await db_writer.dedicated_db_writer() - result = await execute_with_retry(session, select(TokenLevelEmbedding).where(TokenLevelEmbedding.token == "token"), OperationalError) - assert result.scalar_one().token == "token" - -@pytest.mark.asyncio -@pytest.mark.usefixtures("setup_db") -async def test_enqueue_token_level_embedding_bundle_write(db_writer): - async with AsyncSessionLocal() as session: - token_bundle = TokenLevelEmbeddingBundle( - input_text="input", - llm_model_name="model", - token_level_embeddings_bundle_json="{}", - ip_address="127.0.0.1", - request_time=datetime.now(), - response_time=datetime.now(), - total_time=1.0 - ) - await db_writer.enqueue_write([token_bundle]) - await db_writer.dedicated_db_writer() - result = await execute_with_retry(session, select(TokenLevelEmbeddingBundle).where(TokenLevelEmbeddingBundle.input_text == "input"), OperationalError) - assert result.scalar_one().input_text == "input" - -@pytest.mark.asyncio -@pytest.mark.usefixtures("setup_db") -async def test_enqueue_token_level_embedding_bundle_combined_feature_vector_write(db_writer): - async with AsyncSessionLocal() as session: - feature_vector = TokenLevelEmbeddingBundleCombinedFeatureVector( - token_level_embedding_bundle_id=1, - llm_model_name="model", - combined_feature_vector_json="{}", - combined_feature_vector_hash="hash" - ) - await db_writer.enqueue_write([feature_vector]) - await db_writer.dedicated_db_writer() - result = await execute_with_retry(session, select(TokenLevelEmbeddingBundleCombinedFeatureVector).where(TokenLevelEmbeddingBundleCombinedFeatureVector.combined_feature_vector_hash == "hash"), OperationalError) - assert result.scalar_one().combined_feature_vector_hash == "hash" - @pytest.mark.asyncio @pytest.mark.usefixtures("setup_db") async def test_enqueue_audio_transcript_write(db_writer):