diff --git a/fugue_ml/__init__.py b/fugue_ml/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fugue_tune/__init__.py b/fugue_tune/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fugue_tune/convert.py b/fugue_tune/convert.py new file mode 100644 index 0000000..2d24ae3 --- /dev/null +++ b/fugue_tune/convert.py @@ -0,0 +1,139 @@ +import copy +import inspect +from typing import Any, Callable, Dict, List, Optional, no_type_check + +from fugue import ExecutionEngine +from fugue._utils.interfaceless import ( + FunctionWrapper, + _ExecutionEngineParam, + _FuncParam, + _OtherParam, + is_class_method, +) +from triad import assert_or_throw +from triad.utils.convert import get_caller_global_local_vars, to_function + +from fugue_tune.exceptions import FugueTuneCompileError +from fugue_tune.tunable import SimpleTunable, Tunable + + +def tunable(distributable: bool = True) -> Callable[[Any], "_FuncAsTunable"]: + def deco(func: Callable) -> "_FuncAsTunable": + assert_or_throw( + not is_class_method(func), + NotImplementedError("tunable decorator can't be used on class methods"), + ) + return _FuncAsTunable.from_func(func) + + return deco + + +def _to_tunable( + obj: Any, + global_vars: Optional[Dict[str, Any]] = None, + local_vars: Optional[Dict[str, Any]] = None, + distributable: Optional[bool] = None, +) -> Tunable: + global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) + + def get_tunable() -> Tunable: + if isinstance(obj, Tunable): + return copy.copy(obj) + try: + f = to_function(obj, global_vars=global_vars, local_vars=local_vars) + # this is for string expression of function with decorator + if isinstance(f, Tunable): + return copy.copy(f) + # this is for functions without decorator + return _FuncAsTunable.from_func(f, distributable) + except Exception as e: + exp = e + raise FugueTuneCompileError(f"{obj} is not a valid tunable function", exp) + + t = get_tunable() + if distributable is None: + distributable = t.distributable + elif distributable: + assert_or_throw( + t.distributable, FugueTuneCompileError(f"{t} is not distributable") + ) + return t + + +class _SingleParam(_FuncParam): + def __init__(self, param: Optional[inspect.Parameter]): + super().__init__(param, "float", "s") + + +class _DictParam(_FuncParam): + def __init__(self, param: Optional[inspect.Parameter]): + super().__init__(param, "Dict[str,Any]", "d") + + +class _TunableWrapper(FunctionWrapper): + def __init__(self, func: Callable): + super().__init__(func, "^e?[^e]+$", "^[sd]$") + + def _parse_param( + self, + annotation: Any, + param: Optional[inspect.Parameter], + none_as_other: bool = True, + ) -> _FuncParam: + if annotation is float: + return _SingleParam(param) + elif annotation is Dict[str, Any]: + return _DictParam(param) + elif annotation is ExecutionEngine: + return _ExecutionEngineParam(param) + else: + return _OtherParam(param) + + @property + def single(self) -> bool: + return isinstance(self._rt, _SingleParam) + + @property + def needs_engine(self) -> bool: + return isinstance(self._params.get_value_by_index(0), _ExecutionEngineParam) + + +class _FuncAsTunable(SimpleTunable): + @no_type_check + def tune(self, **kwargs: Any) -> Dict[str, Any]: + # pylint: disable=no-member + args: List[Any] = [self.execution_engine] if self._needs_engine else [] + if self._single: + return dict(error=self._func(*args, **kwargs)) + else: + return self._func(*args, **kwargs) + + @no_type_check + def __call__(self, *args: Any, **kwargs: Any) -> Any: + return self._func(*args, **kwargs) + + @property + def distributable(self) -> bool: + return self._distributable # type: ignore + + @no_type_check + @staticmethod + def from_func( + func: Callable, distributable: Optional[bool] = None + ) -> "_FuncAsTunable": + t = _FuncAsTunable() + tw = _TunableWrapper(func) + t._func = tw._func + t._single = tw.single + t._needs_engine = tw.needs_engine + if distributable is None: + t._distributable = not tw.needs_engine + else: + if distributable: + assert_or_throw( + not tw.needs_engine, + "function with ExecutionEngine can't be distributable", + ) + t._distributable = distributable + + return t diff --git a/fugue_tune/exceptions.py b/fugue_tune/exceptions.py new file mode 100644 index 0000000..a645cb5 --- /dev/null +++ b/fugue_tune/exceptions.py @@ -0,0 +1,13 @@ +from typing import Any + +from fugue.exceptions import FugueWorkflowCompileError, FugueWorkflowRuntimeError + + +class FugueTuneCompileError(FugueWorkflowCompileError): + def __init__(self, *args: Any): + super().__init__(*args) + + +class FugueTuneRuntimeError(FugueWorkflowRuntimeError): + def __init__(self, *args: Any): + super().__init__(*args) diff --git a/fugue_tune/iter.py b/fugue_tune/iter.py new file mode 100644 index 0000000..3c5ad1e --- /dev/null +++ b/fugue_tune/iter.py @@ -0,0 +1,57 @@ +import itertools +from typing import Any, Dict, Iterable, List, Tuple + + +def dict_product( + d: Dict[str, Iterable[Any]], safe: bool = True +) -> Iterable[Dict[str, Any]]: + keys = d.keys() + arrays = list(d.values()) + if len(arrays) == 0: + if safe: + yield {} + return + for element in _safe_product(arrays, safe): + yield {k: v for k, v in zip(keys, element) if v is not _EMPTY_ITER} + + +def product( + arrays: List[Iterable[Any]], safe: bool = False, remove_empty: bool = True +) -> Iterable[List[Any]]: + if len(arrays) == 0: + if safe: + yield [] + return + if remove_empty: + for x in _safe_product(arrays, safe): + yield [xx for xx in x if xx is not _EMPTY_ITER] + else: + for x in _safe_product(arrays, safe): + yield [None if xx is _EMPTY_ITER else xx for xx in x] + + +def safe_iter(it: Iterable[Any], safe: bool = True) -> Iterable[Any]: + if not safe: + yield from it + else: + n = 0 + for x in it: + yield x + n += 1 + if n == 0: + yield _EMPTY_ITER + + +def _safe_product(arrays: List[Iterable[Any]], safe: bool = True) -> Iterable[Tuple]: + if not safe: + yield from itertools.product(*arrays) + else: + arr = [safe_iter(t) for t in arrays] + yield from itertools.product(*arr) + + +class _EmptyIter(object): + pass + + +_EMPTY_ITER = _EmptyIter() diff --git a/fugue_tune/space.py b/fugue_tune/space.py new file mode 100644 index 0000000..e9622ac --- /dev/null +++ b/fugue_tune/space.py @@ -0,0 +1,135 @@ +from copy import deepcopy +from typing import Any, Dict, Iterable, List, Tuple, no_type_check + +from fugue_tune.iter import dict_product, product + + +class Grid(object): + def __init__(self, *args: Any): + self._values = list(args) + + def __iter__(self) -> Iterable[Any]: + yield from self._values + + +class Choice(object): + def __init__(self, *args: Any): + self._values = list(args) + + def __iter__(self) -> Iterable[Any]: + yield from self._values + + +class Rand(object): + def __init__(self, start: float, end: float, q: float, log: bool, normal: bool): + self._start = start + self._end = end + self._q = q + self._log = log + self._normal = normal + + +# TODO: make this inherit from iterable? +class Space(object): + def __init__(self, **kwargs: Any): + self._value = deepcopy(kwargs) + self._grid: List[List[Tuple[Any, Any, Any]]] = [] + for k in self._value.keys(): + self._search(self._value, k) + + def __iter__(self) -> Iterable[Dict[str, Any]]: + for tps in product(self._grid, safe=True, remove_empty=True): # type: ignore + for tp in tps: + tp[0][tp[1]] = tp[2] + yield deepcopy(self._value) + + def __mul__(self, other: Any) -> "HorizontalSpace": + return HorizontalSpace(self, other) + + def __add__(self, other: Any) -> "VerticalSpace": + return VerticalSpace(self, other) + + def _search(self, parent: Any, key: Any) -> None: + node = parent[key] + if isinstance(node, Grid): + self._grid.append(self._grid_wrapper(parent, key)) + elif isinstance(node, dict): + for k in node.keys(): + self._search(node, k) + elif isinstance(node, list): + for i in range(len(node)): + self._search(node, i) + + def _grid_wrapper(self, parent: Any, key: Any) -> List[Tuple[Any, Any, Any]]: + return [(parent, key, x) for x in parent[key]] + + +class HorizontalSpace(Space): + def __init__(self, *args: Any, **kwargs: Any): + self._groups: List[VerticalSpace] = [] + for x in args: + if isinstance(x, HorizontalSpace): + self._groups.append(VerticalSpace(x)) + elif isinstance(x, VerticalSpace): + self._groups.append(x) + elif isinstance(x, Space): + self._groups.append(VerticalSpace(x)) + elif isinstance(x, dict): + self._groups.append(VerticalSpace(HorizontalSpace(**x))) + elif isinstance(x, list): + self._groups.append(VerticalSpace(*x)) + else: + raise ValueError(f"{x} is invalid") + self._dict = {k: _SpaceValue(v) for k, v in kwargs.items()} + + @no_type_check # TODO: remove this? + def __iter__(self) -> Iterable[Dict[str, Any]]: + dicts = list(dict_product(self._dict, safe=True)) + for spaces in product( + [g.spaces for g in self._groups], safe=True, remove_empty=True + ): + for comb in product(list(spaces) + [dicts], safe=True, remove_empty=True): + res: Dict[str, Any] = {} + for d in comb: + res.update(d) + yield res + + +class VerticalSpace(Space): + def __init__(self, *args: Any): + self._spaces: List[Space] = [] + for x in args: + if isinstance(x, Space): + self._spaces.append(x) + elif isinstance(x, dict): + self._spaces.append(Space(**x)) + elif isinstance(x, list): + self._spaces.append(VerticalSpace(*x)) + else: + raise ValueError(f"{x} is invalid") + + @property + def spaces(self) -> List[Space]: + return self._spaces + + def __iter__(self) -> Iterable[Dict[str, Any]]: + for space in self._spaces: + yield from space # type: ignore + + +class _SpaceValue(object): + def __init__(self, value: Any): + self.value = value + + @no_type_check # TODO: remove this? + def __iter__(self) -> Iterable[Any]: + if isinstance(self.value, (HorizontalSpace, VerticalSpace)): + yield from self.value + elif isinstance(self.value, dict): + yield from dict_product( + {k: _SpaceValue(v) for k, v in self.value.items()}, safe=True + ) + elif isinstance(self.value, list): + yield from product([_SpaceValue(v) for v in self.value], safe=True) + else: + yield self.value diff --git a/fugue_tune/tunable.py b/fugue_tune/tunable.py new file mode 100644 index 0000000..a3682d9 --- /dev/null +++ b/fugue_tune/tunable.py @@ -0,0 +1,60 @@ +from typing import Any, Dict + +from fugue import ExecutionEngine +from triad import ParamDict + +from fugue_tune.exceptions import FugueTuneRuntimeError + + +class Tunable(object): + def run(self, **kwargs: Any) -> None: # pragma: no cover + raise NotImplementedError + + def report(self, result: Dict[str, Any]) -> None: + self._error = float(result["error"]) + self._hp = ParamDict(result.get("hp", None)) + self._metadata = ParamDict(result.get("metadata", None)) + + @property + def error(self) -> float: + try: + return self._error + except Exception: + raise FugueTuneRuntimeError("error is not set") + + @property + def hp(self) -> ParamDict: + try: + return self._hp + except Exception: + raise FugueTuneRuntimeError("hp is not set") + + @property + def metadata(self) -> ParamDict: + try: + return self._metadata + except Exception: + raise FugueTuneRuntimeError("metadata is not set") + + @property + def distributable(self) -> bool: # pragma: no cover + return True + + @property + def execution_engine(self) -> ExecutionEngine: + # pylint: disable=no-member + try: + return self._execution_engine # type: ignore + except Exception: + raise FugueTuneRuntimeError("execution_engine is not set") + + +class SimpleTunable(Tunable): + def tune(self, **kwargs: Any) -> Dict[str, Any]: # pragma: no cover + raise NotImplementedError + + def run(self, **kwargs: Any) -> None: + res = self.tune(**kwargs) + if "hp" not in res: + res["hp"] = kwargs + self.report(res) diff --git a/fugue_tune/tuner.py b/fugue_tune/tuner.py new file mode 100644 index 0000000..6d7fd02 --- /dev/null +++ b/fugue_tune/tuner.py @@ -0,0 +1,137 @@ +import json +import os +import random +from typing import Any, Dict, Iterable, List, Optional, Set +from uuid import uuid4 + +import pandas as pd +from fugue import ( + ArrayDataFrame, + DataFrame, + ExecutionEngine, + FugueWorkflow, + IterableDataFrame, + LocalDataFrame, + Transformer, + WorkflowDataFrame, +) +from triad import assert_or_throw +from triad.utils.convert import get_caller_global_local_vars + +from fugue_tune.convert import _to_tunable +from fugue_tune.space import Choice, Grid, Rand, Space + + +class Tuner(object): + def tune( # noqa: C901 + self, + params_df: WorkflowDataFrame, + tunable: Any, + distributable: Optional[bool] = None, + ) -> WorkflowDataFrame: + t = _to_tunable( # type: ignore + tunable, *get_caller_global_local_vars(), distributable + ) + if distributable is None: + distributable = t.distributable + + # input_has: __fmin_params__:str + # schema: *,__fmin_value__:double,__fmin_metadata__:str + def compute_transformer( + df: Iterable[Dict[str, Any]], load: Any = None + ) -> Iterable[Dict[str, Any]]: + for row in df: + dfs: Dict[str, Any] = {} + dfs_keys: Set[str] = set() + for k, v in row.items(): + if k.startswith("__df_"): + key = k[len("__df_") :] + if v is not None: + dfs[key] = pd.read_parquet(v) + dfs_keys.add(key) + for params in json.loads(row["__fmin_params__"]): + t.run(**dfs, **params) + res = dict(row) + res["__fmin_params__"] = json.dumps( + {pk: pv for pk, pv in t.hp.items() if pk not in dfs_keys} + ) + res["__fmin_value__"] = t.error + res["__fmin_metadata__"] = json.dumps(t.metadata) + yield res + + # input_has: __fmin_params__:str + def compute_processor(engine: ExecutionEngine, df: DataFrame) -> DataFrame: + def get_rows() -> Iterable[Any]: + keys = list(df.schema.names) + ["__fmin_value__", "__fmin_metadata__"] + for row in compute_transformer(df.as_dict_iterable()): + yield [row[k] for k in keys] + + t._execution_engine = engine # type:ignore + return ArrayDataFrame( + get_rows(), df.schema + "__fmin_value__:double,__fmin_metadata__:str" + ) + + if not distributable: + return params_df.process(compute_processor) + else: + return params_df.partition(num="ROWCOUNT", algo="even").transform( + compute_transformer + ) + + def serialize_df( + self, df: WorkflowDataFrame, name: str, path: str + ) -> WorkflowDataFrame: + pre_partition = df.partition_spec + if len(pre_partition.partition_by) == 0: + + def save_single_file(e: ExecutionEngine, input: DataFrame) -> DataFrame: + fp = os.path.join(path, str(uuid4()) + ".parquet") + e.save_df(input, fp, force_single=True) + return ArrayDataFrame([[fp]], f"__df_{name}:str") + + return df.process(save_single_file) + else: + + class SavePartition(Transformer): + def get_output_schema(self, df: DataFrame) -> Any: + dfn = self.params.get_or_throw("name", str) + return self.key_schema + f"__df_{dfn}:str" + + def transform(self, df: LocalDataFrame) -> LocalDataFrame: + fp = os.path.join( + self.params.get_or_throw("path", str), str(uuid4()) + ".parquet" + ) + df.as_pandas().to_parquet(fp) + return ArrayDataFrame( + [self.cursor.key_value_array + [fp]], self.output_schema + ) + + return df.transform(SavePartition, params={"path": path, "name": name}) + + def space_to_df( + self, wf: FugueWorkflow, space: Space, batch_size: int = 1, shuffle: bool = True + ) -> WorkflowDataFrame: + def get_data() -> Iterable[List[Any]]: + it = list(space) # type: ignore + if shuffle: + random.seed(0) + random.shuffle(it) + res: List[Any] = [] + for a in it: + res.append(self._convert_hp(a)) + if batch_size == len(res): + yield [json.dumps(res)] + res = [] + if len(res) > 0: + yield [json.dumps(res)] + + return wf.df(IterableDataFrame(get_data(), "__fmin_params__:str")) + + def _convert_hp(self, params: Dict[str, Any]) -> Dict[str, Any]: + return {k: self._convert_single(v) for k, v in params.items()} + + def _convert_single(self, param: Any) -> Any: + assert_or_throw( + not isinstance(param, (Grid, Rand, Choice)), NotImplementedError(param) + ) + return param diff --git a/setup.cfg b/setup.cfg index 5100588..78af340 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,11 +4,12 @@ description-file = README.md [tool:pytest] addopts = --cov=fugue_incubator + --cov=fugue_tune --cov-report=term-missing:skip-covered -vvv [flake8] -ignore = E24,E203,W503 +ignore = E24,E203,W503,C408,A002 max-line-length = 88 format = pylint exclude = .svc,CVS,.bzr,.hg,.git,__pycache__,venv,tests/*,docs/* diff --git a/setup.py b/setup.py index 92ced4f..bddb6e4 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ def get_version() -> str: author_email="goodwanghan@gmail.com", keywords="fugue incubator experiment", url="http://github.com/fugue-project/fugue-incubator", - install_requires=["fugue"], + install_requires=["fugue==0.5.0.dev1"], extras_require={}, classifiers=[ # "3 - Alpha", "4 - Beta" or "5 - Production/Stable" diff --git a/tests/fugue_tune/__init__.py b/tests/fugue_tune/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fugue_tune/test_convert.py b/tests/fugue_tune/test_convert.py new file mode 100644 index 0000000..6e14658 --- /dev/null +++ b/tests/fugue_tune/test_convert.py @@ -0,0 +1,88 @@ +from typing import Any, Dict, List, Tuple + +from fugue import ExecutionEngine +from pytest import raises + +from fugue_tune.convert import _to_tunable, tunable +from fugue_tune.exceptions import FugueTuneCompileError, FugueTuneRuntimeError + + +def test_to_tunable(): + def t1(a: int) -> float: + pass + + def t2(b: str) -> Dict[str, Any]: + pass + + t22 = _to_tunable(t2) + + def t3() -> Dict[str, Any]: + pass + + def t4(a: int) -> Tuple[float, List[str]]: + pass + + def t5(e: ExecutionEngine, a: int) -> float: + pass + + def t6(a: int, e: ExecutionEngine) -> float: + pass + + assert t1 is _to_tunable(t1)._func + assert _to_tunable(t1).distributable + assert _to_tunable(t1, distributable=True).distributable + assert not _to_tunable(t1, distributable=False).distributable + assert t2 is _to_tunable(_to_tunable(t2))._func + assert t2 is _to_tunable("t22")._func + assert t1 is _to_tunable("t1")._func + assert t2 is _to_tunable("t2")._func + assert t5 is _to_tunable(t5)._func + assert not _to_tunable(t5).distributable + assert not _to_tunable(t5, distributable=False).distributable + # with execution engine, distributable can't be true + raises(FugueTuneCompileError, lambda: _to_tunable(t5, distributable=True)) + + # return type must be float or Tuple[float,Dict[str,Any]] + # input must not be empty + with raises(FugueTuneCompileError): + _to_tunable(t3) + + with raises(FugueTuneCompileError): + _to_tunable("t3") + + with raises(FugueTuneCompileError): + _to_tunable(t4) + + with raises(FugueTuneCompileError): + _to_tunable(t6) + + +def test_deco(): + @tunable() + def t1(a: int, b: int) -> float: + return a + b + + @tunable() + def t2(a: int, b: int) -> Dict[str, Any]: + return dict(error=a + b, metadata={"x": 1}, hp={"y": 2}) + + t11 = _to_tunable(t1) # it will copy, t1 change will not affect t11 + t12 = _to_tunable("t1") # it will copy, t1 change will not affect t11 + + assert 5 == t1(2, 3) + t1.run(a=3, b=4) + assert 7 == t1.error + assert t1.metadata == {} + assert t1.hp == dict(a=3, b=4) + assert t1.distributable + + raises(FugueTuneRuntimeError, lambda: t11.hp) + t11.run(a=4, b=5) + assert t11.hp == dict(a=4, b=5) + raises(FugueTuneRuntimeError, lambda: t12.hp) + + assert t2(2, 3) == {"error": 5, "hp": {"y": 2}, "metadata": {"x": 1}} + t2.run(a=3, b=4) + assert 7 == t2.error + assert t2.metadata == {"x": 1} + assert t2.hp == {"y": 2} diff --git a/tests/fugue_tune/test_iter.py b/tests/fugue_tune/test_iter.py new file mode 100644 index 0000000..034728a --- /dev/null +++ b/tests/fugue_tune/test_iter.py @@ -0,0 +1,64 @@ +from fugue_tune.iter import product, dict_product, safe_iter, _EMPTY_ITER + + +def test_safe_iter(): + assert [1] == list(safe_iter([1])) + assert [1] == list(safe_iter(safe_iter([1]))) + assert [None] == list(safe_iter(safe_iter([None]))) + assert [1] == list(safe_iter([1], safe=False)) + + assert [_EMPTY_ITER] == list(safe_iter([])) + assert [_EMPTY_ITER] == list(safe_iter(safe_iter([]))) + assert [] == list(safe_iter([], safe=False)) + + +def test_product(): + res = list(product([], safe=False)) + assert 0 == len(res) + res = list(product([], safe=True)) + assert 1 == len(res) + + res = list(product([[]], safe=False)) + assert 0 == len(res) + res = list(product([[]], safe=True)) + assert [[]] == res + + res = list(product([[], []])) + assert 0 == len(res) + res = list(product([[], []], safe=True)) + assert [[]] == res + res = list(product([[], []], safe=True, remove_empty=False)) + assert [[None, None]] == res + + for safe in [True, False]: + for remove_empty in [True, False]: + res = list(product([[1], [2]], safe=safe, remove_empty=remove_empty)) + assert 1 == len(res) + assert [1, 2] == res[0] + + res = list(product([[1], [2, 3]], safe=safe, remove_empty=remove_empty)) + assert [[1, 2], [1, 3]] == res + + res = list(product([[1], []])) + assert [] == res + res = list(product([[None], [], [2, 3]], safe=True, remove_empty=False)) + assert [[None, None, 2], [None, None, 3]] == res + res = list(product([[None], [], [2, 3]], safe=True, remove_empty=True)) + assert [[None, 2], [None, 3]] == res + + res = list(product([[1], safe_iter([])], safe=False, remove_empty=True)) + assert [[1]] == res + res = list(product([[1], safe_iter([])], safe=False, remove_empty=False)) + assert [[1, None]] == res + + +def test_dict_product(): + res = list(dict_product({}, safe=True)) + assert [{}] == res + res = list(dict_product({"a": []}, safe=True)) + assert [{}] == res + + res = list(dict_product({}, safe=False)) + assert [] == res + res = list(dict_product({"a": []}, safe=False)) + assert [] == res diff --git a/tests/fugue_tune/test_space.py b/tests/fugue_tune/test_space.py new file mode 100644 index 0000000..8963b63 --- /dev/null +++ b/tests/fugue_tune/test_space.py @@ -0,0 +1,133 @@ +from pytest import raises +from fugue_tune.space import Grid, Space, HorizontalSpace, VerticalSpace + + +def test_single_space(): + dicts = list(Space(a=1, b=Grid(2, 3, 4))) + assert 3 == len(dicts) + assert dict(a=1, b=2) == dicts[0] + assert dict(a=1, b=3) == dicts[1] + + dicts = list(Space(a=Grid(None, "x"), b=Grid(2, 3))) + assert 4 == len(dicts) + + dicts = list(Space(a=1, b=[Grid(2, 3), Grid(4, 5)])) + assert 4 == len(dicts) + assert dict(a=1, b=[2, 4]) == dicts[0] + assert dict(a=1, b=[2, 5]) == dicts[1] + assert dict(a=1, b=[3, 4]) == dicts[2] + assert dict(a=1, b=[3, 5]) == dicts[3] + + dicts = list(Space(a=1, b=dict(x=Grid(2, 3), y=Grid(4, 5)))) + assert 4 == len(dicts) + assert dict(a=1, b=dict(x=2, y=4)) == dicts[0] + assert dict(a=1, b=dict(x=2, y=5)) == dicts[1] + assert dict(a=1, b=dict(x=3, y=4)) == dicts[2] + assert dict(a=1, b=dict(x=3, y=5)) == dicts[3] + + +def test_space_simple_dict(): + spaces = list(HorizontalSpace()) + assert 1 == len(spaces) + assert {} == spaces[0] + + spaces = list(HorizontalSpace(a=10, b=[1, 2], c=dict(x=1))) + assert 1 == len(spaces) + assert dict(a=10, b=[1, 2], c=dict(x=1)) == spaces[0] + + spaces = list(HorizontalSpace(dict(a=10, b=[1, 2], c=dict(x=1)))) + assert 1 == len(spaces) + assert dict(a=10, b=[1, 2], c=dict(x=1)) == spaces[0] + + spaces = list(HorizontalSpace(dict(a=10), dict(b=[1, 2], c=dict(x=1)))) + assert 1 == len(spaces) + assert dict(a=10, b=[1, 2], c=dict(x=1)) == spaces[0] + + raises(ValueError, lambda: HorizontalSpace(10)) + + +def test_spaces(): + spaces = list(VerticalSpace()) + assert 0 == len(spaces) + + spaces = list(VerticalSpace(dict(a=10))) + assert [dict(a=10)] == spaces + + spaces = list(VerticalSpace(dict(a=10), [dict(b=11), dict(c=12)])) + assert [dict(a=10), dict(b=11), dict(c=12)] == spaces + + spaces = list(VerticalSpace(HorizontalSpace(a=10), dict(b=10))) + assert [dict(a=10), dict(b=10)] == spaces + + raises(ValueError, lambda: VerticalSpace(10)) + + +def test_space_combo(): + spaces = list(HorizontalSpace(dict(a=10), [])) + assert [dict(a=10)] == spaces + + spaces = list(HorizontalSpace(dict(a=10), [dict(b=20), dict(c=30, a=11)])) + assert 2 == len(spaces) + assert dict(a=10, b=20) == spaces[0] + assert dict(a=11, c=30) == spaces[1] + + spaces = list( + HorizontalSpace( + HorizontalSpace(a=10), + VerticalSpace(dict(b=20), HorizontalSpace(c=30, a=None)), + ) + ) + assert 2 == len(spaces) + assert dict(a=10, b=20) == spaces[0] + assert dict(a=None, c=30) == spaces[1] + + spaces = list( + HorizontalSpace( + dict(a=HorizontalSpace(dict(aa=10), VerticalSpace(dict(), dict(cc=12)))), + VerticalSpace(dict(b=20), HorizontalSpace(c=30)), + ) + ) + assert 4 == len(spaces) + assert dict(a=dict(aa=10), b=20) == spaces[0] + assert dict(a=dict(aa=10, cc=12), b=20) == spaces[1] + assert dict(a=dict(aa=10), c=30) == spaces[2] + assert dict(a=dict(aa=10, cc=12), c=30) == spaces[3] + + spaces = list( + HorizontalSpace(a=VerticalSpace(HorizontalSpace(x=[1, 2]), dict(y=None))) + ) + assert 2 == len(spaces) + assert dict(a=dict(x=[1, 2])) == spaces[0] + assert dict(a=dict(y=None)) == spaces[1] + + +def test_operators(): + s1 = Space(a=1, b=Grid(2, 3)) + s2 = Space(c=Grid("a", "b")) + assert [ + dict(a=1, b=2, c="a"), + dict(a=1, b=2, c="b"), + dict(a=1, b=3, c="a"), + dict(a=1, b=3, c="b"), + ] == list(s1 * s2) + + assert [ + dict(a=1, b=2), + dict(a=1, b=3), + dict(c="a"), + dict(c="b"), + ] == list(s1 + s2) + + assert [ + dict(a=1, b=2, c="a"), + dict(a=1, b=3, c="a"), + dict(a=1, b=2, c="b"), + dict(a=1, b=3, c="b"), + ] == list(s1 * [dict(c="a"), dict(c="b")]) + + assert [ + dict(a=1, b=2), + dict(a=1, b=3), + dict(c="a"), + dict(c="b"), + ] == list(s1 + [dict(c="a"), dict(c="b")]) diff --git a/tests/fugue_tune/test_tunable.py b/tests/fugue_tune/test_tunable.py new file mode 100644 index 0000000..1ed5df0 --- /dev/null +++ b/tests/fugue_tune/test_tunable.py @@ -0,0 +1,38 @@ +from typing import Any, Dict, Tuple + +import numpy as np +from pytest import raises + +from fugue_tune.exceptions import FugueTuneRuntimeError +from fugue_tune.tunable import SimpleTunable + + +def test_tunable(): + t = _MockTunable() + t.run(a=1, b=2) + assert 3.0 == t.error + assert t.metadata == {} + assert t.hp == {"a": 1, "b": 2} + + t = _MockTunable() + t.run(a=1, m=2, x=2) + assert 5.0 == t.error + assert t.metadata == {"m": 2} + assert t.hp == {"x": 2} + + t = _MockTunable() + raises(FugueTuneRuntimeError, lambda: t.error) + raises(FugueTuneRuntimeError, lambda: t.hp) + raises(FugueTuneRuntimeError, lambda: t.metadata) + raises(FugueTuneRuntimeError, lambda: t.execution_engine) + + +class _MockTunable(SimpleTunable): + def tune(self, **kwargs: Any) -> Dict[str, Any]: + error = np.double(sum(kwargs.values())) + res = {"error": error} + if "m" in kwargs: + res["metadata"] = {"m": kwargs["m"]} + if "x" in kwargs: + res["hp"] = {"x": kwargs["x"]} + return res diff --git a/tests/fugue_tune/test_tuner.py b/tests/fugue_tune/test_tuner.py new file mode 100644 index 0000000..5ada543 --- /dev/null +++ b/tests/fugue_tune/test_tuner.py @@ -0,0 +1,83 @@ +import json +from typing import Any, Iterable, List + +import pandas as pd +from fugue import ExecutionEngine, FugueWorkflow, IterableDataFrame, WorkflowDataFrame +from pytest import raises + +from fugue_tune.exceptions import FugueTuneCompileError +from fugue_tune.space import Grid, Space +from fugue_tune.tuner import Tuner + + +def test_space_to_df(): + tuner = Tuner() + + with FugueWorkflow() as dag: + df = tuner.space_to_df(dag, Space(a=Grid(0, 1), b=Grid(2, 3))) + df.assert_eq( + dag.df( + [ + ['[{"a": 0, "b": 2}]'], + ['[{"a": 0, "b": 3}]'], + ['[{"a": 1, "b": 2}]'], + ['[{"a": 1, "b": 3}]'], + ], + "__fmin_params__:str", + ) + ) + + with FugueWorkflow() as dag: + df = tuner.space_to_df( + dag, Space(a=Grid(0, 1), b=Grid(2, 3)), batch_size=3, shuffle=False + ) + df.assert_eq( + dag.df( + [ + ['[{"a": 0, "b": 2}, {"a": 0, "b": 3}, {"a": 1, "b": 2}]'], + ['[{"a": 1, "b": 3}]'], + ], + "__fmin_params__:str", + ) + ) + + +def test_tune_simple(): + tuner = Tuner() + + def t1(a: int, b: int) -> float: + return a + b + + for distributable in [True, False, None]: + with FugueWorkflow() as dag: + df = tuner.space_to_df(dag, Space(a=Grid(0, 1), b=Grid(2, 3))) + tuner.tune(df, t1, distributable=distributable).show() + + def t2(e: ExecutionEngine, a: int, b: int) -> float: + assert isinstance(e, ExecutionEngine) + return a + b + + for distributable in [False, None]: + with FugueWorkflow() as dag: + df = tuner.space_to_df(dag, Space(a=Grid(0, 1), b=Grid(2, 3))) + tuner.tune(df, t2, distributable=distributable).show() + + with raises(FugueTuneCompileError): + with FugueWorkflow() as dag: + df = tuner.space_to_df(dag, Space(a=Grid(0, 1), b=Grid(2, 3))) + tuner.tune(df, t2, distributable=True).show() + + +def test_tune_df(tmpdir): + tuner = Tuner() + + def t1(a: int, p: pd.DataFrame, b: int) -> float: + return float(a + b + p["y"].sum()) + + for distributable in [True, False, None]: + with FugueWorkflow() as dag: + s = tuner.space_to_df(dag, Space(a=Grid(0, 1), b=Grid(2, 3)), batch_size=3) + t = dag.df([[0, 1], [1, 2], [0, 2]], "x:int,y:int") + df = tuner.serialize_df(t, "p", str(tmpdir)).cross_join(s.broadcast()) + tuner.tune(df, t1, distributable=distributable).show() + diff --git a/tests/test_dummy.py b/tests/test_dummy.py deleted file mode 100644 index 10cf3ad..0000000 --- a/tests/test_dummy.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_dummy(): - pass