Skip to content

Commit

Permalink
str
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 16, 2024
1 parent 4668a2f commit 6b864a6
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
2 changes: 1 addition & 1 deletion streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def map(
iterator: Iterator[T],
concurrency: int = 1,
ordered: bool = True,
via: Literal["thread", "process"] = "thread",
via: "Literal['thread', 'process']" = "thread",
) -> Iterator[U]:
validate_iterator(iterator)
validate_concurrency(concurrency)
Expand Down
2 changes: 1 addition & 1 deletion streamable/iters.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def __init__(
concurrency: int,
buffer_size: int,
ordered: bool,
via: Literal["thread", "process"],
via: "Literal['thread', 'process']",
) -> None:
super().__init__(iterator, buffer_size, ordered)
self.transformation = transformation
Expand Down
17 changes: 10 additions & 7 deletions streamable/stream.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import suppress
import datetime
import logging
from multiprocessing import get_logger
Expand All @@ -11,7 +12,6 @@
Iterable,
Iterator,
List,
Literal,
Optional,
Sequence,
Set,
Expand All @@ -22,6 +22,9 @@
overload,
)

with suppress(ImportError):
from typing import Literal

from streamable.util.constants import NO_REPLACEMENT
from streamable.util.validationtools import (
validate_concurrency,
Expand Down Expand Up @@ -237,7 +240,7 @@ def foreach(
effect: Callable[[T], Any],
concurrency: int = 1,
ordered: bool = True,
via: Literal["thread", "process"] = "thread",
via: "Literal['thread', 'process']" = "thread",
) -> "Stream[T]":
"""
For each upstream element, yields it after having called `effect` on it.
Expand All @@ -247,7 +250,7 @@ def foreach(
effect (Callable[[T], Any]): The function to be applied to each element as a side effect.
concurrency (int): Represents both the number of threads used to concurrently apply the `effect` and the size of the buffer containing not-yet-yielded elements. If the buffer is full, the iteration over the upstream is stopped until some elements are yielded out of the buffer. (default is 1, meaning no multithreading).
ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order).
via (Literal["thread", "process"]): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads).
via ("thread" or "process"): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads).
Returns:
Stream[T]: A stream of upstream elements, unchanged.
"""
Expand Down Expand Up @@ -305,7 +308,7 @@ def map(
transformation: Callable[[T], U],
concurrency: int = 1,
ordered: bool = True,
via: Literal["thread", "process"] = "thread",
via: "Literal['thread', 'process']" = "thread",
) -> "Stream[U]":
"""
Applies `transformation` on upstream elements and yields the results.
Expand All @@ -314,7 +317,7 @@ def map(
transformation (Callable[[T], R]): The function to be applied to each element.
concurrency (int): Represents both the number of threads used to concurrently apply `transformation` and the size of the buffer containing not-yet-yielded results. If the buffer is full, the iteration over the upstream is stopped until some results are yielded out of the buffer. (default is 1, meaning no multithreading).
ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order).
via (Literal["thread", "process"]): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads).
via ("thread" or "process"): If `concurrency` > 1, whether to apply `transformation` using processes or threads (default via threads).
Returns:
Stream[R]: A stream of transformed elements.
"""
Expand Down Expand Up @@ -468,7 +471,7 @@ def __init__(
effect: Callable[[T], Any],
concurrency: int,
ordered: bool,
via: Literal["thread", "process"],
via: "Literal['thread', 'process']",
) -> None:
super().__init__(upstream)
self._effect = effect
Expand Down Expand Up @@ -521,7 +524,7 @@ def __init__(
transformation: Callable[[T], U],
concurrency: int,
ordered: bool,
via: Literal["thread", "process"],
via: "Literal['thread', 'process']",
) -> None:
super().__init__(upstream)
self._transformation = transformation
Expand Down

0 comments on commit 6b864a6

Please sign in to comment.