Skip to content

Commit

Permalink
Remove deprecated APIs. (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafa-be authored Sep 12, 2024
1 parent bfe257b commit 3b720bb
Show file tree
Hide file tree
Showing 21 changed files with 45 additions and 529 deletions.
4 changes: 2 additions & 2 deletions docs/source/tutorials/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ with the :py:func:`~parfun.entry_point.set_parallel_backend` or within a Python
set_parallel_backend("local_multiprocessing")
# Set the parallel backend with a Python context.
with set_parallel_backend_context("dask_remote"):
... # Will run with the "dask_remote" backend.
with set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
... # Will run with parallel task over Scaler.
See :py:func:`~parfun.entry_point.set_parallel_backend` for a description of the available backend options.
Expand Down
2 changes: 1 addition & 1 deletion parfun/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "6.0.8"
__version__ = "7.0.0"
6 changes: 0 additions & 6 deletions parfun/backend/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,5 @@ def executor(self) -> Generator[ClientExecutor, None, None]:
with worker_client() as client:
yield client.get_executor()

def get_scheduler_address(self) -> None:
return None

def disconnect(self):
pass

def shutdown(self):
pass
17 changes: 1 addition & 16 deletions parfun/backend/local_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
return None

def submit(self, fn, *args, **kwargs) -> ProfiledFuture:
"""
Submits a task to one of the available worker. Returns a future that will complete when the computation
finishes.
:param fn: function to run
:return: The future.
"""

with profile() as submit_duration:
future = ProfiledFuture()

Expand Down Expand Up @@ -76,8 +68,7 @@ def on_done_callback(underlying_future: Future):
@attrs.define(init=False)
class LocalMultiprocessingBackend(BackendEngine):
"""
A concurrent engine that shares a similar interface to :py:class:`concurrent.futures.Executor`, but that blocks when
submitting tasks when no worker is available.
Concurrent engine that uses Python builtin :mod:`multiprocessing` module.
"""

_underlying_executor: Executor = attrs.field(validator=instance_of(Executor), init=False)
Expand All @@ -94,12 +85,6 @@ def __init__(self, max_workers: int = psutil.cpu_count(logical=False) - 1, is_pr
def session(self) -> LocalMultiprocessingSession:
return LocalMultiprocessingSession(self._underlying_executor)

def get_scheduler_address(self) -> None:
return None

def disconnect(self):
pass

def shutdown(self, wait=True):
self._underlying_executor.shutdown(wait=wait)

Expand Down
6 changes: 0 additions & 6 deletions parfun/backend/local_single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ class LocalSingleProcessBackend(BackendEngine):
def session(self) -> BackendSession:
return LocalSingleProcessSession()

def get_scheduler_address(self) -> None:
return None

def disconnect(self):
pass

def shutdown(self):
pass

Expand Down
28 changes: 3 additions & 25 deletions parfun/backend/mixins.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import abc
import logging
from concurrent.futures import wait
from contextlib import AbstractContextManager
from typing import Any, Callable, Optional
from typing import Any, Callable

from parfun.backend.profiled_future import ProfiledFuture


class BackendSession(AbstractContextManager, metaclass=abc.ABCMeta):
"""
An task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values,
A task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values,
argument values and future objects).
"""

Expand Down Expand Up @@ -55,30 +53,10 @@ def session(self) -> BackendSession:
"""
raise NotImplementedError()

def submit(self, fn: Callable, *args, **kwargs) -> ProfiledFuture:
logging.warning("`submit()` will be removed in a future version, use `session()` instead.")

with self.session() as session:
future = session.submit(fn, *args, **kwargs)
wait([future])

return future

@abc.abstractmethod
def get_scheduler_address(self) -> Optional[str]:
raise NotImplementedError()

@abc.abstractmethod
def disconnect(self):
"""
Disconnects from schedulers in backend engine
"""
raise NotImplementedError()

@abc.abstractmethod
def shutdown(self):
"""
Shutdowns schedulers in backend engine
Shutdowns all resources required by the backend engine.
"""
raise NotImplementedError()

Expand Down
1 change: 1 addition & 0 deletions parfun/backend/scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def cluster(self) -> SchedulerClusterCombo:
def shutdown(self):
if self._cluster is not None:
self._cluster.shutdown()
self._cluster = None

@staticmethod
def __get_constructor_arg_names(class_: type) -> Set:
Expand Down
65 changes: 2 additions & 63 deletions parfun/combine/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
A collection of pre-define APIs to help users combine collection data, like list, array, tuple
"""

import logging
from itertools import chain, tee
from typing import Iterable, List, Tuple, TypeVar
from itertools import chain
from typing import Iterable, List, TypeVar

ListValue = TypeVar("ListValue")

Expand All @@ -19,63 +18,3 @@ def list_concat(values: Iterable[List[ListValue]]) -> List[ListValue]:
"""
return list(chain.from_iterable(values))


def lists_concat(values: Iterable[List[ListValue]]) -> List[ListValue]:
logging.warning(
f"`{lists_concat.__name__}` will be removed in a future version, use `{list_concat.__name__}` instead."
)

return list_concat(values)


def concat_lists(values: Iterable[List[ListValue]]) -> List[ListValue]:
logging.warning(
f"`{concat_lists.__name__}` will be removed in a future version, use `{list_concat.__name__}` instead."
)

return list_concat(values)


def unzip(iterable: Iterable[Tuple]) -> Tuple[Iterable, ...]:
"""
Opposite of zip().
.. code:: python
ls_1 = [1, 2, 3]
ls_2 = [2, 4, 6]
zipped = zip(ls_1, ls_2) # [(1, 2), (2, 4), (3, 6)]
ls_1_out, ls_2_out = unzip(zipped)
print(ls_1_out) # [1, 2, 3]
print(ls_2_out) # [2, 4, 6]
"""

logging.warning(f"`{unzip.__name__}` will be removed in a future version.")

# Fetches the first item to deduce the number of nested values.
it = iter(iterable)
try:
first_values = next(it)
n_values = len(first_values)
except StopIteration:
return ()

def tupled_generator():
yield first_values
while True:
try:
yield next(it)
except StopIteration:
return

teed_iterators = tee(tupled_generator(), n_values)

# Captures the i variable as an function argument, as variables are captured by reference in Python's closures.
def map_function(i_local):
return lambda v: v[i_local]

return tuple(map(map_function(i), gen) for i, gen in enumerate(teed_iterators))
15 changes: 0 additions & 15 deletions parfun/combine/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
A collection of pre-define APIs to help users combine Pandas' Dataframe data
"""

import logging
from typing import Iterable

try:
Expand Down Expand Up @@ -32,17 +31,3 @@ def df_concat(dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
"""

return pd.concat(dfs, ignore_index=True)


def dfs_concat(dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
logging.warning(f"`{dfs_concat.__name__}` will be removed in a future version, use `{df_concat.__name__}` instead.")

return df_concat(dfs)


def concat_list_of_dfs(df_list: Iterable[pd.DataFrame]) -> pd.DataFrame:
logging.warning(
f"`{concat_list_of_dfs.__name__}` will be removed in a future version, use `{df_concat.__name__}` instead."
)

return df_concat(df_list)
28 changes: 3 additions & 25 deletions parfun/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@

from parfun.kernel.function_signature import NamedArguments
from parfun.kernel.parallel_function import ParallelFunction
from parfun.object import FunctionInputType, FunctionOutputType, PartitionType
from parfun.partition.object import PartitionFunction, PartitionGenerator
from parfun.object import FunctionInputType, FunctionOutputType
from parfun.partition.object import PartitionGenerator
from parfun.partition_size_estimator.linear_regression_estimator import LinearRegessionEstimator
from parfun.partition_size_estimator.mixins import PartitionSizeEstimator


def parfun(
split: Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]],
combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType],
split: Optional[Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]] = None,
partition_on: Optional[Union[str, Tuple[str, ...]]] = None,
partition_with: Optional[PartitionFunction[PartitionType]] = None,
initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None,
fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None,
profile: bool = False,
Expand Down Expand Up @@ -52,24 +50,6 @@ def multiply_by_constant(values: Iterable[int], constant: int):
See :py:mod:`~parfun.partition.api` for the list of predefined partitioning functions.
Cannot be used with ``partition_on`` or ``partition_with``.
:param partition_on:
Only partition the data on the provided fields or fields. To be used with ``partition_with``.
There are three constraints:
* the values should be consistent with some names of arguments in ``function``
* the numbers of ``partition_on`` should be equal to the number of arguments in ``partition_with``
* the order of ``partition_on`` should be consistent with the ones in ``partition_on``
:type partition_on: Tuple | str
:param partition_with: distributes the computation by running the function on the input.
:type partition_with: Callable
:param combine_with: aggregates the results by running the function.
:type combine_with: Callable
:param initial_partition_size:
Expand Down Expand Up @@ -102,8 +82,6 @@ def decorator(function: Callable[[FunctionInputType], FunctionOutputType]):
function=function,
function_name=function.__name__,
split=split,
partition_on=partition_on,
partition_with=partition_with,
combine_with=combine_with,
initial_partition_size=initial_partition_size,
fixed_partition_size=fixed_partition_size,
Expand Down
Loading

0 comments on commit 3b720bb

Please sign in to comment.