Skip to content

Commit

Permalink
Merge pull request #35 from SeisSerenata/main
Browse files Browse the repository at this point in the history
feat: update anyparser to async
  • Loading branch information
Cambio ML authored Jun 7, 2024
2 parents 7573a26 + 67b586b commit 0b0862a
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 62 deletions.
155 changes: 95 additions & 60 deletions any_parser/base.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,58 @@
import json
import time
from datetime import datetime, timedelta

import requests

CAMBIO_UPLOAD_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/upload"
)
CAMBIO_EXTRACT_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/extract"
)
CAMBIO_PARSE_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/parse"
)
CAMBIO_INSTRUCT_URL = (
"https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/instruction"
)
CAMBIO_UPLOAD_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/upload"
CAMBIO_REQUEST_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/request"
CAMBIO_QUERY_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/query"


class AnyParser:
def __init__(self, apiKey) -> None:
self._uploadurl = CAMBIO_UPLOAD_URL
self._extracturl = CAMBIO_EXTRACT_URL
self._parseurl = CAMBIO_PARSE_URL
self._instructurl = CAMBIO_INSTRUCT_URL
self._request_header = {"x-api-key": apiKey}
self._requesturl = CAMBIO_REQUEST_URL
self._queryurl = CAMBIO_QUERY_URL
self._request_header = {
"authorizationtoken": "-",
"apikey": apiKey,
}
self.timeout = 60

def query_result(self, payload):
time.sleep(5)
query_timeout = datetime.now() + timedelta(seconds=self.timeout)

while datetime.now() < query_timeout:
query_response = requests.post(
self._queryurl, headers=self._request_header, json=payload
)
assert (
query_response.status_code == 200 or query_response.status_code == 202
)

if query_response.status_code == 200:
break
elif query_response.status_code == 202:
time.sleep(5)
continue

def setAPIKey(self, apiKey):
self._request_header = {"x-api-key": apiKey}
return query_response

def extract(self, file_path):
user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path)
result = self._request_file_extraction(user_id, job_id, s3_key)
return json.loads(result)["result"]
user_id, file_id = self._request_and_upload_by_apiKey(file_path)
result = self._request_file_extraction(user_id, file_id)
return result

def parse(self, file_path, prompt="", mode="advanced"):
user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path)
result = self._request_info_extraction(user_id, job_id, s3_key, mode, prompt)
return json.loads(result)["result"]
user_id, file_id = self._request_and_upload_by_apiKey(file_path)
result = self._request_info_extraction(user_id, file_id)
return result

def instruct(self, file_path, prompt="", mode="advanced"):
user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path)
result = self._request_instruction_extraction(
user_id, job_id, s3_key, mode, prompt
)
return json.loads(result)["result"]
user_id, file_id = self._request_and_upload_by_apiKey(file_path)
result = self._request_instruction_extraction(user_id, file_id)
return result

def _error_handler(self, response):
if response.status_code == 403:
Expand All @@ -59,68 +69,93 @@ def _request_and_upload_by_apiKey(self, file_path):
)

if response.status_code == 200:
url_info = response.json()["presignedUrl"]
uid = response.json()["userId"]
jid = response.json()["jobId"]
user_id = response.json().get("userId")
file_id = response.json().get("fileId")
presigned_url = response.json().get("presignedUrl")
with open(file_path, "rb") as file_to_upload:
files = {"file": (file_path, file_to_upload)}
upload_response = requests.post(
url_info["url"], data=url_info["fields"], files=files
requests.post(
presigned_url["url"],
data=presigned_url["fields"],
files=files,
timeout=30, # Add a timeout argument to prevent the program from hanging indefinitely
)
# print(f"Upload response: {upload_response.status_code}")
return uid, jid, url_info["fields"]["key"]
return user_id, file_id

self._error_handler(response)

def _request_file_extraction(self, user_id, job_id, s3_key):
def _request_file_extraction(self, user_id, file_id):
payload = {
"userId": user_id,
"jobId": job_id,
"fileKey": s3_key,
"files": [{"sourceType": "s3", "fileId": file_id}],
"jobType": "file_extraction",
}
response = requests.post(
self._extracturl, headers=self._request_header, json=payload
self._requesturl, headers=self._request_header, json=payload
)
print(response.json())

if response.status_code == 200:
file_extraction_job_id = response.json().get("jobId")
payload = {
"userId": user_id,
"jobId": file_extraction_job_id,
"queryType": "job_result",
}

query_response = self.query_result(payload)

# print("Extraction success.")
return response.text
return query_response.json()

self._error_handler(response)

def _request_info_extraction(self, user_id, job_id, s3_key, mode, prompt=""):
if mode not in ["advanced", "basic"]:
raise ValueError("Invalid mode. Choose either 'advanced' or 'basic'.")
def _request_info_extraction(self, user_id, file_id):

payload = {
"userId": user_id,
"jobId": job_id,
"fileKey": s3_key,
"userPrompt": prompt,
"files": [{"sourceType": "s3", "fileId": file_id}],
"jobType": "info_extraction",
}
response = requests.post(
self._parseurl, headers=self._request_header, json=payload
self._requesturl, headers=self._request_header, json=payload
)

if response.status_code == 200:
info_extraction_job_id = response.json().get("jobId")
payload = {
"userId": user_id,
"jobId": info_extraction_job_id,
"queryType": "job_result",
}

query_response = self.query_result(payload)

# print("Extraction success.")
return response.text
return query_response.json()

self._error_handler(response)

def _request_instruction_extraction(self, user_id, job_id, s3_key, mode, prompt=""):
if mode not in ["advanced", "basic"]:
raise ValueError("Invalid mode. Choose either 'advanced' or 'basic'.")
def _request_instruction_extraction(self, user_id, file_id, prompt=""):
payload = {
"userId": user_id,
"jobId": job_id,
"fileKey": s3_key,
"userPrompt": prompt,
"files": [{"sourceType": "s3", "fileId": file_id}],
"jobType": "instruction_extraction",
"jobParams": {"userPrompt": prompt},
}
response = requests.post(
self._instructurl, headers=self._request_header, json=payload
self._requesturl, headers=self._request_header, json=payload
)

if response.status_code == 200:
return response.text
instruction_extraction_job_id = response.json().get("jobId")
payload = {
"userId": user_id,
"jobId": instruction_extraction_job_id,
"queryType": "job_result",
}

query_response = self.query_result(payload)

# print("Extraction success.")
return query_response.json()

self._error_handler(response)
9 changes: 7 additions & 2 deletions examples/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
print(content_result)

print("information extraction test:")
example_prompt = "Return table under Investor Metrics in JSON format with year as the key and the column as subkeys."
qa_result = op.parse(example_local_file, example_prompt, mode="basic")
qa_result = op.parse(example_local_file)
print(type(qa_result))
print(qa_result)

print("instruction extraction test:")
example_prompt = "Return table under Investor Metrics in JSON format with year as the key and the column as subkeys."
instruction_result = op.instruct(example_local_file, prompt=example_prompt)
print(type(instruction_result))
print(instruction_result)

0 comments on commit 0b0862a

Please sign in to comment.