diff --git a/docs/source/tutorials/quickstart.rst b/docs/source/tutorials/quickstart.rst index 367515b..8a47947 100644 --- a/docs/source/tutorials/quickstart.rst +++ b/docs/source/tutorials/quickstart.rst @@ -59,8 +59,8 @@ with the :py:func:`~parfun.entry_point.set_parallel_backend` or within a Python set_parallel_backend("local_multiprocessing") # Set the parallel backend with a Python context. - with set_parallel_backend_context("dask_remote"): - ... # Will run with the "dask_remote" backend. + with set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"): + ... # Will run with parallel task over Scaler. See :py:func:`~parfun.entry_point.set_parallel_backend` for a description of the available backend options. diff --git a/parfun/about.py b/parfun/about.py index 65422e9..b378fc6 100644 --- a/parfun/about.py +++ b/parfun/about.py @@ -1 +1 @@ -__version__ = "6.0.8" +__version__ = "7.0.0" diff --git a/parfun/backend/dask.py b/parfun/backend/dask.py index 265789e..0200346 100644 --- a/parfun/backend/dask.py +++ b/parfun/backend/dask.py @@ -145,11 +145,5 @@ def executor(self) -> Generator[ClientExecutor, None, None]: with worker_client() as client: yield client.get_executor() - def get_scheduler_address(self) -> None: - return None - - def disconnect(self): - pass - def shutdown(self): pass diff --git a/parfun/backend/local_multiprocessing.py b/parfun/backend/local_multiprocessing.py index 54b9bdf..3440996 100644 --- a/parfun/backend/local_multiprocessing.py +++ b/parfun/backend/local_multiprocessing.py @@ -26,14 +26,6 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: return None def submit(self, fn, *args, **kwargs) -> ProfiledFuture: - """ - Submits a task to one of the available worker. Returns a future that will complete when the computation - finishes. - - :param fn: function to run - :return: The future. - """ - with profile() as submit_duration: future = ProfiledFuture() @@ -76,8 +68,7 @@ def on_done_callback(underlying_future: Future): @attrs.define(init=False) class LocalMultiprocessingBackend(BackendEngine): """ - A concurrent engine that shares a similar interface to :py:class:`concurrent.futures.Executor`, but that blocks when - submitting tasks when no worker is available. + Concurrent engine that uses Python builtin :mod:`multiprocessing` module. """ _underlying_executor: Executor = attrs.field(validator=instance_of(Executor), init=False) @@ -94,12 +85,6 @@ def __init__(self, max_workers: int = psutil.cpu_count(logical=False) - 1, is_pr def session(self) -> LocalMultiprocessingSession: return LocalMultiprocessingSession(self._underlying_executor) - def get_scheduler_address(self) -> None: - return None - - def disconnect(self): - pass - def shutdown(self, wait=True): self._underlying_executor.shutdown(wait=wait) diff --git a/parfun/backend/local_single_process.py b/parfun/backend/local_single_process.py index 77702bf..f7481df 100644 --- a/parfun/backend/local_single_process.py +++ b/parfun/backend/local_single_process.py @@ -40,12 +40,6 @@ class LocalSingleProcessBackend(BackendEngine): def session(self) -> BackendSession: return LocalSingleProcessSession() - def get_scheduler_address(self) -> None: - return None - - def disconnect(self): - pass - def shutdown(self): pass diff --git a/parfun/backend/mixins.py b/parfun/backend/mixins.py index 1152a82..ae969d8 100644 --- a/parfun/backend/mixins.py +++ b/parfun/backend/mixins.py @@ -1,15 +1,13 @@ import abc -import logging -from concurrent.futures import wait from contextlib import AbstractContextManager -from typing import Any, Callable, Optional +from typing import Any, Callable from parfun.backend.profiled_future import ProfiledFuture class BackendSession(AbstractContextManager, metaclass=abc.ABCMeta): """ - An task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values, + A task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values, argument values and future objects). """ @@ -55,30 +53,10 @@ def session(self) -> BackendSession: """ raise NotImplementedError() - def submit(self, fn: Callable, *args, **kwargs) -> ProfiledFuture: - logging.warning("`submit()` will be removed in a future version, use `session()` instead.") - - with self.session() as session: - future = session.submit(fn, *args, **kwargs) - wait([future]) - - return future - - @abc.abstractmethod - def get_scheduler_address(self) -> Optional[str]: - raise NotImplementedError() - - @abc.abstractmethod - def disconnect(self): - """ - Disconnects from schedulers in backend engine - """ - raise NotImplementedError() - @abc.abstractmethod def shutdown(self): """ - Shutdowns schedulers in backend engine + Shutdowns all resources required by the backend engine. """ raise NotImplementedError() diff --git a/parfun/backend/scaler.py b/parfun/backend/scaler.py index 97e3b9b..b7a1b90 100644 --- a/parfun/backend/scaler.py +++ b/parfun/backend/scaler.py @@ -178,6 +178,7 @@ def cluster(self) -> SchedulerClusterCombo: def shutdown(self): if self._cluster is not None: self._cluster.shutdown() + self._cluster = None @staticmethod def __get_constructor_arg_names(class_: type) -> Set: diff --git a/parfun/combine/collection.py b/parfun/combine/collection.py index 8d14f0d..b18c511 100644 --- a/parfun/combine/collection.py +++ b/parfun/combine/collection.py @@ -2,9 +2,8 @@ A collection of pre-define APIs to help users combine collection data, like list, array, tuple """ -import logging -from itertools import chain, tee -from typing import Iterable, List, Tuple, TypeVar +from itertools import chain +from typing import Iterable, List, TypeVar ListValue = TypeVar("ListValue") @@ -19,63 +18,3 @@ def list_concat(values: Iterable[List[ListValue]]) -> List[ListValue]: """ return list(chain.from_iterable(values)) - - -def lists_concat(values: Iterable[List[ListValue]]) -> List[ListValue]: - logging.warning( - f"`{lists_concat.__name__}` will be removed in a future version, use `{list_concat.__name__}` instead." - ) - - return list_concat(values) - - -def concat_lists(values: Iterable[List[ListValue]]) -> List[ListValue]: - logging.warning( - f"`{concat_lists.__name__}` will be removed in a future version, use `{list_concat.__name__}` instead." - ) - - return list_concat(values) - - -def unzip(iterable: Iterable[Tuple]) -> Tuple[Iterable, ...]: - """ - Opposite of zip(). - - .. code:: python - - ls_1 = [1, 2, 3] - ls_2 = [2, 4, 6] - - zipped = zip(ls_1, ls_2) # [(1, 2), (2, 4), (3, 6)] - - ls_1_out, ls_2_out = unzip(zipped) - print(ls_1_out) # [1, 2, 3] - print(ls_2_out) # [2, 4, 6] - - """ - - logging.warning(f"`{unzip.__name__}` will be removed in a future version.") - - # Fetches the first item to deduce the number of nested values. - it = iter(iterable) - try: - first_values = next(it) - n_values = len(first_values) - except StopIteration: - return () - - def tupled_generator(): - yield first_values - while True: - try: - yield next(it) - except StopIteration: - return - - teed_iterators = tee(tupled_generator(), n_values) - - # Captures the i variable as an function argument, as variables are captured by reference in Python's closures. - def map_function(i_local): - return lambda v: v[i_local] - - return tuple(map(map_function(i), gen) for i, gen in enumerate(teed_iterators)) diff --git a/parfun/combine/dataframe.py b/parfun/combine/dataframe.py index dc37641..436ca04 100644 --- a/parfun/combine/dataframe.py +++ b/parfun/combine/dataframe.py @@ -2,7 +2,6 @@ A collection of pre-define APIs to help users combine Pandas' Dataframe data """ -import logging from typing import Iterable try: @@ -32,17 +31,3 @@ def df_concat(dfs: Iterable[pd.DataFrame]) -> pd.DataFrame: """ return pd.concat(dfs, ignore_index=True) - - -def dfs_concat(dfs: Iterable[pd.DataFrame]) -> pd.DataFrame: - logging.warning(f"`{dfs_concat.__name__}` will be removed in a future version, use `{df_concat.__name__}` instead.") - - return df_concat(dfs) - - -def concat_list_of_dfs(df_list: Iterable[pd.DataFrame]) -> pd.DataFrame: - logging.warning( - f"`{concat_list_of_dfs.__name__}` will be removed in a future version, use `{df_concat.__name__}` instead." - ) - - return df_concat(df_list) diff --git a/parfun/decorators.py b/parfun/decorators.py index 440e41a..24bf6e5 100644 --- a/parfun/decorators.py +++ b/parfun/decorators.py @@ -8,17 +8,15 @@ from parfun.kernel.function_signature import NamedArguments from parfun.kernel.parallel_function import ParallelFunction -from parfun.object import FunctionInputType, FunctionOutputType, PartitionType -from parfun.partition.object import PartitionFunction, PartitionGenerator +from parfun.object import FunctionInputType, FunctionOutputType +from parfun.partition.object import PartitionGenerator from parfun.partition_size_estimator.linear_regression_estimator import LinearRegessionEstimator from parfun.partition_size_estimator.mixins import PartitionSizeEstimator def parfun( + split: Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]], combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType], - split: Optional[Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]] = None, - partition_on: Optional[Union[str, Tuple[str, ...]]] = None, - partition_with: Optional[PartitionFunction[PartitionType]] = None, initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, profile: bool = False, @@ -52,24 +50,6 @@ def multiply_by_constant(values: Iterable[int], constant: int): See :py:mod:`~parfun.partition.api` for the list of predefined partitioning functions. - Cannot be used with ``partition_on`` or ``partition_with``. - - :param partition_on: - Only partition the data on the provided fields or fields. To be used with ``partition_with``. - - There are three constraints: - - * the values should be consistent with some names of arguments in ``function`` - - * the numbers of ``partition_on`` should be equal to the number of arguments in ``partition_with`` - - * the order of ``partition_on`` should be consistent with the ones in ``partition_on`` - - :type partition_on: Tuple | str - - :param partition_with: distributes the computation by running the function on the input. - :type partition_with: Callable - :param combine_with: aggregates the results by running the function. :type combine_with: Callable :param initial_partition_size: @@ -102,8 +82,6 @@ def decorator(function: Callable[[FunctionInputType], FunctionOutputType]): function=function, function_name=function.__name__, split=split, - partition_on=partition_on, - partition_with=partition_with, combine_with=combine_with, initial_partition_size=initial_partition_size, fixed_partition_size=fixed_partition_size, diff --git a/parfun/entry_point.py b/parfun/entry_point.py index 01ced74..f6a3362 100644 --- a/parfun/entry_point.py +++ b/parfun/entry_point.py @@ -51,102 +51,46 @@ def set_parallel_backend(backend: Union[str, BackendEngine], *args, **kwargs) -> :param backend: Supported backend options: - * ``"none"``: unsets the current parallel backend. - The parallel functions will be entirely disabled. + * ``"none"``: disable the current parallel backend. - When used with :py:func:`~parfun.decorators.parfun` will run the function sequentially and ignore partition - and combine functions. + Functions decorated with :py:func:`~parfun.decorators.parfun` will run sequentially as if not decorated. - * ``"local_single_process"``: runs the parallel tasks inside the calling Python process. For debug purposes. + Partitionning and combining functions will be ignored. - When used with :py:func:`~parfun.decorators.parfun`: + * ``"local_single_process"``: runs the tasks inside the calling Python process. - 1. First, the input data will be cut partitioned into multiple chunks using the provided function + Functions decorated with :py:func:`~parfun.decorators.parfun` will partition the input data, and run the + combining function on the output data, but will also execute the function inside the calling Python process. - 2. Then, the single process would interpret these partitions using the function one by one to generate a - list of results accordingly - - 3. Finally, the list of results will be merged into one using the combine function + Mostly intended for debugging purposes. See :py:mod:`~parfun.backend.local_single_process.LocalSingleProcessBackend`. - * ``"local_multiprocessing"``: runs the parallel tasks in parallel using Python's ``multiprocessing``. - - When used with :py:func:`~parfun.decorators.parfun`: - - 1. First, the input data will be cut partitioned into multiple chunks using the provided function - - 2. Then, the multiprocessing worker will interpret these partitions using the function in parallel to - generate a list of results accordingly - - 3. Finally, the list of results will be merged into one using the combine function + * ``"local_multiprocessing"``: runs the tasks in parallel using Python ``multiprocessing`` processes. See :py:mod:`~parfun.backend.local_multiprocessing.LocalMultiprocessingBackend`. - * ``"dask_local"``: runs the parallel tasks in parallel using a local Dask cluster. + * ``"scaler_local"``: runs the tasks in parallel using an internally managed Scaler cluster. - When used with :py:func:`~parfun.decorators.parfun`: + See :py:mod:`~parfun.backend.scaler.ScalerLocalBackend`. - 1. First, the input data will be cut partitioned into multiple chunks using the provided function + * ``"scaler_remote"``: runs the tasks in parallel using an externally managed Dask cluster. - 2. Then, the Dask local workers would interpret these partitions using the function one by one to generate - a list of results accordingly + See :py:mod:`~parfun.backend.scaler.ScalerRemoteBackend`. - 3. Finally, the list of results are merged into one using the combine function + * ``"dask_local"``: runs the tasks in parallel using an internally managed Dask cluster. See :py:mod:`~parfun.backend.dask_local.DaskLocalClusterBackend`. - * ``"dask_remote"``: runs the parallel tasks in parallel using a remote Dask cluster. - - When used with :py:func:`~parfun.decorators.parfun`: - - 1. First, the input data will be cut partitioned into multiple chunks using the provided function - - 2. Then, the Dask remote workers would interpret these partitions using the function one by one to generate - a list of results accordingly - - 3. Finally, the list of results are merged into one using the combine function + * ``"dask_remote"``: runs the tasks in parallel using an externally managed Dask cluster. See :py:mod:`~parfun.backend.dask_remote.DaskRemoteClusterBackend`. - * ``"dask_current"``: runs the parallel tasks in parallel using currently process defined Dask cluster. - - When used with :py:func:`~parfun.decorators.parfun`: - - 1. First, the input data will be cut partitioned into multiple chunks using the provided function - - 2. Then, the Dask workers would interpret these partitions using the function one by one to generate - a list of results accordingly - - 3. Finally, the list of results are merged into one using the combine function + * ``"dask_current"``: runs the tasks in parallel using the currently running Dask client + (:py:func:`~distributed.get_client`). See :py:mod:`~parfun.backend.dask_current.DaskCurrentBackend`. - * ``"scaler_local"``: runs the parallel tasks in parallel using a local Scaler cluster. - - When used with :py:func:`~parfun.decorators.parfun`: - - 1. First, the input data will be cut partitioned into multiple chunks using the provided function - - 2. Then, the Scaler local workers would interpret these partitions using the function one by one to generate - a list of results accordingly - - 3. Finally, the list of results are merged into one using the combine function - - See :py:mod:`~parfun.backend.scaler.ScalerLocalBackend`. - - * ``"scaler_remote"``: runs the parallel tasks in parallel using a remote Scaler cluster. - - When used with :py:func:`~parfun.decorators.parfun`: - - 1. First, the input data will be cut partitioned into multiple chunks using the provided function - - 2. Then, the Scaler remote workers would interpret these partitions using the function one by one to generate - a list of results accordingly - - 3. Finally, the list of results are merged into one using the combine function - - See :py:mod:`~parfun.backend.scaler.ScalerRemoteBackend`. :type backend: Union[str, BackendEngine] :param args: Additional positional parameters for the backend constructor @@ -176,7 +120,6 @@ def set_parallel_backend_context(backend: Union[str, BackendEngine], *args, **kw engine = _backend_engine.get() if engine is not None: - engine.disconnect() engine.shutdown() _backend_engine.reset(token) diff --git a/parfun/kernel/parallel_function.py b/parfun/kernel/parallel_function.py index 79dd1e0..1e46da8 100644 --- a/parfun/kernel/parallel_function.py +++ b/parfun/kernel/parallel_function.py @@ -10,8 +10,7 @@ from parfun.functions import parallel_timed_map from parfun.kernel.function_signature import FunctionSignature, NamedArguments from parfun.object import FunctionInputType, FunctionOutputType, PartitionType -from parfun.partition.api import multiple_arguments -from parfun.partition.object import PartitionFunction, PartitionGenerator +from parfun.partition.object import PartitionGenerator from parfun.partition_size_estimator.linear_regression_estimator import LinearRegessionEstimator from parfun.partition_size_estimator.mixins import PartitionSizeEstimator from parfun.profiler.functions import export_task_trace, print_profile_trace, timed_combine_with, timed_partition @@ -27,11 +26,11 @@ class ParallelFunction: function: Callable[[FunctionInputType], FunctionOutputType] = attrs.field() - combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType] = attrs.field() + function_name: str = attrs.field() split: Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]] = attrs.field() - function_name: str = attrs.field() + combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType] = attrs.field() initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = attrs.field() fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = attrs.field() @@ -48,39 +47,19 @@ def __init__( self, function: Callable[[FunctionInputType], FunctionOutputType], function_name: str, + split: Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]], combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType], - split: Optional[Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]] = None, - partition_on: Optional[Union[str, Tuple[str, ...]]] = None, - partition_with: Optional[PartitionFunction[PartitionType]] = None, initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, profile: bool = False, trace_export: Optional[str] = None, partition_size_estimator_factory: Callable[[], PartitionSizeEstimator] = LinearRegessionEstimator, ): - if (partition_on is None) != (partition_with is None): - raise ValueError("`partition_on` and `partition_with` should be both simultaneously set or None.") - - if partition_on is not None: - assert partition_with is not None - - if split is not None: - raise ValueError("`split` cannot be set with `partition_on` or `partition_with`.") - - if isinstance(partition_on, str): - partition_on = (partition_on,) - - # Implements the legacy `partition_on` and `partition_with` API using the newer `split` API. - - split = ParallelFunction._legacy_partition_with(partition_on, partition_with) - initial_partition_size = ParallelFunction._legacy_partition_size(partition_on, initial_partition_size) - fixed_partition_size = ParallelFunction._legacy_partition_size(partition_on, fixed_partition_size) - self.__attrs_init__( # type: ignore[attr-defined] function=function, function_name=function_name, - combine_with=combine_with, split=split, + combine_with=combine_with, initial_partition_size=initial_partition_size, fixed_partition_size=fixed_partition_size, profile=profile, @@ -194,36 +173,6 @@ def _get_user_partition_sizes(self, args, kwargs) -> Tuple[Optional[int], Option return initial_partition_size, fixed_partition_size - @staticmethod - def _legacy_partition_with( - partition_on: Union[str, Tuple[str, ...]], partition_with: PartitionFunction[PartitionType] - ) -> Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]: - """Implements the legacy `partition_on` and `partition_with` API using the newer `split` interface.""" - - return multiple_arguments(partition_on, partition_with) - - @staticmethod - def _legacy_partition_size( - partition_on: Tuple[str, ...], partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] - ) -> Optional[Union[int, Callable[[FunctionInputType], int]]]: - """ - Implements the legacy behaviour of `initial_partition_size` and `fixed_partition_size` when used with - `partition_on` and `partition_with` API. - """ - - if not callable(partition_size): - return partition_size - - # When the partition size argument is a callable, the old API only passes the `partition_on` values as - # positional arguments. - - def legacy_partition_size(*args, **kwargs): - assert len(args) == 0 - partition_args = [kwargs[arg_name] for arg_name in partition_on] - return partition_size(*partition_args) - - return legacy_partition_size - def is_nested_parallelism(): """Returns True if there is any call to `_apply_function()` in the current call stack.""" diff --git a/parfun/partition/collection.py b/parfun/partition/collection.py index cf2c641..b405d4c 100644 --- a/parfun/partition/collection.py +++ b/parfun/partition/collection.py @@ -2,7 +2,6 @@ A collection of pre-define APIs to help users partition collection data, like list, array, tuple """ -import logging from typing import Iterable, Tuple from parfun.partition.object import PartitionGenerator, PartitionType @@ -39,23 +38,3 @@ def list_by_chunk(*iterables: Iterable[PartitionType]) -> PartitionGenerator[Tup if partition: yield len(partition), tuple(zip(*partition)) - - -def lists_by_chunk(*iterables: Iterable[PartitionType]) -> PartitionGenerator[Tuple[Iterable[PartitionType], ...]]: - logging.warning( - f"`{lists_by_chunk.__name__}` will be removed in a future version, use " - + f"`{list_by_chunk.__name__}` instead." - ) - - return list_by_chunk(*iterables) - - -def zip_partition_on_args( - *iterable: Iterable[PartitionType], -) -> PartitionGenerator[Tuple[Iterable[PartitionType], ...]]: - logging.warning( - f"`{zip_partition_on_args.__name__}` will be removed in a future version, use " - + f"`{list_by_chunk.__name__}` instead." - ) - - return list_by_chunk(*iterable) diff --git a/parfun/partition/dataframe.py b/parfun/partition/dataframe.py index 39ce4c7..5ccd2b0 100644 --- a/parfun/partition/dataframe.py +++ b/parfun/partition/dataframe.py @@ -2,8 +2,7 @@ A collection of pre-define APIs to help users partition dataframe data """ -import logging -from typing import Iterable, List, Literal, Tuple +from typing import Iterable, List, Tuple try: import pandas as pd @@ -141,44 +140,6 @@ def concat_chunked_group_dfs(chunked_group: Tuple[List[pd.DataFrame], ...]): return generator -def dfs_by_row(*dfs: pd.DataFrame) -> PartitionGenerator[Tuple[pd.DataFrame, ...]]: - logging.warning(f"`{dfs_by_row.__name__}` will be removed in a future version, use `{df_by_row.__name__}` instead.") - - return df_by_row(*dfs) - - -def partition_dfs_by_chunk(*dfs: pd.DataFrame) -> PartitionGenerator[Tuple[pd.DataFrame, ...]]: - logging.warning( - f"`{partition_dfs_by_chunk.__name__}` will be removed in a future version, use `{df_by_row.__name__}` instead." - ) - - return df_by_row(*dfs) - - -def dfs_by_group(*args, **kwargs) -> PartitionFunction: - logging.warning( - f"`{dfs_by_group.__name__}` will be removed in a future version, use `{df_by_group.__name__}` instead." - ) - - return df_by_group(*args, **kwargs) - - -def partition_dfs_group_by( - *dfs: pd.DataFrame, - by, - axis: Literal["index", "columns"] = "index", - as_index: bool = True, - sort: bool = True, - group_keys: bool = False, -) -> PartitionGenerator[Tuple[pd.DataFrame, ...]]: - logging.warning( - f"`{partition_dfs_group_by.__name__}` will be removed in a future version, use " - + f"`{df_by_group.__name__}` instead." - ) - - return df_by_group(by=by, axis=axis, as_index=as_index, sort=sort, group_keys=group_keys)(*dfs) - - def __validate_dfs_parameter(*dfs: pd.DataFrame) -> None: if len(dfs) < 1: raise ValueError("missing `dfs` parameter.") diff --git a/parfun/partition/nested.py b/parfun/partition/nested.py deleted file mode 100644 index a2d951b..0000000 --- a/parfun/partition/nested.py +++ /dev/null @@ -1,82 +0,0 @@ -from typing import Any, Callable, Dict, Sequence, Tuple, Union - -from parfun.partition.object import PartitionFunction, PartitionGenerator -from parfun.partition.primitives import partition_flatmap, partition_map - - -def partition_nested( - *columns_partitions: Tuple[Union[Tuple[str, ...], str], PartitionFunction[Tuple]] -) -> Callable[..., PartitionGenerator[Dict[str, Any]]]: - """ - Creates a new partitioning function from a collection of nested partitioning functions that are individually applied - to some of the input arguments. - - .. code: python - - # Applies `df_by_row` on `col_1` and `col_2`, then `df_by_group` on `col_3`, and finally - # `df_by_row` on the previously partitioned `col_3`. - partition_columns( - (("col_1", "col_2"), df_by_row), - ("col_3", df_by_group(by="year")), - ("col_3", df_by_row), - ) - - """ - - # Validates the input. - - if len(columns_partitions) < 1: - raise ValueError("empty partition generator collection.") - - for columns, _ in columns_partitions: - if not isinstance(columns, (str, tuple)): - raise ValueError("column values should be either strings or tuples of strings.") - - if isinstance(columns, str): - columns = (columns,) - - # Builds the generator from the nested-most partition using partition_flatmap calls. - - return _partition_nested_build_generator(columns_partitions) - - -def _partition_nested_build_generator( - columns_partitions: Sequence[Tuple[Union[Tuple[str, ...], str], Callable[..., PartitionGenerator[Tuple]]]] -) -> Callable[..., PartitionGenerator[Dict[str, Any]]]: - assert len(columns_partitions) >= 1 - - current_arg_names, partition_function = columns_partitions[0] - remaining_columns_partitions = columns_partitions[1:] - - if isinstance(current_arg_names, str): - current_arg_names = (current_arg_names,) - - def generator(**kwargs) -> PartitionGenerator[Dict[str, Any]]: - missing_args = [p for p in current_arg_names if p not in kwargs] - if len(missing_args) > 0: - missing_arg_str = ", ".join(missing_args) - raise ValueError(f"missing partition argument(s): {missing_arg_str}") - - current_args = (kwargs[p] for p in current_arg_names) - current_generator = partition_function(*current_args) - - if len(columns_partitions) > 1: - return partition_flatmap( - lambda *partitioned_values: _partition_nested_build_generator(remaining_columns_partitions)( - **_updated_partitioned_kwargs(kwargs, current_arg_names, partitioned_values) - ), - current_generator, - ) # type: ignore[type-var, return-value] - else: - return partition_map( - lambda *partitioned_values: _updated_partitioned_kwargs(kwargs, current_arg_names, partitioned_values), - current_generator, - ) # type: ignore[type-var, return-value] - - return generator - - -def _updated_partitioned_kwargs( - kwargs: Dict[str, Any], arg_names: Tuple[str, ...], partitioned_values: Any -) -> Dict[str, Any]: - return {**kwargs, **dict(zip(arg_names, partitioned_values))} diff --git a/parfun/partition/object.py b/parfun/partition/object.py index 2c0885f..0279049 100644 --- a/parfun/partition/object.py +++ b/parfun/partition/object.py @@ -14,7 +14,6 @@ * Use regular Python generators (prefered) or iterators, returning partitioned values: - .. code:: python def partition_list_by_chunks(values: List): PartitionGenerator[List]: @@ -28,7 +27,6 @@ def partition_list_by_chunks(values: List): PartitionGenerator[List]: size aware generators must get a suggested partition size through the return value of the ``yield`` statement, and yield partition sizes with its partitioned values: - .. code:: python def partition_list_by_chunks(values: List, constant: int) -> PartitionGenerator[Tuple[List, int]]: diff --git a/parfun/profiler/functions.py b/parfun/profiler/functions.py index 9d1181c..8698012 100644 --- a/parfun/profiler/functions.py +++ b/parfun/profiler/functions.py @@ -18,7 +18,6 @@ def profile(timer_function: Callable[[], TraceTime] = time.process_time_ns): """ Provides a Python ``with`` context that measures the execution time of the enclosing block. - .. code:: python with profile() as duration: diff --git a/tests/combine/test_collection.py b/tests/combine/test_collection.py index 80e8ed1..dedc461 100644 --- a/tests/combine/test_collection.py +++ b/tests/combine/test_collection.py @@ -1,6 +1,6 @@ import unittest -from parfun.combine.collection import list_concat, unzip +from parfun.combine.collection import list_concat class TestCombineCollection(unittest.TestCase): @@ -9,14 +9,6 @@ def test_list_concat(self): output_data = list_concat(input_data) self.assertListEqual(output_data, list(range(0, 10))) - def test_unzip(self): - input_data_1 = [1, 2, 3, 4] - input_data_2 = ["a", "b", "c"] - - output_data_1, output_data_2 = unzip(zip(input_data_1, input_data_2)) - self.assertSequenceEqual(input_data_1[:3], list(output_data_1)) - self.assertSequenceEqual(input_data_2, list(output_data_2)) - if __name__ == "__main__": unittest.main() diff --git a/tests/kernel/test_parallel_function.py b/tests/kernel/test_parallel_function.py index e43dd01..c1811cd 100644 --- a/tests/kernel/test_parallel_function.py +++ b/tests/kernel/test_parallel_function.py @@ -1,6 +1,7 @@ import unittest from parfun.entry_point import set_parallel_backend +from parfun.partition.api import all_arguments, per_argument from parfun.kernel.parallel_function import ParallelFunction @@ -14,37 +15,17 @@ def test_validate_signature(self): ParallelFunction( function=lambda x, y: x + y, # type: ignore[misc, arg-type] function_name="lambda", - partition_on=("x",), - partition_with=lambda x: [(x,)], # type: ignore[arg-type, return-value] + split=per_argument(x=lambda x: [(x,)]), # type: ignore[arg-type, return-value] combine_with=sum, ) ParallelFunction( function=lambda *args, **kwargs: tuple(), # type: ignore[misc, arg-type] function_name="lambda", - partition_on=("x", "y"), - partition_with=lambda x, y: [(x, y)], # type: ignore[arg-type, return-value] + split=all_arguments(lambda x, y: [(x, y)]), # type: ignore[arg-type, return-value] combine_with=sum, ) - with self.assertRaises(ValueError): - ParallelFunction( - function=lambda x, y: x + y, # type: ignore[misc, arg-type] - function_name="lambda", - partition_on=(), - partition_with=lambda: [()], # type: ignore[arg-type, return-value] - combine_with=sum, - ) - - with self.assertRaises(ValueError): - ParallelFunction( - function=lambda x, y: x + y, # type: ignore[misc, arg-type] - function_name="lambda", - partition_on=["x", "z"], # type: ignore[arg-type] - partition_with=lambda x, z: [(x, z)], # type: ignore[arg-type, return-value] - combine_with=sum, - ) - if __name__ == "__main__": unittest.main() diff --git a/tests/partition/test_nested.py b/tests/partition/test_nested.py deleted file mode 100644 index 8ebda44..0000000 --- a/tests/partition/test_nested.py +++ /dev/null @@ -1,53 +0,0 @@ -import unittest - -try: - import pandas as pd -except ImportError: - raise ImportError("Pandas dependency missing. Use `pip install 'parfun[pandas]'` to install Pandas.") - -from parfun.partition.collection import list_by_chunk -from parfun.partition.dataframe import df_by_group -from parfun.partition.nested import partition_nested -from parfun.partition.utility import with_partition_size - - -class TestPartitionNested(unittest.TestCase): - def test_partition_nested(self): - arg_1 = list(range(2010, 2014)) - arg_2 = pd.DataFrame({"industry": [1, 2, 1, 3, 1, 2], "score": [212.1, 331.1, 18.2, 98.2, 23.1, 12.3]}) - arg_3 = ["USA", "Belgium", "China", "Poland"] - - def custom_partition_generator(df: pd.DataFrame): - for row in range(0, df.shape[0]): - yield df.iloc[row : row + 1], - - partition_function = partition_nested( - (("arg_1", "arg_3"), list_by_chunk), - ("arg_2", df_by_group(by="industry")), - ("arg_2", custom_partition_generator), - ) - - partitions = list( - with_partition_size(partition_function(arg_1=arg_1, arg_2=arg_2, arg_3=arg_3), partition_size=1) - ) - - n_partitions = len(arg_1) * arg_2.shape[0] - self.assertEqual(len(partitions), n_partitions) - - i = 0 - for partition_arg_1, partition_arg_3 in zip(arg_1, arg_3): - for _, partition_arg_2 in arg_2.groupby(by="industry"): - for i_row_arg_2 in range(0, len(partition_arg_2)): - row_arg_2 = partition_arg_2.iloc[i_row_arg_2 : i_row_arg_2 + 1] - - partition = partitions[i] - - self.assertSequenceEqual(partition["arg_1"], [partition_arg_1]) - self.assertTrue(partition["arg_2"].equals(row_arg_2)) - self.assertSequenceEqual(partition["arg_3"], [partition_arg_3]) - - i += 1 - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_decorators.py b/tests/test_decorators.py index 4286dc0..174374d 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -16,7 +16,7 @@ from parfun.entry_point import ( BACKEND_REGISTRY, get_parallel_backend, set_parallel_backend, set_parallel_backend_context ) -from parfun.partition.api import per_argument +from parfun.partition.api import multiple_arguments, per_argument from parfun.partition.collection import list_by_chunk from parfun.partition.dataframe import df_by_row from parfun.partition.object import PartitionGenerator @@ -165,7 +165,7 @@ def test_per_argument(self): self.assertTrue(sequential.equals(parallel)) -@parfun(partition_on=("col1", "col2", "col3"), partition_with=list_by_chunk, combine_with=sum, fixed_partition_size=100) +@parfun(split=multiple_arguments(("col1", "col2", "col3"), list_by_chunk), combine_with=sum, fixed_partition_size=100) def _sum_horizontally(col1: Iterable[int], col2: Iterable[int], col3: Iterable[int], constant: int) -> int: result = 0 for i in zip(col1, col2, col3): @@ -174,12 +174,12 @@ def _sum_horizontally(col1: Iterable[int], col2: Iterable[int], col3: Iterable[i return result -@parfun(partition_on="values", partition_with=df_by_row, combine_with=df_concat) +@parfun(split=per_argument(values=df_by_row), combine_with=df_concat) def _find_all_nth_primes(values: pd.DataFrame) -> pd.DataFrame: return values.apply(lambda series: series.apply(find_nth_prime)) -@parfun(partition_on=("a", "b"), partition_with=list_by_chunk, combine_with=df_concat) +@parfun(split=multiple_arguments(("a", "b"), list_by_chunk), combine_with=df_concat) def _calculate_some_df(a: List[int], b: List[float], constant_df: pd.DataFrame) -> pd.DataFrame: list_of_df = [] for i, j in zip(a, b): @@ -207,7 +207,7 @@ def _delayed_combine(values: Iterable[float]) -> float: return result -@parfun(partition_on="values", partition_with=_delayed_partition, combine_with=_delayed_combine) +@parfun(split=per_argument(values=_delayed_partition), combine_with=_delayed_combine) def _delayed_sum(values: Iterable[float]) -> float: logging.debug("start delayed sum") result = sum(values) @@ -229,7 +229,7 @@ def _nested_child_function(parent_pids: List[int]) -> List[Tuple[int, int]]: return [(parent_pid, child_pid) for parent_pid in parent_pids] -@parfun(partition_on="values", partition_with=list_by_chunk, combine_with=list_concat, fixed_partition_size=10) +@parfun(split=per_argument(values=list_by_chunk), combine_with=list_concat, fixed_partition_size=10) def _fixed_partition_size(values: List) -> List: if len(values) != 10: raise ValueError("invalid fixed partition size.") @@ -251,10 +251,5 @@ def _per_argument_sum(a: List, b: pd.DataFrame) -> pd.DataFrame: return result -def _custom_partition_lists(*args: List) -> PartitionGenerator[Tuple[List, ...]]: - for values in zip(*args): - yield tuple([v] for v in values) - - if __name__ == "__main__": unittest.main()