Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated APIs. #14

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/tutorials/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ with the :py:func:`~parfun.entry_point.set_parallel_backend` or within a Python
set_parallel_backend("local_multiprocessing")

# Set the parallel backend with a Python context.
with set_parallel_backend_context("dask_remote"):
... # Will run with the "dask_remote" backend.
with set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
... # Will run with parallel task over Scaler.


See :py:func:`~parfun.entry_point.set_parallel_backend` for a description of the available backend options.
Expand Down
2 changes: 1 addition & 1 deletion parfun/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "6.0.8"
__version__ = "7.0.0"
6 changes: 0 additions & 6 deletions parfun/backend/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,5 @@ def executor(self) -> Generator[ClientExecutor, None, None]:
with worker_client() as client:
yield client.get_executor()

def get_scheduler_address(self) -> None:
return None

def disconnect(self):
pass

def shutdown(self):
pass
17 changes: 1 addition & 16 deletions parfun/backend/local_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
return None

def submit(self, fn, *args, **kwargs) -> ProfiledFuture:
"""
Submits a task to one of the available worker. Returns a future that will complete when the computation
finishes.

:param fn: function to run
:return: The future.
"""

with profile() as submit_duration:
future = ProfiledFuture()

Expand Down Expand Up @@ -76,8 +68,7 @@ def on_done_callback(underlying_future: Future):
@attrs.define(init=False)
class LocalMultiprocessingBackend(BackendEngine):
"""
A concurrent engine that shares a similar interface to :py:class:`concurrent.futures.Executor`, but that blocks when
submitting tasks when no worker is available.
Concurrent engine that uses Python builtin :mod:`multiprocessing` module.
"""

_underlying_executor: Executor = attrs.field(validator=instance_of(Executor), init=False)
Expand All @@ -94,12 +85,6 @@ def __init__(self, max_workers: int = psutil.cpu_count(logical=False) - 1, is_pr
def session(self) -> LocalMultiprocessingSession:
return LocalMultiprocessingSession(self._underlying_executor)

def get_scheduler_address(self) -> None:
return None

def disconnect(self):
pass

def shutdown(self, wait=True):
self._underlying_executor.shutdown(wait=wait)

Expand Down
6 changes: 0 additions & 6 deletions parfun/backend/local_single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ class LocalSingleProcessBackend(BackendEngine):
def session(self) -> BackendSession:
return LocalSingleProcessSession()

def get_scheduler_address(self) -> None:
return None

def disconnect(self):
pass

def shutdown(self):
pass

Expand Down
28 changes: 3 additions & 25 deletions parfun/backend/mixins.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import abc
import logging
from concurrent.futures import wait
from contextlib import AbstractContextManager
from typing import Any, Callable, Optional
from typing import Any, Callable

from parfun.backend.profiled_future import ProfiledFuture


class BackendSession(AbstractContextManager, metaclass=abc.ABCMeta):
"""
An task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values,
A task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values,
argument values and future objects).
"""

Expand Down Expand Up @@ -55,30 +53,10 @@ def session(self) -> BackendSession:
"""
raise NotImplementedError()

def submit(self, fn: Callable, *args, **kwargs) -> ProfiledFuture:
logging.warning("`submit()` will be removed in a future version, use `session()` instead.")

with self.session() as session:
future = session.submit(fn, *args, **kwargs)
wait([future])

return future

@abc.abstractmethod
def get_scheduler_address(self) -> Optional[str]:
raise NotImplementedError()

@abc.abstractmethod
def disconnect(self):
"""
Disconnects from schedulers in backend engine
"""
raise NotImplementedError()

@abc.abstractmethod
def shutdown(self):
"""
Shutdowns schedulers in backend engine
Shutdowns all resources required by the backend engine.
"""
raise NotImplementedError()

Expand Down
1 change: 1 addition & 0 deletions parfun/backend/scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def cluster(self) -> SchedulerClusterCombo:
def shutdown(self):
if self._cluster is not None:
self._cluster.shutdown()
self._cluster = None

@staticmethod
def __get_constructor_arg_names(class_: type) -> Set:
Expand Down
65 changes: 2 additions & 63 deletions parfun/combine/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
A collection of pre-define APIs to help users combine collection data, like list, array, tuple
"""

import logging
from itertools import chain, tee
from typing import Iterable, List, Tuple, TypeVar
from itertools import chain
from typing import Iterable, List, TypeVar

ListValue = TypeVar("ListValue")

Expand All @@ -19,63 +18,3 @@ def list_concat(values: Iterable[List[ListValue]]) -> List[ListValue]:

"""
return list(chain.from_iterable(values))


def lists_concat(values: Iterable[List[ListValue]]) -> List[ListValue]:
logging.warning(
f"`{lists_concat.__name__}` will be removed in a future version, use `{list_concat.__name__}` instead."
)

return list_concat(values)


def concat_lists(values: Iterable[List[ListValue]]) -> List[ListValue]:
logging.warning(
f"`{concat_lists.__name__}` will be removed in a future version, use `{list_concat.__name__}` instead."
)

return list_concat(values)


def unzip(iterable: Iterable[Tuple]) -> Tuple[Iterable, ...]:
"""
Opposite of zip().

.. code:: python

ls_1 = [1, 2, 3]
ls_2 = [2, 4, 6]

zipped = zip(ls_1, ls_2) # [(1, 2), (2, 4), (3, 6)]

ls_1_out, ls_2_out = unzip(zipped)
print(ls_1_out) # [1, 2, 3]
print(ls_2_out) # [2, 4, 6]

"""

logging.warning(f"`{unzip.__name__}` will be removed in a future version.")

# Fetches the first item to deduce the number of nested values.
it = iter(iterable)
try:
first_values = next(it)
n_values = len(first_values)
except StopIteration:
return ()

def tupled_generator():
yield first_values
while True:
try:
yield next(it)
except StopIteration:
return

teed_iterators = tee(tupled_generator(), n_values)

# Captures the i variable as an function argument, as variables are captured by reference in Python's closures.
def map_function(i_local):
return lambda v: v[i_local]

return tuple(map(map_function(i), gen) for i, gen in enumerate(teed_iterators))
15 changes: 0 additions & 15 deletions parfun/combine/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
A collection of pre-define APIs to help users combine Pandas' Dataframe data
"""

import logging
from typing import Iterable

try:
Expand Down Expand Up @@ -32,17 +31,3 @@ def df_concat(dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
"""

return pd.concat(dfs, ignore_index=True)


def dfs_concat(dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
logging.warning(f"`{dfs_concat.__name__}` will be removed in a future version, use `{df_concat.__name__}` instead.")

return df_concat(dfs)


def concat_list_of_dfs(df_list: Iterable[pd.DataFrame]) -> pd.DataFrame:
logging.warning(
f"`{concat_list_of_dfs.__name__}` will be removed in a future version, use `{df_concat.__name__}` instead."
)

return df_concat(df_list)
28 changes: 3 additions & 25 deletions parfun/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@

from parfun.kernel.function_signature import NamedArguments
from parfun.kernel.parallel_function import ParallelFunction
from parfun.object import FunctionInputType, FunctionOutputType, PartitionType
from parfun.partition.object import PartitionFunction, PartitionGenerator
from parfun.object import FunctionInputType, FunctionOutputType
from parfun.partition.object import PartitionGenerator
from parfun.partition_size_estimator.linear_regression_estimator import LinearRegessionEstimator
from parfun.partition_size_estimator.mixins import PartitionSizeEstimator


def parfun(
split: Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]],
combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType],
split: Optional[Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]] = None,
partition_on: Optional[Union[str, Tuple[str, ...]]] = None,
partition_with: Optional[PartitionFunction[PartitionType]] = None,
initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None,
fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None,
profile: bool = False,
Expand Down Expand Up @@ -52,24 +50,6 @@ def multiply_by_constant(values: Iterable[int], constant: int):

See :py:mod:`~parfun.partition.api` for the list of predefined partitioning functions.

Cannot be used with ``partition_on`` or ``partition_with``.

:param partition_on:
Only partition the data on the provided fields or fields. To be used with ``partition_with``.

There are three constraints:

* the values should be consistent with some names of arguments in ``function``

* the numbers of ``partition_on`` should be equal to the number of arguments in ``partition_with``

* the order of ``partition_on`` should be consistent with the ones in ``partition_on``

:type partition_on: Tuple | str

:param partition_with: distributes the computation by running the function on the input.
:type partition_with: Callable

:param combine_with: aggregates the results by running the function.
:type combine_with: Callable
:param initial_partition_size:
Expand Down Expand Up @@ -102,8 +82,6 @@ def decorator(function: Callable[[FunctionInputType], FunctionOutputType]):
function=function,
function_name=function.__name__,
split=split,
partition_on=partition_on,
partition_with=partition_with,
combine_with=combine_with,
initial_partition_size=initial_partition_size,
fixed_partition_size=fixed_partition_size,
Expand Down
Loading
Loading