diff --git a/examples/sklearn-hpo-cv.py b/examples/sklearn-hpo-cv.py new file mode 100644 index 00000000..04d6d6c4 --- /dev/null +++ b/examples/sklearn-hpo-cv.py @@ -0,0 +1,277 @@ +"""Random Search with CVEvaluation. + +This example demonstrates the [`CVEvaluation`][amltk.sklearn.CVEvaluation] class, +which builds a custom cross-validation task that can be used to evaluate +[`pipelines`](../guides/pipelines.md) with cross-validation, using +[`RandomSearch`][amltk.optimization.optimizers.random_search.RandomSearch]. +""" +from collections.abc import Mapping +from pathlib import Path +from typing import Any + +import numpy as np +import openml +import pandas as pd +from ConfigSpace import Categorical, Integer +from sklearn.ensemble import RandomForestClassifier +from sklearn.impute import SimpleImputer +from sklearn.metrics import get_scorer +from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder + +from amltk.optimization.optimizers.random_search import RandomSearch +from amltk.optimization.trial import Metric, Trial +from amltk.pipeline import Choice, Component, Node, Sequential, Split, request +from amltk.sklearn import CVEvaluation + + +def get_fold( + openml_task_id: int, + fold: int, +) -> tuple[ + pd.DataFrame, + pd.DataFrame, + pd.DataFrame | pd.Series, + pd.DataFrame | pd.Series, +]: + """Get the data for a specific fold of an OpenML task. + + Args: + openml_task_id: The OpenML task id. + fold: The fold number. + n_splits: The number of splits that will be applied. This is used + to resample training data such that enough at least instance for each class is present for + every stratified split. + seed: The random seed to use for reproducibility of resampling if necessary. + """ + task = openml.tasks.get_task( + openml_task_id, + download_splits=True, + download_data=True, + download_qualities=True, + download_features_meta_data=True, + ) + train_idx, test_idx = task.get_train_test_split_indices(fold=fold) + X, y = task.get_X_and_y(dataset_format="dataframe") # type: ignore + X_train, y_train = X.iloc[train_idx], y.iloc[train_idx] + X_test, y_test = X.iloc[test_idx], y.iloc[test_idx] + return X_train, X_test, y_train, y_test + + +preprocessing = Split( + { + "numerical": Component(SimpleImputer, space={"strategy": ["mean", "median"]}), + "categorical": [ + Component( + OrdinalEncoder, + config={ + "categories": "auto", + "handle_unknown": "use_encoded_value", + "unknown_value": -1, + "encoded_missing_value": -2, + }, + ), + Choice( + "passthrough", + Component( + OneHotEncoder, + space={"max_categories": (2, 20)}, + config={ + "categories": "auto", + "drop": None, + "sparse_output": False, + "handle_unknown": "infrequent_if_exist", + }, + ), + name="one_hot", + ), + ], + }, + name="preprocessing", +) + + +def rf_config_transform(config: Mapping[str, Any], _: Any) -> dict[str, Any]: + new_config = dict(config) + if new_config["class_weight"] == "None": + new_config["class_weight"] = None + return new_config + + +# NOTE: This space should not be used for evaluating how good this RF is +# vs other algorithms +rf_classifier = Component( + item=RandomForestClassifier, + config_transform=rf_config_transform, + space={ + "criterion": ["gini", "entropy"], + "max_features": Categorical( + "max_features", + list(np.logspace(0.1, 1, base=10, num=10) / 10), + ordered=True, + ), + "min_samples_split": Integer("min_samples_split", bounds=(2, 20), default=2), + "min_samples_leaf": Integer("min_samples_leaf", bounds=(1, 20), default=1), + "bootstrap": Categorical("bootstrap", [True, False], default=True), + "class_weight": ["balanced", "balanced_subsample", "None"], + "min_impurity_decrease": (1e-9, 1e-1), + }, + config={ + "random_state": request( + "random_state", + default=None, + ), # Will be provided later by the `Trial` + "n_estimators": 512, + "max_depth": None, + "min_weight_fraction_leaf": 0.0, + "max_leaf_nodes": None, + "warm_start": False, # False due to no iterative fit used here + "n_jobs": 1, + }, +) + +rf_pipeline = Sequential(preprocessing, rf_classifier, name="rf_pipeline") + + +def do_something_after_a_split_was_evaluated( + trial: Trial, + fold: int, + info: CVEvaluation.PostSplitInfo, +) -> CVEvaluation.PostSplitInfo: + return info + + +def do_something_after_a_complete_trial_was_evaluated( + report: Trial.Report, + pipeline: Node, + info: CVEvaluation.CompleteEvalInfo, +) -> Trial.Report: + return report + + +def main() -> None: + random_seed = 42 + openml_task_id = 31 # Adult dataset, classification + task_hint = "classification" + outer_fold_number = ( + 0 # Only run the first outer fold, wrap this in a loop if needs be, with a unique history file + # for each one + ) + optimizer_cls = RandomSearch + working_dir = Path("example-sklearn-hpo-cv").absolute() + results_to = working_dir / "results.parquet" + inner_fold_seed = random_seed + outer_fold_number + metric_definition = Metric( + "accuracy", + minimize=False, + bounds=(0, 1), + fn=get_scorer("accuracy"), + ) + + per_process_memory_limit = None # (4, "GB") # NOTE: May have issues on Mac + per_process_walltime_limit = None # (60, "s") + + debugging = False + if debugging: + max_trials = 1 + max_time = 30 + n_workers = 1 + # raise an error with traceback, something went wrong + on_trial_exception = "raise" + display = True + wait_for_all_workers_to_finish = True + else: + max_trials = 10 + max_time = 300 + n_workers = 4 + # Just mark the trial as fail and move on to the next one + on_trial_exception = "continue" + display = True + wait_for_all_workers_to_finish = False + + X, X_test, y, y_test = get_fold( + openml_task_id=openml_task_id, + fold=outer_fold_number, + ) + + # This object below is a highly customizable class to create a function that we can use for + # evaluating pipelines. + evaluator = CVEvaluation( + # Provide data, number of times to split, cross-validation and a hint of the task type + X, + y, + splitter="cv", + n_splits=8, + task_hint=task_hint, + # Seeding for reproducibility + random_state=inner_fold_seed, + # Provide test data to get test scores + X_test=X_test, + y_test=y_test, + # Record training scores + train_score=True, + # Where to store things + working_dir=working_dir, + # What to do when something goes wrong. + on_error="raise" if on_trial_exception == "raise" else "fail", + # Whether you want models to be store on disk under working_dir + store_models=False, + # A callback to be called at the end of each split + post_split=do_something_after_a_split_was_evaluated, + # Some callback that is called at the end of all fold evaluations + post_processing=do_something_after_a_complete_trial_was_evaluated, + # Whether the post_processing callback requires models will required models, i.e. + # to compute some bagged average over all fold models. If `False` will discard models eagerly + # to sasve sapce. + post_processing_requires_models=False, + # This handles edge cases related to stratified splitting when there are too + # few instances of a specific class. May wish to disable if your passing extra fit params + rebalance_if_required_for_stratified_splitting=True, + # Extra parameters requested by sklearn models/group splitters or metrics, + # such as `sample_weight` + params=None, + ) + + # Here we just use the `optimize` method to setup and run an optimization loop + # with `n_workers`. Please either look at the source code for `optimize` or + # refer to the `Scheduler` and `Optimizer` guide if you need more fine grained control. + # If you need to evaluate a certain configuraiton, you can create your own `Trial` object. + # + # trial = Trial.create(name=...., info=None, config=..., bucket=..., seed=..., metrics=metric_def) + # report = evaluator.evaluate(trial, rf_pipeline) + # print(report) + # + history = rf_pipeline.optimize( + target=evaluator.fn, + metric=metric_definition, + optimizer=optimizer_cls, + seed=inner_fold_seed, + process_memory_limit=per_process_memory_limit, + process_walltime_limit=per_process_walltime_limit, + working_dir=working_dir, + max_trials=max_trials, + timeout=max_time, + display=display, + wait=wait_for_all_workers_to_finish, + n_workers=n_workers, + on_trial_exception=on_trial_exception, + ) + + df = history.df() + + # Assign some new information to the dataframe + df.assign( + outer_fold=outer_fold_number, + inner_fold_seed=inner_fold_seed, + task_id=openml_task_id, + max_trials=max_trials, + max_time=max_time, + optimizer=optimizer_cls.__name__, + n_workers=n_workers, + ) + print(df) + print(f"Saving dataframe of results to path: {results_to}") + df.to_parquet(results_to) + + +if __name__ == "__main__": + main() diff --git a/src/amltk/optimization/optimizers/random_search.py b/src/amltk/optimization/optimizers/random_search.py new file mode 100644 index 00000000..8ef3dd60 --- /dev/null +++ b/src/amltk/optimization/optimizers/random_search.py @@ -0,0 +1,150 @@ +"""An optimizer that uses ConfigSpace for random search.""" +from __future__ import annotations + +from collections.abc import Iterable, Sequence +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING, Literal, overload +from typing_extensions import override + +from amltk.optimization import Metric, Optimizer, Trial +from amltk.pipeline import Node +from amltk.randomness import as_int, randuid +from amltk.store import PathBucket + +if TYPE_CHECKING: + from typing_extensions import Self + + from ConfigSpace import ConfigurationSpace + + from amltk.types import Seed + + +class RandomSearch(Optimizer[None]): + """An optimizer that uses ConfigSpace for random search.""" + + def __init__( + self, + *, + space: ConfigurationSpace, + bucket: PathBucket | None = None, + metrics: Metric | Sequence[Metric], + seed: Seed | None = None, + ) -> None: + """Initialize the optimizer. + + Args: + space: The search space to search over. + bucket: The bucket given to trials generated by this optimizer. + metrics: The metrics to optimize. Unused for RandomSearch. + seed: The seed to use for the optimization. + """ + metrics = metrics if isinstance(metrics, Sequence) else [metrics] + super().__init__(metrics=metrics, bucket=bucket) + seed = as_int(seed) + space.seed(seed) + self._counter = 0 + self.seed = seed + self.space = space + + @override + @classmethod + def create( + cls, + *, + space: ConfigurationSpace | Node, + metrics: Metric | Sequence[Metric], + bucket: PathBucket | str | Path | None = None, + seed: Seed | None = None, + ) -> Self: + """Create a random search optimizer. + + Args: + space: The node to optimize + metrics: The metrics to optimize + bucket: The bucket to store the results in + seed: The seed to use for the optimization + """ + seed = as_int(seed) + match bucket: + case None: + bucket = PathBucket( + f"{cls.__name__}-{datetime.now().isoformat()}", + ) + case str() | Path(): + bucket = PathBucket(bucket) + case bucket: + bucket = bucket # noqa: PLW0127 + + if isinstance(space, Node): + space = space.search_space(parser=cls.preferred_parser()) + + return cls( + space=space, + seed=seed, + bucket=bucket, + metrics=metrics, + ) + + @overload + def ask(self, n: int) -> Iterable[Trial[None]]: + ... + + @overload + def ask(self, n: None = None) -> Trial[None]: + ... + + @override + def ask( + self, + n: int | None = None, + ) -> Trial[None] | Iterable[Trial[None]]: + """Ask the optimizer for a new config. + + Args: + n: The number of configs to ask for. If `None`, ask for a single config. + + + Returns: + The trial info for the new config. + """ + if n is None: + configs = [self.space.sample_configuration()] + else: + configs = self.space.sample_configuration(n) + + trials: list[Trial[None]] = [] + for config in configs: + self._counter += 1 + randuid_seed = self.seed + self._counter + unique_name = f"trial-{randuid(4, seed=randuid_seed)}-{self._counter}" + trial: Trial[None] = Trial.create( + name=unique_name, + config=dict(config), + info=None, + seed=self.seed, + bucket=self.bucket / unique_name, + metrics=self.metrics, + ) + trials.append(trial) + + if n is None: + return trials[0] + + return trials + + @override + def tell(self, report: Trial.Report[None]) -> None: + """Tell the optimizer about the result of a trial. + + Does nothing for random search. + + Args: + report: The report of the trial. + """ + + @override + @classmethod + def preferred_parser(cls) -> Literal["configspace"]: + """The preferred parser for this optimizer.""" + return "configspace" diff --git a/src/amltk/scheduling/task.py b/src/amltk/scheduling/task.py index 2c7ed632..f9f216fd 100644 --- a/src/amltk/scheduling/task.py +++ b/src/amltk/scheduling/task.py @@ -346,7 +346,6 @@ def __rich__(self) -> Panel: items.append(plugin) tree = Tree(label="", hide_root=True) - tree.add(self) items.append(tree) return Panel( diff --git a/src/amltk/sklearn/evaluation.py b/src/amltk/sklearn/evaluation.py index 1dd9b6ca..f31d3db5 100644 --- a/src/amltk/sklearn/evaluation.py +++ b/src/amltk/sklearn/evaluation.py @@ -139,6 +139,81 @@ def my_post_split( """ +def resample_if_minority_class_too_few_for_n_splits( + X_train: pd.DataFrame, # noqa: N803 + y_train: pd.Series, + *, + n_splits: int, + seed: Seed | None = None, + _warning_if_occurs: str | None = None, +) -> tuple[pd.DataFrame, pd.DataFrame | pd.Series]: + """Rebalance the training data to allow stratification. + + If your data only contains something such as 3 labels for a single class, and you + wish to perform 5 fold cross-validation, you will need to rebalance the data to + allow for stratification. This function will take the training data and labels and + and resample the data to allow for stratification. + + Args: + X_train: The training data. + y_train: The training labels. + n_splits: The number of splits to perform. + seed: Used for deciding which instances to resample. + + Returns: + The rebalanced training data and labels. + """ + if y_train.ndim != 1: + raise NotImplementedError( + "Rebalancing for multi-output classification is not yet supported.", + ) + + # If we are in binary/multilclass setting and there is not enough instances + # with a given label to perform stratified sampling with `n_splits`, we first + # find these labels, take the first N instances which have these labels and allows + # us to reach `n_splits` instances for each label. + indices_to_resample = None + label_counts = y_train.value_counts() + under_represented_labels = label_counts[label_counts < n_splits] # type: ignore + + collected_indices = [] + if any(under_represented_labels): + if _warning_if_occurs is not None: + warnings.warn(_warning_if_occurs, UserWarning, stacklevel=2) + under_rep_instances = y_train[y_train.isin(under_represented_labels.index)] # type: ignore + + grouped_by_label = under_rep_instances.to_frame("label").groupby( # type: ignore + "label", + observed=True, # Handles categoricals + ) + for _label, instances_with_label in grouped_by_label: + n_to_take = n_splits - len(instances_with_label) + + need_to_sample_repeatedly = n_to_take > len(instances_with_label) + resampled_instances = instances_with_label.sample( + n=n_to_take, + random_state=seed, # type: ignore + # It could be that we have to repeat sample if there are not enough + # instances to hit `n_splits` for a given label. + replace=need_to_sample_repeatedly, + ) + collected_indices.append(np.asarray(resampled_instances.index)) + + indices_to_resample = np.concatenate(collected_indices) + + if indices_to_resample is not None: + # Give the new samples a new index to not overlap with the original data. + new_start_idx = X_train.index.max() + 1 # type: ignore + new_end_idx = new_start_idx + len(indices_to_resample) + new_idx = pd.RangeIndex(start=new_start_idx, stop=new_end_idx) + resampled_X = X_train.loc[indices_to_resample].set_index(new_idx) + resampled_y = y_train.loc[indices_to_resample].set_axis(new_idx) + X_train = pd.concat([X_train, resampled_X]) + y_train = pd.concat([y_train, resampled_y]) + + return X_train, y_train + + def _check_valid_scores( scores: Mapping[str, float] | Number, split: str, @@ -1305,7 +1380,7 @@ def _on_comm_request_ask_whether_to_continue(self, msg: Comm.Msg) -> None: " discuss use cases and how to handle this.", ) - def __init__( # noqa: PLR0913 + def __init__( # noqa: PLR0913, C901 self, X: XLike, # noqa: N803 y: YLike, @@ -1319,6 +1394,7 @@ def __init__( # noqa: PLR0913 holdout_size: float = 0.33, train_score: bool = False, store_models: bool = False, + rebalance_if_required_for_stratified_splitting: bool | None = None, additional_scorers: Mapping[str, _Scorer] | None = None, random_state: Seed | None = None, # Only used if cv is an int/float params: Mapping[str, Any] | None = None, @@ -1368,6 +1444,14 @@ def __init__( # noqa: PLR0913 will take extra time as predictions will be made on the training data as well. store_models: Whether to store the trained models in the trial. + rebalance_if_required_for_stratified_splitting: Whether the CVEvaluator + should rebalance the training data to allow for stratified splitting. + * If `True`, rebalancing will be done if required. That is when + the `splitter=` is `"cv"` or a `StratifiedKFold` and + there are fewer instances of a minority class than `n_splits=`. + * If `None`, rebalancing will be done if required it. Same + as `True` but raises a warning if it occurs. + * If `False`, rebalancing will never be done. additional_scorers: Additional scorers to use. random_state: The random state to use for the cross-validation `splitter=`. If a custom splitter is provided, this will be @@ -1571,6 +1655,7 @@ def my_post_processing( n_splits=n_splits, random_state=random_state, ) + case "holdout": splitter = _default_holdout( task_type, @@ -1580,6 +1665,43 @@ def my_post_processing( case _: splitter = splitter # noqa: PLW0127 + # This whole block is to check whether we should resample for stratified + # sampling, in the case of a low minority class. + if ( + isinstance(splitter, StratifiedKFold) + and rebalance_if_required_for_stratified_splitting is not False + and task_type in ("binary", "multiclass") + ): + if rebalance_if_required_for_stratified_splitting is None: + _warning = ( + f"Labels have fewer than `{n_splits=}` instances. Resampling data" + " to ensure it's possible to have one of each label in each fold." + " Note that this may cause things to crash if you've provided extra" + " `params` as the `X` data will have gotten slightly larger. Please" + " set `rebalance_if_required_for_stratified_splitting=False` if you" + " do not wish this to be enabled automatically, in which case, you" + " may either perform resampling yourself or choose a smaller" + " `n_splits=`." + ) + else: + _warning = None + + x_is_frame = isinstance(X, pd.DataFrame) + y_is_frame = isinstance(y, pd.Series | pd.DataFrame) + + X, y = resample_if_minority_class_too_few_for_n_splits( # type: ignore + X if x_is_frame else pd.DataFrame(X), + y if y_is_frame else pd.Series(y), # type: ignore + n_splits=n_splits, + seed=random_state, + _warning_if_occurs=_warning, + ) + + if not x_is_frame: + X = X.to_numpy() # type: ignore + if not y_is_frame: + y = y.to_numpy() # type: ignore + self.task_type = task_type self.additional_scorers = additional_scorers self.bucket = bucket diff --git a/src/amltk/store/stored.py b/src/amltk/store/stored.py index f3a47ed3..9383f0b1 100644 --- a/src/amltk/store/stored.py +++ b/src/amltk/store/stored.py @@ -41,6 +41,7 @@ from collections.abc import Callable from typing import Generic, TypeVar +from typing_extensions import override K = TypeVar("K") V = TypeVar("V") @@ -63,3 +64,7 @@ def __init__(self, key: K, read: Callable[[K], V]): def load(self) -> V: """Get the value.""" return self.read(self.key) + + @override + def __repr__(self) -> str: + return f"Stored({self.key})"