Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update anyparser to async #35

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 95 additions & 60 deletions any_parser/base.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,58 @@
import json
import time
from datetime import datetime, timedelta

import requests

CAMBIO_UPLOAD_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/upload"
)
CAMBIO_EXTRACT_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/extract"
)
CAMBIO_PARSE_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/parse"
)
CAMBIO_INSTRUCT_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/instruction"
)
CAMBIO_UPLOAD_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/upload"
CAMBIO_REQUEST_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/request"
CAMBIO_QUERY_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/query"


class AnyParser:
def __init__(self, apiKey) -> None:
self._uploadurl = CAMBIO_UPLOAD_URL
self._extracturl = CAMBIO_EXTRACT_URL
self._parseurl = CAMBIO_PARSE_URL
self._instructurl = CAMBIO_INSTRUCT_URL
self._request_header = {"x-api-key": apiKey}
self._requesturl = CAMBIO_REQUEST_URL
self._queryurl = CAMBIO_QUERY_URL
self._request_header = {
"authorizationtoken": "-",
"apikey": apiKey,
}
self.timeout = 60

def query_result(self, payload):
time.sleep(5)
query_timeout = datetime.now() + timedelta(seconds=self.timeout)

while datetime.now() < query_timeout:
query_response = requests.post(
self._queryurl, headers=self._request_header, json=payload
)
assert (
query_response.status_code == 200 or query_response.status_code == 202
)

if query_response.status_code == 200:
break
elif query_response.status_code == 202:
time.sleep(5)
continue

def setAPIKey(self, apiKey):
self._request_header = {"x-api-key": apiKey}
return query_response

def extract(self, file_path):
user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path)
result = self._request_file_extraction(user_id, job_id, s3_key)
return json.loads(result)["result"]
user_id, file_id = self._request_and_upload_by_apiKey(file_path)
result = self._request_file_extraction(user_id, file_id)
return result

def parse(self, file_path, prompt="", mode="advanced"):
user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path)
result = self._request_info_extraction(user_id, job_id, s3_key, mode, prompt)
return json.loads(result)["result"]
user_id, file_id = self._request_and_upload_by_apiKey(file_path)
result = self._request_info_extraction(user_id, file_id)
return result

def instruct(self, file_path, prompt="", mode="advanced"):
user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path)
result = self._request_instruction_extraction(
user_id, job_id, s3_key, mode, prompt
)
return json.loads(result)["result"]
user_id, file_id = self._request_and_upload_by_apiKey(file_path)
result = self._request_instruction_extraction(user_id, file_id)
return result

def _error_handler(self, response):
if response.status_code == 403:
Expand All @@ -59,68 +69,93 @@ def _request_and_upload_by_apiKey(self, file_path):
)

if response.status_code == 200:
url_info = response.json()["presignedUrl"]
uid = response.json()["userId"]
jid = response.json()["jobId"]
user_id = response.json().get("userId")
file_id = response.json().get("fileId")
presigned_url = response.json().get("presignedUrl")
with open(file_path, "rb") as file_to_upload:
files = {"file": (file_path, file_to_upload)}
upload_response = requests.post(
url_info["url"], data=url_info["fields"], files=files
requests.post(
presigned_url["url"],
data=presigned_url["fields"],
files=files,
timeout=30, # Add a timeout argument to prevent the program from hanging indefinitely
)
# print(f"Upload response: {upload_response.status_code}")
return uid, jid, url_info["fields"]["key"]
return user_id, file_id

self._error_handler(response)

def _request_file_extraction(self, user_id, job_id, s3_key):
def _request_file_extraction(self, user_id, file_id):
payload = {
"userId": user_id,
"jobId": job_id,
"fileKey": s3_key,
"files": [{"sourceType": "s3", "fileId": file_id}],
"jobType": "file_extraction",
}
response = requests.post(
self._extracturl, headers=self._request_header, json=payload
self._requesturl, headers=self._request_header, json=payload
)
print(response.json())

if response.status_code == 200:
file_extraction_job_id = response.json().get("jobId")
payload = {
"userId": user_id,
"jobId": file_extraction_job_id,
"queryType": "job_result",
}

query_response = self.query_result(payload)

# print("Extraction success.")
return response.text
return query_response.json()

self._error_handler(response)

def _request_info_extraction(self, user_id, job_id, s3_key, mode, prompt=""):
if mode not in ["advanced", "basic"]:
raise ValueError("Invalid mode. Choose either 'advanced' or 'basic'.")
def _request_info_extraction(self, user_id, file_id):

payload = {
"userId": user_id,
"jobId": job_id,
"fileKey": s3_key,
"userPrompt": prompt,
"files": [{"sourceType": "s3", "fileId": file_id}],
"jobType": "info_extraction",
}
response = requests.post(
self._parseurl, headers=self._request_header, json=payload
self._requesturl, headers=self._request_header, json=payload
)

if response.status_code == 200:
info_extraction_job_id = response.json().get("jobId")
payload = {
"userId": user_id,
"jobId": info_extraction_job_id,
"queryType": "job_result",
}

query_response = self.query_result(payload)

# print("Extraction success.")
return response.text
return query_response.json()

self._error_handler(response)

def _request_instruction_extraction(self, user_id, job_id, s3_key, mode, prompt=""):
if mode not in ["advanced", "basic"]:
raise ValueError("Invalid mode. Choose either 'advanced' or 'basic'.")
def _request_instruction_extraction(self, user_id, file_id, prompt=""):
payload = {
"userId": user_id,
"jobId": job_id,
"fileKey": s3_key,
"userPrompt": prompt,
"files": [{"sourceType": "s3", "fileId": file_id}],
"jobType": "instruction_extraction",
"jobParams": {"userPrompt": prompt},
}
response = requests.post(
self._instructurl, headers=self._request_header, json=payload
self._requesturl, headers=self._request_header, json=payload
)

if response.status_code == 200:
return response.text
instruction_extraction_job_id = response.json().get("jobId")
payload = {
"userId": user_id,
"jobId": instruction_extraction_job_id,
"queryType": "job_result",
}

query_response = self.query_result(payload)

# print("Extraction success.")
return query_response.json()

self._error_handler(response)
9 changes: 7 additions & 2 deletions examples/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
print(content_result)

print("information extraction test:")
example_prompt = "Return table under Investor Metrics in JSON format with year as the key and the column as subkeys."
qa_result = op.parse(example_local_file, example_prompt, mode="basic")
qa_result = op.parse(example_local_file)
print(type(qa_result))
print(qa_result)

print("instruction extraction test:")
example_prompt = "Return table under Investor Metrics in JSON format with year as the key and the column as subkeys."
instruction_result = op.instruct(example_local_file, prompt=example_prompt)
print(type(instruction_result))
print(instruction_result)
Loading