Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor any parser classes and update testcases #70

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 35 additions & 91 deletions any_parser/any_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

from any_parser.async_parser import AsyncParser
from any_parser.constants import ProcessType
from any_parser.sync_parser import SyncParser
from any_parser.sync_parser import (
ExtractKeyValueSyncParser,
ExtractPIISyncParser,
ExtractResumeKeyValueSyncParser,
ExtractTablesSyncParser,
ParseSyncParser,
)
from any_parser.utils import validate_file_inputs

PUBLIC_SHARED_BASE_URL = "https://public-api.cambio-ai.com"
Expand Down Expand Up @@ -119,8 +125,14 @@ def __init__(self, api_key: str, base_url: str = PUBLIC_SHARED_BASE_URL) -> None
api_key: Authentication key for API access
base_url: API endpoint URL, defaults to public endpoint
"""
self._sync_parser = SyncParser(api_key, base_url)
self._async_parser = AsyncParser(api_key, base_url)
self._sync_parse = ParseSyncParser(api_key, base_url)
self._sync_extract_key_value = ExtractKeyValueSyncParser(api_key, base_url)
self._sync_extract_resume_key_value = ExtractResumeKeyValueSyncParser(
api_key, base_url
)
self._sync_extract_pii = ExtractPIISyncParser(api_key, base_url)
self._sync_extract_tables = ExtractTablesSyncParser(api_key, base_url)

@handle_file_processing
def parse(
Expand All @@ -141,23 +153,13 @@ def parse(
Returns:
tuple: (result, timing_info) or (error_message, "")
"""
response, info = self._sync_parser.get_sync_response(
self._sync_parser._sync_parse_url,
file_content=file_content, # type: ignore
file_type=file_type, # type: ignore
return self._sync_parse.parse(
file_path=file_path,
file_content=file_content,
file_type=file_type,
extract_args=extract_args,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why we need extract_args for parse?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parse uses VQAProcessor which also has some extract args (VqaProcessorArgs).

)

if response is None:
return info, ""

try:
response_data = response.json()
result = response_data["markdown"]
return result, f"Time Elapsed: {info}"
except json.JSONDecodeError:
return f"Error: Invalid JSON response: {response.text}", ""

@handle_file_processing
def extract_pii(
self,
Expand All @@ -168,23 +170,12 @@ def extract_pii(
"""
Extract PII data from a file synchronously.
"""
response, info = self._sync_parser.get_sync_response(
self._sync_parser._sync_extract_pii,
file_content=file_content, # type: ignore
file_type=file_type, # type: ignore
extract_args=None,
return self._sync_extract_pii.extract(
file_path=file_path,
file_content=file_content,
file_type=file_type,
)

if response is None:
return info, ""

try:
response_data = response.json()
result = response_data["pii_extraction"]
return result, f"Time Elapsed: {info}"
except json.JSONDecodeError:
return f"Error: Invalid JSON response: {response.text}", ""

@handle_file_processing
def extract_tables(
self,
Expand All @@ -199,23 +190,12 @@ def extract_tables(
Returns:
tuple(str, str): The extracted data and the time taken.
"""
response, info = self._sync_parser.get_sync_response(
self._sync_parser._sync_extract_tables,
file_content=file_content, # type: ignore
file_type=file_type, # type: ignore
extract_args=None,
return self._sync_extract_tables.extract(
file_path=file_path,
file_content=file_content,
file_type=file_type,
)

if response is None:
return info, ""

try:
response_data = response.json()
result = response_data["markdown"]
return result, f"Time Elapsed: {info}"
except json.JSONDecodeError:
return f"Error: Invalid JSON response: {response.text}", ""

@handle_file_processing
def extract_key_value(
self,
Expand All @@ -233,23 +213,13 @@ def extract_key_value(
Returns:
tuple(str, str): The extracted data and the time taken.
"""
response, info = self._sync_parser.get_sync_response(
self._sync_parser._sync_extract_key_value,
file_content=file_content, # type: ignore
file_type=file_type, # type: ignore
return self._sync_extract_key_value.extract(
file_path=file_path,
file_content=file_content,
file_type=file_type,
extract_args={"extract_instruction": extract_instruction},
)

if response is None:
return info, ""

try:
response_data = response.json()
result = response_data["json"]
return result, f"Time Elapsed: {info}"
except json.JSONDecodeError:
return f"Error: Invalid JSON response: {response.text}", ""

@handle_file_processing
def extract_resume_key_value(
self, file_path=None, file_content=None, file_type=None
Expand All @@ -270,23 +240,12 @@ def extract_resume_key_value(
- "pii": Personally Identifiable Information - includes
only name, email, and phone
"""
response, info = self._sync_parser.get_sync_response(
self._sync_parser._sync_extract_resume_key_value,
file_content=file_content, # type: ignore
file_type=file_type, # type: ignore
extract_args=None,
return self._sync_extract_resume_key_value.extract(
file_path=file_path,
file_content=file_content,
file_type=file_type,
)

if response is None:
return info, ""

try:
response_data = response.json()
result = response_data["extraction_result"]
return result, f"Time Elapsed: {info}"
except json.JSONDecodeError:
return f"Error: Invalid JSON response: {response.text}", ""

# Example of decorated methods:
@handle_file_processing
def async_parse(
Expand Down Expand Up @@ -425,19 +384,4 @@ def async_fetch(
timeout=TIMEOUT,
)

if response is None:
return "Error: timeout, no response received"
if response.status_code == 200:
result = response.json()
if "json" in result:
return result["json"]
elif "resume_extraction" in result:
return result["resume_extraction"]
elif "pii_extraction" in result:
return result["pii_extraction"]
elif "markdown" in result:
return result["markdown"]
return f"Error: Invalid response format\n {result}"
if response.status_code == 202:
return ""
return f"Error: {response.status_code} {response.text}"
return self._async_parser.handle_async_response(response)
59 changes: 58 additions & 1 deletion any_parser/async_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,48 @@
TIMEOUT = 60


class BasePostProcessor:
def __init__(self, successor=None) -> None:
self.successor = successor

def process(self, json_response: Dict) -> str:
if self.successor:
return self.successor.process(json_response)
return f"Error: Invalid JSON response: {json_response}"


class ParsePostProcessor(BasePostProcessor):
def process(self, json_response: Dict) -> str:
if "markdown" in json_response:
return json_response["markdown"]
return super().process(json_response)


class KeyValuePostProcessor(BasePostProcessor):
def process(self, json_response: Dict) -> str:
if "json" in json_response:
return json_response["json"]
return super().process(json_response)


class ExtractPIIPostProcessor(BasePostProcessor):
def process(self, json_response: Dict) -> str:
if "pii_extraction" in json_response:
return json_response["pii_extraction"]
return super().process(json_response)


class ExtractResumeKeyValuePostProcessor(BasePostProcessor):

def process(self, json_response: Dict) -> str:
if "resume_extraction" in json_response:
return json_response["resume_extraction"]
return super().process(json_response)


class AsyncParser(BaseParser):
def _setup_endpoints(self) -> None:
def __init__(self, api_key: str, base_url: str) -> None:
super().__init__(api_key, base_url)
self._async_upload_url = f"{self._base_url}/async/upload"
self._async_fetch_url = f"{self._base_url}/async/fetch"

Expand Down Expand Up @@ -58,3 +98,20 @@ def send_async_request(

# If response successful, upload the file
return upload_file_to_presigned_url(file_content, response)

def handle_async_response(self, response) -> str:
if response is None:
return "Error: timeout, no response received"
if response.status_code == 202:
return ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we might want a better logging here for 202 to indicate that this is still in progress.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

if response.status_code == 200:
extract_resume_processor = ExtractResumeKeyValuePostProcessor()
key_value_processor = KeyValuePostProcessor(extract_resume_processor)
extract_pii_processor = ExtractPIIPostProcessor(key_value_processor)
handler = ParsePostProcessor(extract_pii_processor)
Comment on lines +108 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting to handle all possible cases in the json_response.

try:
return handler.process(response.json())
except json.JSONDecodeError:
return f"Error: Invalid JSON response: {response.text}"

return f"Error: {response.status_code} {response.text}"
5 changes: 0 additions & 5 deletions any_parser/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,3 @@ def __init__(self, api_key: str, base_url: str) -> None:
"Content-Type": "application/json",
"x-api-key": self._api_key,
}
self._setup_endpoints()

def _setup_endpoints(self) -> None:
"""Setup API endpoints - to be implemented by child classes."""
raise NotImplementedError
Loading
Loading