Skip to content

Commit

Permalink
Merge pull request #525 from mindsdb/fix_508
Browse files Browse the repository at this point in the history
Fix #508 - `allow_incomplete_history` for time series predictors
  • Loading branch information
paxcema authored Sep 22, 2021
2 parents 54170c6 + e7bebc2 commit 6475193
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 33 deletions.
49 changes: 23 additions & 26 deletions lightwood/api/json_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def generate_json_ai(
):
input_cols.append(col_name)

tss = problem_definition.timeseries_settings
is_target_predicting_encoder = False
# Single text column classification
if (
Expand Down Expand Up @@ -179,8 +180,7 @@ def generate_json_ai(

}]

if not problem_definition.timeseries_settings.is_timeseries or \
problem_definition.timeseries_settings.nr_predictions <= 1:
if not tss.is_timeseries or tss.nr_predictions == 1:
mixers.extend([{
'module': 'LightGBM',
'args': {
Expand All @@ -195,7 +195,7 @@ def generate_json_ai(
}
}
])
elif problem_definition.timeseries_settings.nr_predictions > 1:
elif tss.nr_predictions > 1:
mixers.extend([{
'module': 'LightGBMArray',
'args': {
Expand All @@ -205,7 +205,7 @@ def generate_json_ai(
}
}])

if problem_definition.timeseries_settings.use_previous_target:
if tss.use_previous_target:
mixers.extend([
{
'module': 'SkTime',
Expand All @@ -229,8 +229,7 @@ def generate_json_ai(
)}

if (
problem_definition.timeseries_settings.is_timeseries
and problem_definition.timeseries_settings.nr_predictions > 1
tss.is_timeseries and tss.nr_predictions > 1
):
list(outputs.values())[0].data_dtype = dtype.tsarray

Expand All @@ -247,15 +246,12 @@ def generate_json_ai(
)

for encoder_name in ts_encoders:
if (
problem_definition.timeseries_settings.is_timeseries
and encoder_name == encoder['module'].split(".")[1]
):
if problem_definition.timeseries_settings.group_by is not None:
for group in problem_definition.timeseries_settings.group_by:
if tss.is_timeseries and encoder_name == encoder['module'].split(".")[1]:
if tss.group_by is not None:
for group in tss.group_by:
dependency.append(group)

if problem_definition.timeseries_settings.use_previous_target:
if tss.use_previous_target:
dependency.append(f"__mdb_ts_previous_{target}")

if len(dependency) > 0:
Expand All @@ -265,15 +261,15 @@ def generate_json_ai(
features[col_name] = feature

# Decide on the accuracy functions to use
if list(outputs.values())[0].data_dtype in [dtype.integer, dtype.float, dtype.date, dtype.datetime]:
output_dtype = list(outputs.values())[0].data_dtype
if output_dtype in [dtype.integer, dtype.float, dtype.date, dtype.datetime]:
accuracy_functions = ['r2_score']
elif list(outputs.values())[0].data_dtype in [dtype.categorical, dtype.tags, dtype.binary]:
elif output_dtype in [dtype.categorical, dtype.tags, dtype.binary]:
accuracy_functions = ['balanced_accuracy_score']
elif list(outputs.values())[0].data_dtype in (dtype.array, dtype.tsarray):
elif output_dtype in (dtype.array, dtype.tsarray):
accuracy_functions = ['evaluate_array_accuracy']
else:
data_dtype = list(outputs.values())[0].data_dtype
raise Exception(f'Please specify a custom accuracy function for output type {data_dtype}')
raise Exception(f'Please specify a custom accuracy function for output type {output_dtype}')

if problem_definition.time_aim is None and (
problem_definition.seconds_per_mixer is None or problem_definition.seconds_per_encoder is None):
Expand Down Expand Up @@ -324,6 +320,8 @@ def add_implicit_values(json_ai: JsonAI) -> JsonAI:
:returns: ``JSONAI`` object with all necessary parameters that were previously left unmentioned filled in.
"""
problem_definition = json_ai.problem_definition
tss = problem_definition.timeseries_settings

imports = [
'from lightwood.mixer import Neural', 'from lightwood.mixer import LightGBM',
'from lightwood.mixer import LightGBMArray', 'from lightwood.mixer import SkTime',
Expand Down Expand Up @@ -351,7 +349,7 @@ def add_implicit_values(json_ai: JsonAI) -> JsonAI:
continue
imports.append(f"from lightwood.encoder import {encoder_import}")

if problem_definition.timeseries_settings.use_previous_target:
if tss.use_previous_target:
imports.append('from lightwood.encoder import ArrayEncoder')

# Add implicit arguments
Expand All @@ -368,8 +366,8 @@ def add_implicit_values(json_ai: JsonAI) -> JsonAI:
mixers[i]['args']['timeseries_settings'] = mixers[i]['args'].get(
'timeseries_settings', '$problem_definition.timeseries_settings')
mixers[i]['args']['net'] = mixers[i]['args'].get(
'net', '"DefaultNet"' if not problem_definition.timeseries_settings.is_timeseries
or not problem_definition.timeseries_settings.use_previous_target
'net', '"DefaultNet"' if not tss.is_timeseries
or not tss.use_previous_target
else '"ArNet"')

elif mixers[i]['module'] == 'LightGBM':
Expand Down Expand Up @@ -462,15 +460,13 @@ def add_implicit_values(json_ai: JsonAI) -> JsonAI:
"encoded_data": "encoded_data",
"predictions": "df",
"analysis": "$runtime_analyzer",
"ts_analysis": "$ts_analysis"
if problem_definition.timeseries_settings.is_timeseries
else None,
"ts_analysis": "$ts_analysis" if tss.is_timeseries else None,
"target_name": "$target",
"target_dtype": "$dtype_dict[self.target]",
},
}

if problem_definition.timeseries_settings.is_timeseries:
if tss.is_timeseries:
if json_ai.timeseries_transformer is None:
json_ai.timeseries_transformer = {
"module": "transform_timeseries",
Expand Down Expand Up @@ -521,7 +517,8 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
dtype_dict[col_name] = f"""'{feature.data_dtype}'"""

# @TODO: Move into json-ai creation function (I think? Maybe? Let's discuss)
if json_ai.problem_definition.timeseries_settings.use_previous_target:
tss = json_ai.problem_definition.timeseries_settings
if tss.is_timeseries and tss.use_previous_target:
col_name = f'__mdb_ts_previous_{json_ai.problem_definition.target}'
json_ai.problem_definition.timeseries_settings.target_type = list(json_ai.outputs.values())[0].data_dtype
encoder_dict[col_name] = call(lookup_encoder(list(json_ai.outputs.values())[0].data_dtype,
Expand Down
4 changes: 3 additions & 1 deletion lightwood/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,13 @@ class TimeseriesSettings:
order_by: List[str] = None
window: int = None
group_by: List[str] = None
use_previous_target: bool = False
use_previous_target: bool = True
nr_predictions: int = None
historical_columns: List[str] = None
target_type: str = (
"" # @TODO: is the current setter (outside of initialization) a sane option?
)
allow_incomplete_history: bool = False

@staticmethod
def from_dict(obj: Dict):
Expand All @@ -241,6 +242,7 @@ def from_dict(obj: Dict):
use_previous_target=obj.get("use_previous_target", True),
historical_columns=[],
nr_predictions=obj.get("nr_predictions", 1),
allow_incomplete_history=obj.get('allow_incomplete_history', False)
)
for setting in obj:
timeseries_settings.__setattr__(setting, obj[setting])
Expand Down
15 changes: 11 additions & 4 deletions lightwood/data/timeseries_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ def transform_timeseries(
for oby in tss.order_by:
original_df[f'__mdb_original_{oby}'] = original_df[oby]

group_lengths = []
if len(gb_arr) > 0:
df_arr = []
for _, df in original_df.groupby(gb_arr):
df_arr.append(df.sort_values(by=ob_arr))
group_lengths.append(len(df))
else:
df_arr = [original_df]
group_lengths.append(len(original_df))

n_groups = len(df_arr)
last_index = original_df['original_index'].max()
for i, subdf in enumerate(df_arr):
if '__mdb_make_predictions' in subdf.columns and mode == 'predict':
Expand Down Expand Up @@ -115,7 +119,7 @@ def transform_timeseries(
pool.close()
pool.join()
else:
for i in range(len(df_arr)):
for i in range(n_groups):
df_arr[i] = _ts_to_obj(df_arr[i], historical_columns=ob_arr + tss.historical_columns)
df_arr[i] = _ts_order_col_to_cell_lists(df_arr[i], historical_columns=ob_arr + tss.historical_columns)
df_arr[i] = _ts_add_previous_rows(df_arr[i],
Expand All @@ -132,11 +136,14 @@ def transform_timeseries(
combined_df = pd.DataFrame(combined_df[combined_df['__mdb_make_predictions'].astype(bool).isin([True])])
del combined_df['__mdb_make_predictions']

if len(combined_df) == 0:
raise Exception(f'Not enough historical context to make a timeseries prediction. Please provide a number of rows greater or equal to the window size. If you can\'t get enough rows, consider lowering your window size. If you want to force timeseries predictions lacking historical context please set the `allow_incomplete_history` advanced argument to `True`, but this might lead to subpar predictions.') # noqa
if not infer_mode and any([i < tss.window for i in group_lengths]):
if tss.allow_incomplete_history:
log.warning("Forecasting with incomplete historical context, predictions might be subpar")
else:
raise Exception(f'Not enough historical context to make a timeseries prediction. Please provide a number of rows greater or equal to the window size. If you can\'t get enough rows, consider lowering your window size. If you want to force timeseries predictions lacking historical context please set the `allow_incomplete_history` timeseries setting to `True`, but this might lead to subpar predictions.') # noqa

df_gb_map = None
if len(df_arr) > 1: # @TODO: and (transaction.lmd['quick_learn'] or transaction.lmd['quick_predict']):
if n_groups > 1:
df_gb_list = list(combined_df.groupby(tss.group_by))
df_gb_map = {}
for gb, df in df_gb_list:
Expand Down
15 changes: 13 additions & 2 deletions tests/integration/advanced/test_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,28 @@ def test_0_time_series_grouped_regression(self):
target = 'Traffic'
order_by = 'T'
nr_preds = 2
window = 5
pred = predictor_from_problem(train,
ProblemDefinition.from_dict({'target': target,
'time_aim': 30,
'nsubsets': 10,
'anomaly_detection': True,
'timeseries_settings': {
'use_previous_target': True,
'allow_incomplete_history': True,
'group_by': ['Country'],
'nr_predictions': nr_preds,
'order_by': [order_by],
'window': 5
'window': window
}}))
pred.learn(train)
preds = pred.predict(test)
self.check_ts_prediction_df(preds, nr_preds, [order_by])

# test allowed incomplete history
preds = pred.predict(test[:window - 1])
self.check_ts_prediction_df(preds, nr_preds, [order_by])

# test inferring mode
test['__mdb_make_predictions'] = False
preds = pred.predict(test)
Expand All @@ -85,20 +91,25 @@ def test_1_time_series_regression(self):
target = 'Traffic'
order_by = 'T'
nr_preds = 2
window = 5
pred = predictor_from_problem(data,
ProblemDefinition.from_dict({'target': target,
'nsubsets': 10,
'anomaly_detection': False,
'timeseries_settings': {
'use_previous_target': False,
'allow_incomplete_history': False,
'nr_predictions': nr_preds,
'order_by': [order_by],
'window': 5}
'window': window}
}))
pred.learn(data)
preds = pred.predict(data[0:10])
self.check_ts_prediction_df(preds, nr_preds, [order_by])

# test incomplete history, should not be possible
self.assertRaises(Exception, pred.predict, test[:window - 1])

# test inferring mode
test['__mdb_make_predictions'] = False
preds = pred.predict(test)
Expand Down

0 comments on commit 6475193

Please sign in to comment.