diff --git a/fugue_tune/sklearn.py b/fugue_tune/sklearn.py index 3b9387f..52628e9 100644 --- a/fugue_tune/sklearn.py +++ b/fugue_tune/sklearn.py @@ -1,12 +1,14 @@ +import json import os import pickle from importlib import import_module -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Tuple from uuid import uuid4 import numpy as np import pandas as pd from fugue import ( + DataFrame, ExecutionEngine, FugueWorkflow, NativeExecutionEngine, @@ -16,9 +18,18 @@ from triad.utils.convert import get_full_type_path, to_instance, to_type from fugue_tune.space import Space -from fugue_tune.tune import ObjectiveRunner, TunableWithSpace, select_best, tunable +from fugue_tune.tune import ( + ObjectiveRunner, + TunableWithSpace, + select_best, + serialize_df, + space_to_df, + tunable, + tune, +) +from sklearn.ensemble import StackingClassifier, StackingRegressor +from sklearn.model_selection import cross_val_score from sklearn.base import is_classifier, is_regressor -from sklearn.model_selection import KFold, cross_val_score _EMPTY_DF = pd.DataFrame() _EMPTY_LIST: List[str] = [] @@ -48,7 +59,7 @@ def suggest_sk_model( df = df.partition(by=partition_keys) skcv = build_sk_cv( space=space, - train_df=dag.df(df), + train_df=df, scoring=scoring, cv=cv, feature_prefix=feature_prefix, @@ -66,6 +77,79 @@ def suggest_sk_model( return list(best.result.as_dict_iterable()) +def suggest_sk_stacking_model( + space: Space, + stack_space: Space, + train_df: Any, + scoring: str, + serialize_path: str, + cv: int = 5, + feature_prefix: str = "", + label_col: str = "label", + save_model: bool = False, + partition_keys: List[str] = _EMPTY_LIST, + top_n: int = 1, + objective_runner: Optional[ObjectiveRunner] = None, + distributable: Optional[bool] = None, + execution_engine: Any = NativeExecutionEngine, + stack_cv: int = 2, + stack_method: str = "auto", + stack_passthrough: bool = False, +) -> List[Dict[str, Any]]: + e = to_instance(execution_engine, ExecutionEngine) + model_path = serialize_path if save_model else "" + + dag = FugueWorkflow() + df = dag.df(train_df) + if len(partition_keys) > 0: + df = df.partition(by=partition_keys) + skcv = build_sk_cv( + space=space, + train_df=df, + scoring=scoring, + cv=cv, + feature_prefix=feature_prefix, + label_col=label_col, + ) + result = skcv.tune( + objective_runner=objective_runner, + distributable=distributable, + serialize_path=serialize_path, + shuffle=True, + ).persist() + best_models = select_best(result.transform(_extract_model), top=1) + if top_n > 0: + best_models = select_best(best_models.drop(["_sk__model"]), top=top_n) + kwargs = Space( + _sk__scoring=scoring, + _sk__cv=cv, + _sk__feature_prefix=feature_prefix, + _sk__label_col=label_col, + _sk__save_path=model_path, + _sk__stack_cv=stack_cv, + _sk__method=stack_method, + _sk__passthrough=stack_passthrough, + ) + space_df = best_models.process( + _process_stack_space, + params=dict(keys=partition_keys, space=stack_space * kwargs), + ) + data = serialize_df(df, name="_sk__train_df", path=serialize_path) + best = tune( + data.join(space_df.broadcast(), how="inner"), + tunable=tunable(_sk_stack_cv), + distributable=distributable, + objective_runner=objective_runner, + ) + best = select_best(best, top=1) + dag.run(e) + return list(best.result.as_dict_iterable()) + + +def sk_space(model: Any, **kwargs: Any) -> Space: + return Space(_sk__model=_to_model_str(model), **kwargs) + + def build_sk_cv( space: Space, train_df: WorkflowDataFrame, @@ -86,10 +170,6 @@ def build_sk_cv( return tunable(_sk_cv).space(space, **kwargs) # type:ignore -def sk_space(model: Any, **kwargs: Any) -> Space: - return Space(_sk__model=_to_model_str(model), **kwargs) - - def _sk_cv( _sk__model: str, _sk__train_df: pd.DataFrame, @@ -101,13 +181,14 @@ def _sk_cv( **kwargs: Any, ) -> Dict[str, Any]: model = _to_model(_sk__model)(**kwargs) - train_x = _sk__train_df.drop([_sk__label_col], axis=1) + train_df = _sk__train_df.sample(frac=1, random_state=0).reset_index(drop=True) + + train_x = train_df.drop([_sk__label_col], axis=1) cols = [x for x in train_x.columns if x.startswith(_sk__feature_prefix)] train_x = train_x[cols] - train_y = _sk__train_df[_sk__label_col] + train_y = train_df[_sk__label_col] - kf = KFold(n_splits=_sk__cv, random_state=0, shuffle=True) - s = cross_val_score(model, train_x, train_y, cv=kf, scoring=_sk__scoring) + s = cross_val_score(model, train_x, train_y, cv=_sk__cv, scoring=_sk__scoring) metadata = dict(sk_model=_sk__model, cv_scores=[float(x) for x in s]) if _sk__save_path != "": model.fit(train_x, train_y) @@ -120,6 +201,74 @@ def _sk_cv( ) +def _sk_stack_cv( + _sk__model: str, + _sk__estimators: str, + _sk__train_df: pd.DataFrame, + _sk__scoring: Any, + _sk__stack_cv: int = 2, + _sk__method: str = "auto", + _sk__passthrough: bool = False, + _sk__cv: int = 5, + _sk__feature_prefix: str = "", + _sk__label_col: str = "label", + _sk__save_path: str = "", + **kwargs: Any, +) -> Dict[str, Any]: + final_estimator = _to_model(_sk__model)(**kwargs) + estimators: List[Tuple[str, Any]] = [] + for i, d in enumerate(json.loads(_sk__estimators)): + key = f"_{i}" + m = _to_model(d.pop("_sk__model")) + estimators.append((key, m(**d))) + if is_classifier(final_estimator): + model = StackingClassifier( + estimators, + final_estimator, + cv=_sk__stack_cv, + stack_method=_sk__method, + passthrough=_sk__passthrough, + n_jobs=kwargs.get("n_jobs", 1), + ) + else: + model = StackingRegressor( + estimators, + final_estimator, + cv=_sk__stack_cv, + passthrough=_sk__passthrough, + n_jobs=kwargs.get("n_jobs", 1), + ) + train_df = _sk__train_df.sample(frac=1, random_state=0).reset_index(drop=True) + + train_x = train_df.drop([_sk__label_col], axis=1) + cols = [x for x in train_x.columns if x.startswith(_sk__feature_prefix)] + train_x = train_x[cols] + train_y = train_df[_sk__label_col] + + s = cross_val_score(model, train_x, train_y, cv=_sk__cv, scoring=_sk__scoring) + metadata = dict(sk_model=get_full_type_path(model), cv_scores=[float(x) for x in s]) + if _sk__save_path != "": + model.fit(train_x, train_y) + fp = os.path.join(_sk__save_path, str(uuid4()) + ".pkl") + with FileSystem().openbin(fp, mode="wb") as f: + pickle.dump(model, f) + metadata["model_path"] = fp + return dict( + error=-np.mean(s), + hp=dict( + _sk__model=get_full_type_path(model), + _sk__estimators=dict( + **{f"_{i}": d for i, d in enumerate(json.loads(_sk__estimators))}, + stacking=dict(_sk__model=_sk__model, **kwargs), + ), + _sk__stack_cv=_sk__stack_cv, + _sk__method=_sk__method, + _sk__passthrough=_sk__passthrough, + ), + metadata=metadata, + ) + + def _to_model(obj: Any) -> Any: if isinstance(obj, str): parts = obj.split(".") @@ -137,3 +286,44 @@ def _to_model_str(model: Any) -> Any: if isinstance(model, str): model = _to_model(model) return get_full_type_path(model) + + +# schema: *,_sk__model:str +def _extract_model(df: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: + for row in df: + row["_sk__model"] = json.loads(row["__fmin_params__"])["_sk__model"] + yield row + + +def _process_stack_space( + engine: ExecutionEngine, df: DataFrame, keys: List[str], space: Space +) -> DataFrame: + fe_schema = df.schema.extract(keys) + "__fmin_fe__:str" + + def _merge_space(df: List[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: + p = json.dumps([json.loads(row["__fmin_params__"]) for row in df]) + res = df[0] + res["__fmin_fe__"] = p + yield res + + # schema: *-__fmin_fe__ + def _construct_final_space( + df: Iterable[Dict[str, Any]] + ) -> Iterable[Dict[str, Any]]: + for row in df: + op = json.loads(row["__fmin_params__"]) + for o in op: + o["_sk__estimators"] = row["__fmin_fe__"] + row["__fmin_params__"] = json.dumps(op) + yield row + + with FugueWorkflow(engine) as dag: + ddf = dag.df(df) + space_df = space_to_df(dag, space).broadcast() + if len(keys) == 0: + fe = ddf.process(_merge_space, schema=fe_schema) + else: + fe = ddf.partition(by=keys).transform(_merge_space, schema=fe_schema) + result = fe.cross_join(space_df).transform(_construct_final_space) + + return result.result diff --git a/fugue_tune/tune.py b/fugue_tune/tune.py index 095f9b0..86e1d28 100644 --- a/fugue_tune/tune.py +++ b/fugue_tune/tune.py @@ -264,6 +264,7 @@ def tune( serialize_path: str = "", batch_size: int = 1, shuffle: bool = True, + data_space_join_type: str = "cross", ) -> WorkflowDataFrame: if len(self.dfs) > 0: data = serialize_dfs(self.dfs, path=serialize_path, how=how) @@ -274,7 +275,7 @@ def tune( shuffle=shuffle, ).broadcast() return tune( - data.cross_join(space_df), + data.join(space_df, how=data_space_join_type), tunable=self.tunable, distributable=distributable, objective_runner=objective_runner, diff --git a/tests/fugue_tune/test_sklearn.py b/tests/fugue_tune/test_sklearn.py index c7e3557..033aded 100644 --- a/tests/fugue_tune/test_sklearn.py +++ b/tests/fugue_tune/test_sklearn.py @@ -5,12 +5,21 @@ from fugue import FugueWorkflow from pytest import raises from sklearn.base import is_classifier, is_regressor -from sklearn.linear_model import LinearRegression +from sklearn.linear_model import LinearRegression, Ridge, LogisticRegression +from sklearn.ensemble import RandomForestClassifier +from sklearn.ensemble import StackingRegressor from fugue_tune import Space -from fugue_tune.sklearn import _sk_cv, _to_model, _to_model_str, build_sk_cv +from fugue_tune.sklearn import ( + _process_stack_space, + _sk_cv, + _sk_stack_cv, + _to_model, + _to_model_str, + build_sk_cv, +) from fugue_tune.sklearn import sk_space as ss -from fugue_tune.sklearn import suggest_sk_model +from fugue_tune.sklearn import suggest_sk_model, suggest_sk_stacking_model from fugue_tune.space import Grid @@ -60,6 +69,32 @@ def test_tunable_sk_cv(tmpdir): assert isinstance(obj, LinearRegression) +def test_tunable_sk_stack_cv(tmpdir): + res = _sk_stack_cv( + "sklearn.linear_model.LinearRegression", + '[{"_sk__model": "sklearn.linear_model._base.LinearRegression", "normalize": true},' + '{"_sk__model": "sklearn.linear_model._base.LinearRegression", "normalize": false}]', + _create_mock_data(), + _sk__scoring="neg_mean_absolute_error", + _sk__label_col="l", + _sk__feature_prefix="f_", + fit_intercept=True, + _sk__save_path=str(tmpdir), + ) + print(res) + assert res["error"] < 0.1 + assert _to_model(res["hp"]["_sk__model"]) is StackingRegressor + assert ( + _to_model(res["hp"]["_sk__estimators"]["stacking"]["_sk__model"]) + is LinearRegression + ) + assert res["hp"]["_sk__estimators"]["stacking"]["fit_intercept"] + assert isinstance(res["metadata"]["cv_scores"], list) + + obj = pickle.load(open(res["metadata"]["model_path"], mode="rb")) + assert isinstance(obj, StackingRegressor) + + def test_build_sk_cv(tmpdir): space = sum( [ @@ -101,10 +136,96 @@ def test_suggest_sk_model(tmpdir): print(res) -def _create_mock_data(): +def test_suggest_sk_stacking_model(tmpdir): + space = sum( + [ + ss(LinearRegression, fit_intercept=Grid(True, False)), + ss(Ridge, alpha=Grid(0.1, 0.2)), + ] + ) + space2 = sum( + [ + ss(LinearRegression, normalize=Grid(True, False)), + ] + ) + res = suggest_sk_stacking_model( + space, + space2, + _create_mock_data(), + scoring="neg_mean_absolute_error", + serialize_path=str(tmpdir), + label_col="l", + feature_prefix="f_", + save_model=True, + partition_keys=["p"], + top_n=2, + ) + assert len(res) == 4 + + space = sum( + [ + ss(LogisticRegression), + ss(RandomForestClassifier), + ] + ) + space2 = sum( + [ + ss(LogisticRegression), + ] + ) + res = suggest_sk_stacking_model( + space, + space2, + _create_mock_data(regression=False), + scoring="neg_mean_absolute_error", + serialize_path=str(tmpdir), + label_col="l", + feature_prefix="f_", + save_model=True, + partition_keys=["p"], + top_n=2, + ) + assert len(res) == 4 + print(res) + + +def test_process_stack_space(tmpdir): + space1 = ss(LinearRegression, normalize=Grid(True, False)) + space2 = ss(LinearRegression, fit_intercept=Grid(True, False)) + dag = FugueWorkflow() + result0 = build_sk_cv( + space1, + dag.df(_create_mock_data()), + scoring="neg_mean_absolute_error", + cv=2, + label_col="l", + feature_prefix="f_", + ).tune(distributable=False, serialize_path=str(tmpdir)) + res0 = result0.process(_process_stack_space, params=dict(keys=[], space=space2)) + res0.show() + + result1 = build_sk_cv( + space1, + dag.df(_create_mock_data()).partition(by=["p"]), + scoring="neg_mean_absolute_error", + cv=2, + label_col="l", + feature_prefix="f_", + ).tune(distributable=False, serialize_path=str(tmpdir)) + res1 = result1.process(_process_stack_space, params=dict(keys=["p"], space=space2)) + dag.run() + + assert 2 == len(res0.result.as_array()) + assert 8 == len(res1.result.as_array()) + + +def _create_mock_data(regression=True): np.random.seed(0) df = pd.DataFrame(np.random.rand(100, 3), columns=["f_a", "f_b", "f_c"]) df["d"] = "x" - df["l"] = df["f_a"] * 3 + df["f_b"] * 4 + df["f_c"] * 5 + 100 + if regression: + df["l"] = df["f_a"] * 3 + df["f_b"] * 4 + df["f_c"] * 5 + 100 + else: + df["l"] = (df["f_a"] * 3 - df["f_b"] * 4 + df["f_c"] * 5) > 0.5 df["p"] = np.random.randint(low=0, high=4, size=(100, 1)) return df