diff --git a/.env b/.env index 943cf72..c53f9ae 100644 --- a/.env +++ b/.env @@ -1,8 +1,8 @@ USE_SECURITY_TOKEN=1 USE_PARALLEL_INFERENCE_QUEUE=1 -MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS=8 +MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS=10 DEFAULT_MODEL_NAME=Meta-Llama-3-8B-Instruct.Q3_K_S -LLM_CONTEXT_SIZE_IN_TOKENS=4096 +LLM_CONTEXT_SIZE_IN_TOKENS=2048 TEXT_COMPLETION_CONTEXT_SIZE_IN_TOKENS=32000 DEFAULT_MAX_COMPLETION_TOKENS=1000 DEFAULT_NUMBER_OF_COMPLETIONS_TO_GENERATE =1 diff --git a/README.md b/README.md index fafc2b8..8ddca6e 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ Watch the the automated setup process in action [here](https://asciinema.org/a/6 ## Features -1. **Text Embedding Computation**: Utilizes pre-trained LLama2 and other LLMs via llama_cpp and langchain to generate embeddings for any provided text, including token-level embeddings that capture more nuanced information about the content. +1. **Text Embedding Computation**: Utilizes pre-trained LLama2 and other LLMs via llama_cpp to generate embeddings for any provided text, including token-level embeddings that capture more nuanced information about the content. 2. **Embedding Caching**: Efficiently stores and retrieves computed embeddings in SQLite, minimizing redundant computations. It supports caching both fixed-sized embedding vectors and token-level embeddings. 3. **Advanced Similarity Measurements and Retrieval**: Utilizes the author's own `fast_vector_similarity` library written in Rust to offer highly optimized advanced similarity measures such as `spearman_rho`, `kendall_tau`, `approximate_distance_correlation`, `jensen_shannon_similarity`, and `hoeffding_d`. Semantic search across cached embeddings is also supported using FAISS vector searching. 4. **Two-Step Advanced Semantic Search**: The API first leverages FAISS and cosine similarity for rapid filtering, and then applies additional similarity measures like `spearman_rho`, `kendall_tau`, `approximate_distance_correlation`, `jensen_shannon_similarity`, and `hoeffding_d` for a more nuanced comparison. @@ -108,9 +108,9 @@ fastapi faster-whisper filelock httpx -langchain -langchain-community llama-cpp-python +magika +mutagen nvgpu pandas psutil @@ -120,6 +120,8 @@ pytest python-decouple python-multipart pytz +redis +ruff sqlalchemy textract-py3 uvicorn diff --git a/dump.rdb b/dump.rdb new file mode 100644 index 0000000..743484a Binary files /dev/null and b/dump.rdb differ diff --git a/embeddings_data_models.py b/embeddings_data_models.py index 8e01c88..90d1742 100644 --- a/embeddings_data_models.py +++ b/embeddings_data_models.py @@ -26,14 +26,14 @@ class TextEmbedding(Base): response_time = Column(DateTime) total_time = Column(Float) document_file_hash = Column(String, ForeignKey('document_embeddings.file_hash')) - document = relationship("DocumentEmbedding", back_populates="embeddings") + corpus_identifier_string = Column(String, index=True) + document = relationship("DocumentEmbedding", back_populates="embeddings", foreign_keys=[document_file_hash, corpus_identifier_string]) __table_args__ = (UniqueConstraint('text_hash', 'llm_model_name', name='_text_hash_model_uc'),) @validates('text') def update_text_hash(self, key, text): self.text_hash = sha3_256(text.encode('utf-8')).hexdigest() return text - - + class DocumentEmbedding(Base): __tablename__ = "document_embeddings" id = Column(Integer, primary_key=True, index=True) @@ -41,25 +41,26 @@ class DocumentEmbedding(Base): filename = Column(String) mimetype = Column(String) file_hash = Column(String, index=True) - llm_model_name = Column(String, index=True) - file_data = Column(LargeBinary) # To store the original file - document_embedding_results_json = Column(JSON) # To store the embedding results JSON + corpus_identifier_string = Column(String, index=True) + llm_model_name = Column(String, index=True) + file_data = Column(LargeBinary) # To store the original file + document_embedding_results_json = Column(JSON) # To store the embedding results JSON ip_address = Column(String) request_time = Column(DateTime) response_time = Column(DateTime) - total_time = Column(Float) - document = relationship("Document", back_populates="document_embeddings") - embeddings = relationship("TextEmbedding", back_populates="document") + total_time = Column(Float) + document = relationship("Document", back_populates="document_embeddings", foreign_keys=[document_hash]) + embeddings = relationship("TextEmbedding", back_populates="document", foreign_keys=[TextEmbedding.document_file_hash]) __table_args__ = (UniqueConstraint('file_hash', 'llm_model_name', name='_file_hash_model_uc'),) - class Document(Base): __tablename__ = "documents" id = Column(Integer, primary_key=True, index=True) llm_model_name = Column(String, index=True) document_hash = Column(String, index=True) - document_embeddings = relationship("DocumentEmbedding", back_populates="document") - def update_hash(self): # Concatenate specific attributes from the document_embeddings relationship + document_embeddings = relationship("DocumentEmbedding", back_populates="document", foreign_keys=[DocumentEmbedding.document_hash]) + corpus_identifier_string = Column(String, index=True) + def update_hash(self): # Concatenate specific attributes from the document_embeddings relationship hash_data = "".join([emb.filename + emb.mimetype for emb in self.document_embeddings]) self.document_hash = sha3_256(hash_data.encode('utf-8')).hexdigest() @@ -70,7 +71,6 @@ def update_document_hash_on_append(target, value, initiator): @event.listens_for(Document.document_embeddings, 'remove') def update_document_hash_on_remove(target, value, initiator): target.update_hash() - class TokenLevelEmbedding(Base): __tablename__ = "token_level_embeddings" id = Column(Integer, primary_key=True, index=True) @@ -148,6 +148,7 @@ class SemanticSearchRequest(BaseModel): query_text: str number_of_most_similar_strings_to_return: Optional[int] = 10 llm_model_name: Optional[str] = DEFAULT_MODEL_NAME + corpus_identifier_string: str class SemanticSearchResponse(BaseModel): query_text: str @@ -156,6 +157,7 @@ class SemanticSearchResponse(BaseModel): class AdvancedSemanticSearchRequest(BaseModel): query_text: str llm_model_name: str = DEFAULT_MODEL_NAME + corpus_identifier_string: str similarity_filter_percentage: float = 0.98 number_of_most_similar_strings_to_return: Optional[int] = None diff --git a/environment.yml b/environment.yml index 67b766d..079acc0 100644 --- a/environment.yml +++ b/environment.yml @@ -13,8 +13,6 @@ dependencies: - faster-whisper - filelock - httpx - - langchain - - langchain-community - llama-cpp-python - magika - mutagen @@ -27,6 +25,8 @@ dependencies: - python-decouple - python-multipart - pytz + - redis + - ruff - sqlalchemy - textract-py3 - uvicorn diff --git a/misc_utility_functions.py b/misc_utility_functions.py index 6086d5e..35bdc26 100644 --- a/misc_utility_functions.py +++ b/misc_utility_functions.py @@ -6,10 +6,12 @@ import json import io import redis -import subprocess import sys +import threading import numpy as np +import pandas as pd import faiss +from io import StringIO from typing import Any from collections import defaultdict from sqlalchemy import text as sql_text @@ -63,23 +65,68 @@ def is_redis_running(host='localhost', port=6379): def start_redis_server(): try: - # Attempt to start Redis server using the redis-server command - subprocess.run(["redis-server"], check=True) - print("Redis server started successfully.") - except subprocess.CalledProcessError as e: + result = os.system("sudo service redis-server start") + if result == 0: + print("Redis server started successfully.") + else: + logger.error(f"Failed to start Redis server, return code: {result}") + raise Exception("Failed to start Redis server.") + except Exception as e: logger.error(f"Failed to start Redis server: {e}") raise def restart_redis_server(): try: - # Attempt to restart Redis server using the redis-cli shutdown command - subprocess.run(["redis-cli", "shutdown"], check=True) - subprocess.run(["redis-server"], check=True) - print("Redis server restarted successfully.") - except subprocess.CalledProcessError as e: + result = os.system("sudo service redis-server stop") + if result != 0: + logger.warning(f"Failed to stop Redis server, it might not be running. Return code: {result}") + result = os.system("sudo service redis-server start") + if result == 0: + print("Redis server started successfully.") + else: + logger.error(f"Failed to start Redis server, return code: {result}") + raise Exception("Failed to start Redis server.") + except Exception as e: logger.error(f"Failed to restart Redis server: {e}") raise +def configure_redis_optimally(redis_host='localhost', redis_port=6379, maxmemory='1gb'): + configured_file = 'redis_configured.txt' + if os.path.exists(configured_file): + print("Redis has already been configured. Skipping configuration.") + return + if not is_redis_running(redis_host, redis_port): + start_redis_server() + r = redis.StrictRedis(host=redis_host, port=redis_port, decode_responses=True) + output = [] + def set_config(key, value): + try: + response = r.config_set(key, value) + msg = f"Successfully set {key} to {value}" if response else f"Failed to set {key} to {value}" + output.append(msg) + print(msg) + except redis.exceptions.ConnectionError as e: + logger.error(f"Failed to set config {key}: {e}") + raise + set_config('maxmemory', maxmemory) + set_config('maxmemory-policy', 'allkeys-lru') + max_clients = os.cpu_count() * 1000 + set_config('maxclients', max_clients) + set_config('timeout', 300) + set_config('save', '900 1 300 10 60 10000') + set_config('appendonly', 'yes') + set_config('appendfsync', 'everysec') + set_config('stop-writes-on-bgsave-error', 'no') + output.append("Redis configuration optimized successfully.") + output.append("Restarting Redis server to apply changes...") + with open(configured_file, 'w') as f: + f.write("\n".join(output)) + print("\n".join(output)) + restart_redis_server() + +def configure_redis_in_background(): + threading.Thread(target=configure_redis_optimally).start() + async def build_faiss_indexes(): global faiss_indexes, token_faiss_indexes, associated_texts_by_model if os.environ.get("FAISS_SETUP_DONE") == "1": @@ -264,30 +311,3 @@ def seek(self, offset: int, whence: int = 0) -> int: return self.file.seek(offset, whence) def tell(self) -> int: return self.file.tell() - -def configure_redis_optimally(redis_host='localhost', redis_port=6379, maxmemory='1gb'): - if not is_redis_running(redis_host, redis_port): - start_redis_server() - r = redis.StrictRedis(host=redis_host, port=redis_port, decode_responses=True) - def set_config(key, value): - response = r.config_set(key, value) - if response: - print(f"Successfully set {key} to {value}") - else: - print(f"Failed to set {key} to {value}") - set_config('maxmemory', maxmemory) - set_config('maxmemory-policy', 'allkeys-lru') - set_config('databases', 16) - max_clients = os.cpu_count() * 1000 - set_config('maxclients', max_clients) - set_config('timeout', 300) - set_config('save', '900 1 300 10 60 10000') - set_config('appendonly', 'yes') - set_config('appendfsync', 'everysec') - set_config('stop-writes-on-bgsave-error', 'no') - print("Redis configuration optimized successfully.") - print("Restarting Redis server to apply changes...") - restart_redis_server() - -def configure_redis_in_background(): - threading.Thread(target=configure_redis_optimally).start() \ No newline at end of file diff --git a/redis_configured.txt b/redis_configured.txt new file mode 100644 index 0000000..7ebe220 --- /dev/null +++ b/redis_configured.txt @@ -0,0 +1,10 @@ +Successfully set maxmemory to 1gb +Successfully set maxmemory-policy to allkeys-lru +Successfully set maxclients to 64000 +Successfully set timeout to 300 +Successfully set save to 900 1 300 10 60 10000 +Successfully set appendonly to yes +Successfully set appendfsync to everysec +Successfully set stop-writes-on-bgsave-error to no +Redis configuration optimized successfully. +Restarting Redis server to apply changes... \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6694be0..89e4a00 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,8 +7,6 @@ fastapi faster-whisper filelock httpx -langchain -langchain-community llama-cpp-python magika mutagen @@ -22,6 +20,7 @@ python-decouple python-multipart pytz redis +ruff sqlalchemy textract-py3 uvicorn diff --git a/resource_monitoring_logs.json b/resource_monitoring_logs.json new file mode 100644 index 0000000..0c196e3 --- /dev/null +++ b/resource_monitoring_logs.json @@ -0,0 +1,78 @@ +{"timestamp": "2024-05-21 20:40:21", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 4, "num_characters": 15}, "memory_used": 770904064, "cpu_used": {"user": 2.6999999999999997, "system": 0.3, "idle": -3.0999999999999943}, "time_taken": 72.86972856521606} +{"timestamp": "2024-05-21 20:45:00", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 4, "num_characters": 15}, "memory_used": 1131089920, "cpu_used": {"user": -1.3, "system": 0.0, "idle": 1.4000000000000057}, "time_taken": 416.1386353969574} +{"timestamp": "2024-05-21 20:53:59", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 4, "num_characters": 15}, "memory_used": 854781952, "cpu_used": {"user": 0.2999999999999998, "system": 0.10000000000000003, "idle": -0.20000000000000284}, "time_taken": 201.09225273132324} +{"timestamp": "2024-05-21 21:01:35", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 4, "num_characters": 15}, "memory_used": 690286592, "cpu_used": {"user": 9.0, "system": 2.0, "idle": -10.899999999999991}, "time_taken": 16.313755750656128} +{"timestamp": "2024-05-21 21:03:11", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 4, "num_characters": 15}, "memory_used": 1383215104, "cpu_used": {"user": 8.1, "system": 0.2999999999999998, "idle": -8.300000000000011}, "time_taken": 16.91592812538147} +{"timestamp": "2024-05-21 21:12:54", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 0, "cpu_used": {"user": 0.7999999999999998, "system": -0.2, "idle": -59.400000000000006}, "time_taken": 0.0049707889556884766} +{"timestamp": "2024-05-21 21:16:16", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 0, "cpu_used": {"user": 3.1, "system": -0.2, "idle": -62.900000000000006}, "time_taken": 0.004973411560058594} +{"timestamp": "2024-05-21 21:18:21", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 252043264, "cpu_used": {"user": 0.6, "system": 0.1, "idle": -0.6000000000000085}, "time_taken": 149.48805594444275} +{"timestamp": "2024-05-21 21:25:24", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 640163840, "cpu_used": {"user": -2.1, "system": -0.09999999999999998, "idle": 2.299999999999997}, "time_taken": 61.50355911254883} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 23, "num_characters": 149}, "memory_used": 324644864, "cpu_used": {"user": 0.0, "system": -0.6, "idle": 48.599999999999994}, "time_taken": 115.65987038612366} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 43, "num_characters": 234}, "memory_used": 185544704, "cpu_used": {"user": 3.4, "system": 0.4, "idle": 61.2}, "time_taken": 153.1902883052826} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 22, "num_characters": 147}, "memory_used": 176959488, "cpu_used": {"user": 4.1, "system": 0.4, "idle": 77.4}, "time_taken": 190.95956873893738} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 30, "num_characters": 203}, "memory_used": 297877504, "cpu_used": {"user": 0.3999999999999999, "system": -0.8, "idle": 55.3}, "time_taken": 450.28634786605835} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 79}, "memory_used": 299630592, "cpu_used": {"user": 90.4, "system": 5.8, "idle": -15.2}, "time_taken": 451.2564778327942} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 26, "num_characters": 130}, "memory_used": -795389952, "cpu_used": {"user": 87.7, "system": 8.3, "idle": -18.0}, "time_taken": 453.63061475753784} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 27, "num_characters": 156}, "memory_used": -798306304, "cpu_used": {"user": 90.6, "system": 8.0, "idle": -22.7}, "time_taken": 455.15462923049927} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 16, "num_characters": 96}, "memory_used": -800391168, "cpu_used": {"user": 85.2, "system": 8.9, "idle": -50.2}, "time_taken": 456.1361782550812} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 86}, "memory_used": -896925696, "cpu_used": {"user": 94.0, "system": 4.0, "idle": -21.1}, "time_taken": 457.29425382614136} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 15, "num_characters": 100}, "memory_used": -900067328, "cpu_used": {"user": 85.6, "system": 6.3, "idle": -22.9}, "time_taken": 458.89341711997986} +{"timestamp": "2024-05-21 21:45:17", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": -903421952, "cpu_used": {"user": -1.2000000000000002, "system": -0.2, "idle": -1.5}, "time_taken": 458.9596793651581} +{"timestamp": "2024-05-21 21:52:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 10, "num_characters": 76}, "memory_used": -104677376, "cpu_used": {"user": -2.0, "system": 0.0, "idle": -59.0}, "time_taken": 5.288030624389648} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 22, "num_characters": 147}, "memory_used": 129810432, "cpu_used": {"user": 25.8, "system": 2.6, "idle": 51.599999999999994}, "time_taken": 51.36289119720459} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 23, "num_characters": 149}, "memory_used": 129810432, "cpu_used": {"user": 1.0, "system": 0.0, "idle": 9.0}, "time_taken": 51.38051795959473} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 43, "num_characters": 234}, "memory_used": 129810432, "cpu_used": {"user": -2.0, "system": 0.0, "idle": 32.0}, "time_taken": 51.39635634422302} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 30, "num_characters": 203}, "memory_used": 129810432, "cpu_used": {"user": -2.0, "system": -1.0, "idle": -62.0}, "time_taken": 51.38391876220703} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 79}, "memory_used": 129810432, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -9.0}, "time_taken": 51.38269114494324} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 26, "num_characters": 130}, "memory_used": 129552384, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -14.0}, "time_taken": 51.38032341003418} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 16, "num_characters": 96}, "memory_used": 128786432, "cpu_used": {"user": -2.0, "system": 0.0, "idle": -18.0}, "time_taken": 51.37873291969299} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 86}, "memory_used": 128786432, "cpu_used": {"user": 2.0, "system": 0.0, "idle": 18.0}, "time_taken": 51.381343603134155} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 15, "num_characters": 100}, "memory_used": 128786432, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -9.0}, "time_taken": 51.37587761878967} +{"timestamp": "2024-05-21 21:53:32", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 27, "num_characters": 156}, "memory_used": 128786432, "cpu_used": {"user": -3.0, "system": 0.0, "idle": -19.0}, "time_taken": 51.38053917884827} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 23, "num_characters": 149}, "memory_used": 181100544, "cpu_used": {"user": 72.3, "system": 7.800000000000001, "idle": -30.2}, "time_taken": 3.150285482406616} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 22, "num_characters": 147}, "memory_used": 181100544, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -2.0}, "time_taken": 3.145923614501953} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 43, "num_characters": 234}, "memory_used": 187994112, "cpu_used": {"user": 88.1, "system": 9.4, "idle": -27.5}, "time_taken": 12.930879354476929} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 16, "num_characters": 96}, "memory_used": 188481536, "cpu_used": {"user": -1.0, "system": 0.0, "idle": 15.0}, "time_taken": 12.913820266723633} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 26, "num_characters": 130}, "memory_used": 187994112, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -8.0}, "time_taken": 12.922861576080322} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 30, "num_characters": 203}, "memory_used": 187736064, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -39.0}, "time_taken": 12.933888912200928} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 79}, "memory_used": 187736064, "cpu_used": {"user": -3.0, "system": 0.0, "idle": -7.0}, "time_taken": 12.932099103927612} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 86}, "memory_used": 188219392, "cpu_used": {"user": 0.0, "system": 0.0, "idle": 20.0}, "time_taken": 12.941673994064331} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 15, "num_characters": 100}, "memory_used": 187703296, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -11.0}, "time_taken": 12.934585332870483} +{"timestamp": "2024-05-21 21:55:44", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 27, "num_characters": 156}, "memory_used": 187703296, "cpu_used": {"user": -1.0, "system": -1.0, "idle": -24.0}, "time_taken": 12.939662218093872} +{"timestamp": "2024-05-21 21:55:47", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 10, "num_characters": 76}, "memory_used": 6627328, "cpu_used": {"user": 0.0, "system": 0.0, "idle": 18.0}, "time_taken": 9.857243061065674} +{"timestamp": "2024-05-21 21:55:47", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 49, "num_characters": 315}, "memory_used": -4202496, "cpu_used": {"user": 92.1, "system": 6.0, "idle": -20.2}, "time_taken": 12.734312534332275} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 43, "num_characters": 234}, "memory_used": 206323712, "cpu_used": {"user": 86.2, "system": 7.2, "idle": -27.5}, "time_taken": 12.834195613861084} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 22, "num_characters": 147}, "memory_used": 206065664, "cpu_used": {"user": -3.0, "system": 0.0, "idle": 15.0}, "time_taken": 12.836692094802856} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 30, "num_characters": 203}, "memory_used": 206065664, "cpu_used": {"user": -1.0, "system": 0.0, "idle": -59.0}, "time_taken": 12.82752275466919} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 23, "num_characters": 149}, "memory_used": 206065664, "cpu_used": {"user": 2.0, "system": 1.0, "idle": -35.0}, "time_taken": 12.85067892074585} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 16, "num_characters": 96}, "memory_used": 205807616, "cpu_used": {"user": 1.0, "system": 1.0, "idle": 23.0}, "time_taken": 12.833847761154175} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 15, "num_characters": 100}, "memory_used": 205807616, "cpu_used": {"user": -1.0, "system": -1.0, "idle": -2.0}, "time_taken": 12.82727336883545} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 26, "num_characters": 130}, "memory_used": 205807616, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -21.0}, "time_taken": 12.848714590072632} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 27, "num_characters": 156}, "memory_used": 205807616, "cpu_used": {"user": 2.0, "system": 0.0, "idle": 7.0}, "time_taken": 12.843171119689941} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 86}, "memory_used": 205807616, "cpu_used": {"user": -2.0, "system": -1.0, "idle": -16.0}, "time_taken": 12.850148439407349} +{"timestamp": "2024-05-21 22:04:26", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 79}, "memory_used": 205807616, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -23.0}, "time_taken": 12.867834329605103} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 24, "num_characters": 128}, "memory_used": -48504832, "cpu_used": {"user": 91.1, "system": 6.2, "idle": -22.4}, "time_taken": 12.285202503204346} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 49, "num_characters": 315}, "memory_used": -48504832, "cpu_used": {"user": 0.0, "system": 0.0, "idle": 18.0}, "time_taken": 12.293155670166016} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 8, "num_characters": 35}, "memory_used": -48504832, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -1.0}, "time_taken": 12.301603555679321} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 10, "num_characters": 76}, "memory_used": -48504832, "cpu_used": {"user": -1.0, "system": -1.0, "idle": -61.0}, "time_taken": 12.316025018692017} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 530, "num_characters": 3238}, "memory_used": -48238592, "cpu_used": {"user": 1.0, "system": 1.0, "idle": 1.0}, "time_taken": 12.28593897819519} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 12, "num_characters": 73}, "memory_used": -48238592, "cpu_used": {"user": -2.0, "system": -0.7, "idle": -85.6}, "time_taken": 12.265953063964844} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 21, "num_characters": 142}, "memory_used": -48496640, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -22.0}, "time_taken": 12.298172950744629} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 26, "num_characters": 136}, "memory_used": -48238592, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -13.0}, "time_taken": 12.265851974487305} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 11, "num_characters": 82}, "memory_used": -48496640, "cpu_used": {"user": -1.0, "system": 0.0, "idle": -63.0}, "time_taken": 12.310485363006592} +{"timestamp": "2024-05-21 22:04:39", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 10, "num_characters": 51}, "memory_used": -48496640, "cpu_used": {"user": -1.0, "system": 0.0, "idle": -14.0}, "time_taken": 12.308598518371582} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 30, "num_characters": 174}, "memory_used": 290541568, "cpu_used": {"user": 89.3, "system": 4.8, "idle": -40.2}, "time_taken": 17.428781747817993} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 18, "num_characters": 123}, "memory_used": 290279424, "cpu_used": {"user": 1.0, "system": -1.0, "idle": -34.0}, "time_taken": 17.441152572631836} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 13, "num_characters": 74}, "memory_used": 290279424, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -8.0}, "time_taken": 17.4292573928833} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 20, "num_characters": 118}, "memory_used": 290533376, "cpu_used": {"user": -3.0, "system": 0.0, "idle": -13.0}, "time_taken": 17.4340341091156} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 24, "num_characters": 105}, "memory_used": 290484224, "cpu_used": {"user": 1.0, "system": 0.0, "idle": 21.0}, "time_taken": 17.406537771224976} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 30, "num_characters": 158}, "memory_used": 290484224, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -8.0}, "time_taken": 17.406208276748657} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 24, "num_characters": 140}, "memory_used": 290484224, "cpu_used": {"user": -3.0, "system": 0.0, "idle": -17.0}, "time_taken": 17.40397620201111} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 11, "num_characters": 66}, "memory_used": 290484224, "cpu_used": {"user": -2.0, "system": 0.0, "idle": 18.0}, "time_taken": 17.429460525512695} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 4, "num_characters": 31}, "memory_used": 290484224, "cpu_used": {"user": -4.0, "system": -1.0, "idle": -71.0}, "time_taken": 17.44107437133789} +{"timestamp": "2024-05-21 22:04:51", "client_ip": "localhost", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 51, "num_characters": 344}, "memory_used": 290484224, "cpu_used": {"user": 0.0, "system": 0.0, "idle": -23.0}, "time_taken": 17.437281370162964} +{"timestamp": "2024-05-21 22:15:07", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 208039936, "cpu_used": {"user": 85.7, "system": 7.2, "idle": -92.8}, "time_taken": 30.396328449249268} +{"timestamp": "2024-05-21 22:24:32", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 253370368, "cpu_used": {"user": 87.10000000000001, "system": 7.8999999999999995, "idle": -95.0}, "time_taken": 26.73915457725525} +{"timestamp": "2024-05-21 22:49:47", "client_ip": "127.0.0.1", "endpoint_name": "get_all_embedding_vectors_for_document", "request_details": {"num_sentences": 85, "total_words": 2440, "total_characters": 14710, "file_size_mb": 0.6132612228393555, "mime_type": "application/pdf"}, "memory_used": 221396992, "cpu_used": {"user": 76.2, "system": 6.0, "idle": -82.1}, "time_taken": 32.07573962211609} +{"timestamp": "2024-05-21 22:52:37", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 3, "num_characters": 13}, "memory_used": 2994176, "cpu_used": {"user": 79.60000000000001, "system": 13.3, "idle": -93.0}, "time_taken": 1.0049951076507568} +{"timestamp": "2024-05-21 22:56:44", "client_ip": "127.0.0.1", "endpoint_name": "get_embedding_vector_for_string", "request_details": {"num_words": 3, "num_characters": 13}, "memory_used": -32768, "cpu_used": {"user": -1.9, "system": 0.7, "idle": 1.3999999999999915}, "time_taken": 0.015372991561889648} diff --git a/service_functions.py b/service_functions.py index 8fb7248..4d55376 100644 --- a/service_functions.py +++ b/service_functions.py @@ -2,12 +2,13 @@ import shared_resources from shared_resources import load_model, token_level_embedding_model_cache, text_completion_model_cache, is_gpu_available from database_functions import AsyncSessionLocal, DatabaseWriter, execute_with_retry -from misc_utility_functions import clean_filename_for_url_func, FakeUploadFile, sophisticated_sentence_splitter, merge_transcript_segments_into_combined_text, suppress_stdout_stderr +from misc_utility_functions import clean_filename_for_url_func, FakeUploadFile, sophisticated_sentence_splitter, merge_transcript_segments_into_combined_text, suppress_stdout_stderr, build_faiss_indexes from embeddings_data_models import TextEmbedding, DocumentEmbedding, Document, TokenLevelEmbedding, TokenLevelEmbeddingBundleCombinedFeatureVector, AudioTranscript from embeddings_data_models import EmbeddingRequest, TextCompletionRequest from embeddings_data_models import TextCompletionResponse, AudioTranscriptResponse import os import re +import unicodedata import shutil import psutil import glob @@ -17,6 +18,7 @@ import tempfile import traceback import time +from io import StringIO from datetime import datetime from hashlib import sha3_256 from urllib.parse import quote @@ -25,6 +27,7 @@ import textract from sqlalchemy import text as sql_text from sqlalchemy import select +from sqlalchemy.orm import joinedload from fastapi import HTTPException, Request, UploadFile, File from fastapi.concurrency import run_in_threadpool from typing import List, Optional, Tuple, Dict, Any @@ -193,10 +196,8 @@ async def get_or_compute_transcript(file: UploadFile, with tempfile.NamedTemporaryFile(delete=False) as tmp_file: shutil.copyfileobj(file.file, tmp_file) audio_file_name = tmp_file.name - if corpus_identifier_string is None: corpus_identifier_string = audio_file_hash - segment_details, info, combined_transcript_text, combined_transcript_text_list_of_metadata_dicts, request_time, response_time, total_time, download_url = await compute_transcript_with_whisper_from_audio_func(audio_file_hash, audio_file_name, file.filename, audio_file_size_mb, ip_address, req, corpus_identifier_string, compute_embeddings_for_resulting_transcript_document, llm_model_name) audio_transcript_response = { "audio_file_hash": audio_file_hash, @@ -258,37 +259,69 @@ async def _get_embedding_from_db(text_hash: str, llm_model_name: str) -> Optiona return json.loads(embedding_json) return None -async def get_or_compute_embedding(request: EmbeddingRequest, req: Request = None, client_ip: str = None, document_file_hash: str = None) -> dict: +async def get_texts_for_corpus_identifier(corpus_identifier: str) -> Dict[str, List[str]]: + async with AsyncSessionLocal() as session: + result = await session.execute( + select(DocumentEmbedding) + .options(joinedload(DocumentEmbedding.embeddings)) + .filter(DocumentEmbedding.corpus_identifier == corpus_identifier) + ) + document_embeddings = result.scalars().all() + texts_by_model = {} + for document_embedding in document_embeddings: + llm_model_name = document_embedding.llm_model_name + texts = [embedding.text for embedding in document_embedding.embeddings] + if llm_model_name in texts_by_model: + texts_by_model[llm_model_name].extend(texts) + else: + texts_by_model[llm_model_name] = texts + return texts_by_model + +async def get_texts_for_model(llm_model_name: str) -> List[str]: + async with AsyncSessionLocal() as session: + result = await session.execute( + select(DocumentEmbedding) + .options(joinedload(DocumentEmbedding.embeddings)) + .filter(DocumentEmbedding.llm_model_name == llm_model_name) + ) + document_embeddings = result.scalars().all() + texts = [] + for document_embedding in document_embeddings: + texts.extend([embedding.text for embedding in document_embedding.embeddings]) + return texts + +async def get_or_compute_embedding(request: EmbeddingRequest, req: Request = None, client_ip: str = None, document_file_hash: str = None, corpus_identifier_string: str = None) -> dict: request_time = datetime.utcnow() # Capture request time as datetime object + prepared_request_text = prepare_string_for_embedding(request.text) ip_address = client_ip or (req.client.host if req else "localhost") # If client_ip is provided, use it; otherwise, try to get from req; if not available, default to "localhost" - logger.info(f"Received request for embedding for '{request.text}' using model '{request.llm_model_name}' from IP address '{ip_address}'") - embedding_list = await get_embedding_from_db(request.text, request.llm_model_name) # Check if embedding exists in the database + logger.info(f"Received request for embedding for '{prepared_request_text}' using model '{request.llm_model_name}' from IP address '{ip_address}'") + embedding_list = await get_embedding_from_db(prepared_request_text, request.llm_model_name) # Check if embedding exists in the database if embedding_list is not None: response_time = datetime.utcnow() # Capture response time as datetime object total_time = (response_time - request_time).total_seconds() # Calculate time taken in seconds - logger.info(f"Embedding found in database for '{request.text}' using model '{request.llm_model_name}'; returning in {total_time:.4f} seconds") + logger.info(f"Embedding found in database for '{prepared_request_text}' using model '{request.llm_model_name}'; returning in {total_time:.4f} seconds") return {"embedding": embedding_list} model = load_model(request.llm_model_name) - embedding_list = calculate_sentence_embedding(model, request.text) # Compute the embedding if not in the database + embedding_list = await calculate_sentence_embedding(model, request.text) # Compute the embedding if not in the database if embedding_list is None: - logger.error(f"Could not calculate the embedding for the given text: '{request.text}' using model '{request.llm_model_name}!'") + logger.error(f"Could not calculate the embedding for the given text: '{prepared_request_text}' using model '{request.llm_model_name}!'") raise HTTPException(status_code=400, detail="Could not calculate the embedding for the given text") - embedding_json = json.dumps(embedding_list) # Serialize the numpy array to JSON and save to the database + embedding_json = json.dumps(embedding_list) # Serialize the list to JSON and save to the database response_time = datetime.utcnow() # Capture response time as datetime object total_time = (response_time - request_time).total_seconds() # Calculate total time using datetime objects word_length_of_input_text = len(request.text.split()) if word_length_of_input_text > 0: - logger.info(f"Embedding calculated for '{request.text}' using model '{request.llm_model_name}' in {total_time} seconds, or an average of {total_time/word_length_of_input_text :.2f} seconds per word. Now saving to database...") - await save_embedding_to_db(request.text, request.llm_model_name, embedding_json, ip_address, request_time, response_time, total_time, document_file_hash) + logger.info(f"Embedding calculated for '{prepared_request_text}' using model '{request.llm_model_name}' in {total_time} seconds, or an average of {total_time/word_length_of_input_text :.2f} seconds per word. Now saving to database...") + await save_embedding_to_db(prepared_request_text, request.llm_model_name, embedding_json, ip_address, request_time, response_time, total_time, document_file_hash) return {"embedding": embedding_list} -async def save_embedding_to_db(text: str, llm_model_name: str, embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime, total_time: float, document_file_hash: str = None): +async def save_embedding_to_db(text: str, llm_model_name: str, embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime, total_time: float, document_file_hash: str = None, corpus_identifier_string: str = None): existing_embedding = await get_embedding_from_db(text, llm_model_name) # Check if the embedding already exists if existing_embedding is not None: return existing_embedding - return await execute_with_retry(_save_embedding_to_db, text, llm_model_name, embedding_json, ip_address, request_time, response_time, total_time, document_file_hash) + return await execute_with_retry(_save_embedding_to_db, text, llm_model_name, embedding_json, ip_address, request_time, response_time, total_time, document_file_hash, corpus_identifier_string) -async def _save_embedding_to_db(text: str, llm_model_name: str, embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime, total_time: float, document_file_hash: str = None): +async def _save_embedding_to_db(text: str, llm_model_name: str, embedding_json: str, ip_address: str, request_time: datetime, response_time: datetime, total_time: float, document_file_hash: str = None, corpus_identifier_string: str = None): existing_embedding = await get_embedding_from_db(text, llm_model_name) if existing_embedding: return existing_embedding @@ -300,7 +333,8 @@ async def _save_embedding_to_db(text: str, llm_model_name: str, embedding_json: request_time=request_time, response_time=response_time, total_time=total_time, - document_file_hash=document_file_hash + document_file_hash=document_file_hash, + corpus_identifier_string=corpus_identifier_string ) await shared_resources.db_writer.enqueue_write([embedding]) # Enqueue the write operation using the db_writer instance @@ -338,9 +372,13 @@ def load_token_level_embedding_model(llm_model_name: str, raise_http_exception: async def compute_token_level_embedding_bundle_combined_feature_vector(token_level_embeddings) -> List[float]: start_time = datetime.utcnow() logger.info("Extracting token-level embeddings from the bundle") - parsed_df = pd.read_json(token_level_embeddings) # Parse the json_content back to a DataFrame - token_level_embeddings = list(parsed_df['embedding']) - embeddings = np.array(token_level_embeddings) # Convert the list of embeddings to a NumPy array + # Ensure token_level_embeddings is a JSON string + if not isinstance(token_level_embeddings, str): + raise ValueError("token_level_embeddings must be a JSON string") + # Wrap the JSON string in StringIO to avoid the FutureWarning + parsed_df = pd.read_json(StringIO(token_level_embeddings)) + token_level_embeddings_list = list(parsed_df['embedding']) + embeddings = np.array(token_level_embeddings_list) # Convert the list of embeddings to a NumPy array logger.info(f"Computing column-wise means/mins/maxes/std_devs of the embeddings... (shape: {embeddings.shape})") assert(len(embeddings) > 0) means = np.mean(embeddings, axis=0) @@ -351,10 +389,9 @@ async def compute_token_level_embedding_bundle_combined_feature_vector(token_lev combined_feature_vector = np.concatenate([means, mins, maxes, stds]) end_time = datetime.utcnow() total_time = (end_time - start_time).total_seconds() - logger.info(f"Computed the token-level embedding bundle's combined feature vector computed in {total_time: .2f} seconds.") + logger.info(f"Computed the token-level embedding bundle's combined feature vector in {total_time:.2f} seconds.") return combined_feature_vector.tolist() - async def get_or_compute_token_level_embedding_bundle_combined_feature_vector(token_level_embedding_bundle_id, token_level_embeddings, db_writer: DatabaseWriter) -> List[float]: request_time = datetime.utcnow() request_time = datetime.utcnow() @@ -377,10 +414,9 @@ async def get_or_compute_token_level_embedding_bundle_combined_feature_vector(to combined_feature_vector_json=json.dumps(combined_feature_vector) # Convert the list to a JSON string ) logger.info(f"Writing combined feature vector for database write for token-level embedding bundle ID: {token_level_embedding_bundle_id} to the database...") - await db_writer.enqueue_write([combined_feature_vector_db_object]) + await shared_resources.db_writer.enqueue_write([combined_feature_vector_db_object]) return combined_feature_vector - async def calculate_token_level_embeddings(text: str, llm_model_name: str, client_ip: str, token_level_embedding_bundle_id: int) -> List[np.array]: request_time = datetime.utcnow() logger.info(f"Starting token-level embedding calculation for text: '{text}' using model: '{llm_model_name}'") @@ -436,14 +472,68 @@ async def store_token_level_embeddings_in_db(token: str, llm_model_name: str, to ) await shared_resources.db_writer.enqueue_write([embedding]) # Enqueue the write operation for the token-level embedding -def calculate_sentence_embedding(llama: Llama, text: str) -> np.array: - sentence_embedding = None +def extract_embeddings(input_data): + embeddings = [] + for item in input_data['data']: + if isinstance(item['embedding'][0], list): # Check if the first element is a list + for embedding_list in item['embedding']: + embeddings.extend(embedding_list) + else: # Single list of floats + embeddings.extend(item['embedding']) + return embeddings + +def extract_embeddings_list(input_data): + embeddings_list = [] + for item in input_data['data']: + if isinstance(item['embedding'][0], list): # Check if the first element is a list + embeddings = [] + for embedding_list in item['embedding']: + embeddings.extend(embedding_list) + embeddings_list.append(embeddings) + else: # Single list of floats + embeddings_list.append(item['embedding']) + return embeddings_list + +def prepare_string_for_embedding(text: str) -> str: + # Normalize Unicode characters to NFKC form + text = unicodedata.normalize('NFKC', text) + # Define all possible newline and carriage return characters + newline_chars = [ + '\r', '\n', '\r\n', '\u2028', '\u2029', '\v', '\f', + '\x85', '\u000A', '\u000B', '\u000C', '\u000D', '\u0085', + '\u000D\u000A' + ] + # Replace all newline characters with a space + for nl in newline_chars: + text = text.replace(nl, ' ') + # Replace any sequence of whitespace characters (including non-breaking spaces) with a single space + text = re.sub(r'\s+', ' ', text) + # Remove leading and trailing whitespace + text = text.strip() + # Remove leading comma followed by whitespace if present + if text.startswith(', '): + text = text[2:].strip() + # Remove all control characters and non-printable characters + text = ''.join(ch for ch in text if unicodedata.category(ch)[0] != 'C') + # Ensure text is ASCII-encoded to catch any remaining unusual characters + text = text.encode('ascii', 'ignore').decode('ascii') + # Truncate to a maximum length of 2000 characters + if len(text) > 2000: + text = text[:2000] + # Eliminate all blank lines + text = ' '.join(line for line in text.splitlines() if line.strip() != '') + return text + +async def calculate_sentence_embedding(llama, text: str) -> np.array: + sentence_embedding_vector = None retry_count = 0 - while sentence_embedding is None and retry_count < 3: + while sentence_embedding_vector is None and retry_count < 3: try: if retry_count > 0: - logger.info(f"Attempting again calculate sentence embedding. Attempt number {retry_count + 1}") - sentence_embedding = llama.embed_query(text) + logger.info(f"Attempting again to calculate sentence embedding. Attempt number {retry_count + 1}") + prepared_text = prepare_string_for_embedding(text) + sentence_embedding_object = llama.create_embedding(prepared_text) + sentence_embedding_vector = extract_embeddings(sentence_embedding_object) except TypeError as e: logger.error(f"TypeError in calculate_sentence_embedding: {e}") raise @@ -452,11 +542,87 @@ def calculate_sentence_embedding(llama: Llama, text: str) -> np.array: text = text[:-int(len(text) * 0.1)] retry_count += 1 logger.info(f"Trimming sentence due to too many tokens. New length: {len(text)}") - if sentence_embedding is None: + if sentence_embedding_vector is None: logger.error("Failed to calculate sentence embedding after multiple attempts") - return sentence_embedding + return sentence_embedding_vector -async def compute_embeddings_for_document(strings: list, llm_model_name: str, client_ip: str, document_file_hash: str) -> List[Tuple[str, np.array]]: +async def calculate_sentence_embeddings_list(llama, texts: list) -> list: + retry_count = 0 + sentence_embeddings_vectors = None + while sentence_embeddings_vectors is None and retry_count < 3: + try: + if retry_count > 0: + logger.info(f"Attempting again to calculate sentence embeddings. Attempt number {retry_count + 1}") + prepared_texts = [prepare_string_for_embedding(text) for text in texts] + sentence_embeddings_object = llama.create_embedding(prepared_texts) + sentence_embeddings_vectors = extract_embeddings_list(sentence_embeddings_object) + except TypeError as e: + logger.error(f"TypeError in calculate_sentence_embeddings_list: {e}") + raise + except Exception as e: + logger.error(f"Exception in calculate_sentence_embeddings_list: {e}") + texts = [text[:-int(len(text) * 0.1)] for text in texts] + retry_count += 1 + logger.info(f"Trimming sentences due to too many tokens. New lengths: {[len(text) for text in texts]}") + if sentence_embeddings_vectors is None: + logger.error("Failed to calculate sentence embeddings after multiple attempts") + return sentence_embeddings_vectors + +async def compute_embeddings_for_document(strings: list, llm_model_name: str, client_ip: str, document_file_hash: str, corpus_identifier_string: str) -> List[Tuple[str, np.array]]: + results = [] + strings = [prepare_string_for_embedding(text) for text in strings] + if USE_PARALLEL_INFERENCE_QUEUE: + logger.info(f"Using parallel inference queue to compute embeddings for {len(strings)} strings") + start_time = time.perf_counter() + semaphore = asyncio.Semaphore(MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS) + model = load_model(llm_model_name) + async def compute_embedding_batch(batch): + try: + async with semaphore: + embeddings = await calculate_sentence_embeddings_list(model, batch) + batch_results = [] + for text, embedding in zip(batch, embeddings): + embedding_json = json.dumps(embedding) + request_time = datetime.utcnow() + response_time = datetime.utcnow() + total_time = (response_time - request_time).total_seconds() + await save_embedding_to_db(text, llm_model_name, embedding_json, client_ip, request_time, response_time, total_time, document_file_hash, corpus_identifier_string) + batch_results.append((text, embedding)) + return batch_results + except Exception as e: + logger.error(f"Error computing embeddings for batch: {e}") + return [(text, None) for text in batch] + batch_size = MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS + batches = [strings[i:i + batch_size] for i in range(0, len(strings), batch_size)] + batch_results = await asyncio.gather(*[compute_embedding_batch(batch) for batch in batches]) + for batch_result in batch_results: + results.extend(batch_result) + end_time = time.perf_counter() + duration = end_time - start_time + if len(strings) > 0: + logger.info(f"Parallel inference task for {len(strings)} strings completed in {duration:.2f} seconds; {duration / len(strings):.2f} seconds per string") + else: + logger.info(f"Using sequential inference to compute embeddings for {len(strings)} strings") + start_time = time.perf_counter() + model = load_model(llm_model_name) + embeddings = await calculate_sentence_embeddings_list(model, strings) + for text, embedding in zip(strings, embeddings): + prepared_text = prepare_string_for_embedding(text) + embedding_json = json.dumps(embedding) + request_time = datetime.utcnow() + response_time = datetime.utcnow() + total_time = (response_time - request_time).total_seconds() + await save_embedding_to_db(prepared_text, llm_model_name, embedding_json, client_ip, request_time, response_time, total_time, document_file_hash) + results.append((text, embedding)) + end_time = time.perf_counter() + duration = end_time - start_time + if len(strings) > 0: + logger.info(f"Sequential inference task for {len(strings)} strings completed in {duration:.2f} seconds; {duration / len(strings):.2f} seconds per string") + filtered_results = [(text, embedding) for text, embedding in results if embedding is not None] + faiss_indexes, token_faiss_indexes, associated_texts_by_model = await build_faiss_indexes() + return filtered_results + +async def compute_embeddings_for_document_old(strings: list, llm_model_name: str, client_ip: str, document_file_hash: str) -> List[Tuple[str, np.array]]: from swiss_army_llama import get_embedding_vector_for_string results = [] if USE_PARALLEL_INFERENCE_QUEUE: @@ -540,6 +706,7 @@ async def store_document_embeddings_in_db(file: File, filename=file.filename, mimetype=file.content_type, file_hash=file_hash, + corpus_identifier_string=corpus_identifier_string, llm_model_name=llm_model_name, file_data=original_file_content, document_embedding_results_json=json.loads(json_content.decode()), @@ -564,13 +731,13 @@ async def store_document_embeddings_in_db(file: File, request_time=request_time, response_time=datetime.utcnow(), total_time=(datetime.utcnow() - request_time).total_seconds(), - document_file_hash=file_hash # Link it to the DocumentEmbedding via file_hash + document_file_hash=file_hash, # Link it to the DocumentEmbedding via file_hash + corpus_identifier_string=corpus_identifier_string ) else: write_operations.append(embedding_entry) await shared_resources.db_writer.enqueue_write(write_operations) # Enqueue the write operation for text embeddings - def load_text_completion_model(llm_model_name: str, raise_http_exception: bool = True): global USE_VERBOSE try: diff --git a/shared_resources.py b/shared_resources.py index 8ab7abc..612b66c 100644 --- a/shared_resources.py +++ b/shared_resources.py @@ -1,19 +1,18 @@ -from misc_utility_functions import is_redis_running, build_faiss_indexes, suppress_stdout_stderr +from misc_utility_functions import is_redis_running, start_redis_server, build_faiss_indexes, suppress_stdout_stderr from database_functions import DatabaseWriter, initialize_db from ramdisk_functions import setup_ramdisk, copy_models_to_ramdisk, check_that_user_has_required_permissions_to_manage_ramdisks from logger_config import setup_logger from aioredlock import Aioredlock import aioredis import asyncio -import subprocess import urllib.request import os import nvgpu import glob import json import traceback +import llama_cpp from typing import List, Tuple, Dict -from langchain_community.embeddings import LlamaCppEmbeddings from decouple import config from fastapi import HTTPException logger = setup_logger() @@ -70,7 +69,7 @@ async def initialize_globals(): global db_writer, faiss_indexes, token_faiss_indexes, associated_texts_by_model, redis, lock_manager if not is_redis_running(): logger.info("Starting Redis server...") - subprocess.Popen(['redis-server'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + start_redis_server() await asyncio.sleep(1) # Sleep for 1 second to give Redis time to start redis = await aioredis.create_redis_pool('redis://localhost') lock_manager = Aioredlock([redis]) @@ -173,8 +172,7 @@ def load_model(llm_model_name: str, raise_http_exception: bool = True): matching_files.sort(key=os.path.getmtime, reverse=True) model_file_path = matching_files[0] with suppress_stdout_stderr(): - model_instance = LlamaCppEmbeddings(model_path=model_file_path, use_mlock=True, n_ctx=LLM_CONTEXT_SIZE_IN_TOKENS) # Load the model without GPU acceleration - model_instance.client.verbose = USE_VERBOSE + model_instance = llama_cpp.Llama(model_path=model_file_path, use_mlock=True, n_ctx=LLM_CONTEXT_SIZE_IN_TOKENS, embedding=True, verbose=False) embedding_model_cache[llm_model_name] = model_instance return model_instance except TypeError as e: diff --git a/swiss_army_llama.py b/swiss_army_llama.py index 21907cc..c110fbe 100644 --- a/swiss_army_llama.py +++ b/swiss_army_llama.py @@ -3,12 +3,12 @@ from logger_config import setup_logger from database_functions import AsyncSessionLocal, DatabaseWriter, get_db_writer from ramdisk_functions import clear_ramdisk -from misc_utility_functions import build_faiss_indexes, safe_path, configure_redis_in_background +from misc_utility_functions import build_faiss_indexes, safe_path, configure_redis_optimally from embeddings_data_models import DocumentEmbedding, TokenLevelEmbeddingBundle from embeddings_data_models import EmbeddingRequest, SemanticSearchRequest, AdvancedSemanticSearchRequest, SimilarityRequest, TextCompletionRequest, AddGrammarRequest from embeddings_data_models import EmbeddingResponse, SemanticSearchResponse, AdvancedSemanticSearchResponse, SimilarityResponse, AllStringsResponse, AllDocumentsResponse, TextCompletionResponse, AddGrammarResponse from embeddings_data_models import ShowLogsIncrementalModel -from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file, start_resource_monitoring, end_resource_monitoring +from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file, start_resource_monitoring, end_resource_monitoring, get_texts_for_corpus_identifier, get_texts_for_model from service_functions import parse_submitted_document_file_into_sentence_strings_func, compute_embeddings_for_document, store_document_embeddings_in_db, generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func, get_audio_duration_seconds from grammar_builder import GrammarBuilder from log_viewer_functions import show_logs_incremental_func, show_logs_func @@ -17,7 +17,7 @@ import glob import json import os -import signal +import sys import random import re import tempfile @@ -32,7 +32,6 @@ from decouple import config import uvicorn import fastapi -from fastapi.param_functions import Body from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Depends, Form from fastapi.responses import JSONResponse, FileResponse, HTMLResponse, Response from contextlib import asynccontextmanager @@ -50,6 +49,7 @@ logger = setup_logger() magika = Magika() gpu_check_results = is_gpu_available() +configure_redis_optimally() logger.info(f"\nGPU check results:\n {gpu_check_results}\n") class GracefulExit(BaseException): @@ -66,7 +66,6 @@ async def lifespan(app: FastAPI): # Shutdown code (if any) pass - # Note: the Ramdisk setup and teardown requires sudo; to enable password-less sudo, edit your sudoers file with `sudo visudo`. # Add the following lines, replacing username with your actual username # username ALL=(ALL) NOPASSWD: /bin/mount -t tmpfs -o size=*G tmpfs /mnt/ramdisk @@ -103,9 +102,6 @@ async def general_exception_handler(request: Request, exc: Exception) -> JSONRes logger.exception(exc) return JSONResponse(status_code=500, content={"message": "An unexpected error occurred"}) -@app.on_event("startup") -async def startup_event(): - configure_redis_in_background() @app.get("/", include_in_schema=False) async def custom_swagger_ui_html(): @@ -325,7 +321,6 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques try: input_data = {"text": request.text} context = start_resource_monitoring("get_embedding_vector_for_string", input_data, req.client.host if req else "localhost") - return await get_or_compute_embedding(request, req, client_ip, document_file_hash) finally: end_resource_monitoring(context) @@ -421,7 +416,7 @@ async def get_token_level_embeddings_matrix_and_combined_feature_vector_for_stri existing_embedding_bundle = result.unique().scalar() if existing_embedding_bundle: logger.info("Found existing token-level embedding bundle in the database.") - combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(existing_embedding_bundle.id, existing_embedding_bundle.token_level_embeddings, db_writer) + combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(existing_embedding_bundle.id, existing_embedding_bundle.token_level_embeddings) response_content = { 'input_text': request.text, 'token_level_embedding_bundle': json.loads(existing_embedding_bundle.token_level_embeddings_bundle_json), @@ -465,7 +460,7 @@ async def get_token_level_embeddings_matrix_and_combined_feature_vector_for_stri overall_total_time = (datetime.utcnow() - request_time).total_seconds() if len(embedding_bundle.token_level_embeddings_bundle_json) > 0: tokens = re.findall(r'\b\w+\b', request.text) - logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0)} seconds per million output characters.") + logger.info(f"The response took {overall_total_time:,.2f} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0):,.2f} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0):,.2f} seconds per million output characters.") if send_back_json_or_zip_file == 'json': # Assume 'json' response should be sent back logger.info(f"Now sending back JSON response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of JSON response out of {len(json_content)} total characters: {json_content[:100]}") return JSONResponse(content=response_content) @@ -864,7 +859,7 @@ async def get_all_embedding_vectors_for_document( result = await session.execute(select(DocumentEmbedding).filter(DocumentEmbedding.file_hash == file_hash, DocumentEmbedding.llm_model_name == llm_model_name)) existing_document_embedding = result.scalar_one_or_none() if existing_document_embedding: - logger.info(f"Document has been processed before, returning existing result") + logger.info("Document has been processed before, returning existing result") json_content = json.dumps(existing_document_embedding.document_embedding_results_json).encode() else: with open(temp_file_path, 'rb') as f: @@ -880,7 +875,7 @@ async def get_all_embedding_vectors_for_document( } context = start_resource_monitoring("get_all_embedding_vectors_for_document", input_data, client_ip) try: - results = await compute_embeddings_for_document(sentences, llm_model_name, client_ip, file_hash) + results = await compute_embeddings_for_document(sentences, llm_model_name, client_ip, file_hash, corpus_identifier_string) except Exception as e: logger.error(f"Error while computing embeddings for document: {e}") traceback.print_exc() @@ -891,7 +886,7 @@ async def get_all_embedding_vectors_for_document( json_content = df.to_json(orient=json_format or 'records').encode() with open(temp_file_path, 'rb') as file_buffer: original_file_content = file_buffer.read() - await store_document_embeddings_in_db(file.filename if file else url, file_hash, original_file_content, json_content, results, llm_model_name, client_ip, request_time, corpus_identifier_string) + await store_document_embeddings_in_db(file, file_hash, original_file_content, json_content, results, llm_model_name, client_ip, request_time, corpus_identifier_string) overall_total_time = (datetime.utcnow() - request_time).total_seconds() logger.info(f"Done getting all embeddings for document containing {len(sentences)} sentences with model {llm_model_name}") json_content_length = len(json_content)