Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Dicklesworthstone committed May 21, 2024
1 parent c084065 commit 2441f8a
Showing 1 changed file with 73 additions and 67 deletions.
140 changes: 73 additions & 67 deletions swiss_army_llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,74 +825,80 @@ async def get_all_embedding_vectors_for_document(
temp_file_path = await download_file(url, size, hash)
else:
raise HTTPException(status_code=400, detail="Invalid input. Provide either a file or URL with hash and size.")
# Verify file integrity
hash_obj = sha3_256()
with open(temp_file_path, 'rb') as buffer:
for chunk in iter(lambda: buffer.read(chunk_size), b''):
hash_obj.update(chunk)
file_hash = hash_obj.hexdigest()
logger.info(f"SHA3-256 hash of submitted file: {file_hash}")
if corpus_identifier_string is None:
corpus_identifier_string = file_hash
unique_id = f"document_embedding_{file_hash}_{llm_model_name}"
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
async with AsyncSession() as session:
result = await session.execute(select(DocumentEmbedding).filter(DocumentEmbedding.file_hash == file_hash, DocumentEmbedding.llm_model_name == llm_model_name))
existing_document_embedding = result.scalar_one_or_none()
if existing_document_embedding:
logger.info(f"Document {file.filename if file else url} has been processed before, returning existing result")
json_content = json.dumps(existing_document_embedding.document_embedding_results_json).encode()

try:
# Verify file integrity
hash_obj = sha3_256()
with open(temp_file_path, 'rb') as buffer:
for chunk in iter(lambda: buffer.read(chunk_size), b''):
hash_obj.update(chunk)
file_hash = hash_obj.hexdigest()
logger.info(f"SHA3-256 hash of submitted file: {file_hash}")
if corpus_identifier_string is None:
corpus_identifier_string = file_hash
unique_id = f"document_embedding_{file_hash}_{llm_model_name}"
lock = await shared_resources.lock_manager.lock(unique_id)
if lock.valid:
try:
async with AsyncSession() as session:
result = await session.execute(select(DocumentEmbedding).filter(DocumentEmbedding.file_hash == file_hash, DocumentEmbedding.llm_model_name == llm_model_name))
existing_document_embedding = result.scalar_one_or_none()
if existing_document_embedding:
logger.info(f"Document {file.filename if file else url} has been processed before, returning existing result")
json_content = json.dumps(existing_document_embedding.document_embedding_results_json).encode()
else:
await read_and_rewrite_file_with_safe_encoding(temp_file_path)
with open(temp_file_path, 'rb') as file:
input_data_binary = file.read()
result = magika.identify_bytes(input_data_binary)
mime_type = result.output.mime_type
logger.info(f"Received request to extract embeddings for document {file.filename if file else url} with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path)} bytes from IP address: {client_ip}")
sentences = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type)
input_data = {
"sentences": sentences,
"file_size_mb": os.path.getsize(temp_file_path) / (1024 * 1024),
"mime_type": mime_type
}
context = start_resource_monitoring("get_all_embedding_vectors_for_document", input_data, client_ip)
try:
results = await compute_embeddings_for_document(sentences, llm_model_name, client_ip, file_hash)
except Exception as e:
logger.error(f"Error while computing embeddings for document: {e}")
traceback.print_exc()
raise HTTPException(status_code=400, detail="Error while computing embeddings for document")
finally:
end_resource_monitoring(context)
df = pd.DataFrame(results, columns=['text', 'embedding'])
json_content = df.to_json(orient=json_format or 'records').encode()
with open(temp_file_path, 'rb') as file_buffer:
original_file_content = file_buffer.read()
await store_document_embeddings_in_db(file, file_hash, original_file_content, json_content, results, llm_model_name, client_ip, request_time, corpus_identifier_string)
overall_total_time = (datetime.utcnow() - request_time).total_seconds()
logger.info(f"Done getting all embeddings for document {file.filename if file else url} containing {len(sentences)} sentences with model {llm_model_name}")
json_content_length = len(json_content)
if json_content_length > 0:
logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (len(sentences) / 1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length) / 1000000.0)} seconds per million output characters.")
if send_back_json_or_zip_file == 'json':
logger.info(f"Returning JSON response for document {file.filename if file else url} containing {len(sentences)} sentences with model {llm_model_name}; first 100 characters out of {json_content_length} total of JSON response: {json_content[:100]}")
return JSONResponse(content=json.loads(json_content.decode()))
else:
await read_and_rewrite_file_with_safe_encoding(temp_file_path)
with open(temp_file_path, 'rb') as file:
input_data_binary = file.read()
result = magika.identify_bytes(input_data_binary)
mime_type = result.output.mime_type
logger.info(f"Received request to extract embeddings for document {file.filename if file else url} with MIME type: {mime_type} and size: {os.path.getsize(temp_file_path)} bytes from IP address: {client_ip}")
sentences = await parse_submitted_document_file_into_sentence_strings_func(temp_file_path, mime_type)
input_data = {
"sentences": sentences,
"file_size_mb": os.path.getsize(temp_file_path) / (1024 * 1024),
"mime_type": mime_type
}
context = start_resource_monitoring("get_all_embedding_vectors_for_document", input_data, client_ip)
try:
results = await compute_embeddings_for_document(sentences, llm_model_name, client_ip, file_hash)
except Exception as e:
logger.error(f"Error while computing embeddings for document: {e}")
traceback.print_exc()
raise HTTPException(status_code=400, detail="Error while computing embeddings for document")
finally:
end_resource_monitoring(context)
df = pd.DataFrame(results, columns=['text', 'embedding'])
json_content = df.to_json(orient=json_format or 'records').encode()
with open(temp_file_path, 'rb') as file_buffer:
original_file_content = file_buffer.read()
await store_document_embeddings_in_db(file, file_hash, original_file_content, json_content, results, llm_model_name, client_ip, request_time, corpus_identifier_string)
overall_total_time = (datetime.utcnow() - request_time).total_seconds()
logger.info(f"Done getting all embeddings for document {file.filename if file else url} containing {len(sentences)} sentences with model {llm_model_name}")
json_content_length = len(json_content)
if json_content_length > 0:
logger.info(f"The response took {overall_total_time} seconds to generate, or {overall_total_time / (len(sentences) / 1000.0)} seconds per thousand input tokens and {overall_total_time / (float(json_content_length) / 1000000.0)} seconds per million output characters.")
if send_back_json_or_zip_file == 'json':
logger.info(f"Returning JSON response for document {file.filename if file else url} containing {len(sentences)} sentences with model {llm_model_name}; first 100 characters out of {json_content_length} total of JSON response: {json_content[:100]}")
return JSONResponse(content=json.loads(json_content.decode()))
else:
original_filename_without_extension, _ = os.path.splitext(file.filename if file else os.path.basename(url))
json_file_path = f"/tmp/{original_filename_without_extension}.json"
with open(json_file_path, 'wb') as json_file:
json_file.write(json_content)
zip_file_path = f"/tmp/{original_filename_without_extension}.zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(json_file_path, os.path.basename(json_file_path))
logger.info(f"Returning ZIP response for document {file.filename if file else url} containing {len(sentences)} sentences with model {llm_model_name}; first 100 characters out of {json_content_length} total of JSON response: {json_content[:100]}")
return FileResponse(zip_file_path, headers={"Content-Disposition": f"attachment; filename={original_filename_without_extension}.zip"})
finally:
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}
original_filename_without_extension, _ = os.path.splitext(file.filename if file else os.path.basename(url))
json_file_path = f"/tmp/{original_filename_without_extension}.json"
with open(json_file_path, 'wb') as json_file:
json_file.write(json_content)
zip_file_path = f"/tmp/{original_filename_without_extension}.zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(json_file_path, os.path.basename(json_file_path))
logger.info(f"Returning ZIP response for document {file.filename if file else url} containing {len(sentences)} sentences with model {llm_model_name}; first 100 characters out of {json_content_length} total of JSON response: {json_content[:100]}")
return FileResponse(zip_file_path, headers={"Content-Disposition": f"attachment; filename={original_filename_without_extension}.zip"})
finally:
await shared_resources.lock_manager.unlock(lock)
else:
return {"status": "already processing"}
except Exception as e:
logger.error(f"Error in processing: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail="An error occurred while processing the request.")



Expand Down

0 comments on commit 2441f8a

Please sign in to comment.