Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

💎 feat(release): Dockerfile #93

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ experiments
test_data
training
wandb
example.pdf

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
50 changes: 50 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Use an official Python 3.9+ image as a parent image, based on Debian for compatibility with your instructions
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /usr/src/app

# Copy the current directory contents into the container at /usr/src/app
COPY . .

# Make the scripts executable
RUN chmod +x scripts/install/tesseract_5_install.sh \
&& chmod +x scripts/install/ghostscript_install.sh


RUN apt-get update && apt-get install -y apt-transport-https \
&& . /etc/os-release \
&& echo "deb https://notesalexp.org/tesseract-ocr5/${VERSION_CODENAME} ${VERSION_CODENAME} main" > /etc/apt/sources.list.d/notesalexp.list \
&& apt-get update -oAcquire::AllowInsecureRepositories=true \
&& apt-get install -y notesalexp-keyring -oAcquire::AllowInsecureRepositories=true --allow-unauthenticated \
&& apt-get update \
&& apt-get install gcc python3-dev -y \
&& apt-get install -y tesseract-ocr \
# Install additional requirements from apt-requirements.txt
&& apt-get install -y $(cat scripts/install/apt-requirements.txt | xargs) \
&& rm -rf /var/lib/apt/lists/*

# Install Python dependencies
RUN pip install poetry \
&& poetry config virtualenvs.create false \
&& poetry install

# Set up environment variables
# Replace these with your actual paths or configuration
ENV TESSDATA_PREFIX=/usr/local/share/tessdata
ENV TORCH_DEVICE=cpu
# For GPU setup, adjust the TORCH_DEVICE accordingly, e.g., cuda or mps
# And ensure INFERENCE_RAM is set based on your GPU's VRAM
# ENV INFERENCE_RAM=16
# Additional environment variables can be set here or passed at runtime

# Copy everything else into the container
COPY . .

# Update pytorch - adjust for GPU or CPU
# CPU only example, uncomment and adjust as necessary
RUN poetry run pip uninstall -y torch && poetry run pip install torch

# The command to run when the container starts
# This CMD line is a placeholder; adjust it based on how you want to use the container
CMD ["poetry", "run", "python", "convert_single.py", "0.0.0.0", "5000", "./", "--parallel_factor", "2", "--max_pages", "10"]
117 changes: 40 additions & 77 deletions convert.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import argparse
import os
import socket
import tempfile
from typing import Dict, Optional

import math
import ray
from tqdm import tqdm
import math

import json
from marker.convert import convert_single_pdf, get_length_of_text
from marker.models import load_all_models
from marker.settings import settings
from marker.logger import configure_logging
import traceback
import json

configure_logging()

def receive_file_from_socket(sock, buffer_size=4096):
"""Receive file data from the socket and write it to a temporary file."""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
while True:
data = sock.recv(buffer_size)
if not data:
break # No more data, transfer is complete.
temp_file.write(data)
return temp_file.name

@ray.remote(num_cpus=settings.RAY_CORES_PER_WORKER, num_gpus=.05 if settings.CUDA else 0)
def process_single_pdf(fname: str, out_folder: str, model_refs, metadata: Optional[Dict] = None, min_length: Optional[int] = None):
Expand Down Expand Up @@ -44,84 +53,38 @@ def process_single_pdf(fname: str, out_folder: str, model_refs, metadata: Option
print(f"Error converting {fname}: {e}")
print(traceback.format_exc())


def main():
parser = argparse.ArgumentParser(description="Convert multiple pdfs to markdown.")
parser.add_argument("in_folder", help="Input folder with pdfs.")
parser.add_argument("out_folder", help="Output folder")
parser.add_argument("--chunk_idx", type=int, default=0, help="Chunk index to convert")
parser.add_argument("--num_chunks", type=int, default=1, help="Number of chunks being processed in parallel")
parser.add_argument("--max", type=int, default=None, help="Maximum number of pdfs to convert")
parser.add_argument("--workers", type=int, default=5, help="Number of worker processes to use")
parser.add_argument("--metadata_file", type=str, default=None, help="Metadata json file to use for filtering")
parser.add_argument("--min_length", type=int, default=None, help="Minimum length of pdf to convert")

args = parser.parse_args()

in_folder = os.path.abspath(args.in_folder)
out_folder = os.path.abspath(args.out_folder)
files = [os.path.join(in_folder, f) for f in os.listdir(in_folder)]
os.makedirs(out_folder, exist_ok=True)

# Handle chunks if we're processing in parallel
# Ensure we get all files into a chunk
chunk_size = math.ceil(len(files) / args.num_chunks)
start_idx = args.chunk_idx * chunk_size
end_idx = start_idx + chunk_size
files_to_convert = files[start_idx:end_idx]

# Limit files converted if needed
if args.max:
files_to_convert = files_to_convert[:args.max]

metadata = {}
if args.metadata_file:
metadata_file = os.path.abspath(args.metadata_file)
with open(metadata_file, "r") as f:
metadata = json.load(f)

total_processes = min(len(files_to_convert), args.workers)

ray.init(
num_cpus=total_processes,
num_gpus=1 if settings.CUDA else 0,
storage=settings.RAY_CACHE_PATH,
_temp_dir=settings.RAY_CACHE_PATH,
log_to_driver=settings.DEBUG
)
def start_socket_server(host, port, out_folder, workers, metadata: Optional[Dict] = None, min_length: Optional[int] = None):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Socket server listening on {host}:{port}")

model_lst = load_all_models()
model_refs = ray.put(model_lst)

# Dynamically set GPU allocation per task based on GPU ram
gpu_frac = settings.VRAM_PER_TASK / settings.INFERENCE_RAM if settings.CUDA else 0

print(f"Converting {len(files_to_convert)} pdfs in chunk {args.chunk_idx + 1}/{args.num_chunks} with {total_processes} processes, and storing in {out_folder}")
futures = [
process_single_pdf.options(num_gpus=gpu_frac).remote(
filename,
out_folder,
model_refs,
metadata=metadata.get(os.path.basename(filename)),
min_length=args.min_length
) for filename in files_to_convert
]
try:
while True:
client_socket, addr = server_socket.accept()
print(f"Received connection from {addr}")

# Run all ray conversion tasks
progress_bar = tqdm(total=len(futures))
while len(futures) > 0:
finished, futures = ray.wait(
futures, timeout=7.0
)
finished_lst = ray.get(finished)
if isinstance(finished_lst, list):
progress_bar.update(len(finished_lst))
else:
progress_bar.update(1)
fname = receive_file_from_socket(client_socket)
client_socket.close()

# Shutdown ray to free resources
ray.shutdown()
# Here, immediately queue the file for processing. In a production,
# you might want to implement some form of rate limiting or queue management!! 😘
process_single_pdf.remote(
fname, out_folder, model_refs,
metadata=metadata, min_length=min_length
)

finally:
server_socket.close()

if __name__ == "__main__":
main()
def main():
parser = argparse.ArgumentParser(description="Convert received pdfs to markdown over a socket connection.")
parser.add_argument("host", help="Host to listen on")
parser.add_argument("port", type=int, help="Port to listen on")
parser.add_argument("out_folder", help="Output folder")
parser.add_argument("--workers", type=int, default=5, help="Number of worker processes to use")
parser.add_argument("--metadata_file", type=str, default=None, help="Metadata json file to use for filtering")
parser.add_argument("--min_length", type=int, default=None, help="Minimum length of pdf to convert")
56 changes: 45 additions & 11 deletions convert_single.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,66 @@
import argparse

import json
import socket
import tempfile
from marker.convert import convert_single_pdf
from marker.logger import configure_logging
from marker.models import load_all_models
import json

configure_logging()

def receive_file_from_socket(sock, buffer_size=1024):
"""Receive file data from the socket and write it to a temporary file."""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
file_name = temp_file.name
while True:
data = sock.recv(buffer_size)
if not data:
break # No more data, transfer is complete.
temp_file.write(data)
return file_name

def main():
parser = argparse.ArgumentParser()
parser.add_argument("filename", help="PDF file to parse")
parser.add_argument("host", help="Host to listen on")
parser.add_argument("port", type=int, help="Port to listen on")
parser.add_argument("output", help="Output file name")
parser.add_argument("--max_pages", type=int, default=None, help="Maximum number of pages to parse")
parser.add_argument("--parallel_factor", type=int, default=1, help="How much to multiply default parallel OCR workers and model batch sizes by.")
args = parser.parse_args()

fname = args.filename
model_lst = load_all_models()
full_text, out_meta = convert_single_pdf(fname, model_lst, max_pages=args.max_pages, parallel_factor=args.parallel_factor)
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Bind the socket to the address given on the command line
server_address = (args.host, args.port)
sock.bind(server_address)
sock.listen(1)

print(f"Listening for incoming connections on {args.host}:{args.port}")

while True:
print("Waiting for a connection")
connection, client_address = sock.accept()
try:
print("Connection from", client_address)

# Receive the file over the socket
fname = receive_file_from_socket(connection)

# Load models and convert PDF
model_lst = load_all_models()
full_text, out_meta = convert_single_pdf(fname, model_lst, max_pages=args.max_pages, parallel_factor=args.parallel_factor)

with open(args.output, "w+", encoding='utf-8') as f:
f.write(full_text)
with open(args.output, "w+", encoding='utf-8') as f:
f.write(full_text)

out_meta_filename = args.output.rsplit(".", 1)[0] + "_meta.json"
with open(out_meta_filename, "w+") as f:
f.write(json.dumps(out_meta, indent=4))
out_meta_filename = args.output.rsplit(".", 1)[0] + "_meta.json"
with open(out_meta_filename, "w+") as f:
f.write(json.dumps(out_meta, indent=4))

finally:
# Clean up the connection
connection.close()

if __name__ == "__main__":
main()
Loading