Skip to content

Commit

Permalink
Merge pull request #1160 from mindsdb/staging
Browse files Browse the repository at this point in the history
Release 23.6.4.0
  • Loading branch information
paxcema authored Jun 26, 2023
2 parents 51a3f03 + 7432c7a commit 3262859
Show file tree
Hide file tree
Showing 19 changed files with 261 additions and 1,279 deletions.
750 changes: 27 additions & 723 deletions docssrc/source/tutorials/custom_cleaner/custom_cleaner.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lightwood/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = 'lightwood'
__package_name__ = 'lightwood'
__version__ = '23.6.2.0'
__version__ = '23.6.4.0'
__description__ = "Lightwood is a toolkit for automatic machine learning model building"
__email__ = "[email protected]"
__author__ = 'MindsDB Inc'
Expand Down
13 changes: 11 additions & 2 deletions lightwood/api/json_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,6 @@ def _add_implicit_values(json_ai: JsonAI) -> JsonAI:
"dtype_dict": "$dtype_dict",
"target": "$target",
"mode": "$mode",
"ts_analysis": "$ts_analysis",
"pred_args": "$pred_args",
},
},
Expand Down Expand Up @@ -1336,8 +1335,18 @@ def predict(self, data: pd.DataFrame, args: Dict = {{}}) -> pd.DataFrame:
black = None

if black is not None:
try:
formatted_predictor_code = black.format_str(predictor_code, mode=black.FileMode())

if type(predictor_from_code(formatted_predictor_code)).__name__ == 'Predictor':
predictor_code = formatted_predictor_code
else:
log.info('Black formatter output is invalid, predictor code might be a bit ugly')

except Exception:
log.info('Black formatter failed to run, predictor code might be a bit ugly')
else:
log.info('Unable to import black formatter, predictor code might be a bit ugly.')
predictor_code = black.format_str(predictor_code, mode=black.FileMode())

return predictor_code

Expand Down
132 changes: 15 additions & 117 deletions lightwood/data/timeseries_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
from copy import deepcopy
from typing import Dict, Tuple, List, Union
from typing import Dict, Tuple, List

import optuna
import numpy as np
import pandas as pd
from sktime.transformations.series.detrend import Detrender
from sktime.forecasting.trend import PolynomialTrendForecaster
from sktime.transformations.series.detrend import ConditionalDeseasonalizer
from type_infer.dtype import dtype

from lightwood.api.types import TimeseriesSettings
from type_infer.dtype import dtype
from lightwood.helpers.ts import get_ts_groups, get_delta, get_group_matches, Differencer
from lightwood.helpers.log import log
from lightwood.helpers.ts import get_ts_groups, get_delta, Differencer
from lightwood.encoder.time_series.helpers.common import generate_target_group_normalizers


Expand All @@ -36,18 +30,16 @@ def timeseries_analyzer(data: Dict[str, pd.DataFrame], dtype_dict: Dict[str, str
""" # noqa
tss = timeseries_settings
groups = get_ts_groups(data['train'], tss)
deltas, periods, freqs = get_delta(data['train'], dtype_dict, groups, target, tss)
deltas, periods, freqs = get_delta(data['train'], tss)

normalizers = generate_target_group_normalizers(data['train'], target, dtype_dict, groups, tss)
normalizers = generate_target_group_normalizers(data['train'], target, dtype_dict, tss)

if dtype_dict[target] in (dtype.integer, dtype.float, dtype.num_tsarray):
naive_forecast_residuals, scale_factor = get_grouped_naive_residuals(data['dev'], target, tss, groups)
differencers = get_differencers(data['train'], target, groups, tss.group_by)
stl_transforms = get_stls(data['train'], data['dev'], target, periods, groups, tss)
naive_forecast_residuals, scale_factor = get_grouped_naive_residuals(data['dev'], target, tss)
differencers = get_differencers(data['train'], target, tss.group_by)
else:
naive_forecast_residuals, scale_factor = {}, {}
differencers = {}
stl_transforms = {}

return {'target_normalizers': normalizers,
'deltas': deltas,
Expand All @@ -57,7 +49,7 @@ def timeseries_analyzer(data: Dict[str, pd.DataFrame], dtype_dict: Dict[str, str
'ts_naive_mae': scale_factor,
'periods': periods,
'sample_freqs': freqs,
'stl_transforms': stl_transforms,
'stl_transforms': {}, # TODO: remove, or provide from outside as user perhaps
'differencers': differencers
}

Expand Down Expand Up @@ -87,121 +79,27 @@ def get_naive_residuals(target_data: pd.DataFrame, m: int = 1) -> Tuple[List, fl
def get_grouped_naive_residuals(
info: pd.DataFrame,
target: str,
tss: TimeseriesSettings,
group_combinations: List) -> Tuple[Dict, Dict]:
tss: TimeseriesSettings
) -> Tuple[Dict, Dict]:
"""
Wraps `get_naive_residuals` for a dataframe with multiple co-existing time series.
""" # noqa
group_residuals = {}
group_scale_factors = {}
for group in group_combinations:
idxs, subset = get_group_matches(info, group, tss.group_by)
grouped = info.groupby(by=tss.group_by) if tss.group_by else info.groupby(lambda x: '__default')
for group, subset in grouped:
if subset.shape[0] > 1:
residuals, scale_factor = get_naive_residuals(subset[target]) # @TODO: pass m once we handle seasonality
group_residuals[group] = residuals
group_scale_factors[group] = scale_factor
return group_residuals, group_scale_factors


def get_differencers(data: pd.DataFrame, target: str, groups: List, group_cols: List):
def get_differencers(data: pd.DataFrame, target: str, group_cols: List):
differencers = {}
for group in groups:
idxs, subset = get_group_matches(data, group, group_cols)
grouped = data.groupby(by=group_cols) if group_cols else data.groupby(lambda x: True)
for group, subset in grouped:
differencer = Differencer()
differencer.fit(subset[target].values)
differencers[group] = differencer
return differencers


def get_stls(train_df: pd.DataFrame,
dev_df: pd.DataFrame,
target: str,
sps: Dict,
groups: list,
tss: TimeseriesSettings
) -> Dict[str, object]:
stls = {'__default': None}
for group in groups:
if group != '__default':
_, tr_subset = get_group_matches(train_df, group, tss.group_by)
_, dev_subset = get_group_matches(dev_df, group, tss.group_by)
if tr_subset.shape[0] > 0 and dev_subset.shape[0] > 0 and sps.get(group, False):
group_freq = tr_subset['__mdb_inferred_freq'].iloc[0]
tr_subset = deepcopy(tr_subset)[target]
dev_subset = deepcopy(dev_subset)[target]
tr_subset.index = pd.date_range(start=tr_subset.iloc[0], freq=group_freq,
periods=len(tr_subset)).to_period()
dev_subset.index = pd.date_range(start=dev_subset.iloc[0], freq=group_freq,
periods=len(dev_subset)).to_period()
stl = _pick_ST(tr_subset, dev_subset, sps[group])
log.info(f'Best STL decomposition params for group {group} are: {stl["best_params"]}')
stls[group] = stl
return stls


def _pick_ST(tr_subset: pd.Series, dev_subset: pd.Series, sp: list):
"""
Perform hyperparam search with optuna to find best combination of ST transforms for a time series.
:param tr_subset: training series used for fitting blocks. Index should be datetime, and values are the actual time series.
:param dev_subset: dev series used for computing loss. Index should be datetime, and values are the actual time series.
:param sp: list of candidate seasonal periods
:return: best deseasonalizer and detrender combination based on dev_loss
""" # noqa

def _ST_objective(trial: optuna.Trial):
trend_degree = trial.suggest_categorical("trend_degree", [1])
ds_sp = trial.suggest_categorical("ds_sp", sp) # seasonality period to use in deseasonalizer
if min(min(tr_subset), min(dev_subset)) <= 0:
decomp_type = trial.suggest_categorical("decomp_type", ['additive'])
else:
decomp_type = trial.suggest_categorical("decomp_type", ['additive', 'multiplicative'])

detrender = Detrender(forecaster=PolynomialTrendForecaster(degree=trend_degree))
deseasonalizer = ConditionalDeseasonalizer(sp=ds_sp, model=decomp_type)
transformer = STLTransformer(detrender=detrender, deseasonalizer=deseasonalizer, type=decomp_type)
transformer.fit(tr_subset)
residuals = transformer.transform(dev_subset)

trial.set_user_attr("transformer", transformer)
return np.power(residuals, 2).sum()

space = {"trend_degree": [1, 2], "ds_sp": sp, "decomp_type": ['additive', 'multiplicative']}
study = optuna.create_study(sampler=optuna.samplers.GridSampler(space))
study.optimize(_ST_objective, n_trials=8)

return {
"transformer": study.best_trial.user_attrs['transformer'],
"best_params": study.best_params
}


class STLTransformer:
def __init__(self, detrender: Detrender, deseasonalizer: ConditionalDeseasonalizer, type: str = 'additive'):
"""
Class that handles STL transformation and inverse, given specific detrender and deseasonalizer instances.
:param detrender: Already initialized.
:param deseasonalizer: Already initialized.
:param type: Either 'additive' or 'multiplicative'.
""" # noqa
self._type = type
self.detrender = detrender
self.deseasonalizer = deseasonalizer
self.op = {
'additive': lambda x, y: x - y,
'multiplicative': lambda x, y: x / y
}
self.iop = {
'additive': lambda x, y: x + y,
'multiplicative': lambda x, y: x * y
}

def fit(self, x: Union[pd.DataFrame, pd.Series]):
self.deseasonalizer.fit(x)
self.detrender.fit(self.op[self._type](x, self.deseasonalizer.transform(x)))

def transform(self, x: Union[pd.DataFrame, pd.Series]):
return self.detrender.transform(self.deseasonalizer.transform(x))

def inverse_transform(self, x: Union[pd.DataFrame, pd.Series]):
return self.deseasonalizer.inverse_transform(self.detrender.inverse_transform(x))
106 changes: 30 additions & 76 deletions lightwood/data/timeseries_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
import numpy as np
import pandas as pd
from lightwood.helpers.parallelism import get_nr_procs
from lightwood.helpers.ts import get_ts_groups, get_delta, get_group_matches

from type_infer.dtype import dtype
from lightwood.api.types import TimeseriesSettings, PredictionArguments
from lightwood.helpers.log import log


def transform_timeseries(
data: pd.DataFrame, dtype_dict: Dict[str, str], ts_analysis: dict,
data: pd.DataFrame, dtype_dict: Dict[str, str],
timeseries_settings: TimeseriesSettings, target: str, mode: str,
pred_args: Optional[PredictionArguments] = None
) -> pd.DataFrame:
Expand All @@ -29,7 +28,6 @@ def transform_timeseries(
:param data: Dataframe with data to transform.
:param dtype_dict: Dictionary with the types of each column.
:param ts_analysis: dictionary with various insights into each series passed as training input.
:param timeseries_settings: A `TimeseriesSettings` object.
:param target: The name of the target column to forecast.
:param mode: Either "train" or "predict", depending on what phase is calling this procedure.
Expand All @@ -43,6 +41,7 @@ def transform_timeseries(
gb_arr = tss.group_by if tss.group_by is not None else []
oby = tss.order_by
window = tss.window
oby_col = tss.order_by

if tss.use_previous_target and target not in data.columns:
raise Exception(f"Cannot transform. Missing historical values for target column {target} (`use_previous_target` is set to True).") # noqa
Expand All @@ -51,37 +50,32 @@ def transform_timeseries(
if hcol not in data.columns or data[hcol].isna().any():
raise Exception(f"Cannot transform. Missing values in historical column {hcol}.")

# infer frequency with get_delta
oby_col = tss.order_by
groups = get_ts_groups(data, tss)

# initial stable sort and per-partition deduplication
# initial stable sort and per-partition deduplication TODO: slowish, add a top-level param to disable if needed
data = data.sort_values(by=oby_col, kind='mergesort')
data = data.drop_duplicates(subset=[oby_col, *gb_arr], keep='first')

if not ts_analysis:
_, periods, freqs = get_delta(data, dtype_dict, groups, target, tss)
else:
periods = ts_analysis['periods']
freqs = ts_analysis['sample_freqs']

# pass seconds to timestamps according to each group's inferred freq, and force this freq on index
subsets = []
for group in groups:
if (tss.group_by and group != '__default') or not tss.group_by:
idxs, subset = get_group_matches(data, group, tss.group_by, copy=True)
if subset.shape[0] > 0:
if periods.get(group, periods['__default']) == 0 and subset.shape[0] > 1:
raise Exception(
f"Partition is not valid, faulty group {group}. Please make sure you group by a set of columns that ensures unique measurements for each grouping through time.") # noqa

index = pd.to_datetime(subset[oby_col], unit='s')
subset.index = pd.date_range(start=index.iloc[0],
freq=freqs.get(group, freqs['__default']),
periods=len(subset))
subset['__mdb_inferred_freq'] = subset.index.freq # sets constant column because pd.concat forgets freq (see: https://github.com/pandas-dev/pandas/issues/3232) # noqa
subsets.append(subset)
original_df = pd.concat(subsets).sort_values(by='__mdb_original_index')
grouped = data.groupby(by=tss.group_by) if tss.group_by else data.groupby(lambda x: True)
reindexed = []
# TODO: introduce MP here
for name, group in grouped:
name = name if tss.group_by and len(tss.group_by) > 1 else (name, ) # guaranteed tuple type
if group.shape[0] > 0:
if group[tss.order_by].value_counts().max() > 1 and group.shape[0] > 1:
raise Exception(f"Partition is not valid, faulty group {name}. Please make sure you group by a set of columns that ensures unique measurements for each grouping through time.") # noqa

index = pd.to_datetime(group[oby_col], unit='s', utc=True)
group.index = pd.date_range(start=index.iloc[0], end=index.iloc[-1], periods=len(group))
resampled = group
group['__mdb_inferred_freq'] = None
if len(group) > 2:
freq = pd.infer_freq(group.index)
if freq is not None:
group['__mdb_inferred_freq'] = freq # sets constant column because pd.concat forgets freq (see: https://github.com/pandas-dev/pandas/issues/3232) # noqa
resampled = group.resample(freq).first()
reindexed.append(resampled)

original_df = pd.concat(reindexed).sort_values(by='__mdb_original_index')

if '__mdb_forecast_offset' in original_df.columns:
""" This special column can be either None or an integer. If this column is passed, then the TS transformation will react to the values within:
Expand All @@ -103,18 +97,12 @@ def transform_timeseries(
offset = 0
cutoff_mode = False

original_index_list = []
idx = 0
for row in original_df.itertuples():
if _make_pred(row) or cutoff_mode:
original_df.at[row.Index, '__make_predictions'] = True
original_index_list.append(idx)
idx += 1
else:
original_df.at[row.Index, '__make_predictions'] = False
original_index_list.append(None)

original_df['original_index'] = original_index_list
if '__mdb_forecast_offset' in original_df.columns or cutoff_mode:
original_df['__make_predictions'] = True
original_df['original_index'] = np.arange(len(original_df))
else:
original_df['__make_predictions'] = False
original_df['original_index'] = None

secondary_type_dict = {}
if dtype_dict[oby] in (dtype.date, dtype.integer, dtype.float):
Expand Down Expand Up @@ -191,39 +179,12 @@ def transform_timeseries(
else:
raise Exception(f'Not enough historical context to make a timeseries prediction (`allow_incomplete_history` is set to False). Please provide a number of rows greater or equal to the window size - currently (number_rows, window_size) = ({min(group_lengths)}, {tss.window}). If you can\'t get enough rows, consider lowering your window size. If you want to force timeseries predictions lacking historical context please set the `allow_incomplete_history` timeseries setting to `True`, but this might lead to subpar predictions depending on the mixer.') # noqa

df_gb_map = None
if n_groups > 1:
df_gb_list = list(combined_df.groupby(tss.group_by))
df_gb_map = {}
for gb, df in df_gb_list:
df_gb_map['_' + '_'.join(str(gb))] = df

timeseries_row_mapping = {}
idx = 0

if df_gb_map is None:
for i in range(len(combined_df)):
row = combined_df.iloc[i]
if not cutoff_mode:
timeseries_row_mapping[idx] = int(
row['original_index']) if row['original_index'] is not None and not np.isnan(
row['original_index']) else None
else:
timeseries_row_mapping[idx] = idx
idx += 1
else:
for gb in df_gb_map:
for i in range(len(df_gb_map[gb])):
row = df_gb_map[gb].iloc[i]
if not cutoff_mode:
timeseries_row_mapping[idx] = int(
row['original_index']) if row['original_index'] is not None and not np.isnan(
row['original_index']) else None
else:
timeseries_row_mapping[idx] = idx

idx += 1

del combined_df['original_index']

return combined_df
Expand Down Expand Up @@ -256,13 +217,6 @@ def _ts_infer_next_row(df: pd.DataFrame, ob: str) -> pd.DataFrame:
return new_df


def _make_pred(row) -> bool:
"""
Indicates whether a prediction should be made for `row` or not.
"""
return not hasattr(row, '__mdb_forecast_offset') or row.make_predictions


def _ts_to_obj(df: pd.DataFrame, historical_columns: list) -> pd.DataFrame:
"""
Casts all historical columns in a dataframe to `object` type.
Expand Down
Loading

0 comments on commit 3262859

Please sign in to comment.