diff --git a/docs/assets/images/multiprocessing.svg b/docs/assets/images/multiprocessing.svg new file mode 100644 index 00000000..594b0d04 --- /dev/null +++ b/docs/assets/images/multiprocessing.svg @@ -0,0 +1,3 @@ + + + diff --git a/docs/assets/stylesheets/extra.css b/docs/assets/stylesheets/extra.css index 97c90e53..e2b927df 100644 --- a/docs/assets/stylesheets/extra.css +++ b/docs/assets/stylesheets/extra.css @@ -155,6 +155,6 @@ body, input { margin-top: 1.5rem; } -.references { - +.doc td > code { + word-break: normal; } diff --git a/docs/inference.md b/docs/inference.md new file mode 100644 index 00000000..d849ba86 --- /dev/null +++ b/docs/inference.md @@ -0,0 +1,61 @@ +# Inference + +Once you have obtained a pipeline, either by composing rule-based components, training a model or loading a model from the disk, you can use it to make predictions on documents. This is referred to as inference. + +## Inference on a single document + +In EDS-PDF, computing the prediction on a single document is done by calling the pipeline on the document. The input can be either: + +- a sequence of bytes +- or a [PDFDoc][edspdf.structures.PDFDoc] object + +```python +from pathlib import Path + +pipeline = ... +content = Path("path/to/.pdf").read_bytes() +doc = pipeline(content) +``` + +If you're lucky enough to have a GPU, you can use it to speed up inference by moving the model to the GPU before calling the pipeline. To leverage multiple GPUs, refer to the [multiprocessing accelerator][edspdf.accelerators.multiprocessing.MultiprocessingAccelerator] description below. + +```python +pipeline.to("cuda") # same semantics as pytorch +doc = pipeline(content) +``` + +## Inference on multiple documents + +When processing multiple documents, it is usually more efficient to use the `pipeline.pipe(...)` method, especially when using deep learning components, since this allow matrix multiplications to be batched together. Depending on your computational resources and requirements, EDS-PDF comes with various "accelerators" to speed up inference (see the [Accelerators](#accelerators) section for more details). By default, the `.pipe()` method uses the [`simple` accelerator][edspdf.accelerators.simple.SimpleAccelerator] but you can switch to a different one by passing the `accelerator` argument. + +```python +pipeline = ... +docs = pipeline.pipe( + [content1, content2, ...], + batch_size=16, # optional, default to the one defined in the pipeline + accelerator=my_accelerator, +) +``` + +The `pipe` method supports the following arguments : + +::: edspdf.pipeline.Pipeline.pipe + options: + heading_level: 3 + only_parameters: true + +## Accelerators + +### Simple accelerator {: #edspdf.accelerators.simple.SimpleAccelerator } + +::: edspdf.accelerators.simple.SimpleAccelerator + options: + heading_level: 3 + only_class_level: true + +### Multiprocessing accelerator {: #edspdf.accelerators.multiprocessing.MultiprocessingAccelerator } + +::: edspdf.accelerators.multiprocessing.MultiprocessingAccelerator + options: + heading_level: 3 + only_class_level: true diff --git a/docs/pipeline.md b/docs/pipeline.md index 23b0042c..8d1d1b00 100644 --- a/docs/pipeline.md +++ b/docs/pipeline.md @@ -57,6 +57,8 @@ model(pdf_bytes) model.pipe([pdf_bytes, ...]) ``` +For more information on how to use the pipeline, refer to the [Inference](../inference) page. + ## Hybrid models EDS-PDF was designed to facilitate the training and inference of hybrid models that diff --git a/edspdf/accelerators/base.py b/edspdf/accelerators/base.py index a72650a2..07bc01b2 100644 --- a/edspdf/accelerators/base.py +++ b/edspdf/accelerators/base.py @@ -25,23 +25,18 @@ def __get_validators__(cls): @classmethod def validate(cls, value, config=None): if isinstance(value, str): - return FromDictFieldsToDoc(value) - elif isinstance(value, dict): - return FromDictFieldsToDoc(**value) - elif callable(value): + value = {"content_field": value} + if isinstance(value, dict): + value = FromDictFieldsToDoc(**value) + if callable(value): return value - else: - raise TypeError( - f"Invalid entry {value} ({type(value)}) for ToDoc, " - f"expected string, a dict or a callable." - ) - + raise TypeError( + f"Invalid entry {value} ({type(value)}) for ToDoc, " + f"expected string, a dict or a callable." + ) -def identity(x): - return x - -FROM_DOC_TO_DICT_FIELDS_TEMPLATE = """\ +FROM_DOC_TO_DICT_FIELDS_TEMPLATE = """ def fn(doc): return {X} """ @@ -50,8 +45,10 @@ def fn(doc): class FromDocToDictFields: def __init__(self, mapping): self.mapping = mapping - dict_fields = ", ".join(f"{k}: doc.{v}" for k, v in mapping.items()) - self.fn = eval(FROM_DOC_TO_DICT_FIELDS_TEMPLATE.replace("X", dict_fields)) + dict_fields = ", ".join(f"{repr(k)}: doc.{v}" for k, v in mapping.items()) + local_vars = {} + exec(FROM_DOC_TO_DICT_FIELDS_TEMPLATE.replace("X", dict_fields), local_vars) + self.fn = local_vars["fn"] def __reduce__(self): return FromDocToDictFields, (self.mapping,) @@ -75,14 +72,13 @@ def __get_validators__(cls): @classmethod def validate(cls, value, config=None): if isinstance(value, dict): - return FromDocToDictFields(value) - elif callable(value): + value = FromDocToDictFields(value) + if callable(value): return value - else: - raise TypeError( - f"Invalid entry {value} ({type(value)}) for ToDoc, " - f"expected dict or callable" - ) + raise TypeError( + f"Invalid entry {value} ({type(value)}) for ToDoc, " + f"expected dict or callable" + ) class Accelerator: @@ -92,7 +88,6 @@ def __call__( model: Any, to_doc: ToDoc = FromDictFieldsToDoc("content"), from_doc: FromDoc = lambda doc: doc, - component_cfg: Dict[str, Dict[str, Any]] = None, ): raise NotImplementedError() diff --git a/edspdf/accelerators/multiprocessing.py b/edspdf/accelerators/multiprocessing.py new file mode 100644 index 00000000..6802a2ee --- /dev/null +++ b/edspdf/accelerators/multiprocessing.py @@ -0,0 +1,545 @@ +import gc +import signal +from multiprocessing.connection import wait +from random import shuffle +from typing import Any, Iterable, List, Optional, Union + +import torch +import torch.multiprocessing as mp + +from .. import TrainablePipe +from ..registry import registry +from ..utils.collections import batchify +from .base import Accelerator, FromDictFieldsToDoc, FromDoc, ToDoc + +DEBUG = True + +debug = ( + (lambda *args, flush=False, **kwargs: print(*args, **kwargs, flush=True)) + if DEBUG + else lambda *args, **kwargs: None +) + + +class Exchanger: + def __init__( + self, + num_stages, + num_gpu_workers, + num_cpu_workers, + gpu_worker_devices, + ): + # queue for cpu input tasks + self.gpu_worker_devices = gpu_worker_devices + # We add prioritized queue at the end for STOP signals + self.cpu_inputs_queues = [ + [mp.SimpleQueue()] + [mp.SimpleQueue() for _ in range(num_stages + 1)] + # The input queue is not shared between processes, since calling `wait` + # on a queue reader from multiple processes may lead to a deadlock + for _ in range(num_cpu_workers) + ] + self.gpu_inputs_queues = [ + [mp.SimpleQueue() for _ in range(num_stages + 1)] + for _ in range(num_gpu_workers) + ] + self.outputs_queue = mp.Queue() + + def get_cpu_tasks(self, idx): + while True: + queue_readers = wait( + [queue._reader for queue in self.cpu_inputs_queues[idx]] + ) + stage, queue = next( + (stage, q) + for stage, q in reversed(list(enumerate(self.cpu_inputs_queues[idx]))) + if q._reader in queue_readers + ) + try: + item = queue.get() + except BaseException: + continue + if item is None: + return + yield stage, item + + def put_cpu(self, item, stage, idx): + return self.cpu_inputs_queues[idx][stage].put(item) + + def get_gpu_tasks(self, idx): + while True: + queue_readers = wait( + [queue._reader for queue in self.gpu_inputs_queues[idx]] + ) + stage, queue = next( + (stage, q) + for stage, q in reversed(list(enumerate(self.gpu_inputs_queues[idx]))) + if q._reader in queue_readers + ) + try: + item = queue.get() + except BaseException: # pragma: no cover + continue + if item is None: + return + yield stage, item + + def put_gpu(self, item, stage, idx): + return self.gpu_inputs_queues[idx][stage].put(item) + + def put_results(self, items): + self.outputs_queue.put(items) + + def iter_results(self): + for out in iter(self.outputs_queue.get, None): + yield out + + +class CPUWorker(mp.Process): + def __init__( + self, + cpu_idx: int, + exchanger: Exchanger, + gpu_pipe_names: List[str], + model: Any, + device: Union[str, torch.device], + ): + super(CPUWorker, self).__init__() + + self.cpu_idx = cpu_idx + self.exchanger = exchanger + self.gpu_pipe_names = gpu_pipe_names + self.model = model + self.device = device + + def _run(self): + # Cannot pass torch tensor during init i think ? otherwise i get + # ValueError: bad value(s) in fds_to_keep + mp._prctl_pr_set_pdeathsig(signal.SIGINT) + + model = self.model.to(self.device) + stages = [{"cpu_components": [], "gpu_component": None}] + for name, component in model.pipeline: + if name in self.gpu_pipe_names: + stages[-1]["gpu_component"] = component + stages.append({"cpu_components": [], "gpu_component": None}) + else: + stages[-1]["cpu_components"].append(component) + + next_batch_id = 0 + active_batches = {} + debug( + f"CPU worker {self.cpu_idx} is ready", + next(model.parameters()).device, + flush=True, + ) + + had_error = False + with torch.no_grad(): + for stage, task in self.exchanger.get_cpu_tasks(self.cpu_idx): + if had_error: + continue # pragma: no cover + try: + if stage == 0: + gpu_idx = None + batch_id = next_batch_id + debug("preprocess start for", batch_id) + next_batch_id += 1 + docs = task + else: + gpu_idx, batch_id, result = task + debug("postprocess start for", batch_id) + docs = active_batches.pop(batch_id) + gpu_pipe = stages[stage - 1]["gpu_component"] + docs = gpu_pipe.postprocess(docs, result) # type: ignore + + for component in stages[stage]["cpu_components"]: + if hasattr(component, "batch_process"): + docs = component.batch_process(docs) + else: + docs = [component(doc) for doc in docs] + + gpu_pipe = stages[stage]["gpu_component"] + if gpu_pipe is not None: + preprocessed = gpu_pipe.make_batch(docs) # type: ignore + active_batches[batch_id] = docs + if gpu_idx is None: + gpu_idx = batch_id % len(self.exchanger.gpu_worker_devices) + collated = gpu_pipe.collate( # type: ignore + preprocessed, + device=self.exchanger.gpu_worker_devices[gpu_idx], + ) + self.exchanger.put_gpu( + item=(self.cpu_idx, batch_id, collated), + idx=gpu_idx, + stage=stage, + ) + batch_id += 1 + debug("preprocess end for", batch_id) + else: + self.exchanger.put_results((docs, self.cpu_idx, gpu_idx)) + debug("postprocess end for", batch_id) + except BaseException as e: + had_error = True + import traceback + + print(traceback.format_exc(), flush=True) + self.exchanger.put_results((e, self.cpu_idx, None)) + # We need to drain the queues of GPUWorker fed inputs (pre-moved to GPU) + # to ensure no tensor allocated on producer processes (CPUWorker via + # collate) are left in consumer processes + debug("Start draining CPU worker", self.cpu_idx) + [None for _ in self.exchanger.get_cpu_tasks(self.cpu_idx)] + debug(f"CPU worker {self.cpu_idx} is about to stop") + + def run(self): + self._run() + self.model = None + gc.collect() + torch.cuda.empty_cache() + + +class GPUWorker(mp.Process): + def __init__( + self, + gpu_idx, + exchanger: Exchanger, + gpu_pipe_names: List[str], + model: Any, + device: Union[str, torch.device], + ): + super().__init__() + + self.device = device + self.gpu_idx = gpu_idx + self.exchanger = exchanger + + self.gpu_pipe_names = gpu_pipe_names + self.model = model + self.device = device + + def _run(self): + debug("GPU worker", self.gpu_idx, "started") + mp._prctl_pr_set_pdeathsig(signal.SIGINT) + had_error = False + + model = self.model.to(self.device) + stage_components = [model.get_pipe(name) for name in self.gpu_pipe_names] + del model + with torch.no_grad(): + for stage, task in self.exchanger.get_gpu_tasks(self.gpu_idx): + if had_error: + continue # pragma: no cover + try: + cpu_idx, batch_id, batch = task + debug("forward start for", batch_id) + component = stage_components[stage] + res = component.module_forward(batch) + del batch, task + # TODO set non_blocking=True here + res = { + key: val.to("cpu") if not isinstance(val, int) else val + for key, val in res.items() + } + self.exchanger.put_cpu( + item=(self.gpu_idx, batch_id, res), + stage=stage + 1, + idx=cpu_idx, + ) + debug("forward end for", batch_id) + except BaseException as e: + had_error = True + self.exchanger.put_results((e, None, self.gpu_idx)) + import traceback + + print(traceback.format_exc(), flush=True) + task = batch = res = None # noqa + # We need to drain the queues of CPUWorker fed inputs (pre-moved to GPU) + # to ensure no tensor allocated on producer processes (CPUWorker via + # collate) are left in consumer processes + debug("Start draining GPU worker", self.gpu_idx) + [None for _ in self.exchanger.get_gpu_tasks(self.gpu_idx)] + debug(f"GPU worker {self.gpu_idx} is about to stop") + + def run(self): + self._run() + self.model = None + gc.collect() + torch.cuda.empty_cache() + + +DEFAULT_MAX_CPU_WORKERS = 4 + + +@registry.accelerator.register("multiprocessing") +class MultiprocessingAccelerator(Accelerator): + """ + If you have multiple CPU cores, and optionally multiple GPUs, we provide a + `multiprocessing` accelerator that allows to run the inference on multiple + processes. + + This accelerator dispatches the batches between multiple workers + (data-parallelism), and distribute the computation of a given batch on one or two + workers (model-parallelism). This is done by creating two types of workers: + + - a `CPUWorker` which handles the non deep-learning components and the + preprocessing, collating and postprocessing of deep-learning components + - a `GPUWorker` which handles the forward call of the deep-learning components + + The advantage of dedicating a worker to the deep-learning components is that it + allows to prepare multiple batches in parallel in multiple `CPUWorker`, and ensure + that the `GPUWorker` never wait for a batch to be ready. + + The overall architecture described in the following figure, for 3 CPU workers and 2 + GPU workers. + +