forked from microsoft/qlib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow.py
305 lines (262 loc) · 11.8 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from pathlib import Path
from qlib.model.meta.task import MetaTask
from qlib.contrib.meta.data_selection.model import MetaModelDS
from qlib.contrib.meta.data_selection.dataset import InternalData, MetaDatasetDS
from qlib.data.dataset.handler import DataHandlerLP
import pandas as pd
import fire
import sys
import pickle
from typing import Optional
from qlib import auto_init
from qlib.model.trainer import TrainerR
from qlib.typehint import Literal
from qlib.utils import init_instance_by_config
from qlib.workflow import R
from qlib.tests.data import GetData
DIRNAME = Path(__file__).absolute().resolve().parent
sys.path.append(str(DIRNAME.parent / "baseline"))
from rolling_benchmark import RollingBenchmark # NOTE: sys.path is changed for import RollingBenchmark
class DDGDA:
"""
please run `python workflow.py run_all` to run the full workflow of the experiment
**NOTE**
before running the example, please clean your previous results with following command
- `rm -r mlruns`
"""
def __init__(
self,
sim_task_model: Literal["linear", "gbdt"] = "gbdt",
forecast_model: Literal["linear", "gbdt"] = "linear",
h_path: Optional[str] = None,
test_end: Optional[str] = None,
train_start: Optional[str] = None,
meta_1st_train_end: Optional[str] = None,
task_ext_conf: Optional[dict] = None,
alpha: float = 0.01,
proxy_hd: str = "handler_proxy.pkl",
):
"""
Parameters
----------
train_start: Optional[str]
the start datetime for data. It is used in training start time (for both tasks & meta learing)
test_end: Optional[str]
the end datetime for data. It is used in test end time
meta_1st_train_end: Optional[str]
the datetime of training end of the first meta_task
alpha: float
Setting the L2 regularization for ridge
The `alpha` is only passed to MetaModelDS (it is not passed to sim_task_model currently..)
"""
self.step = 20
# NOTE:
# the horizon must match the meaning in the base task template
self.horizon = 20
self.meta_exp_name = "DDG-DA"
self.sim_task_model = sim_task_model # The model to capture the distribution of data.
self.forecast_model = forecast_model # downstream forecasting models' type
self.rb_kwargs = {
"h_path": h_path,
"test_end": test_end,
"train_start": train_start,
"task_ext_conf": task_ext_conf,
}
self.alpha = alpha
self.meta_1st_train_end = meta_1st_train_end
self.proxy_hd = proxy_hd
def get_feature_importance(self):
# this must be lightGBM, because it needs to get the feature importance
rb = RollingBenchmark(model_type="gbdt", **self.rb_kwargs)
task = rb.basic_task()
with R.start(experiment_name="feature_importance"):
model = init_instance_by_config(task["model"])
dataset = init_instance_by_config(task["dataset"])
model.fit(dataset)
fi = model.get_feature_importance()
# Because the model use numpy instead of dataframe for training lightgbm
# So the we must use following extra steps to get the right feature importance
df = dataset.prepare(segments=slice(None), col_set="feature", data_key=DataHandlerLP.DK_R)
cols = df.columns
fi_named = {cols[int(k.split("_")[1])]: imp for k, imp in fi.to_dict().items()}
return pd.Series(fi_named)
def dump_data_for_proxy_model(self):
"""
Dump data for training meta model.
The meta model will be trained upon the proxy forecasting model.
This dataset is for the proxy forecasting model.
"""
topk = 30
fi = self.get_feature_importance()
col_selected = fi.nlargest(topk)
rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs)
task = rb.basic_task()
dataset = init_instance_by_config(task["dataset"])
prep_ds = dataset.prepare(slice(None), col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
feature_df = prep_ds["feature"]
label_df = prep_ds["label"]
feature_selected = feature_df.loc[:, col_selected.index]
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
lambda df: (df - df.mean()).div(df.std())
)
feature_selected = feature_selected.fillna(0.0)
df_all = {
"label": label_df.reindex(feature_selected.index),
"feature": feature_selected,
}
df_all = pd.concat(df_all, axis=1)
df_all.to_pickle(DIRNAME / "fea_label_df.pkl")
# dump data in handler format for aligning the interface
handler = DataHandlerLP(
data_loader={
"class": "qlib.data.dataset.loader.StaticDataLoader",
"kwargs": {"config": DIRNAME / "fea_label_df.pkl"},
}
)
handler.to_pickle(DIRNAME / self.proxy_hd, dump_all=True)
@property
def _internal_data_path(self):
return DIRNAME / f"internal_data_s{self.step}.pkl"
def dump_meta_ipt(self):
"""
Dump data for training meta model.
This function will dump the input data for meta model
"""
# According to the experiments, the choice of the model type is very important for achieving good results
rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs)
sim_task = rb.basic_task()
if self.sim_task_model == "gbdt":
sim_task["model"].setdefault("kwargs", {}).update({"early_stopping_rounds": None, "num_boost_round": 150})
exp_name_sim = f"data_sim_s{self.step}"
internal_data = InternalData(sim_task, self.step, exp_name=exp_name_sim)
internal_data.setup(trainer=TrainerR)
with self._internal_data_path.open("wb") as f:
pickle.dump(internal_data, f)
def train_meta_model(self, fill_method="max"):
"""
training a meta model based on a simplified linear proxy model;
"""
# 1) leverage the simplified proxy forecasting model to train meta model.
# - Only the dataset part is important, in current version of meta model will integrate the
rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs)
sim_task = rb.basic_task()
# the train_start for training meta model does not necessarily align with final rolling
train_start = "2008-01-01" if self.rb_kwargs.get("train_start") is None else self.rb_kwargs.get("train_start")
train_end = "2010-12-31" if self.meta_1st_train_end is None else self.meta_1st_train_end
test_start = (pd.Timestamp(train_end) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
proxy_forecast_model_task = {
# "model": "qlib.contrib.model.linear.LinearModel",
"dataset": {
"class": "qlib.data.dataset.DatasetH",
"kwargs": {
"handler": f"file://{(DIRNAME / self.proxy_hd).absolute()}",
"segments": {
"train": (train_start, train_end),
"test": (test_start, sim_task["dataset"]["kwargs"]["segments"]["test"][1]),
},
},
},
# "record": ["qlib.workflow.record_temp.SignalRecord"]
}
# the proxy_forecast_model_task will be used to create meta tasks.
# The test date of first task will be 2011-01-01. Each test segment will be about 20days
# The tasks include all training tasks and test tasks.
# 2) preparing meta dataset
kwargs = dict(
task_tpl=proxy_forecast_model_task,
step=self.step,
segments=0.62, # keep test period consistent with the dataset yaml
trunc_days=1 + self.horizon,
hist_step_n=30,
fill_method=fill_method,
rolling_ext_days=0,
)
# NOTE:
# the input of meta model (internal data) are shared between proxy model and final forecasting model
# but their task test segment are not aligned! It worked in my previous experiment.
# So the misalignment will not affect the effectiveness of the method.
with self._internal_data_path.open("rb") as f:
internal_data = pickle.load(f)
md = MetaDatasetDS(exp_name=internal_data, **kwargs)
# 3) train and logging meta model
with R.start(experiment_name=self.meta_exp_name):
R.log_params(**kwargs)
mm = MetaModelDS(
step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=30, seed=43, alpha=self.alpha
)
mm.fit(md)
R.save_objects(model=mm)
@property
def _task_path(self):
return DIRNAME / f"tasks_s{self.step}.pkl"
def meta_inference(self):
"""
Leverage meta-model for inference:
- Given
- baseline tasks
- input for meta model(internal data)
- meta model (its learnt knowledge on proxy forecasting model is expected to transfer to normal forecasting model)
"""
# 1) get meta model
exp = R.get_exp(experiment_name=self.meta_exp_name)
rec = exp.list_recorders(rtype=exp.RT_L)[0]
meta_model: MetaModelDS = rec.load_object("model")
# 2)
# we are transfer to knowledge of meta model to final forecasting tasks.
# Create MetaTaskDataset for the final forecasting tasks
# Aligning the setting of it to the MetaTaskDataset when training Meta model is necessary
# 2.1) get previous config
param = rec.list_params()
trunc_days = int(param["trunc_days"])
step = int(param["step"])
hist_step_n = int(param["hist_step_n"])
fill_method = param.get("fill_method", "max")
rb = RollingBenchmark(model_type=self.forecast_model, **self.rb_kwargs)
task_l = rb.create_rolling_tasks()
# 2.2) create meta dataset for final dataset
kwargs = dict(
task_tpl=task_l,
step=step,
segments=0.0, # all the tasks are for testing
trunc_days=trunc_days,
hist_step_n=hist_step_n,
fill_method=fill_method,
task_mode=MetaTask.PROC_MODE_TRANSFER,
)
with self._internal_data_path.open("rb") as f:
internal_data = pickle.load(f)
mds = MetaDatasetDS(exp_name=internal_data, **kwargs)
# 3) meta model make inference and get new qlib task
new_tasks = meta_model.inference(mds)
with self._task_path.open("wb") as f:
pickle.dump(new_tasks, f)
def train_and_eval_tasks(self):
"""
Training the tasks generated by meta model
Then evaluate it
"""
with self._task_path.open("rb") as f:
tasks = pickle.load(f)
rb = RollingBenchmark(rolling_exp="rolling_ds", model_type=self.forecast_model, **self.rb_kwargs)
rb.train_rolling_tasks(tasks)
rb.ens_rolling()
rb.update_rolling_rec()
def run_all(self):
# 1) file: handler_proxy.pkl (self.proxy_hd)
self.dump_data_for_proxy_model()
# 2)
# file: internal_data_s20.pkl
# mlflow: data_sim_s20, models for calculating meta_ipt
self.dump_meta_ipt()
# 3) meta model will be stored in `DDG-DA`
self.train_meta_model()
# 4) new_tasks are saved in "tasks_s20.pkl" (reweighter is added)
self.meta_inference()
# 5) load the saved tasks and train model
self.train_and_eval_tasks()
if __name__ == "__main__":
GetData().qlib_data(exists_skip=True)
auto_init()
fire.Fire(DDGDA)