Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batch api #71

Merged
merged 2 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ file_id = ap.async_parse(file_path="./data/test.pdf")
markdown = ap.async_fetch(file_id=file_id)
```

### 5. Run Batch Extraction (Beta)
For batch extraction, send the file to begin processing and fetch results later:
```python
# Send the file to begin batch extraction
response = ap.batches.create(file_path="./data/test.pdf")
request_id = response.requestId

# Fetch the extracted content using the request ID
markdown = ap.batches.retrieve(request_id)
```

> ⚠️ **Note:** Batch extraction is currently in beta testing. Processing time may take up to 12 hours to complete.
Comment on lines +68 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job in updating the README.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make sure you update the README that the cambioml.com website created API key will not be added into the batch api usage group and user should contact us to make sure your API key will have batch process permission manually added at this moment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let me update the readme here.


## :scroll: Examples
Check out these examples to see how you can utilize **AnyParser** to extract text, numbers, and symbols in fewer than 10 lines of code!

Expand Down
6 changes: 6 additions & 0 deletions any_parser/any_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests

from any_parser.async_parser import AsyncParser
from any_parser.batch_parser import BatchParser
from any_parser.constants import ProcessType
from any_parser.sync_parser import (
ExtractKeyValueSyncParser,
Expand All @@ -20,6 +21,10 @@
from any_parser.utils import validate_file_inputs

PUBLIC_SHARED_BASE_URL = "https://public-api.cambio-ai.com"
# TODO: Update this to the correct batch endpoint
PUBLIC_BATCH_BASE_URL = (
"http://AnyPar-ApiCo-cuKOBXasmUF1-1986145995.us-west-2.elb.amazonaws.com"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me link this to our DNS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image I just created batch-api domain name forward for batch-api.cambio-ai.com. However, one thing I am not 100% sure is that for API gateway, we used to setup custom domain names in it. This is a directly public elb, so do we have to do anything on the AWS side besides the squarespace domains work that I have already done. I can merge this in first and you can address in a future PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested http://batch-api.cambio-ai.com, and the domain name forward is working.

)
TIMEOUT = 60


Expand Down Expand Up @@ -133,6 +138,7 @@ def __init__(self, api_key: str, base_url: str = PUBLIC_SHARED_BASE_URL) -> None
)
self._sync_extract_pii = ExtractPIISyncParser(api_key, base_url)
self._sync_extract_tables = ExtractTablesSyncParser(api_key, base_url)
self.batches = BatchParser(api_key, PUBLIC_BATCH_BASE_URL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use _batch to indicate this is a private method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we pass another batch_url and input and pass in the PUBLIC_BATCH_BASE_URL as argument to improve readability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial design is, I want the usage to be such that the user can use ap.batches.retrieve(request_id) to invoke the API, similar to what OpenAI does. Let's sync later offline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we pass another batch_url and input and pass in the PUBLIC_BATCH_BASE_URL as argument to improve readability.

Let me update this one


@handle_file_processing
def parse(
Expand Down
113 changes: 113 additions & 0 deletions any_parser/batch_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Batch parser implementation."""

from typing import List, Optional

import requests
from pydantic import BaseModel, Field

from any_parser.base_parser import BaseParser

TIMEOUT = 60


class UploadResponse(BaseModel):
fileName: str
requestId: str
requestStatus: str


class UsageResponse(BaseModel):
pageLimit: int
pageRemaining: int


class FileStatusResponse(BaseModel):
fileName: str
fileType: str
requestId: str
requestStatus: str
uploadTime: str
completionTime: Optional[str] = None
result: Optional[List[str]] = Field(default_factory=list)
error: Optional[List[str]] = Field(default_factory=list)
Comment on lines +13 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a docstring and describe what each attribute means to ease the review process.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring added. Thanks!



class BatchParser(BaseParser):
def __init__(self, api_key: str, base_url: str) -> None:
super().__init__(api_key, base_url)
self._file_upload_url = f"{self._base_url}/files/"
self._processing_status_url = f"{self._base_url}/files/" + "{request_id}"
self._usage_url = f"{self._base_url}/users/current/usage"

# remove "Content-Type" from headers
self._headers.pop("Content-Type")
Comment on lines +42 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why we need to remove this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to upload a file in our request using batch api in any parser SDK, and the Content-Type in base-parser does not work for this scenario.


def create(self, file_path: str) -> UploadResponse:
"""Upload a single file for batch processing.

Args:
file_path: Path to the file to upload

Returns:
FileUploadResponse object containing upload details
"""
with open(file_path, "rb") as f:
files = {"file": f}
response = requests.post(
self._file_upload_url,
headers=self._headers,
files=files,
timeout=TIMEOUT,
)
print(response.json())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's not use print, but logger.info


if response.status_code != 200:
raise Exception(f"Upload failed: {response.text}")

data = response.json()
return UploadResponse(
fileName=data["fileName"],
requestId=data["requestId"],
requestStatus=data["requestStatus"],
)

def retrieve(self, request_id: str) -> FileStatusResponse:
"""Get the processing status of a file.

Args:
request_id: The ID of the file processing request

Returns:
FileProcessingStatus object containing status details
"""
response = requests.get(
self._processing_status_url.format(request_id=request_id),
headers=self._headers,
timeout=TIMEOUT,
)

if response.status_code != 200:
raise Exception(f"Status check failed: {response.text}")

data = response.json()
return FileStatusResponse(**data)

def get_usage(self) -> UsageResponse:
"""Get current usage information.

Returns:
UsageResponse object containing usage details
"""
response = requests.get(
self._usage_url,
headers=self._headers,
timeout=TIMEOUT,
)

if response.status_code != 200:
raise Exception(f"Usage check failed: {response.text}")

data = response.json()
return UsageResponse(
pageLimit=data["pageLimit"], pageRemaining=data["pageRemaining"]
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ readme = "README.md"
python = ">=3.9,<3.13"
requests = "^2.25.0"
python-dotenv = "^1.0.0"
pydantic = "^2.10.3"

[tool.poetry.group.dev.dependencies]
Levenshtein = [
Expand Down
37 changes: 37 additions & 0 deletions tests/test_batch_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Testing Batch API Extraction"""

import os
import sys
import unittest

from dotenv import load_dotenv

sys.path.append(".")
load_dotenv(override=True)
from any_parser import AnyParser # noqa: E402


class TestAnyParserBatchAPI(unittest.TestCase):
"""Testing Any Parser Batch API"""

def setUp(self):
self.api_key = os.environ.get("CAMBIO_API_KEY")
if not self.api_key:
raise ValueError("CAMBIO_API_KEY is not set")
self.ap = AnyParser(self.api_key)

def test_batch_api_create(self):
"""Batch API Create"""
working_file = "./examples/sample_data/stoxx_index_guide_0003.pdf"

response = self.ap.batches.create(working_file)

self.assertIsNotNone(response)
self.assertEqual(response.requestStatus, "UPLOADED")

request_id = response.requestId
status = self.ap.batches.retrieve(request_id)
self.assertEqual(status.requestStatus, "UPLOADED")

quota = self.ap.batches.get_usage()
self.assertGreaterEqual(quota.pageRemaining, 0)
Loading