Skip to content

Commit

Permalink
Merge pull request #941 from mindsdb/staging
Browse files Browse the repository at this point in the history
Release 22.7.3.0
  • Loading branch information
paxcema authored Jul 20, 2022
2 parents a590bec + be93122 commit 5496c5d
Show file tree
Hide file tree
Showing 23 changed files with 481 additions and 412 deletions.
540 changes: 270 additions & 270 deletions docssrc/source/tutorials/tutorial_time_series/tutorial_time_series.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lightwood/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = 'lightwood'
__package_name__ = 'lightwood'
__version__ = '22.7.2.2'
__version__ = '22.7.3.0'
__description__ = "Lightwood is a toolkit for automatic machine learning model building"
__email__ = "[email protected]"
__author__ = 'MindsDB Inc'
Expand Down
9 changes: 3 additions & 6 deletions lightwood/analysis/explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ def explain(data: pd.DataFrame,
for col in timeseries_settings.group_by:
row_insights[f'group_{col}'] = data[col]

for col in timeseries_settings.order_by:
row_insights[f'order_{col}'] = data[col]

for col in timeseries_settings.order_by:
row_insights[f'order_{col}'] = get_inferred_timestamps(
row_insights, col, ts_analysis['deltas'], timeseries_settings)
row_insights[f'order_{timeseries_settings.order_by}'] = data[timeseries_settings.order_by]
row_insights[f'order_{timeseries_settings.order_by}'] = get_inferred_timestamps(
row_insights, timeseries_settings.order_by, ts_analysis['deltas'], timeseries_settings)

kwargs = {
'data': data,
Expand Down
2 changes: 1 addition & 1 deletion lightwood/analysis/helpers/feature_importance.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]:
else:
empty_input_accuracy = {}
ignorable_input_cols = [x for x in ns.input_cols if (not ns.tss.is_timeseries or
(x not in ns.tss.order_by and
(x != ns.tss.order_by and
x not in ns.tss.historical_columns))]
for col in ignorable_input_cols:
partial_data = deepcopy(ns.encoded_val_data)
Expand Down
13 changes: 8 additions & 5 deletions lightwood/analysis/nc/calibrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from lightwood.api.dtype import dtype
from lightwood.api.types import PredictionArguments
from lightwood.helpers.ts import add_tn_conf_bounds
from lightwood.helpers.ts import add_tn_num_conf_bounds, add_tn_cat_conf_bounds

from lightwood.data import EncodedDs
from lightwood.analysis.base import BaseAnalysisBlock
Expand Down Expand Up @@ -120,7 +120,7 @@ def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]:
# fit additional ICPs in time series tasks with grouped columns
if ns.tss.is_timeseries and ns.tss.group_by:
# generate a multiindex
midx = pd.MultiIndex.from_frame(icp_df[[*ns.tss.group_by, f'__mdb_original_{ns.tss.order_by[0]}']])
midx = pd.MultiIndex.from_frame(icp_df[[*ns.tss.group_by, f'__mdb_original_{ns.tss.order_by}']])
icp_df.index = midx

# create an ICP for each possible group
Expand Down Expand Up @@ -157,7 +157,7 @@ def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]:

# add all predictions to DF
icps_df = deepcopy(ns.data)
midx = pd.MultiIndex.from_frame(icps_df[[*ns.tss.group_by, f'__mdb_original_{ns.tss.order_by[0]}']])
midx = pd.MultiIndex.from_frame(icps_df[[*ns.tss.group_by, f'__mdb_original_{ns.tss.order_by}']])
icps_df.index = midx
if ns.is_multi_ts or pred_is_list:
icps_df[f'__predicted_{ns.target}'] = np.array([p[0] for p in ns.normal_predictions['prediction']])
Expand Down Expand Up @@ -380,8 +380,11 @@ def explain(self, row_insights: pd.DataFrame, global_insights: Dict[str, object]
cooldown=ns.pred_args.anomaly_cooldown)
row_insights['anomaly'] = anomalies

if ns.tss.is_timeseries and ns.tss.horizon > 1 and is_numerical:
row_insights = add_tn_conf_bounds(row_insights, ns.tss)
if ns.tss.is_timeseries and ns.tss.horizon > 1:
if is_numerical:
row_insights = add_tn_num_conf_bounds(row_insights, ns.tss)
else:
row_insights = add_tn_cat_conf_bounds(row_insights, ns.tss)

# clip bounds if necessary
if is_numerical:
Expand Down
41 changes: 22 additions & 19 deletions lightwood/api/json_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def lookup_encoder(
# Time-series representations require more advanced flags
if tss.is_timeseries:
gby = tss.group_by if tss.group_by is not None else []
if col_name in tss.order_by:
if col_name == tss.order_by:
encoder_dict["module"] = "ArrayEncoder"
encoder_dict["args"]["original_type"] = f'"{tss.target_type}"'
encoder_dict["args"]["window"] = f"{tss.window}"
Expand Down Expand Up @@ -575,6 +575,7 @@ def _add_implicit_values(json_ai: JsonAI) -> JsonAI:
mixers[i]["args"]["tss"] = mixers[i]["args"].get("tss", "$problem_definition.timeseries_settings")
mixers[i]["args"]["ts_analysis"] = mixers[i]["args"].get("ts_analysis", "$ts_analysis")
mixers[i]["args"]["fit_on_dev"] = mixers[i]["args"].get("fit_on_dev", "True")
mixers[i]["args"]["use_stl"] = mixers[i]["args"].get("use_stl", "False")

elif mixers[i]["module"] == "NHitsMixer":
mixers[i]["args"]["target"] = mixers[i]["args"].get("target", "$target")
Expand Down Expand Up @@ -752,27 +753,29 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
encoder_dict[col_name] = call(encoder)

# Populate time-series specific details
# TODO: consider moving this to a `JsonAI override` phase
tss = json_ai.problem_definition.timeseries_settings
if tss.is_timeseries and tss.use_previous_target:
col_name = f"__mdb_ts_previous_{json_ai.problem_definition.target}"
target_type = json_ai.dtype_dict[json_ai.problem_definition.target]
json_ai.problem_definition.timeseries_settings.target_type = target_type
encoder_dict[col_name] = call(
lookup_encoder(
target_type,
col_name,
False,
json_ai.problem_definition,
False,
None,
if tss.is_timeseries:
if tss.use_previous_target:
col_name = f"__mdb_ts_previous_{json_ai.problem_definition.target}"
target_type = json_ai.dtype_dict[json_ai.problem_definition.target]
json_ai.problem_definition.timeseries_settings.target_type = target_type
encoder_dict[col_name] = call(
lookup_encoder(
target_type,
col_name,
False,
json_ai.problem_definition,
False,
None,
)
)
)

dtype_dict[col_name] = target_type
# @TODO: Is populating the json_ai at this stage even necessary?
json_ai.encoders[col_name] = encoder_dict[col_name]
json_ai.dtype_dict[col_name] = target_type
json_ai.dependency_dict[col_name] = []
dtype_dict[col_name] = target_type
# @TODO: Is populating the json_ai at this stage even necessary?
json_ai.encoders[col_name] = encoder_dict[col_name]
json_ai.dtype_dict[col_name] = target_type
json_ai.dependency_dict[col_name] = []

# ----------------- #

Expand Down
12 changes: 8 additions & 4 deletions lightwood/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ class TimeseriesSettings:
:param is_timeseries: Whether the input data should be treated as time series; if true, this flag is checked in \
subsequent internal steps to ensure processing is appropriate for time-series data.
:param order_by: A list of columns by which the data should be ordered.
:param order_by: Column by which the data should be ordered.
:param group_by: Optional list of columns by which the data should be grouped. Each different combination of values\
for these columns will yield a different series.
:param window: The temporal horizon (number of rows) that a model intakes to "look back" into when making a\
prediction, after the rows are ordered by order_by columns and split into groups if applicable.
prediction, after the rows are ordered by the order_by column and split into groups if applicable.
:param horizon: The number of points in the future that predictions should be made for, defaults to 1. Once \
trained, the model will be able to predict up to this many points into the future.
:param historical_columns: The temporal dynamics of these columns will be used as additional context to train the \
Expand All @@ -128,7 +128,7 @@ class TimeseriesSettings:
""" # noqa

is_timeseries: bool
order_by: List[str] = None
order_by: str = None
window: int = None
group_by: List[str] = None
use_previous_target: bool = True
Expand All @@ -152,11 +152,15 @@ def from_dict(obj: Dict):
:returns: A populated ``TimeseriesSettings`` object.
""" # noqa
if len(obj) > 0:
for mandatory_setting in ["order_by", "window"]:
for mandatory_setting, etype in zip(["order_by", "window"], [str, int]):
if mandatory_setting not in obj:
err = f"Missing mandatory timeseries setting: {mandatory_setting}"
log.error(err)
raise Exception(err)
if obj[mandatory_setting] and not isinstance(obj[mandatory_setting], etype):
err = f"Wrong type for mandatory timeseries setting '{mandatory_setting}': found '{type(obj[mandatory_setting])}', expected '{etype}'" # noqa
log.error(err)
raise Exception(err)

timeseries_settings = TimeseriesSettings(
is_timeseries=True,
Expand Down
2 changes: 1 addition & 1 deletion lightwood/data/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def clean_timeseries(df: pd.DataFrame, tss: TimeseriesSettings) -> pd.DataFrame:
invalid_rows = []

for idx, row in df.iterrows():
if pd.isna(row[tss.order_by[0]]):
if pd.isna(row[tss.order_by]):
invalid_rows.append(idx)

df = df.drop(invalid_rows)
Expand Down
15 changes: 10 additions & 5 deletions lightwood/data/splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def stratify(data: pd.DataFrame,
pct_test: float,
stratify_on: List[str],
seed: int,
reshuffle: bool) -> List[pd.DataFrame]:
reshuffle: bool,
atol: float = 0.05) -> List[pd.DataFrame]:
"""
Stratified data splitter.
Expand All @@ -109,6 +110,7 @@ def stratify(data: pd.DataFrame,
:param stratify_on: Columns to consider when stratifying
:param seed: Random state for pandas data-frame shuffling
:param reshuffle: specify if reshuffling should be done post-split
:param atol: absolute tolerance for difference in stratification percentages. If violated, reverts to a non-stratified split.
:returns Stratified train, dev, test dataframes
""" # noqa
Expand Down Expand Up @@ -136,10 +138,13 @@ def stratify(data: pd.DataFrame,
for df in [train_st, dev_st, test_st]]

# check that stratified lengths conform to expected percentages
if not np.isclose(len(train_st) / len(data), pct_train, atol=0.01) or \
not np.isclose(len(dev_st) / len(data), pct_dev, atol=0.01) or \
not np.isclose(len(test_st) / len(data), pct_test, atol=0.01):
log.info("Could not stratify; reverting to simple split")
emp_tr = len(train_st) / len(data)
emp_dev = len(dev_st) / len(data)
emp_te = len(test_st) / len(data)
if not np.isclose(emp_tr, pct_train, atol=atol) or \
not np.isclose(emp_dev, pct_dev, atol=atol) or \
not np.isclose(emp_te, pct_test, atol=atol):
log.warning(f"Stratification is outside of imposed tolerance ({atol}) ({emp_tr} train - {emp_dev} dev - {emp_te} test), reverting to a simple split.") # noqa
train_st, dev_st, test_st = simple_split(data, pct_train, pct_dev, pct_test)

return [train_st, dev_st, test_st]
45 changes: 26 additions & 19 deletions lightwood/data/timeseries_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from lightwood.api.types import TimeseriesSettings
from lightwood.api.dtype import dtype
from lightwood.helpers.ts import get_ts_groups, get_delta, get_group_matches, Differencer
from lightwood.helpers.ts import get_ts_groups, get_delta, get_group_matches, Differencer, max_pacf
from lightwood.helpers.log import log
from lightwood.encoder.time_series.helpers.common import generate_target_group_normalizers

Expand Down Expand Up @@ -41,6 +41,7 @@ def timeseries_analyzer(data: Dict[str, pd.DataFrame], dtype_dict: Dict[str, str
normalizers = generate_target_group_normalizers(data['train'], target, dtype_dict, groups, tss)

if dtype_dict[target] in (dtype.integer, dtype.float, dtype.num_tsarray):
periods = max_pacf(data['train'], groups, target, tss) # override with PACF output
naive_forecast_residuals, scale_factor = get_grouped_naive_residuals(data['dev'],
target,
tss,
Expand Down Expand Up @@ -96,9 +97,10 @@ def get_grouped_naive_residuals(
group_scale_factors = {}
for group in group_combinations:
idxs, subset = get_group_matches(info, group, tss.group_by)
residuals, scale_factor = get_naive_residuals(subset[target]) # @TODO: pass m once we handle seasonality
group_residuals[group] = residuals
group_scale_factors[group] = scale_factor
if subset.shape[0] > 1:
residuals, scale_factor = get_naive_residuals(subset[target]) # @TODO: pass m once we handle seasonality
group_residuals[group] = residuals
group_scale_factors[group] = scale_factor
return group_residuals, group_scale_factors


Expand All @@ -119,34 +121,38 @@ def get_stls(train_df: pd.DataFrame,
groups: list,
tss: TimeseriesSettings
) -> Dict[str, object]:
stls = {}
stls = {'__default': None}
for group in groups:
_, tr_subset = get_group_matches(train_df, group, tss.group_by)
_, dev_subset = get_group_matches(dev_df, group, tss.group_by)
group_freq = tr_subset['__mdb_inferred_freq'].iloc[0]
tr_subset = deepcopy(tr_subset)[target]
dev_subset = deepcopy(dev_subset)[target]
tr_subset.index = pd.date_range(start=tr_subset.iloc[0], freq=group_freq, periods=len(tr_subset)).to_period()
dev_subset.index = pd.date_range(start=dev_subset.iloc[0], freq=group_freq, periods=len(dev_subset)).to_period()
stl = _pick_ST(tr_subset, dev_subset, sps[group])
log.info(f'Best STL decomposition params for group {group} are: {stl["best_params"]}')
stls[group] = stl
if group != '__default':
_, tr_subset = get_group_matches(train_df, group, tss.group_by)
_, dev_subset = get_group_matches(dev_df, group, tss.group_by)
if tr_subset.shape[0] > 0 and dev_subset.shape[0] > 0 and sps.get(group, False):
group_freq = tr_subset['__mdb_inferred_freq'].iloc[0]
tr_subset = deepcopy(tr_subset)[target]
dev_subset = deepcopy(dev_subset)[target]
tr_subset.index = pd.date_range(start=tr_subset.iloc[0], freq=group_freq,
periods=len(tr_subset)).to_period()
dev_subset.index = pd.date_range(start=dev_subset.iloc[0], freq=group_freq,
periods=len(dev_subset)).to_period()
stl = _pick_ST(tr_subset, dev_subset, sps[group])
log.info(f'Best STL decomposition params for group {group} are: {stl["best_params"]}')
stls[group] = stl
return stls


def _pick_ST(tr_subset: pd.Series, dev_subset: pd.Series, sp: int):
def _pick_ST(tr_subset: pd.Series, dev_subset: pd.Series, sp: list):
"""
Perform hyperparam search with optuna to find best combination of ST transforms for a time series.
:param tr_subset: training series used for fitting blocks. Index should be datetime, and values are the actual time series.
:param dev_subset: dev series used for computing loss. Index should be datetime, and values are the actual time series.
:param sp: seasonal period
:param sp: list of candidate seasonal periods
:return: best deseasonalizer and detrender combination based on dev_loss
""" # noqa

def _ST_objective(trial: optuna.Trial):
trend_degree = trial.suggest_categorical("trend_degree", [1, 2])
ds_sp = trial.suggest_categorical("ds_sp", [sp]) # seasonality period to use in deseasonalizer
ds_sp = trial.suggest_categorical("ds_sp", sp) # seasonality period to use in deseasonalizer
if min(min(tr_subset), min(dev_subset)) <= 0:
decomp_type = trial.suggest_categorical("decomp_type", ['additive'])
else:
Expand All @@ -161,7 +167,8 @@ def _ST_objective(trial: optuna.Trial):
trial.set_user_attr("transformer", transformer)
return np.power(residuals, 2).sum()

study = optuna.create_study()
space = {"trend_degree": [1, 2], "ds_sp": sp, "decomp_type": ['additive', 'multiplicative']}
study = optuna.create_study(sampler=optuna.samplers.GridSampler(space))
study.optimize(_ST_objective, n_trials=8)

return {
Expand Down
Loading

0 comments on commit 5496c5d

Please sign in to comment.