diff --git a/swiss_army_llama.py b/swiss_army_llama.py index 740df5e..984e83f 100644 --- a/swiss_army_llama.py +++ b/swiss_army_llama.py @@ -273,7 +273,15 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques logger.warning(f"Unauthorized request from client IP {client_ip}") raise HTTPException(status_code=403, detail="Unauthorized") try: - return await get_or_compute_embedding(request, req, client_ip, document_file_hash) + unique_id = f"get_embedding_{request.text}_{request.llm_model_name}" + lock = await shared_resources.lock_manager.lock(unique_id) + if lock.valid: + try: + return await get_or_compute_embedding(request, req, client_ip, document_file_hash) + finally: + await shared_resources.lock_manager.unlock(lock) + else: + return {"status": "already processing"} except Exception as e: logger.error(f"An error occurred while processing the request: {e}") logger.error(traceback.format_exc()) # Print the traceback @@ -369,57 +377,65 @@ async def get_token_level_embeddings_matrix_and_combined_feature_vector_for_stri 'combined_feature_vector': combined_feature_vector } return JSONResponse(content=response_content) - logger.info("No cached result found. Calculating token-level embeddings now...") - try: - embedding_bundle = TokenLevelEmbeddingBundle( - input_text=request.text, - llm_model_name=request.llm_model_name, - ip_address=client_ip, - request_time=request_time - ) - token_embeddings = await calculate_token_level_embeddings(request.text, request.llm_model_name, client_ip, embedding_bundle.id) - tokens = re.findall(r'\b\w+\b', request.text) - logger.info(f"Tokenized text into {len(tokens)} tokens. Organizing results.") - df = pd.DataFrame({ - 'token': tokens, - 'embedding': [embedding.tolist() for embedding in token_embeddings] - }) - json_content = df.to_json(orient=json_format or 'records') - response_time=datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - embedding_bundle.token_level_embeddings_bundle_json = json_content - embedding_bundle.response_time = response_time - embedding_bundle.total_time = total_time - combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(embedding_bundle.id, json_content, db_writer) - response_content = { - 'input_text': request.text, - 'token_level_embedding_bundle': json.loads(embedding_bundle.token_level_embeddings_bundle_json), - 'combined_feature_vector': combined_feature_vector - } - logger.info(f"Done getting token-level embedding matrix and combined feature vector for input text string {request.text} and model {request.llm_model_name}") - json_content = embedding_bundle.token_level_embeddings_bundle_json - json_content_length = len(json.dumps(response_content)) - overall_total_time = (datetime.utcnow() - request_time).total_seconds() - if len(embedding_bundle.token_level_embeddings_bundle_json) > 0: - tokens = re.findall(r'\b\w+\b', request.text) - logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0)} seconds per million output characters.") - if send_back_json_or_zip_file == 'json': # Assume 'json' response should be sent back - logger.info(f"Now sending back JSON response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of JSON response out of {len(json_content)} total characters: {json_content[:100]}") - return JSONResponse(content=response_content) - else: # Assume 'zip' file should be sent back - output_file_name_without_extension = f"token_level_embeddings_and_combined_feature_vector_for_input_hash_{input_text_hash}_and_model_name__{request.llm_model_name}" - json_file_path = f"/tmp/{output_file_name_without_extension}.json" - with open(json_file_path, 'w') as json_file: - json.dump(response_content, json_file) - zip_file_path = f"/tmp/{output_file_name_without_extension}.zip" - with zipfile.ZipFile(zip_file_path, 'w') as zipf: - zipf.write(json_file_path, os.path.basename(json_file_path)) - logger.info(f"Now sending back ZIP file response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of zipped JSON file out of {len(json_content)} total characters: {json_content[:100]}") - return FileResponse(zip_file_path, headers={"Content-Disposition": f"attachment; filename={output_file_name_without_extension}.zip"}) - except Exception as e: - logger.error(f"An error occurred while processing the request: {e}") - logger.error(traceback.format_exc()) - raise HTTPException(status_code=500, detail="Internal Server Error") + unique_id = f"get_token_level_embeddings_{request.text}_{request.llm_model_name}" + lock = await shared_resources.lock_manager.lock(unique_id) + if lock.valid: + try: + logger.info("No cached result found. Calculating token-level embeddings now...") + try: + embedding_bundle = TokenLevelEmbeddingBundle( + input_text=request.text, + llm_model_name=request.llm_model_name, + ip_address=client_ip, + request_time=request_time + ) + token_embeddings = await calculate_token_level_embeddings(request.text, request.llm_model_name, client_ip, embedding_bundle.id) + tokens = re.findall(r'\b\w+\b', request.text) + logger.info(f"Tokenized text into {len(tokens)} tokens. Organizing results.") + df = pd.DataFrame({ + 'token': tokens, + 'embedding': [embedding.tolist() for embedding in token_embeddings] + }) + json_content = df.to_json(orient=json_format or 'records') + response_time=datetime.utcnow() + total_time = (response_time - request_time).total_seconds() + embedding_bundle.token_level_embeddings_bundle_json = json_content + embedding_bundle.response_time = response_time + embedding_bundle.total_time = total_time + combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(embedding_bundle.id, json_content, db_writer) + response_content = { + 'input_text': request.text, + 'token_level_embedding_bundle': json.loads(embedding_bundle.token_level_embeddings_bundle_json), + 'combined_feature_vector': combined_feature_vector + } + logger.info(f"Done getting token-level embedding matrix and combined feature vector for input text string {request.text} and model {request.llm_model_name}") + json_content = embedding_bundle.token_level_embeddings_bundle_json + json_content_length = len(json.dumps(response_content)) + overall_total_time = (datetime.utcnow() - request_time).total_seconds() + if len(embedding_bundle.token_level_embeddings_bundle_json) > 0: + tokens = re.findall(r'\b\w+\b', request.text) + logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0)} seconds per million output characters.") + if send_back_json_or_zip_file == 'json': # Assume 'json' response should be sent back + logger.info(f"Now sending back JSON response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of JSON response out of {len(json_content)} total characters: {json_content[:100]}") + return JSONResponse(content=response_content) + else: # Assume 'zip' file should be sent back + output_file_name_without_extension = f"token_level_embeddings_and_combined_feature_vector_for_input_hash_{input_text_hash}_and_model_name__{request.llm_model_name}" + json_file_path = f"/tmp/{output_file_name_without_extension}.json" + with open(json_file_path, 'w') as json_file: + json.dump(response_content, json_file) + zip_file_path = f"/tmp/{output_file_name_without_extension}.zip" + with zipfile.ZipFile(zip_file_path, 'w') as zipf: + zipf.write(json_file_path, os.path.basename(json_file_path)) + logger.info(f"Now sending back ZIP file response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of zipped JSON file out of {len(json_content)} total characters: {json_content[:100]}") + return FileResponse(zip_file_path, headers={"Content-Disposition": f"attachment; filename={output_file_name_without_extension}.zip"}) + except Exception as e: + logger.error(f"An error occurred while processing the request: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail="Internal Server Error") + finally: + await shared_resources.lock_manager.unlock(lock) + else: + return {"status": "already processing"} @app.post("/compute_similarity_between_strings/", @@ -453,43 +469,50 @@ async def compute_similarity_between_strings(request: SimilarityRequest, req: Re similarity_measure = request.similarity_measure.lower() if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN): raise HTTPException(status_code=403, detail="Unauthorized") - try: - client_ip = req.client.host if req else "localhost" - embedding_request1 = EmbeddingRequest(text=request.text1, llm_model_name=request.llm_model_name) - embedding_request2 = EmbeddingRequest(text=request.text2, llm_model_name=request.llm_model_name) - embedding1_response = await get_or_compute_embedding(embedding_request1, client_ip=client_ip) - embedding2_response = await get_or_compute_embedding(embedding_request2, client_ip=client_ip) - embedding1 = np.array(embedding1_response["embedding"]) - embedding2 = np.array(embedding2_response["embedding"]) - if embedding1.size == 0 or embedding2.size == 0: - raise HTTPException(status_code=400, detail="Could not calculate embeddings for the given texts") - params = { - "vector_1": embedding1.tolist(), - "vector_2": embedding2.tolist(), - "similarity_measure": similarity_measure - } - similarity_stats_str = fvs.py_compute_vector_similarity_stats(json.dumps(params)) - similarity_stats_json = json.loads(similarity_stats_str) - if similarity_measure == 'all': - similarity_score = similarity_stats_json - else: - similarity_score = similarity_stats_json.get(similarity_measure, None) - if similarity_score is None: - raise HTTPException(status_code=400, detail="Invalid similarity measure specified") - response_time = datetime.utcnow() - total_time = (response_time - request_time).total_seconds() - logger.info(f"Computed similarity using {similarity_measure} in {total_time} seconds; similarity score: {similarity_score}") - return { - "text1": request.text1, - "text2": request.text2, - "similarity_measure": similarity_measure, - "similarity_score": similarity_score, - "embedding1": embedding1.tolist(), - "embedding2": embedding2.tolist() - } - except Exception as e: - logger.error(f"An error occurred while processing the request: {e}") - raise HTTPException(status_code=500, detail="Internal Server Error") + unique_id = f"compute_similarity_{request.text1}_{request.text2}_{request.llm_model_name}_{similarity_measure}" + lock = await shared_resources.lock_manager.lock(unique_id) + if lock.valid: + try: + client_ip = req.client.host if req else "localhost" + embedding_request1 = EmbeddingRequest(text=request.text1, llm_model_name=request.llm_model_name) + embedding_request2 = EmbeddingRequest(text=request.text2, llm_model_name=request.llm_model_name) + embedding1_response = await get_or_compute_embedding(embedding_request1, client_ip=client_ip) + embedding2_response = await get_or_compute_embedding(embedding_request2, client_ip=client_ip) + embedding1 = np.array(embedding1_response["embedding"]) + embedding2 = np.array(embedding2_response["embedding"]) + if embedding1.size == 0 or embedding2.size == 0: + raise HTTPException(status_code=400, detail="Could not calculate embeddings for the given texts") + params = { + "vector_1": embedding1.tolist(), + "vector_2": embedding2.tolist(), + "similarity_measure": similarity_measure + } + similarity_stats_str = fvs.py_compute_vector_similarity_stats(json.dumps(params)) + similarity_stats_json = json.loads(similarity_stats_str) + if similarity_measure == 'all': + similarity_score = similarity_stats_json + else: + similarity_score = similarity_stats_json.get(similarity_measure, None) + if similarity_score is None: + raise HTTPException(status_code=400, detail="Invalid similarity measure specified") + response_time = datetime.utcnow() + total_time = (response_time - request_time).total_seconds() + logger.info(f"Computed similarity using {similarity_measure} in {total_time} seconds; similarity score: {similarity_score}") + return { + "text1": request.text1, + "text2": request.text2, + "similarity_measure": similarity_measure, + "similarity_score": similarity_score, + "embedding1": embedding1.tolist(), + "embedding2": embedding2.tolist() + } + except Exception as e: + logger.error(f"An error occurred while processing the request: {e}") + raise HTTPException(status_code=500, detail="Internal Server Error") + finally: + await shared_resources.lock_manager.unlock(lock) + else: + return {"status": "already processing"}