Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add input folder support for batch api #74

Merged
merged 8 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ request_id = response.requestId
markdown = ap.batches.retrieve(request_id)
```

Batch API for folder input:
```python
# Send the folder to begin batch extraction
WORKING_FOLDER = "./sample_data"
# This will generate a jsonl with filename and requestID
response = ap.batches.create(WORKING_FOLDER)

# Fetch the extracted content using the request ID
markdown = ap.batches.retrieve(request_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: be a bit more specific regarding how to get a single request_id and then check its status.

```
For more details about code implementation of batch API, refer to
[examples/parse_batch_upload.py](examples/parse_batch_upload.py) and [examples/parse_batch_fetch.py](examples/parse_batch_fetch.py)

> ⚠️ **Note:** Batch extraction is currently in beta testing. Processing time may take up to 12 hours to complete.

## :scroll: Examples
Expand Down
74 changes: 70 additions & 4 deletions any_parser/batch_parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""Batch parser implementation."""

import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Optional

import requests
Expand All @@ -8,6 +13,8 @@
from any_parser.base_parser import BaseParser

TIMEOUT = 60
MAX_FILES = 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to restrict on this. For the batch API, the logic is that

  • there is a crone job for every 2 hour
  • there is a scan every 5 mins and if the batch queue size is more than 1000
    One of the two situation match will trigger a batch run.

MAX_WORKERS = 10


class UploadResponse(BaseModel):
Expand Down Expand Up @@ -43,14 +50,25 @@ def __init__(self, api_key: str, base_url: str) -> None:
self._headers.pop("Content-Type")

def create(self, file_path: str) -> UploadResponse:
"""Upload a single file for batch processing.
"""Upload a single file or folder for batch processing.

Args:
file_path: Path to the file to upload
file_path: Path to the file or folder to upload

Returns:
FileUploadResponse object containing upload details
If file: UploadResponse object containing upload details
If folder: Path to the JSONL file containing upload responses
"""
path = Path(file_path)
if path.is_file():
return self._upload_single_file(path)
elif path.is_dir():
return self._upload_folder(path)
else:
raise ValueError(f"Path {file_path} does not exist")

def _upload_single_file(self, file_path: Path) -> UploadResponse:
"""Upload a single file for batch processing."""
with open(file_path, "rb") as f:
files = {"file": f}
response = requests.post(
Expand All @@ -59,7 +77,6 @@ def create(self, file_path: str) -> UploadResponse:
files=files,
timeout=TIMEOUT,
)
print(response.json())

if response.status_code != 200:
raise Exception(f"Upload failed: {response.text}")
Expand All @@ -71,6 +88,55 @@ def create(self, file_path: str) -> UploadResponse:
requestStatus=data["requestStatus"],
)

def _upload_folder(self, folder_path: Path) -> str:
"""Upload all files in a folder for batch processing.

Args:
folder_path: Path to the folder containing files to upload

Returns:
Path to the JSONL file containing upload responses
"""
# Get all files in folder and subfolders
files = []
for root, _, filenames in os.walk(folder_path):
for filename in filenames:
files.append(Path(root) / filename)

if len(files) > MAX_FILES:
raise ValueError(
f"Found {len(files)} files. Maximum allowed is {MAX_FILES}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not need for this.


# Upload files concurrently using thread pool
responses = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_file = {
executor.submit(self._upload_single_file, file_path): file_path
for file_path in files
}

for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
response = future.result()
responses.append(response.dict())
except Exception as e:
print(f"Failed to upload {file_path}: {str(e)}")

# Save responses to JSONL file in parallel folder
folder_name = folder_path.name
folder_size = len(files)
current_time = time.strftime("%Y%m%d%H%M%S")
output_filename = f"{folder_name}_{folder_size}_{current_time}.jsonl"
output_path = folder_path.parent / output_filename

with open(output_path, "w") as f:
for response in responses:
f.write(json.dumps(response) + "\n")

return str(output_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule of thumb is that it is better to return a list of UploadResponse instead of a awkward file path that people have to know what this means and read it in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified and moved this logic to the upload python script


def retrieve(self, request_id: str) -> FileStatusResponse:
"""Get the processing status of a file.

Expand Down
60 changes: 60 additions & 0 deletions examples/parse_batch_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Test batch API folder fetch response"""

import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

from dotenv import load_dotenv

from any_parser import AnyParser

# Load environment variables
load_dotenv(override=True)

MAX_WORKER = 10

# Get API key and create parser
api_key = os.environ.get("CAMBIO_API_KEY")
if not api_key:
raise ValueError("CAMBIO_API_KEY is not set")
ap = AnyParser(api_key)

# Read responses from JSONL file
# Change to your real output json from parse_batch_upload.py
response_file = "./sample_data_20241219190049.jsonl"
with open(response_file, "r") as f:
responses = [json.loads(line) for line in f]


def process_response(response):
"""Process a single response by retrieving markdown content"""
request_id = response["requestId"]
try:
markdown = ap.batches.retrieve(request_id)
if markdown:
response["result"] = [markdown.result[0] if markdown.result else ""]
response["requestStatus"] = "COMPLETED"
response["completionTime"] = markdown.completionTime
except Exception as e:
print(f"Error processing {request_id}: {str(e)}")
Copy link
Preview

Copilot AI Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace print statement with a proper logging mechanism for error handling.

Suggested change
print(f"Error processing {request_id}: {str(e)}")
logging.error(f"Error processing {request_id}: {str(e)}")

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boqiny please address this comment.

Copy link
Preview

Copilot AI Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use logger.error instead of print for error handling.

Suggested change
print(f"Error processing {request_id}: {str(e)}")
logger.error(f"Error processing {request_id}: {str(e)}")

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boqiny please address this comment.

response["error"] = [str(e)]
return response


# Process responses concurrently
with ThreadPoolExecutor(max_workers=MAX_WORKER) as executor:
future_to_response = {
executor.submit(process_response, response): response for response in responses
}

updated_responses = []
for future in as_completed(future_to_response):
updated_response = future.result()
updated_responses.append(updated_response)

# Write all updated responses back to file
with open(response_file, "w") as f:
for response in updated_responses:
f.write(json.dumps(response) + "\n")

print(f"Updated all responses in {response_file} with markdown content")
22 changes: 22 additions & 0 deletions examples/parse_batch_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Batch API Folder Processing Upload Example"""

import os

from dotenv import load_dotenv

from any_parser import AnyParser

# Load environment variables
load_dotenv(override=True)

# Get API key and create parser
api_key = os.environ.get("CAMBIO_API_KEY")
if not api_key:
raise ValueError("CAMBIO_API_KEY is not set")
ap = AnyParser(api_key)

# Upload folder for batch processing
WORKING_FOLDER = "./sample_data"
response = ap.batches.create(WORKING_FOLDER)

print(f"Upload response saved to: {response}")