From 6b864a62df119e3f47b9e472b75b7deefdaf48d7 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 16 Oct 2024 20:32:03 +0100 Subject: [PATCH] str --- streamable/functions.py | 2 +- streamable/iters.py | 2 +- streamable/stream.py | 17 ++++++++++------- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/streamable/functions.py b/streamable/functions.py index 205d8a2..3b7270c 100644 --- a/streamable/functions.py +++ b/streamable/functions.py @@ -106,7 +106,7 @@ def map( iterator: Iterator[T], concurrency: int = 1, ordered: bool = True, - via: Literal["thread", "process"] = "thread", + via: "Literal['thread', 'process']" = "thread", ) -> Iterator[U]: validate_iterator(iterator) validate_concurrency(concurrency) diff --git a/streamable/iters.py b/streamable/iters.py index 39a1fde..e5b1fe9 100644 --- a/streamable/iters.py +++ b/streamable/iters.py @@ -418,7 +418,7 @@ def __init__( concurrency: int, buffer_size: int, ordered: bool, - via: Literal["thread", "process"], + via: "Literal['thread', 'process']", ) -> None: super().__init__(iterator, buffer_size, ordered) self.transformation = transformation diff --git a/streamable/stream.py b/streamable/stream.py index a3bf200..07dc05d 100644 --- a/streamable/stream.py +++ b/streamable/stream.py @@ -1,3 +1,4 @@ +from contextlib import suppress import datetime import logging from multiprocessing import get_logger @@ -11,7 +12,6 @@ Iterable, Iterator, List, - Literal, Optional, Sequence, Set, @@ -22,6 +22,9 @@ overload, ) +with suppress(ImportError): + from typing import Literal + from streamable.util.constants import NO_REPLACEMENT from streamable.util.validationtools import ( validate_concurrency, @@ -237,7 +240,7 @@ def foreach( effect: Callable[[T], Any], concurrency: int = 1, ordered: bool = True, - via: Literal["thread", "process"] = "thread", + via: "Literal['thread', 'process']" = "thread", ) -> "Stream[T]": """ For each upstream element, yields it after having called `effect` on it. @@ -247,7 +250,7 @@ def foreach( effect (Callable[[T], Any]): The function to be applied to each element as a side effect. concurrency (int): Represents both the number of threads used to concurrently apply the `effect` and the size of the buffer containing not-yet-yielded elements. If the buffer is full, the iteration over the upstream is stopped until some elements are yielded out of the buffer. (default is 1, meaning no multithreading). ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order). - via (Literal["thread", "process"]): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads). + via ("thread" or "process"): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads). Returns: Stream[T]: A stream of upstream elements, unchanged. """ @@ -305,7 +308,7 @@ def map( transformation: Callable[[T], U], concurrency: int = 1, ordered: bool = True, - via: Literal["thread", "process"] = "thread", + via: "Literal['thread', 'process']" = "thread", ) -> "Stream[U]": """ Applies `transformation` on upstream elements and yields the results. @@ -314,7 +317,7 @@ def map( transformation (Callable[[T], R]): The function to be applied to each element. concurrency (int): Represents both the number of threads used to concurrently apply `transformation` and the size of the buffer containing not-yet-yielded results. If the buffer is full, the iteration over the upstream is stopped until some results are yielded out of the buffer. (default is 1, meaning no multithreading). ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order). - via (Literal["thread", "process"]): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads). + via ("thread" or "process"): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads). Returns: Stream[R]: A stream of transformed elements. """ @@ -468,7 +471,7 @@ def __init__( effect: Callable[[T], Any], concurrency: int, ordered: bool, - via: Literal["thread", "process"], + via: "Literal['thread', 'process']", ) -> None: super().__init__(upstream) self._effect = effect @@ -521,7 +524,7 @@ def __init__( transformation: Callable[[T], U], concurrency: int, ordered: bool, - via: Literal["thread", "process"], + via: "Literal['thread', 'process']", ) -> None: super().__init__(upstream) self._transformation = transformation