Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add input folder support for batch api #74

Merged
merged 8 commits into from
Dec 31, 2024
Merged

feat: Add input folder support for batch api #74

merged 8 commits into from
Dec 31, 2024

Conversation

boqiny
Copy link
Member

@boqiny boqiny commented Dec 19, 2024

User description

Description

Previous batch api only accepts single file input, which is just like a super long async function.
Now refer to OpenAI's batch API implementation https://platform.openai.com/docs/guides/batch/getting-started?lang=node
We should add for a folder support by:

  1. First upload the contents in the folder and save the filename and requestId as jsonl (shown in examples/parse_batch_upload.py)
  2. Use generated jsonl to get the markdown for each elements when finished (shown in examples/parse_batch_fetch.py)

Now I set max worker = 10 & max input folder size = 1000

Related Issue

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring
  • Performance improvement

How Has This Been Tested?

Test scripts being added to examples folder parse_batch_upload.py and parse_batch_fetch.py

Screenshots (if applicable)

Generated Json after upload input folder
image

Fetch (Added markdown contents after processing is finished)
image

Checklist

  • My code follows the project's style guidelines
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

Additional Notes


PR Type

Enhancement, Documentation


Description

  • Enhanced the batch API to support folder uploads in addition to single file uploads.
  • Introduced concurrent processing for folder uploads using a thread pool, improving efficiency.
  • Added example scripts (parse_batch_upload.py and parse_batch_fetch.py) to demonstrate folder upload and response fetching functionalities.
  • Updated the README to document the new folder upload feature and provide references to the example scripts.
  • Improved error handling and logging for batch processing operations.

Changes walkthrough 📝

Relevant files
Enhancement
batch_parser.py
Add folder upload support and concurrent processing in batch parser.

any_parser/batch_parser.py

  • Added support for uploading folders in addition to single files for
    batch processing.
  • Introduced _upload_folder method to handle concurrent file uploads
    using a thread pool.
  • Enhanced error handling and logging for file uploads.
  • Refactored create method to handle both files and folders.
  • +56/-6   
    Documentation
    parse_batch_fetch.py
    Add example script for fetching batch API responses.         

    examples/parse_batch_fetch.py

  • Added a script to fetch and process batch API responses.
  • Implemented concurrent processing of responses using a thread pool.
  • Enhanced error handling for response processing.
  • Outputs updated responses with markdown content to a JSONL file.
  • +60/-0   
    parse_batch_upload.py
    Add example script for uploading folders to batch API.     

    examples/parse_batch_upload.py

  • Added a script to upload folders for batch processing.
  • Saves upload responses to a timestamped JSONL file.
  • Demonstrates usage of the new folder upload functionality.
  • +32/-0   
    README.md
    Update README with batch API folder input usage.                 

    README.md

  • Updated documentation to include usage of batch API for folder input.
  • Added references to example scripts for batch upload and fetch.
  • Highlighted beta status and potential processing time for batch
    extraction.
  • +13/-0   

    💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

    @lingjiekong lingjiekong requested a review from Copilot December 23, 2024 07:33
    Copy link

    @Copilot Copilot AI left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

    Comments suppressed due to low confidence (2)

    examples/parse_batch_fetch.py:14

    • The variable name MAX_WORKER should be renamed to MAX_WORKERS for consistency.
    MAX_WORKER = 10
    

    README.md:87

    • The example code is missing the step to read the request ID from the response. It should be updated for clarity.
    markdown = ap.batches.retrieve(request_id)
    

    response["requestStatus"] = "COMPLETED"
    response["completionTime"] = markdown.completionTime
    except Exception as e:
    print(f"Error processing {request_id}: {str(e)}")
    Copy link
    Preview

    Copilot AI Dec 23, 2024

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Replace print statement with a proper logging mechanism for error handling.

    Suggested change
    print(f"Error processing {request_id}: {str(e)}")
    logging.error(f"Error processing {request_id}: {str(e)}")

    Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

    Positive Feedback
    Negative Feedback

    Provide additional feedback

    Please help us improve GitHub Copilot by sharing more details about this comment.

    Please select one or more of the options
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    @boqiny please address this comment.

    @@ -8,6 +13,8 @@
    from any_parser.base_parser import BaseParser

    TIMEOUT = 60
    MAX_FILES = 1000
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    There is no need to restrict on this. For the batch API, the logic is that

    • there is a crone job for every 2 hour
    • there is a scan every 5 mins and if the batch queue size is more than 1000
      One of the two situation match will trigger a batch run.

    Comment on lines 106 to 109
    if len(files) > MAX_FILES:
    raise ValueError(
    f"Found {len(files)} files. Maximum allowed is {MAX_FILES}"
    )
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    not need for this.

    Comment on lines 132 to 138
    output_path = folder_path.parent / output_filename

    with open(output_path, "w") as f:
    for response in responses:
    f.write(json.dumps(response) + "\n")

    return str(output_path)
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Rule of thumb is that it is better to return a list of UploadResponse instead of a awkward file path that people have to know what this means and read it in.

    Copy link
    Member Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    modified and moved this logic to the upload python script

    @boqiny boqiny requested a review from lingjiekong December 23, 2024 10:00
    Copy link

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
    🧪 No relevant tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Error Handling

    The _upload_folder method logs errors for failed uploads but does not provide a mechanism to retry or handle partial failures. Consider adding retry logic or a way to handle partially uploaded folders.

    def _upload_folder(self, folder_path: Path) -> List[UploadResponse]:
        """Upload all files in a folder for batch processing.
    
        Args:
            folder_path: Path to the folder containing files to upload
    
        Returns:
            List of UploadResponse objects for each uploaded file
        """
        # Get all files in folder and subfolders
        files = []
        for root, _, filenames in os.walk(folder_path):
            for filename in filenames:
                files.append(Path(root) / filename)
    
        # Upload files concurrently using thread pool
        responses = []
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future_to_file = {
                executor.submit(self._upload_single_file, file_path): file_path
                for file_path in files
            }
    
            for future in as_completed(future_to_file):
                file_path = future_to_file[future]
                try:
                    response = future.result()
                    responses.append(response)
                except Exception as e:
                    logger.error(f"Failed to upload {file_path}: {str(e)}")
    
        return responses
    Hardcoded File Paths

    The script uses hardcoded file paths for response_file and assumes a specific file structure. This could lead to issues when running in different environments. Consider parameterizing these paths or adding configuration options.

    response_file = "./sample_data_20241219190049.jsonl"
    with open(response_file, "r") as f:
        responses = [json.loads(line) for line in f]
    Lack of Error Handling

    The script does not handle errors during the folder upload process. If the create method fails, there is no fallback or error reporting mechanism.

    # Upload folder for batch processing
    WORKING_FOLDER = "./sample_data"
    responses = ap.batches.create(WORKING_FOLDER)
    
    # Save responses to JSONL file with timestamp
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    output_file = f"./sample_data_{timestamp}.jsonl"
    
    with open(output_file, "w") as f:
        for response in responses:
            f.write(json.dumps(response.model_dump()) + "\n")

    Copy link

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Score
    General
    Validate that MAX_WORKERS is a positive integer before using it in ThreadPoolExecutor

    Add a check to ensure that the MAX_WORKERS value is a positive integer to prevent
    runtime errors in the ThreadPoolExecutor.

    any_parser/batch_parser.py [108-109]

    +if MAX_WORKERS <= 0:
    +    raise ValueError("MAX_WORKERS must be a positive integer")
     with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    Suggestion importance[1-10]: 9

    Why: Adding a validation for MAX_WORKERS prevents runtime errors and ensures the stability of the thread pool execution. This is a critical improvement for handling concurrency safely.

    9
    Filter out non-file entries when collecting files for batch upload

    Ensure that the _upload_folder method filters out non-file entries (e.g.,
    directories or hidden files) to avoid unnecessary processing or errors during
    upload.

    any_parser/batch_parser.py [103-104]

    -files.append(Path(root) / filename)
    +if (Path(root) / filename).is_file():
    +    files.append(Path(root) / filename)
    Suggestion importance[1-10]: 8

    Why: This suggestion ensures that only valid files are processed, avoiding potential errors or unnecessary processing of directories or hidden files. It directly improves the robustness of the _upload_folder method.

    8
    Validate the existence and type of the WORKING_FOLDER path before processing

    Validate that the WORKING_FOLDER path exists and is a directory before attempting to
    upload files to prevent runtime errors.

    examples/parse_batch_upload.py [22-23]

    +if not os.path.isdir(WORKING_FOLDER):
    +    raise ValueError(f"WORKING_FOLDER {WORKING_FOLDER} does not exist or is not a directory")
     responses = ap.batches.create(WORKING_FOLDER)
    Suggestion importance[1-10]: 8

    Why: Checking the existence and type of WORKING_FOLDER prevents runtime errors and ensures the program behaves predictably. This validation is essential for reliable file handling.

    8
    Use a logging mechanism instead of print for error reporting

    Replace the use of print for error logging with a proper logging mechanism to ensure
    consistent and configurable error handling.

    examples/parse_batch_fetch.py [38-39]

    -print(f"Error processing {request_id}: {str(e)}")
    +logger.error(f"Error processing {request_id}: {str(e)}")
    Suggestion importance[1-10]: 7

    Why: Replacing print with a logging mechanism enhances error reporting by making it configurable and consistent with the rest of the application. This is a good practice for maintainability and debugging.

    7

    @lingjiekong lingjiekong requested a review from Copilot December 29, 2024 19:31
    Copy link

    @Copilot Copilot AI left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

    response["requestStatus"] = "COMPLETED"
    response["completionTime"] = markdown.completionTime
    except Exception as e:
    print(f"Error processing {request_id}: {str(e)}")
    Copy link
    Preview

    Copilot AI Dec 29, 2024

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Use logger.error instead of print for error handling.

    Suggested change
    print(f"Error processing {request_id}: {str(e)}")
    logger.error(f"Error processing {request_id}: {str(e)}")

    Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

    Positive Feedback
    Negative Feedback

    Provide additional feedback

    Please help us improve GitHub Copilot by sharing more details about this comment.

    Please select one or more of the options
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    @boqiny please address this comment.

    response["requestStatus"] = "COMPLETED"
    response["completionTime"] = markdown.completionTime
    except Exception as e:
    print(f"Error processing {request_id}: {str(e)}")
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    @boqiny please address this comment.

    README.md Outdated
    Comment on lines 83 to 87
    # This will generate a jsonl with filename and requestID
    response = ap.batches.create(WORKING_FOLDER)

    # Fetch the extracted content using the request ID
    markdown = ap.batches.retrieve(request_id)
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    nit: be a bit more specific regarding how to get a single request_id and then check its status.

    response["requestStatus"] = "COMPLETED"
    response["completionTime"] = markdown.completionTime
    except Exception as e:
    print(f"Error processing {request_id}: {str(e)}")
    Copy link
    Member

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    @boqiny please address this comment.

    Copy link
    Member

    @lingjiekong lingjiekong left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    LGTM with minor comment

    @lingjiekong
    Copy link
    Member

    Please incorporate this #72, if you have not done so in this PR.

    @boqiny boqiny merged commit f225a4b into main Dec 31, 2024
    5 checks passed
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants