Skip to content

Commit

Permalink
Merge pull request #74 from CambioML/1219
Browse files Browse the repository at this point in the history
feat: Add input folder support for batch api
  • Loading branch information
boqiny authored Dec 31, 2024
2 parents 651e7df + 6e24a85 commit f225a4b
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 21 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ request_id = response.requestId
markdown = ap.batches.retrieve(request_id)
```

Batch API for folder input:
```python
# Send the folder to begin batch extraction
WORKING_FOLDER = "./sample_data"
# This will generate a jsonl with filename and requestID
response = ap.batches.create(WORKING_FOLDER)
```

Each response in the JSONL file contains:
- The filename
- A unique request ID
- Additional processing metadata
You can later use these request IDs to retrieve the extracted content for each file:

```python
# Fetch the extracted content using the request ID from the jsonl file
markdown = ap.batches.retrieve(request_id)
```
For more details about code implementation of batch API, refer to
[examples/parse_batch_upload.py](examples/parse_batch_upload.py) and [examples/parse_batch_fetch.py](examples/parse_batch_fetch.py)

> ⚠️ **Note:** Batch extraction is currently in beta testing. Processing time may take up to 12 hours to complete.
>
> ⚠️ **Important:** API keys generated from cambioml.com do not automatically have batch processing permissions. Please contact [email protected] to request batch processing access for your API key.
Expand Down
60 changes: 55 additions & 5 deletions any_parser/batch_parser.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""Batch parser implementation."""

import logging
import os
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Optional, Union

import requests
from pydantic import BaseModel, Field

from any_parser.base_parser import BaseParser

TIMEOUT = 60
MAX_WORKERS = 10

logger = logging.getLogger(__name__)


class UploadResponse(BaseModel):
Expand Down Expand Up @@ -55,15 +61,26 @@ def __init__(self, api_key: str, base_url: str) -> None:
# remove "Content-Type" from headers
self._headers.pop("Content-Type")

def create(self, file_path: str) -> UploadResponse:
"""Upload a single file for batch processing.
def create(self, file_path: str) -> Union[UploadResponse, List[UploadResponse]]:
"""Upload a single file or folder for batch processing.
Args:
file_path: Path to the file to upload
file_path: Path to the file or folder to upload
Returns:
FileUploadResponse object containing upload details
If file: Single UploadResponse object containing upload details
If folder: List of UploadResponse objects for each file
"""
path = Path(file_path)
if path.is_file():
return self._upload_single_file(path)
elif path.is_dir():
return self._upload_folder(path)
else:
raise ValueError(f"Path {file_path} does not exist")

def _upload_single_file(self, file_path: Path) -> UploadResponse:
"""Upload a single file for batch processing."""
if not os.path.isfile(file_path):
raise FileNotFoundError(f"The file path '{file_path}' does not exist.")

Expand All @@ -86,6 +103,39 @@ def create(self, file_path: str) -> UploadResponse:
requestStatus=data["requestStatus"],
)

def _upload_folder(self, folder_path: Path) -> List[UploadResponse]:
"""Upload all files in a folder for batch processing.
Args:
folder_path: Path to the folder containing files to upload
Returns:
List of UploadResponse objects for each uploaded file
"""
# Get all files in folder and subfolders
files = []
for root, _, filenames in os.walk(folder_path):
for filename in filenames:
files.append(Path(root) / filename)

# Upload files concurrently using thread pool
responses = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_file = {
executor.submit(self._upload_single_file, file_path): file_path
for file_path in files
}

for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
response = future.result()
responses.append(response)
except Exception as e:
logger.error(f"Failed to upload {file_path}: {str(e)}")

return responses

def retrieve(self, request_id: str) -> FileStatusResponse:
"""Get the processing status of a file.
Expand Down
65 changes: 65 additions & 0 deletions examples/parse_batch_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Test batch API folder fetch response"""

import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

from dotenv import load_dotenv

from any_parser import AnyParser

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv(override=True)

MAX_WORKER = 10

# Get API key and create parser
api_key = os.environ.get("CAMBIO_API_KEY")
if not api_key:
raise ValueError("CAMBIO_API_KEY is not set")
ap = AnyParser(api_key)

# Read responses from JSONL file
# Change to your real output json from parse_batch_upload.py
response_file = "./sample_data_20241219190049.jsonl"
with open(response_file, "r") as f:
responses = [json.loads(line) for line in f]


def process_response(response):
"""Process a single response by retrieving markdown content"""
request_id = response["requestId"]
try:
markdown = ap.batches.retrieve(request_id)
if markdown:
response["result"] = [markdown.result[0] if markdown.result else ""]
response["requestStatus"] = "COMPLETED"
response["completionTime"] = markdown.completionTime
except Exception as e:
logger.error(f"Error processing {request_id}: {str(e)}")
response["error"] = [str(e)]
return response


# Process responses concurrently
with ThreadPoolExecutor(max_workers=MAX_WORKER) as executor:
future_to_response = {
executor.submit(process_response, response): response for response in responses
}

updated_responses = []
for future in as_completed(future_to_response):
updated_response = future.result()
updated_responses.append(updated_response)

# Write all updated responses back to file
with open(response_file, "w") as f:
for response in updated_responses:
f.write(json.dumps(response) + "\n")

print(f"Updated all responses in {response_file} with markdown content")
32 changes: 32 additions & 0 deletions examples/parse_batch_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Batch API Folder Processing Upload Example"""

import json
import os
from datetime import datetime

from dotenv import load_dotenv

from any_parser import AnyParser

# Load environment variables
load_dotenv(override=True)

# Get API key and create parser
api_key = os.environ.get("CAMBIO_API_KEY")
if not api_key:
raise ValueError("CAMBIO_API_KEY is not set")
ap = AnyParser(api_key)

# Upload folder for batch processing
WORKING_FOLDER = "./sample_data"
responses = ap.batches.create(WORKING_FOLDER)

# Save responses to JSONL file with timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_file = f"./sample_data_{timestamp}.jsonl"

with open(output_file, "w") as f:
for response in responses:
f.write(json.dumps(response.model_dump()) + "\n")

print(f"Upload responses saved to: {output_file}")
28 changes: 12 additions & 16 deletions tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
"first_name": "the first name of the employee",
"last_name": "the last name of the employee",
},
"correct_output": [
{
"social_security_number": ["758-58-5787"],
"ein": ["78-8778788"],
"first_name": ["Jesan"],
"last_name": ["Rahaman"],
}
],
"correct_output": {
"social_security_number": ["758-58-5787"],
"ein": ["78-8778788"],
"first_name": ["Jesan"],
"last_name": ["Rahaman"],
},
},
# {
# "working_file": "./examples/sample_data/test_w2.pptx",
Expand Down Expand Up @@ -58,13 +56,11 @@
"first_name": "the first name of the employee",
"last_name": "the last name of the employee",
},
"correct_output": [
{
"social_security_number": ["758-58-5787"],
"ein": ["78-8778788"],
"first_name": ["Jesan"],
"last_name": ["Rahaman"],
}
],
"correct_output": {
"social_security_number": ["758-58-5787"],
"ein": ["78-8778788"],
"first_name": ["Jesan"],
"last_name": ["Rahaman"],
},
},
]

0 comments on commit f225a4b

Please sign in to comment.