Skip to content

Commit

Permalink
More redis based lock controls
Browse files Browse the repository at this point in the history
  • Loading branch information
Dicklesworthstone committed Oct 3, 2023
1 parent 8103c4a commit b5a05ed
Showing 1 changed file with 112 additions and 89 deletions.
201 changes: 112 additions & 89 deletions swiss_army_llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,15 @@ async def get_embedding_vector_for_string(request: EmbeddingRequest, req: Reques
logger.warning(f"Unauthorized request from client IP {client_ip}")
raise HTTPException(status_code=403, detail="Unauthorized")
try:
return await get_or_compute_embedding(request, req, client_ip, document_file_hash)
unique_id = f"get_embedding_{request.text}_{request.llm_model_name}"
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
return await get_or_compute_embedding(request, req, client_ip, document_file_hash)
finally:
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}
except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
logger.error(traceback.format_exc()) # Print the traceback
Expand Down Expand Up @@ -369,57 +377,65 @@ async def get_token_level_embeddings_matrix_and_combined_feature_vector_for_stri
'combined_feature_vector': combined_feature_vector
}
return JSONResponse(content=response_content)
logger.info("No cached result found. Calculating token-level embeddings now...")
try:
embedding_bundle = TokenLevelEmbeddingBundle(
input_text=request.text,
llm_model_name=request.llm_model_name,
ip_address=client_ip,
request_time=request_time
)
token_embeddings = await calculate_token_level_embeddings(request.text, request.llm_model_name, client_ip, embedding_bundle.id)
tokens = re.findall(r'\b\w+\b', request.text)
logger.info(f"Tokenized text into {len(tokens)} tokens. Organizing results.")
df = pd.DataFrame({
'token': tokens,
'embedding': [embedding.tolist() for embedding in token_embeddings]
})
json_content = df.to_json(orient=json_format or 'records')
response_time=datetime.utcnow()
total_time = (response_time - request_time).total_seconds()
embedding_bundle.token_level_embeddings_bundle_json = json_content
embedding_bundle.response_time = response_time
embedding_bundle.total_time = total_time
combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(embedding_bundle.id, json_content, db_writer)
response_content = {
'input_text': request.text,
'token_level_embedding_bundle': json.loads(embedding_bundle.token_level_embeddings_bundle_json),
'combined_feature_vector': combined_feature_vector
}
logger.info(f"Done getting token-level embedding matrix and combined feature vector for input text string {request.text} and model {request.llm_model_name}")
json_content = embedding_bundle.token_level_embeddings_bundle_json
json_content_length = len(json.dumps(response_content))
overall_total_time = (datetime.utcnow() - request_time).total_seconds()
if len(embedding_bundle.token_level_embeddings_bundle_json) > 0:
tokens = re.findall(r'\b\w+\b', request.text)
logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0)} seconds per million output characters.")
if send_back_json_or_zip_file == 'json': # Assume 'json' response should be sent back
logger.info(f"Now sending back JSON response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of JSON response out of {len(json_content)} total characters: {json_content[:100]}")
return JSONResponse(content=response_content)
else: # Assume 'zip' file should be sent back
output_file_name_without_extension = f"token_level_embeddings_and_combined_feature_vector_for_input_hash_{input_text_hash}_and_model_name__{request.llm_model_name}"
json_file_path = f"/tmp/{output_file_name_without_extension}.json"
with open(json_file_path, 'w') as json_file:
json.dump(response_content, json_file)
zip_file_path = f"/tmp/{output_file_name_without_extension}.zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(json_file_path, os.path.basename(json_file_path))
logger.info(f"Now sending back ZIP file response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of zipped JSON file out of {len(json_content)} total characters: {json_content[:100]}")
return FileResponse(zip_file_path, headers={"Content-Disposition": f"attachment; filename={output_file_name_without_extension}.zip"})
except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail="Internal Server Error")
unique_id = f"get_token_level_embeddings_{request.text}_{request.llm_model_name}"
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
logger.info("No cached result found. Calculating token-level embeddings now...")
try:
embedding_bundle = TokenLevelEmbeddingBundle(
input_text=request.text,
llm_model_name=request.llm_model_name,
ip_address=client_ip,
request_time=request_time
)
token_embeddings = await calculate_token_level_embeddings(request.text, request.llm_model_name, client_ip, embedding_bundle.id)
tokens = re.findall(r'\b\w+\b', request.text)
logger.info(f"Tokenized text into {len(tokens)} tokens. Organizing results.")
df = pd.DataFrame({
'token': tokens,
'embedding': [embedding.tolist() for embedding in token_embeddings]
})
json_content = df.to_json(orient=json_format or 'records')
response_time=datetime.utcnow()
total_time = (response_time - request_time).total_seconds()
embedding_bundle.token_level_embeddings_bundle_json = json_content
embedding_bundle.response_time = response_time
embedding_bundle.total_time = total_time
combined_feature_vector = await get_or_compute_token_level_embedding_bundle_combined_feature_vector(embedding_bundle.id, json_content, db_writer)
response_content = {
'input_text': request.text,
'token_level_embedding_bundle': json.loads(embedding_bundle.token_level_embeddings_bundle_json),
'combined_feature_vector': combined_feature_vector
}
logger.info(f"Done getting token-level embedding matrix and combined feature vector for input text string {request.text} and model {request.llm_model_name}")
json_content = embedding_bundle.token_level_embeddings_bundle_json
json_content_length = len(json.dumps(response_content))
overall_total_time = (datetime.utcnow() - request_time).total_seconds()
if len(embedding_bundle.token_level_embeddings_bundle_json) > 0:
tokens = re.findall(r'\b\w+\b', request.text)
logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (float(len(tokens))/1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length)/1000000.0)} seconds per million output characters.")
if send_back_json_or_zip_file == 'json': # Assume 'json' response should be sent back
logger.info(f"Now sending back JSON response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of JSON response out of {len(json_content)} total characters: {json_content[:100]}")
return JSONResponse(content=response_content)
else: # Assume 'zip' file should be sent back
output_file_name_without_extension = f"token_level_embeddings_and_combined_feature_vector_for_input_hash_{input_text_hash}_and_model_name__{request.llm_model_name}"
json_file_path = f"/tmp/{output_file_name_without_extension}.json"
with open(json_file_path, 'w') as json_file:
json.dump(response_content, json_file)
zip_file_path = f"/tmp/{output_file_name_without_extension}.zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(json_file_path, os.path.basename(json_file_path))
logger.info(f"Now sending back ZIP file response for input text string {request.text} and model {request.llm_model_name}; First 100 characters of zipped JSON file out of {len(json_content)} total characters: {json_content[:100]}")
return FileResponse(zip_file_path, headers={"Content-Disposition": f"attachment; filename={output_file_name_without_extension}.zip"})
except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail="Internal Server Error")
finally:
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}


@app.post("/compute_similarity_between_strings/",
Expand Down Expand Up @@ -453,43 +469,50 @@ async def compute_similarity_between_strings(request: SimilarityRequest, req: Re
similarity_measure = request.similarity_measure.lower()
if USE_SECURITY_TOKEN and use_hardcoded_security_token and (token is None or token != SECURITY_TOKEN):
raise HTTPException(status_code=403, detail="Unauthorized")
try:
client_ip = req.client.host if req else "localhost"
embedding_request1 = EmbeddingRequest(text=request.text1, llm_model_name=request.llm_model_name)
embedding_request2 = EmbeddingRequest(text=request.text2, llm_model_name=request.llm_model_name)
embedding1_response = await get_or_compute_embedding(embedding_request1, client_ip=client_ip)
embedding2_response = await get_or_compute_embedding(embedding_request2, client_ip=client_ip)
embedding1 = np.array(embedding1_response["embedding"])
embedding2 = np.array(embedding2_response["embedding"])
if embedding1.size == 0 or embedding2.size == 0:
raise HTTPException(status_code=400, detail="Could not calculate embeddings for the given texts")
params = {
"vector_1": embedding1.tolist(),
"vector_2": embedding2.tolist(),
"similarity_measure": similarity_measure
}
similarity_stats_str = fvs.py_compute_vector_similarity_stats(json.dumps(params))
similarity_stats_json = json.loads(similarity_stats_str)
if similarity_measure == 'all':
similarity_score = similarity_stats_json
else:
similarity_score = similarity_stats_json.get(similarity_measure, None)
if similarity_score is None:
raise HTTPException(status_code=400, detail="Invalid similarity measure specified")
response_time = datetime.utcnow()
total_time = (response_time - request_time).total_seconds()
logger.info(f"Computed similarity using {similarity_measure} in {total_time} seconds; similarity score: {similarity_score}")
return {
"text1": request.text1,
"text2": request.text2,
"similarity_measure": similarity_measure,
"similarity_score": similarity_score,
"embedding1": embedding1.tolist(),
"embedding2": embedding2.tolist()
}
except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
unique_id = f"compute_similarity_{request.text1}_{request.text2}_{request.llm_model_name}_{similarity_measure}"
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
client_ip = req.client.host if req else "localhost"
embedding_request1 = EmbeddingRequest(text=request.text1, llm_model_name=request.llm_model_name)
embedding_request2 = EmbeddingRequest(text=request.text2, llm_model_name=request.llm_model_name)
embedding1_response = await get_or_compute_embedding(embedding_request1, client_ip=client_ip)
embedding2_response = await get_or_compute_embedding(embedding_request2, client_ip=client_ip)
embedding1 = np.array(embedding1_response["embedding"])
embedding2 = np.array(embedding2_response["embedding"])
if embedding1.size == 0 or embedding2.size == 0:
raise HTTPException(status_code=400, detail="Could not calculate embeddings for the given texts")
params = {
"vector_1": embedding1.tolist(),
"vector_2": embedding2.tolist(),
"similarity_measure": similarity_measure
}
similarity_stats_str = fvs.py_compute_vector_similarity_stats(json.dumps(params))
similarity_stats_json = json.loads(similarity_stats_str)
if similarity_measure == 'all':
similarity_score = similarity_stats_json
else:
similarity_score = similarity_stats_json.get(similarity_measure, None)
if similarity_score is None:
raise HTTPException(status_code=400, detail="Invalid similarity measure specified")
response_time = datetime.utcnow()
total_time = (response_time - request_time).total_seconds()
logger.info(f"Computed similarity using {similarity_measure} in {total_time} seconds; similarity score: {similarity_score}")
return {
"text1": request.text1,
"text2": request.text2,
"similarity_measure": similarity_measure,
"similarity_score": similarity_score,
"embedding1": embedding1.tolist(),
"embedding2": embedding2.tolist()
}
except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
finally:
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}



Expand Down

0 comments on commit b5a05ed

Please sign in to comment.