From 723e844848323794c24efdaab91191a14391a0fb Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 12 May 2024 16:26:26 -0700 Subject: [PATCH] add dispatch.worker and dispatch.batch Signed-off-by: Achille Roussel --- src/dispatch/__init__.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/dispatch/__init__.py b/src/dispatch/__init__.py index f51106c..a098953 100644 --- a/src/dispatch/__init__.py +++ b/src/dispatch/__init__.py @@ -3,9 +3,10 @@ from __future__ import annotations import os +import threading from concurrent import futures from http.server import ThreadingHTTPServer -from typing import Any, Callable, Coroutine, Optional, TypeVar, overload +from typing import Any, Callable, Coroutine, List, Optional, TypeVar, overload from urllib.parse import urlsplit from typing_extensions import ParamSpec, TypeAlias @@ -31,6 +32,7 @@ "Status", "all", "any", + "batch", "call", "function", "gather", @@ -44,7 +46,8 @@ T = TypeVar("T") _registry: Optional[Registry] = None - +_workers: List[Callable[None, None]] = [] +_threads: List[threading.Thread] = [] def default_registry(): global _registry @@ -89,6 +92,18 @@ def run(init: Optional[Callable[P, None]] = None, *args: P.args, **kwargs: P.kwa parsed_url = urlsplit("//" + address) server_address = (parsed_url.hostname or "", parsed_url.port or 0) server = ThreadingHTTPServer(server_address, Dispatch(default_registry())) + + for worker in _workers: + def entrypoint(): + try: + worker() + finally: + server.shutdown() + _threads.append(threading.Thread(target=entrypoint)) + + for thread in _threads: + thread.start() + try: if init is not None: init(*args, **kwargs) @@ -96,3 +111,16 @@ def run(init: Optional[Callable[P, None]] = None, *args: P.args, **kwargs: P.kwa finally: server.shutdown() server.server_close() + + for thread in _threads: + thread.join() + +def batch() -> Batch: + """Create a new batch object.""" + return default_registry().batch() + + +def worker(fn: Callable[None, None]) -> Callable[None, None]: + """Decorator declaring workers that will be started when dipatch.run is called.""" + _workers.append(fn) + return fn