diff --git a/any_parser/base.py b/any_parser/base.py index 7b0cd83..d9f634e 100644 --- a/any_parser/base.py +++ b/any_parser/base.py @@ -1,48 +1,58 @@ -import json +import time +from datetime import datetime, timedelta import requests -CAMBIO_UPLOAD_URL = ( - "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/upload" -) -CAMBIO_EXTRACT_URL = ( - "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/extract" -) -CAMBIO_PARSE_URL = ( - "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/parse" -) -CAMBIO_INSTRUCT_URL = ( - "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/cambio_api/instruction" -) +CAMBIO_UPLOAD_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/upload" +CAMBIO_REQUEST_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/request" +CAMBIO_QUERY_URL = "https://qreije6m7l.execute-api.us-west-2.amazonaws.com/v1/query" class AnyParser: def __init__(self, apiKey) -> None: self._uploadurl = CAMBIO_UPLOAD_URL - self._extracturl = CAMBIO_EXTRACT_URL - self._parseurl = CAMBIO_PARSE_URL - self._instructurl = CAMBIO_INSTRUCT_URL - self._request_header = {"x-api-key": apiKey} + self._requesturl = CAMBIO_REQUEST_URL + self._queryurl = CAMBIO_QUERY_URL + self._request_header = { + "authorizationtoken": "-", + "apikey": apiKey, + } + self.timeout = 60 + + def query_result(self, payload): + time.sleep(5) + query_timeout = datetime.now() + timedelta(seconds=self.timeout) + + while datetime.now() < query_timeout: + query_response = requests.post( + self._queryurl, headers=self._request_header, json=payload + ) + assert ( + query_response.status_code == 200 or query_response.status_code == 202 + ) + + if query_response.status_code == 200: + break + elif query_response.status_code == 202: + time.sleep(5) + continue - def setAPIKey(self, apiKey): - self._request_header = {"x-api-key": apiKey} + return query_response def extract(self, file_path): - user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path) - result = self._request_file_extraction(user_id, job_id, s3_key) - return json.loads(result)["result"] + user_id, file_id = self._request_and_upload_by_apiKey(file_path) + result = self._request_file_extraction(user_id, file_id) + return result def parse(self, file_path, prompt="", mode="advanced"): - user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path) - result = self._request_info_extraction(user_id, job_id, s3_key, mode, prompt) - return json.loads(result)["result"] + user_id, file_id = self._request_and_upload_by_apiKey(file_path) + result = self._request_info_extraction(user_id, file_id) + return result def instruct(self, file_path, prompt="", mode="advanced"): - user_id, job_id, s3_key = self._request_and_upload_by_apiKey(file_path) - result = self._request_instruction_extraction( - user_id, job_id, s3_key, mode, prompt - ) - return json.loads(result)["result"] + user_id, file_id = self._request_and_upload_by_apiKey(file_path) + result = self._request_instruction_extraction(user_id, file_id) + return result def _error_handler(self, response): if response.status_code == 403: @@ -59,68 +69,93 @@ def _request_and_upload_by_apiKey(self, file_path): ) if response.status_code == 200: - url_info = response.json()["presignedUrl"] - uid = response.json()["userId"] - jid = response.json()["jobId"] + user_id = response.json().get("userId") + file_id = response.json().get("fileId") + presigned_url = response.json().get("presignedUrl") with open(file_path, "rb") as file_to_upload: files = {"file": (file_path, file_to_upload)} - upload_response = requests.post( - url_info["url"], data=url_info["fields"], files=files + requests.post( + presigned_url["url"], + data=presigned_url["fields"], + files=files, + timeout=30, # Add a timeout argument to prevent the program from hanging indefinitely ) # print(f"Upload response: {upload_response.status_code}") - return uid, jid, url_info["fields"]["key"] + return user_id, file_id self._error_handler(response) - def _request_file_extraction(self, user_id, job_id, s3_key): + def _request_file_extraction(self, user_id, file_id): payload = { - "userId": user_id, - "jobId": job_id, - "fileKey": s3_key, + "files": [{"sourceType": "s3", "fileId": file_id}], + "jobType": "file_extraction", } response = requests.post( - self._extracturl, headers=self._request_header, json=payload + self._requesturl, headers=self._request_header, json=payload ) + print(response.json()) if response.status_code == 200: + file_extraction_job_id = response.json().get("jobId") + payload = { + "userId": user_id, + "jobId": file_extraction_job_id, + "queryType": "job_result", + } + + query_response = self.query_result(payload) + # print("Extraction success.") - return response.text + return query_response.json() self._error_handler(response) - def _request_info_extraction(self, user_id, job_id, s3_key, mode, prompt=""): - if mode not in ["advanced", "basic"]: - raise ValueError("Invalid mode. Choose either 'advanced' or 'basic'.") + def _request_info_extraction(self, user_id, file_id): + payload = { - "userId": user_id, - "jobId": job_id, - "fileKey": s3_key, - "userPrompt": prompt, + "files": [{"sourceType": "s3", "fileId": file_id}], + "jobType": "info_extraction", } response = requests.post( - self._parseurl, headers=self._request_header, json=payload + self._requesturl, headers=self._request_header, json=payload ) if response.status_code == 200: + info_extraction_job_id = response.json().get("jobId") + payload = { + "userId": user_id, + "jobId": info_extraction_job_id, + "queryType": "job_result", + } + + query_response = self.query_result(payload) + # print("Extraction success.") - return response.text + return query_response.json() self._error_handler(response) - def _request_instruction_extraction(self, user_id, job_id, s3_key, mode, prompt=""): - if mode not in ["advanced", "basic"]: - raise ValueError("Invalid mode. Choose either 'advanced' or 'basic'.") + def _request_instruction_extraction(self, user_id, file_id, prompt=""): payload = { - "userId": user_id, - "jobId": job_id, - "fileKey": s3_key, - "userPrompt": prompt, + "files": [{"sourceType": "s3", "fileId": file_id}], + "jobType": "instruction_extraction", + "jobParams": {"userPrompt": prompt}, } response = requests.post( - self._instructurl, headers=self._request_header, json=payload + self._requesturl, headers=self._request_header, json=payload ) if response.status_code == 200: - return response.text + instruction_extraction_job_id = response.json().get("jobId") + payload = { + "userId": user_id, + "jobId": instruction_extraction_job_id, + "queryType": "job_result", + } + + query_response = self.query_result(payload) + + # print("Extraction success.") + return query_response.json() self._error_handler(response) diff --git a/examples/test_example.py b/examples/test_example.py index 3e0b254..029aeee 100755 --- a/examples/test_example.py +++ b/examples/test_example.py @@ -24,7 +24,12 @@ print(content_result) print("information extraction test:") - example_prompt = "Return table under Investor Metrics in JSON format with year as the key and the column as subkeys." - qa_result = op.parse(example_local_file, example_prompt, mode="basic") + qa_result = op.parse(example_local_file) print(type(qa_result)) print(qa_result) + + print("instruction extraction test:") + example_prompt = "Return table under Investor Metrics in JSON format with year as the key and the column as subkeys." + instruction_result = op.instruct(example_local_file, prompt=example_prompt) + print(type(instruction_result)) + print(instruction_result)