Skip to content

Commit

Permalink
Added optional resource monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Dicklesworthstone committed May 20, 2024
1 parent 1199b8d commit 83a994e
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 17 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ RETRY_DELAY_BASE_SECONDS=1
JITTER_FACTOR=0.1
USE_RAMDISK=0
USE_VERBOSE=0
USE_RESOURCE_MONITORING=1
RAMDISK_PATH = "/mnt/ramdisk"
RAMDISK_SIZE_IN_GB=50
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ httpx
langchain
langchain-community
llama-cpp-python
mutagen
nvgpu
pandas
psutil
Expand Down
114 changes: 113 additions & 1 deletion service_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from decouple import config
from faster_whisper import WhisperModel
from llama_cpp import Llama, LlamaGrammar
from mutagen import File as MutagenFile

logger = setup_logger()

SWISS_ARMY_LLAMA_SERVER_LISTEN_PORT = config("SWISS_ARMY_LLAMA_SERVER_LISTEN_PORT", default=8089, cast=int)
Expand All @@ -45,6 +47,7 @@
MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS = config("MAX_CONCURRENT_PARALLEL_INFERENCE_TASKS", default=10, cast=int)
USE_RAMDISK = config("USE_RAMDISK", default=False, cast=bool)
USE_VERBOSE = config("USE_VERBOSE", default=False, cast=bool)
USE_RESOURCE_MONITORING = config("USE_RESOURCE_MONITORING", default=1, cast=bool)
RAMDISK_PATH = config("RAMDISK_PATH", default="/mnt/ramdisk", cast=str)
BASE_DIRECTORY = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -733,4 +736,113 @@ async def download_file(url: str, expected_size: int, expected_hash: str) -> str
if hash_obj.hexdigest() != expected_hash:
os.remove(temp_file_path)
raise HTTPException(status_code=400, detail="File hash mismatch")
return temp_file_path
return temp_file_path

def get_audio_duration_seconds(file_path: str) -> float:
audio = MutagenFile(file_path)
if audio is None or not hasattr(audio.info, 'length'):
raise ValueError("Could not determine the length of the audio file.")
return audio.info.length

def start_resource_monitoring(endpoint_name: str, input_data: Dict[str, Any], client_ip: str) -> Dict[str, Any]:
if not USE_RESOURCE_MONITORING:
return {}
# Capture initial system resource usage
initial_memory = psutil.virtual_memory().used
initial_cpu_times = psutil.cpu_times_percent(interval=None)
start_time = time.time()
# Placeholder for input-specific details
request_details = {}
# Extract endpoint-specific input details
if endpoint_name == "get_embedding_vector_for_string":
text = input_data.get("text", "")
request_details = {
"num_words": len(text.split()),
"num_characters": len(text)
}
elif endpoint_name == "get_all_embedding_vectors_for_document":
sentences = input_data.get("sentences", [])
file_size_mb = input_data.get("file_size_mb", 0)
mime_type = input_data.get("mime_type", "")
request_details = {
"num_sentences": len(sentences),
"total_words": sum(len(sentence.split()) for sentence in sentences),
"total_characters": sum(len(sentence) for sentence in sentences),
"file_size_mb": file_size_mb,
"mime_type": mime_type
}
elif endpoint_name == "compute_transcript_with_whisper_from_audio":
transcript_details = input_data.get("transcript_details", {})
file_size_mb = input_data.get("file_size_mb", 0)
audio_duration_seconds = input_data.get("audio_duration_seconds", 0)
request_details = {
"file_size_mb": file_size_mb,
"audio_duration_seconds": audio_duration_seconds,
"num_sentences": len(transcript_details.get("sentences", [])),
"total_words": sum(len(sentence.split()) for sentence in transcript_details.get("sentences", [])),
"total_characters": sum(len(sentence) for sentence in transcript_details.get("sentences", []))
}
elif endpoint_name == "get_text_completions_from_input_prompt":
input_prompt = input_data.get("input_prompt", "")
request_details = {
"num_words": len(input_prompt.split()),
"num_characters": len(input_prompt),
"llm_model_name": input_data.get("llm_model_name", ""),
"temperature": input_data.get("temperature", 0.7),
"grammar_file_string": input_data.get("grammar_file_string", ""),
"number_of_completions_to_generate": input_data.get("number_of_completions_to_generate", 1),
"number_of_tokens_to_generate": input_data.get("number_of_tokens_to_generate", 1000)
}
# Store initial state and request details in the context
context = {
"endpoint_name": endpoint_name,
"start_time": start_time,
"initial_memory": initial_memory,
"initial_cpu_times": initial_cpu_times,
"request_details": request_details,
"client_ip": client_ip,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(start_time))
}
return context

def end_resource_monitoring(context: Dict[str, Any]):
if not USE_RESOURCE_MONITORING or not context:
return
# Retrieve initial state from context
endpoint_name = context["endpoint_name"]
start_time = context["start_time"]
initial_memory = context["initial_memory"]
initial_cpu_times = context["initial_cpu_times"]
request_details = context["request_details"]
client_ip = context["client_ip"]
timestamp = context["timestamp"]
# Capture final system resource usage
end_time = time.time()
final_memory = psutil.virtual_memory().used
final_cpu_times = psutil.cpu_times_percent(interval=None)
# Calculate the metrics
memory_used = final_memory - initial_memory
cpu_used = {
"user": final_cpu_times.user - initial_cpu_times.user,
"system": final_cpu_times.system - initial_cpu_times.system,
"idle": final_cpu_times.idle - initial_cpu_times.idle
}
time_taken = end_time - start_time
# Combine all metrics into a result dictionary
result = {
"timestamp": timestamp,
"client_ip": client_ip,
"endpoint_name": endpoint_name,
"request_details": request_details,
"memory_used": memory_used,
"cpu_used": cpu_used,
"time_taken": time_taken
}
# Append the result to the log file
log_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource_monitoring_logs.json")
try:
with open(log_file_path, "a") as log_file:
log_file.write(json.dumps(result) + "\n")
except Exception as e:
logger.error(f"Failed to write resource monitoring log: {e}")
logger.info(f"Request data and system resources used: {result}")
52 changes: 36 additions & 16 deletions swiss_army_llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from embeddings_data_models import EmbeddingRequest, SemanticSearchRequest, AdvancedSemanticSearchRequest, SimilarityRequest, TextCompletionRequest, AddGrammarRequest
from embeddings_data_models import EmbeddingResponse, SemanticSearchResponse, AdvancedSemanticSearchResponse, SimilarityResponse, AllStringsResponse, AllDocumentsResponse, TextCompletionResponse, AddGrammarResponse
from embeddings_data_models import ShowLogsIncrementalModel
from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file
from service_functions import parse_submitted_document_file_into_sentence_strings_func, compute_embeddings_for_document, store_document_embeddings_in_db, generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func
from service_functions import get_or_compute_embedding, get_or_compute_transcript, add_model_url, get_or_compute_token_level_embedding_bundle_combined_feature_vector, calculate_token_level_embeddings, download_file, start_resource_monitoring, end_resource_monitoring
from service_functions import parse_submitted_document_file_into_sentence_strings_func, compute_embeddings_for_document, store_document_embeddings_in_db, generate_completion_from_llm, validate_bnf_grammar_func, convert_document_to_sentences_func, get_audio_duration_seconds
from grammar_builder import GrammarBuilder
from log_viewer_functions import show_logs_incremental_func, show_logs_func
from uvicorn_config import option
Expand Down Expand Up @@ -80,6 +80,7 @@ async def lifespan(app: FastAPI):
USE_SECURITY_TOKEN = False
DEFAULT_MODEL_NAME = config("DEFAULT_MODEL_NAME", default="Meta-Llama-3-8B-Instruct.Q3_K_S", cast=str)
USE_RAMDISK = config("USE_RAMDISK", default=False, cast=bool)
USE_RESOURCE_MONITORING = config("USE_RESOURCE_MONITORING", default=1, cast=bool)
RAMDISK_PATH = config("RAMDISK_PATH", default="/mnt/ramdisk", cast=str)
BASE_DIRECTORY = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -318,8 +319,12 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
input_data = {"text": request.text}
context = start_resource_monitoring("get_embedding_vector_for_string", input_data, req.client.host if req else "localhost")

return await get_or_compute_embedding(request, req, client_ip, document_file_hash)
finally:
end_resource_monitoring(context)
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}
Expand All @@ -329,6 +334,7 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques
raise HTTPException(status_code=500, detail="Internal Server Error")



@app.post("/get_token_level_embeddings_matrix_and_combined_feature_vector_for_string/",
summary="Retrieve Token-Level Embeddings and Combined Feature Vector for a Given Input String",
description="""Retrieve the token-level embeddings and combined feature vector for a given input text using the specified model.
Expand Down Expand Up @@ -803,10 +809,8 @@ async def get_all_embedding_vectors_for_document(
):
client_ip = req.client.host if req else "localhost"
request_time = datetime.utcnow()

if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN):
raise HTTPException(status_code=403, detail="Unauthorized")

if file:
_, extension = os.path.splitext(file.filename)
temp_file_path = tempfile.NamedTemporaryFile(suffix=extension, delete=False).name
Expand All @@ -820,20 +824,16 @@ async def get_all_embedding_vectors_for_document(
temp_file_path = await download_file(url, size, hash)
else:
raise HTTPException(status_code=400, detail="Invalid input. Provide either a file or URL with hash and size.")

hash_obj = hashlib.sha3_256()
with open(temp_file_path, 'rb') as buffer:
for chunk in iter(lambda: buffer.read(chunk_size), b''):
hash_obj.update(chunk)
file_hash = hash_obj.hexdigest()
logger.info(f"SHA3-256 hash of submitted file: {file_hash}")

if corpus_identifier_string is None:
corpus_identifier_string = file_hash

unique_id = f"document_embedding_{file_hash}_{llm_model_name}"
lock = await shared_resources.lock_manager.lock(unique_id)

if lock.valid:
try:
async with AsyncSession() as session:
Expand All @@ -846,8 +846,17 @@ async def get_all_embedding_vectors_for_document(
mime = Magic(mime=True)
mime_type = mime.from_file(temp_file_path)
logger.info(f"Received request to extract embeddings for document {file.filename if file else url} with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path)} bytes from IP address: {client_ip}")
strings = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type)
results = await compute_embeddings_for_document(strings, llm_model_name, client_ip, file_hash)
sentences = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type)
input_data = {
"sentences": sentences,
"file_size_mb": os.path.getsize(temp_file_path) / (1024 * 1024),
"mime_type": mime_type
}
context = start_resource_monitoring("get_all_embedding_vectors_for_document", input_data, client_ip)
try:
results = await compute_embeddings_for_document(sentences, llm_model_name, client_ip, file_hash)
finally:
end_resource_monitoring(context)
df = pd.DataFrame(results, columns=['text', 'embedding'])
json_content = df.to_json(orient=json_format or 'records').encode()
with open(temp_file_path, 'rb') as file_buffer:
Expand Down Expand Up @@ -949,20 +958,24 @@ async def get_text_completions_from_input_prompt(request: TextCompletionRequest,
if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN):
logger.warning(f"Unauthorized request from client IP {client_ip}")
raise HTTPException(status_code=403, detail="Unauthorized")
context = start_resource_monitoring("get_text_completions_from_input_prompt", request.dict(), client_ip)
try:
unique_id = f"text_completion_{hash(request.input_prompt)}_{request.llm_model_name}"
lock = await shared_resources.lock_manager.lock(unique_id)
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
return await generate_completion_from_llm(request, req, client_ip)
try:
response = await generate_completion_from_llm(request, req, client_ip)
return response
finally:
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}
except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
logger.error(traceback.format_exc()) # Print the traceback
logger.error(traceback.format_exc()) # Print the traceback
raise HTTPException(status_code=500, detail="Internal Server Error")
finally:
end_resource_monitoring(context)



Expand Down Expand Up @@ -1124,16 +1137,23 @@ async def compute_transcript_with_whisper_from_audio(
temp_file_path = await download_file(url, size, hash)
else:
raise HTTPException(status_code=400, detail="Invalid input. Provide either a file or URL with hash and size.")
audio_file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024)
input_data = {
"file_size_mb": audio_file_size_mb,
"audio_duration_seconds": get_audio_duration_seconds(temp_file_path)
}
context = start_resource_monitoring("compute_transcript_with_whisper_from_audio", input_data, req.client.host if req else "localhost")
try:
audio_transcript = await get_or_compute_transcript(temp_file_path, compute_embeddings_for_resulting_transcript_document, llm_model_name, req, corpus_identifier_string)
os.remove(temp_file_path)
return JSONResponse(content=audio_transcript)
except Exception as e:
os.remove(temp_file_path)
logger.error(f"An error occurred while processing the request: {e}")
logger.error(traceback.format_exc()) # Print the traceback
raise HTTPException(status_code=500, detail="Internal Server Error")

finally:
os.remove(temp_file_path)
end_resource_monitoring(context)


@app.post("/add_new_grammar_definition_file/",
Expand Down

0 comments on commit 83a994e

Please sign in to comment.