Skip to content

Commit

Permalink
Remove ray to enable python 3.12 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
VikParuchuri committed May 28, 2024
1 parent 5ec2b46 commit 32ecfbf
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 405 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Peak GPU memory usage during the benchmark is `4.2GB` for nougat, and `4.1GB` fo

**Throughput**

Marker takes about 4.5GB of VRAM on average per task, so you can convert 10 documents in parallel on an A6000.
Marker takes about 4GB of VRAM on average per task, so you can convert 12 documents in parallel on an A6000.

![Benchmark results](data/images/per_doc.png)

Expand Down
74 changes: 35 additions & 39 deletions convert.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import argparse
import os
from typing import Dict, Optional

import ray
os.environ["IN_STREAMLIT"] = "true" # Avoid multiprocessing inside surya
os.environ["PDFTEXT_CPU_WORKERS"] = "1" # Avoid multiprocessing inside pdftext

import pypdfium2 # Needs to be at the top to avoid warnings
import argparse
import torch.multiprocessing as mp
from tqdm import tqdm
import math

Expand All @@ -19,8 +22,14 @@
configure_logging()


@ray.remote(num_cpus=settings.RAY_CORES_PER_WORKER, num_gpus=.05 if settings.CUDA else 0)
def process_single_pdf(filepath: str, out_folder: str, model_refs, metadata: Optional[Dict] = None, min_length: Optional[int] = None):
def worker_init(shared_model):
global model_refs
model_refs = shared_model


def process_single_pdf(args):
filepath, out_folder, metadata, min_length = args

fname = os.path.basename(filepath)
if markdown_exists(out_folder, fname):
return
Expand All @@ -46,6 +55,11 @@ def process_single_pdf(filepath: str, out_folder: str, model_refs, metadata: Opt
except Exception as e:
print(f"Error converting {filepath}: {e}")
print(traceback.format_exc())
finally:
# Release shared memory
for model in model_refs:
if model:
del model


def main():
Expand Down Expand Up @@ -86,45 +100,27 @@ def main():

total_processes = min(len(files_to_convert), args.workers)

ray.init(
num_cpus=total_processes,
num_gpus=1 if settings.CUDA else 0,
storage=settings.RAY_CACHE_PATH,
_temp_dir=settings.RAY_CACHE_PATH,
log_to_driver=settings.DEBUG
)
# Dynamically set GPU allocation per task based on GPU ram
if settings.CUDA:
tasks_per_gpu = settings.INFERENCE_RAM // settings.VRAM_PER_TASK if settings.CUDA else 0
total_processes = min(tasks_per_gpu, total_processes)

mp.set_start_method('spawn') # Required for CUDA, forkserver doesn't work
model_lst = load_all_models()
model_refs = ray.put(model_lst)

# Dynamically set GPU allocation per task based on GPU ram
gpu_frac = settings.VRAM_PER_TASK / settings.INFERENCE_RAM if settings.CUDA else 0
for model in model_lst:
if model:
model.share_memory()

print(f"Converting {len(files_to_convert)} pdfs in chunk {args.chunk_idx + 1}/{args.num_chunks} with {total_processes} processes, and storing in {out_folder}")
futures = [
process_single_pdf.options(num_gpus=gpu_frac).remote(
filepath,
out_folder,
model_refs,
metadata=metadata.get(os.path.basename(filepath)),
min_length=args.min_length
) for filepath in files_to_convert
]

# Run all ray conversion tasks
progress_bar = tqdm(total=len(futures))
while len(futures) > 0:
finished, futures = ray.wait(
futures, timeout=7.0
)
finished_lst = ray.get(finished)
if isinstance(finished_lst, list):
progress_bar.update(len(finished_lst))
else:
progress_bar.update(1)
task_args = [(f, out_folder, metadata.get(os.path.basename(f)), args.min_length) for f in files_to_convert]

with mp.Pool(processes=total_processes, initializer=worker_init, initargs=(model_lst,)) as pool:
list(tqdm(pool.imap(process_single_pdf, task_args), total=len(task_args), desc="Processing PDFs", unit="pdf"))

# Shutdown ray to free resources
ray.shutdown()
# Delete all CUDA tensors
for model in model_lst:
if model:
del model


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions convert_single.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pypdfium2 # Needs to be at the top to avoid warnings
import argparse
import os

Expand Down
6 changes: 3 additions & 3 deletions marker/convert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import warnings
warnings.filterwarnings("ignore", category=UserWarning) # Filter torch pytree user warnings

import pypdfium2 as pdfium
import pypdfium2 as pdfium # Needs to be at the top to avoid warnings
from PIL import Image

from marker.utils import flush_cuda_memory
Expand Down Expand Up @@ -33,8 +33,8 @@
def convert_single_pdf(
fname: str,
model_lst: List,
max_pages=None,
metadata: Optional[Dict]=None,
max_pages: int = None,
metadata: Optional[Dict] = None,
langs: Optional[List[str]] = None,
batch_multiplier: int = 1
) -> Tuple[str, Dict[str, Image.Image], Dict]:
Expand Down
2 changes: 1 addition & 1 deletion marker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def load_all_models(langs=None, device=None, dtype=None, force_load_ocr=False):
edit = load_editing_model(device, dtype)

# Only load recognition model if we'll need it for all pdfs
ocr = setup_recognition_model(langs, device, dtype) if ((settings.OCR_ENGINE == "surya" and settings.OCR_ALL_PAGES) or force_load_ocr) else None
ocr = setup_recognition_model(langs, device, dtype)
texify = setup_texify_model(device, dtype)
model_lst = [texify, layout, order, edit, detection, ocr]
return model_lst
12 changes: 8 additions & 4 deletions marker/ocr/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@


def should_ocr_page(page: Page, no_text: bool):
detected_lines_found = detected_line_coverage(page)
detected_lines_found, total_lines = detected_line_coverage(page)

# No reason to OCR page if it has no text lines
if total_lines == 0:
return False

# OCR page if we got minimal text, or if we got too many spaces
conditions = [
Expand Down Expand Up @@ -55,7 +59,6 @@ def no_text_found(pages: List[Page]):
def detected_line_coverage(page: Page, intersect_thresh=.5, detection_thresh=.4):
found_lines = 0
for detected_line in page.text_lines.bboxes:

# Get bbox and rescale to match dimensions of original page
detected_bbox = detected_line.bbox
detected_bbox = rescale_bbox(page.text_lines.image_bbox, page.bbox, detected_bbox)
Expand All @@ -70,5 +73,6 @@ def detected_line_coverage(page: Page, intersect_thresh=.5, detection_thresh=.4)

total_lines = len(page.text_lines.bboxes)
if total_lines == 0:
return False
return found_lines / total_lines > detection_thresh
return True, 0

return found_lines / total_lines > detection_thresh, total_lines
10 changes: 0 additions & 10 deletions marker/ocr/recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,7 @@ def run_ocr(doc, pages: List[Page], langs: List[str], rec_model, batch_multiplie
if ocr_method is None:
return pages, {"ocr_pages": 0, "ocr_failed": 0, "ocr_success": 0, "ocr_engine": "none"}
elif ocr_method == "surya":
# Load model just in time if we're not OCRing everything
del_rec_model = False
if rec_model is None:
lang_tokens = langs_to_ids(langs)
rec_model = setup_recognition_model(lang_tokens)
del_rec_model = True

new_pages = surya_recognition(doc, ocr_idxs, langs, rec_model, pages, batch_multiplier=batch_multiplier)

if del_rec_model:
del rec_model
elif ocr_method == "ocrmypdf":
new_pages = tesseract_recognition(doc, ocr_idxs, langs)
else:
Expand Down
Loading

0 comments on commit 32ecfbf

Please sign in to comment.