Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add input folder support for batch api #74

Merged
merged 8 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ request_id = response.requestId
markdown = ap.batches.retrieve(request_id)
```

Batch API for folder input:
```python
# Send the folder to begin batch extraction
WORKING_FOLDER = "./sample_data"
# This will generate a jsonl with filename and requestID
response = ap.batches.create(WORKING_FOLDER)

# Fetch the extracted content using the request ID
markdown = ap.batches.retrieve(request_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: be a bit more specific regarding how to get a single request_id and then check its status.

```
For more details about code implementation of batch API, refer to
[examples/parse_batch_upload.py](examples/parse_batch_upload.py) and [examples/parse_batch_fetch.py](examples/parse_batch_fetch.py)

> ⚠️ **Note:** Batch extraction is currently in beta testing. Processing time may take up to 12 hours to complete.

## :scroll: Examples
Expand Down
62 changes: 56 additions & 6 deletions any_parser/batch_parser.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
"""Batch parser implementation."""

from typing import List, Optional
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Optional, Union

import requests
from pydantic import BaseModel, Field

from any_parser.base_parser import BaseParser

TIMEOUT = 60
MAX_WORKERS = 10

logger = logging.getLogger(__name__)


class UploadResponse(BaseModel):
Expand Down Expand Up @@ -42,15 +49,26 @@ def __init__(self, api_key: str, base_url: str) -> None:
# remove "Content-Type" from headers
self._headers.pop("Content-Type")

def create(self, file_path: str) -> UploadResponse:
"""Upload a single file for batch processing.
def create(self, file_path: str) -> Union[UploadResponse, List[UploadResponse]]:
"""Upload a single file or folder for batch processing.

Args:
file_path: Path to the file to upload
file_path: Path to the file or folder to upload

Returns:
FileUploadResponse object containing upload details
If file: Single UploadResponse object containing upload details
If folder: List of UploadResponse objects for each file
"""
path = Path(file_path)
if path.is_file():
return self._upload_single_file(path)
elif path.is_dir():
return self._upload_folder(path)
else:
raise ValueError(f"Path {file_path} does not exist")

def _upload_single_file(self, file_path: Path) -> UploadResponse:
"""Upload a single file for batch processing."""
with open(file_path, "rb") as f:
files = {"file": f}
response = requests.post(
Expand All @@ -59,7 +77,6 @@ def create(self, file_path: str) -> UploadResponse:
files=files,
timeout=TIMEOUT,
)
print(response.json())

if response.status_code != 200:
raise Exception(f"Upload failed: {response.text}")
Expand All @@ -71,6 +88,39 @@ def create(self, file_path: str) -> UploadResponse:
requestStatus=data["requestStatus"],
)

def _upload_folder(self, folder_path: Path) -> List[UploadResponse]:
"""Upload all files in a folder for batch processing.

Args:
folder_path: Path to the folder containing files to upload

Returns:
List of UploadResponse objects for each uploaded file
"""
# Get all files in folder and subfolders
files = []
for root, _, filenames in os.walk(folder_path):
for filename in filenames:
files.append(Path(root) / filename)

# Upload files concurrently using thread pool
responses = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_file = {
executor.submit(self._upload_single_file, file_path): file_path
for file_path in files
}

for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
response = future.result()
responses.append(response)
except Exception as e:
logger.error(f"Failed to upload {file_path}: {str(e)}")

return responses

def retrieve(self, request_id: str) -> FileStatusResponse:
"""Get the processing status of a file.

Expand Down
60 changes: 60 additions & 0 deletions examples/parse_batch_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Test batch API folder fetch response"""

import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

from dotenv import load_dotenv

from any_parser import AnyParser

# Load environment variables
load_dotenv(override=True)

MAX_WORKER = 10

# Get API key and create parser
api_key = os.environ.get("CAMBIO_API_KEY")
if not api_key:
raise ValueError("CAMBIO_API_KEY is not set")
ap = AnyParser(api_key)

# Read responses from JSONL file
# Change to your real output json from parse_batch_upload.py
response_file = "./sample_data_20241219190049.jsonl"
with open(response_file, "r") as f:
responses = [json.loads(line) for line in f]


def process_response(response):
"""Process a single response by retrieving markdown content"""
request_id = response["requestId"]
try:
markdown = ap.batches.retrieve(request_id)
if markdown:
response["result"] = [markdown.result[0] if markdown.result else ""]
response["requestStatus"] = "COMPLETED"
response["completionTime"] = markdown.completionTime
except Exception as e:
print(f"Error processing {request_id}: {str(e)}")
Copy link
Preview

Copilot AI Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace print statement with a proper logging mechanism for error handling.

Suggested change
print(f"Error processing {request_id}: {str(e)}")
logging.error(f"Error processing {request_id}: {str(e)}")

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boqiny please address this comment.

Copy link
Preview

Copilot AI Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use logger.error instead of print for error handling.

Suggested change
print(f"Error processing {request_id}: {str(e)}")
logger.error(f"Error processing {request_id}: {str(e)}")

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boqiny please address this comment.

response["error"] = [str(e)]
return response


# Process responses concurrently
with ThreadPoolExecutor(max_workers=MAX_WORKER) as executor:
future_to_response = {
executor.submit(process_response, response): response for response in responses
}

updated_responses = []
for future in as_completed(future_to_response):
updated_response = future.result()
updated_responses.append(updated_response)

# Write all updated responses back to file
with open(response_file, "w") as f:
for response in updated_responses:
f.write(json.dumps(response) + "\n")

print(f"Updated all responses in {response_file} with markdown content")
32 changes: 32 additions & 0 deletions examples/parse_batch_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Batch API Folder Processing Upload Example"""

import json
import os
from datetime import datetime

from dotenv import load_dotenv

from any_parser import AnyParser

# Load environment variables
load_dotenv(override=True)

# Get API key and create parser
api_key = os.environ.get("CAMBIO_API_KEY")
if not api_key:
raise ValueError("CAMBIO_API_KEY is not set")
ap = AnyParser(api_key)

# Upload folder for batch processing
WORKING_FOLDER = "./sample_data"
responses = ap.batches.create(WORKING_FOLDER)

# Save responses to JSONL file with timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_file = f"./sample_data_{timestamp}.jsonl"

with open(output_file, "w") as f:
for response in responses:
f.write(json.dumps(response.model_dump()) + "\n")

print(f"Upload responses saved to: {output_file}")
Loading