diff --git a/.env b/.env index 5dd401a..0b0576f 100644 --- a/.env +++ b/.env @@ -16,5 +16,6 @@ RETRY_DELAY_BASE_SECONDS=1 JITTER_FACTOR=0.1 USE_RAMDISK=0 USE_VERBOSE=0 +USE_RESOURCE_MONITORING=1 RAMDISK_PATH = "/mnt/ramdisk" RAMDISK_SIZE_IN_GB=50 diff --git a/requirements.txt b/requirements.txt index d1ff900..4e0816e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ httpx langchain langchain-community llama-cpp-python +mutagen nvgpu pandas psutil diff --git a/service_functions.py b/service_functions.py index cb10888..10f9754 100644 --- a/service_functions.py +++ b/service_functions.py @@ -31,6 +31,8 @@ from decouple import config from faster_whisper import WhisperModel from llama_cpp import Llama, LlamaGrammar +from mutagen import File as MutagenFile + logger = setup_logger() SWISS_ARMY_LLAMA_SERVER_LISTEN_PORT = config("SWISS_ARMY_LLAMA_SERVER_LISTEN_PORT", default=8089, cast=int) @@ -45,6 +47,7 @@ MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS = config("MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS", default=10, cast=int) USE_RAMDISK = config("USE_RAMDISK", default=False, cast=bool) USE_VERBOSE = config("USE_VERBOSE", default=False, cast=bool) +USE_RESOURCE_MONITORING = config("USE_RESOURCE_MONITORING", default=1, cast=bool) RAMDISK_PATH = config("RAMDISK_PATH", default="/mnt/ramdisk", cast=str) BASE_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) @@ -733,4 +736,113 @@ async def download_file(url: str, expected_size: int, expected_hash: str) -> str if hash_obj.hexdigest() != expected_hash: os.remove(temp_file_path) raise HTTPException(status_code=400, detail="File hash mismatch") - return temp_file_path \ No newline at end of file + return temp_file_path + +def get_audio_duration_seconds(file_path: str) -> float: + audio = MutagenFile(file_path) + if audio is None or not hasattr(audio.info, 'length'): + raise ValueError("Could not determine the length of the audio file.") + return audio.info.length + +def start_resource_monitoring(endpoint_name: str, input_data: Dict[str, Any], client_ip: str) -> Dict[str, Any]: + if not USE_RESOURCE_MONITORING: + return {} + # Capture initial system resource usage + initial_memory = psutil.virtual_memory().used + initial_cpu_times = psutil.cpu_times_percent(interval=None) + start_time = time.time() + # Placeholder for input-specific details + request_details = {} + # Extract endpoint-specific input details + if endpoint_name == "get_embedding_vector_for_string": + text = input_data.get("text", "") + request_details = { + "num_words": len(text.split()), + "num_characters": len(text) + } + elif endpoint_name == "get_all_embedding_vectors_for_document": + sentences = input_data.get("sentences", []) + file_size_mb = input_data.get("file_size_mb", 0) + mime_type = input_data.get("mime_type", "") + request_details = { + "num_sentences": len(sentences), + "total_words": sum(len(sentence.split()) for sentence in sentences), + "total_characters": sum(len(sentence) for sentence in sentences), + "file_size_mb": file_size_mb, + "mime_type": mime_type + } + elif endpoint_name == "compute_transcript_with_whisper_from_audio": + transcript_details = input_data.get("transcript_details", {}) + file_size_mb = input_data.get("file_size_mb", 0) + audio_duration_seconds = input_data.get("audio_duration_seconds", 0) + request_details = { + "file_size_mb": file_size_mb, + "audio_duration_seconds": audio_duration_seconds, + "num_sentences": len(transcript_details.get("sentences", [])), + "total_words": sum(len(sentence.split()) for sentence in transcript_details.get("sentences", [])), + "total_characters": sum(len(sentence) for sentence in transcript_details.get("sentences", [])) + } + elif endpoint_name == "get_text_completions_from_input_prompt": + input_prompt = input_data.get("input_prompt", "") + request_details = { + "num_words": len(input_prompt.split()), + "num_characters": len(input_prompt), + "llm_model_name": input_data.get("llm_model_name", ""), + "temperature": input_data.get("temperature", 0.7), + "grammar_file_string": input_data.get("grammar_file_string", ""), + "number_of_completions_to_generate": input_data.get("number_of_completions_to_generate", 1), + "number_of_tokens_to_generate": input_data.get("number_of_tokens_to_generate", 1000) + } + # Store initial state and request details in the context + context = { + "endpoint_name": endpoint_name, + "start_time": start_time, + "initial_memory": initial_memory, + "initial_cpu_times": initial_cpu_times, + "request_details": request_details, + "client_ip": client_ip, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(start_time)) + } + return context + +def end_resource_monitoring(context: Dict[str, Any]): + if not USE_RESOURCE_MONITORING or not context: + return + # Retrieve initial state from context + endpoint_name = context["endpoint_name"] + start_time = context["start_time"] + initial_memory = context["initial_memory"] + initial_cpu_times = context["initial_cpu_times"] + request_details = context["request_details"] + client_ip = context["client_ip"] + timestamp = context["timestamp"] + # Capture final system resource usage + end_time = time.time() + final_memory = psutil.virtual_memory().used + final_cpu_times = psutil.cpu_times_percent(interval=None) + # Calculate the metrics + memory_used = final_memory - initial_memory + cpu_used = { + "user": final_cpu_times.user - initial_cpu_times.user, + "system": final_cpu_times.system - initial_cpu_times.system, + "idle": final_cpu_times.idle - initial_cpu_times.idle + } + time_taken = end_time - start_time + # Combine all metrics into a result dictionary + result = { + "timestamp": timestamp, + "client_ip": client_ip, + "endpoint_name": endpoint_name, + "request_details": request_details, + "memory_used": memory_used, + "cpu_used": cpu_used, + "time_taken": time_taken + } + # Append the result to the log file + log_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource_monitoring_logs.json") + try: + with open(log_file_path, "a") as log_file: + log_file.write(json.dumps(result) + "\n") + except Exception as e: + logger.error(f"Failed to write resource monitoring log: {e}") + logger.info(f"Request data and system resources used: {result}") \ No newline at end of file diff --git a/swiss_army_llama.py b/swiss_army_llama.py index 7279750..2d8d388 100644 --- a/swiss_army_llama.py +++ b/swiss_army_llama.py @@ -8,8 +8,8 @@ from embeddings_data_models import EmbeddingRequest, SemanticSearchRequest, AdvancedSemanticSearchRequest, SimilarityRequest, TextCompletionRequest, AddGrammarRequest from embeddings_data_models import EmbeddingResponse, SemanticSearchResponse, AdvancedSemanticSearchResponse, SimilarityResponse, AllStringsResponse, AllDocumentsResponse, TextCompletionResponse, AddGrammarResponse from embeddings_data_models import ShowLogsIncrementalModel -from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file -from service_functions import parse_submitted_document_file_into_sentence_strings_func, compute_embeddings_for_document, store_document_embeddings_in_db, generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func +from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file, start_resource_monitoring, end_resource_monitoring +from service_functions import parse_submitted_document_file_into_sentence_strings_func, compute_embeddings_for_document, store_document_embeddings_in_db, generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func, get_audio_duration_seconds from grammar_builder import GrammarBuilder from log_viewer_functions import show_logs_incremental_func, show_logs_func from uvicorn_config import option @@ -80,6 +80,7 @@ async def lifespan(app: FastAPI): USE_SECURITY_TOKEN = False DEFAULT_MODEL_NAME = config("DEFAULT_MODEL_NAME", default="Meta-Llama-3-8B-Instruct.Q3_K_S", cast=str) USE_RAMDISK = config("USE_RAMDISK", default=False, cast=bool) +USE_RESOURCE_MONITORING = config("USE_RESOURCE_MONITORING", default=1, cast=bool) RAMDISK_PATH = config("RAMDISK_PATH", default="/mnt/ramdisk", cast=str) BASE_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) @@ -318,8 +319,12 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques lock = await shared_resources.lock_manager.lock(unique_id) if lock.valid: try: + input_data = {"text": request.text} + context = start_resource_monitoring("get_embedding_vector_for_string", input_data, req.client.host if req else "localhost") + return await get_or_compute_embedding(request, req, client_ip, document_file_hash) finally: + end_resource_monitoring(context) await shared_resources.lock_manager.unlock(lock) else: return {"status": "already processing"} @@ -329,6 +334,7 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques raise HTTPException(status_code=500, detail="Internal Server Error") + @app.post("/get_token_level_embeddings_matrix_and_combined_feature_vector_for_string/", summary="Retrieve Token-Level Embeddings and Combined Feature Vector for a Given Input String", description="""Retrieve the token-level embeddings and combined feature vector for a given input text using the specified model. @@ -803,10 +809,8 @@ async def get_all_embedding_vectors_for_document( ): client_ip = req.client.host if req else "localhost" request_time = datetime.utcnow() - if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): raise HTTPException(status_code=403, detail="Unauthorized") - if file: _, extension = os.path.splitext(file.filename) temp_file_path = tempfile.NamedTemporaryFile(suffix=extension, delete=False).name @@ -820,20 +824,16 @@ async def get_all_embedding_vectors_for_document( temp_file_path = await download_file(url, size, hash) else: raise HTTPException(status_code=400, detail="Invalid input. Provide either a file or URL with hash and size.") - hash_obj = hashlib.sha3_256() with open(temp_file_path, 'rb') as buffer: for chunk in iter(lambda: buffer.read(chunk_size), b''): hash_obj.update(chunk) file_hash = hash_obj.hexdigest() logger.info(f"SHA3-256 hash of submitted file: {file_hash}") - if corpus_identifier_string is None: corpus_identifier_string = file_hash - unique_id = f"document_embedding_{file_hash}_{llm_model_name}" lock = await shared_resources.lock_manager.lock(unique_id) - if lock.valid: try: async with AsyncSession() as session: @@ -846,8 +846,17 @@ async def get_all_embedding_vectors_for_document( mime = Magic(mime=True) mime_type = mime.from_file(temp_file_path) logger.info(f"Received request to extract embeddings for document {file.filename if file else url} with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path)} bytes from IP address: {client_ip}") - strings = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type) - results = await compute_embeddings_for_document(strings, llm_model_name, client_ip, file_hash) + sentences = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type) + input_data = { + "sentences": sentences, + "file_size_mb": os.path.getsize(temp_file_path) / (1024 * 1024), + "mime_type": mime_type + } + context = start_resource_monitoring("get_all_embedding_vectors_for_document", input_data, client_ip) + try: + results = await compute_embeddings_for_document(sentences, llm_model_name, client_ip, file_hash) + finally: + end_resource_monitoring(context) df = pd.DataFrame(results, columns=['text', 'embedding']) json_content = df.to_json(orient=json_format or 'records').encode() with open(temp_file_path, 'rb') as file_buffer: @@ -949,20 +958,24 @@ async def get_text_completions_from_input_prompt(request: TextCompletionRequest, if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): logger.warning(f"Unauthorized request from client IP {client_ip}") raise HTTPException(status_code=403, detail="Unauthorized") + context = start_resource_monitoring("get_text_completions_from_input_prompt", request.dict(), client_ip) try: unique_id = f"text_completion_{hash(request.input_prompt)}_{request.llm_model_name}" - lock = await shared_resources.lock_manager.lock(unique_id) + lock = await shared_resources.lock_manager.lock(unique_id) if lock.valid: - try: - return await generate_completion_from_llm(request, req, client_ip) + try: + response = await generate_completion_from_llm(request, req, client_ip) + return response finally: await shared_resources.lock_manager.unlock(lock) else: return {"status": "already processing"} except Exception as e: logger.error(f"An error occurred while processing the request: {e}") - logger.error(traceback.format_exc()) # Print the traceback + logger.error(traceback.format_exc()) # Print the traceback raise HTTPException(status_code=500, detail="Internal Server Error") + finally: + end_resource_monitoring(context) @@ -1124,16 +1137,23 @@ async def compute_transcript_with_whisper_from_audio( temp_file_path = await download_file(url, size, hash) else: raise HTTPException(status_code=400, detail="Invalid input. Provide either a file or URL with hash and size.") + audio_file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024) + input_data = { + "file_size_mb": audio_file_size_mb, + "audio_duration_seconds": get_audio_duration_seconds(temp_file_path) + } + context = start_resource_monitoring("compute_transcript_with_whisper_from_audio", input_data, req.client.host if req else "localhost") try: audio_transcript = await get_or_compute_transcript(temp_file_path, compute_embeddings_for_resulting_transcript_document, llm_model_name, req, corpus_identifier_string) - os.remove(temp_file_path) return JSONResponse(content=audio_transcript) except Exception as e: os.remove(temp_file_path) logger.error(f"An error occurred while processing the request: {e}") logger.error(traceback.format_exc()) # Print the traceback raise HTTPException(status_code=500, detail="Internal Server Error") - + finally: + os.remove(temp_file_path) + end_resource_monitoring(context) @app.post("/add_new_grammar_definition_file/",