-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
345 lines (293 loc) · 17.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
from optparse import OptionParser
import multiprocessing as mp
from pathlib import Path
from time import sleep
import uuid
import os
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier as KNN
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.dummy import DummyClassifier
from sklearn.calibration import CalibratedClassifierCV
from dataset import build_datasets, TimeSeriesDataset
from trainer import ScikitModelTrainer
from utils import OptionWithModel, align_data, compute_consensus, encode_results, save_results
from constants import ConsensusBaseline, DataSplit, Model, ReportDict, Metric
from constants import CONSENSUS_BASELINES, POLLING_RATE, CLR_KEYS
def fit_single_model(model_trainer: ScikitModelTrainer, dataset: TimeSeriesDataset, report_dict: ReportDict, replication: int = None):
"""
Fit a single model on the provided dataset and report results
:param model_trainer: model trainer to use
:param dataset: dataset to fit to
:param report_dict: dictionary to save results in
:param replication: replication identifier for this experiment (None indicates unreplicated)
"""
print(f'Fitting {model_trainer.name} on {dataset.name}'
f'{" using GridSearchCV" if model_trainer.use_grid_search else ""}'
f' ({f"Replication {replication}, " if replication is not None else ""}PID {os.getpid()})...')
# Train/fit provided model trainer on the provided dataset
clf = model_trainer.train(dataset.X_train, dataset.y_train)
# https://stackoverflow.com/questions/26478000/converting-linearsvcs-decision-function-to-probabilities-scikit-learn-python
if model_trainer.name == Model.SUPPORT_VECTOR_MACHINE: # Workaround for LinearSVC not implementing predict_proba
clf = CalibratedClassifierCV(clf, cv='prefit')
clf.fit(dataset.X_test, dataset.y_test)
predicted_y_train = clf.predict(dataset.X_train) # Get prediction of fitted model on training set (in-sample)
predicted_y_test = clf.predict(dataset.X_test) # Get prediction of fitted model on test set (out-sample)
y_train_score = clf.predict_proba(dataset.X_train) # Get probabilities for predictions on training set (for ROC)
y_test_score = clf.predict_proba(dataset.X_test) # Get probabilities for predictions on test set (for ROC)
train_clr = classification_report(dataset.y_train, predicted_y_train,
zero_division=0, output_dict=True)
test_clr = classification_report(dataset.y_test, predicted_y_test,
zero_division=0, output_dict=True)
train_roc_auc = roc_auc_score(dataset.y_train, y_train_score[:, -1], average='macro')
test_roc_auc = roc_auc_score(dataset.y_test, y_test_score[:, -1], average='macro')
# Update report dictionary with results
report_dict[model_trainer.name][dataset.name] = {
DataSplit.TRAIN: {metric: (
train_roc_auc if metric == Metric.ROC_AUC
else train_clr[keys[0]][keys[1]] if isinstance(keys, tuple)
else train_clr[keys])
for metric, keys in CLR_KEYS.items()},
DataSplit.TEST: {metric: (
test_roc_auc if metric == Metric.ROC_AUC
else test_clr[keys[0]][keys[1]] if isinstance(keys, tuple)
else test_clr[keys])
for metric, keys in CLR_KEYS.items()}
}
print(f'Done fitting {model_trainer.name} on {dataset.name} '
f'({f"Replication {replication}, " if replication is not None else ""}PID {os.getpid()})')
def fit_consensus_baseline(dataset_list: list[TimeSeriesDataset],
report_dict: dict[str, dict],
baseline: ConsensusBaseline,
replication: int = None):
"""
Fit and record results for the Consensus Baseline (Previous Baseline is period=1)
:param dataset_list: list of datasets to fit on
:param report_dict: dictionary to save reports to
:param baseline: desired baseline
:param replication: replication identifier for this experiment (None indicates unreplicated)
"""
print(f'Generating {baseline}{f" (Replication {replication}, PID {os.getpid()})" if replication is not None else ""}...')
report_dict[baseline] = {}
for dataset in dataset_list:
# set period based on desired baseline (Consensus uses existing dataset's period, Previous uses 1)
period = dataset.period if baseline == 'ConsensusBaseline' else 1
# generate consensus predictions
train_consensus = compute_consensus(dataset.y_train.shift(1).iloc[1:], period)
test_consensus = compute_consensus(dataset.y_test.shift(1).iloc[1:], period)
train_clr = classification_report(*align_data(train_consensus, dataset.y_train),
zero_division=0, output_dict=True)
test_clr = classification_report(*align_data(test_consensus, dataset.y_test),
zero_division=0, output_dict=True)
report_dict[baseline][dataset.name] = {
DataSplit.TRAIN: {metric: (
np.NaN if metric == Metric.ROC_AUC
else train_clr[keys[0]][keys[1]] if isinstance(keys, tuple)
else train_clr[keys])
for metric, keys in CLR_KEYS.items()},
DataSplit.TEST: {metric: (
np.NaN if metric == Metric.ROC_AUC
else test_clr[keys[0]][keys[1]] if isinstance(keys, tuple)
else test_clr[keys])
for metric, keys in CLR_KEYS.items()}
}
print(f'Done generating {baseline}{f" (Replication {replication}, PID {os.getpid()})" if replication is not None else ""}')
def start_new_model_process(model_trainer: ScikitModelTrainer,
dataset: TimeSeriesDataset,
process_list: list[mp.Process],
reports: ReportDict,
replication: int = None):
"""
Starts a new model-fitting process
NOTE: adds the newly created process to process_list
:param model_trainer: ScikitModelTrainer to use
:param dataset: dataset to fit model on
:param process_list: list of processes to add to
:param reports: dictionary of reports to save results in
:param replication: replication identifier for this experiment (None indicates unreplicated)
"""
# Create process to fit a single model
new_process = mp.Process(target=fit_single_model,
args=(model_trainer, dataset, reports, replication),
daemon=True)
process_list.append(new_process) # add process to process list
new_process.start() # start process
def wait_for_processes(process_list: list[mp.Process], wait_threshold: int, polling_rate: int):
"""
Wait until the number of outstanding processes is below a specified amount
NOTE: closes any processes that terminate and removes them from process_list
NOTE: WILL ALWAYS CHECK FOR AND CLOSE ANY DEAD PROCESSES AT LEAST ONCE
:param process_list: list of processes to wait on
:param wait_threshold: minimum number of processes require waiting (never waits if wait_threshold is -1)
:param polling_rate: rate (in seconds) to poll if any processes terminated
"""
wait = True # flag to continue waiting. Used to emulate a do-while loop
while wait: # While we need to wait for processes to finish
for process in process_list: # for each remaining process
if not process.is_alive(): # if process finished
print(f'Closing process {process.pid}')
process.close() # release resources
process_list.remove(process) # remove process from process list
# update wait flag based on number of active processes (always False if threshold is -1)
wait = len(process_list) >= wait_threshold != -1
if wait: # sleep if we still need to wait
sleep(polling_rate)
def run_experiment(model_params: dict[Model, dict], datasets: list[TimeSeriesDataset], selected_model: Model,
processes: int, out_dir: str, replication: int = None):
"""
Run a single experiment replication
:param model_params: model parameters to use
:param datasets: datasets to run experiment on
:param selected_model: specific model to test (test all if None)
:param processes: maximum number of concurrent processes to use
:param out_dir: directory to save results to
:param replication: replication identifier for this experiment (None indicates unreplicated)
"""
print(f'Beginning experiment{f" (Replication {replication}, PID {os.getpid()})" if replication is not None else ""}...')
# Specific model selected
if selected_model is not None:
if selected_model in CONSENSUS_BASELINES: # consensus model requires manual calculation, no parameters needed
model_params = {}
else: # only provide parameters for desired model
model_params = {selected_model: model_params[selected_model]}
# Dictionary which stores result data from experiments
reports = {} # ReportDict (see constants.py), organized as reports[Model][dataset name][DataSplit][Metric]
process_list = [] # List of processes (used for multiprocessing)
# Model experimentation
for model, trainer_params in model_params.items(): # For each model
trainer = ScikitModelTrainer(**trainer_params, name=model) # Initialize a trainer for the model
reports[trainer.name] = mp.Manager().dict() # Initialize dictionary for reports associated with model
for data in datasets: # For each dataset
if processes is None: # multiprocessing not requested
fit_single_model(trainer, data, reports, replication) # Fit a single model in the current process
else: # multiprocessing enabled
wait_for_processes(process_list, processes,
POLLING_RATE) # Wait for acceptable number of running processes
start_new_model_process(trainer, data, process_list, reports, replication) # Start a new fitting process
wait_for_processes(process_list, 1, POLLING_RATE) # Wait for any still-running processes to terminate
# Consensus baselines
if selected_model is None: # no model selected
for baseline in CONSENSUS_BASELINES: # fit all consensus baselines
fit_consensus_baseline(datasets, reports, baseline, replication)
elif selected_model in CONSENSUS_BASELINES: # consensus baseline selected as model
fit_consensus_baseline(datasets, reports, selected_model, replication) # fit desired baseline
# RESULT REPORTING
results = encode_results(reports)
# print(results)
# Save metrics
save_results(results, selected_model, out_dir=out_dir, prefix=replication)
print(f'Experiment completed!{f" (Replication {replication}, PID {os.getpid()})" if replication is not None else ""}')
def get_model_trainer_params(n_jobs: int = -1):
"""
Initialize model trainer parameters
:param n_jobs: n_jobs parameter to pass to models which support parallel processing
:return: dictionary of model trainer parameters
"""
return {
Model.DECISION_TREE: dict(estimator=DecisionTreeClassifier(),
param_grid=dict(splitter=['best', 'random'],
max_depth=[5, 10, 25, None],
min_samples_split=[2, 5, 10, 50],
min_samples_leaf=[1, 5, 10]),
n_jobs=n_jobs),
Model.RANDOM_FOREST: dict(estimator=RandomForestClassifier(n_jobs=n_jobs),
param_grid=dict(n_estimators=[50, 100, 500],
criterion=['gini', 'entropy'],
max_depth=[5, 10, 25, None],
min_samples_split=[2, 5, 10, 50],
min_samples_leaf=[1, 5, 10]),
n_jobs=n_jobs),
Model.LOGISTIC_REGRESSION: dict(estimator=LogisticRegression(max_iter=1e4),
param_grid=dict(penalty=['l1', 'l2'],
C=np.logspace(-3, 3, 7),
solver=['newton-cg', 'lbfgs', 'liblinear']),
error_score=0,
n_jobs=n_jobs),
Model.SUPPORT_VECTOR_MACHINE: dict(estimator=LinearSVC(max_iter=1e6),
param_grid=dict(penalty=['l1', 'l2'],
C=[1, 4, 9, 16, 25],
loss=['hinge', 'squared_hinge']),
error_score=0,
n_jobs=n_jobs),
Model.K_NEAREST_NEIGHBORS: dict(estimator=KNN(n_jobs=n_jobs),
param_grid=dict(n_neighbors=[5, 10, 15, 20],
weights=['uniform', 'distance'],
metric=['l1', 'l2', 'cosine']),
n_jobs=n_jobs),
Model.CONSTANT_BASELINE: dict(estimator=DummyClassifier(strategy='prior')),
Model.RANDOM_BASELINE: dict(estimator=DummyClassifier(strategy='uniform', random_state=0)),
}
def initialize_option_parser():
"""
Initializes the option parser for the main script
:return: the option parser
"""
parser = OptionParser(option_class=OptionWithModel)
parser.add_option('-p', '--processes',
action='store',
type='int',
dest='processes',
help='Number of processes to use PER REPLICATION. '
'Unlimited processes if no number is provided. '
'1 process per replication if option is not present. ')
parser.add_option('-m', '--model',
action='store',
type='model_name',
dest='model',
help='Singular model to train')
parser.add_option('-o', '--out-dir',
action='store',
type='str',
default='./out',
dest='out_dir',
help='Path to directory to save output files to')
parser.add_option('-u', '--use-uuid',
action='store_true',
default=False,
dest='use_uuid',
help='Appends a unique identifier the output directory')
parser.add_option('-r', '--replications',
action='store',
type='int',
dest='replications',
help='Number of replications to perform. '
'Note: Each replication is given a dedicated process. '
'As a result, the n_jobs parameter for supported scikit-learn models will be set to 1. '
'Therefore combining this option with -p/--processes is recommended.')
return parser
if __name__ == '__main__':
# Initialize option parser for optional multiprocessing parameter
parser = initialize_option_parser()
options, _ = parser.parse_args()
if options.use_uuid:
options.out_dir += rf'_{uuid.uuid4()}'
print(f'Unique output directory requested. Using {options.out_dir}')
Path(options.out_dir).mkdir(parents=True, exist_ok=True) # create output directory if it doesn't exist
# n_jobs parameter for scikit-learn models (must be 1 when using multiprocessing)
n_jobs = 1 if options.processes is not None or options.replications is not None else -1
# Initialize estimators and parameters to use for experiments
model_params = get_model_trainer_params(n_jobs)
# Construct datasets to experiment on
datasets = build_datasets(period=5,
rand_features=5,
test_size=0.2,
zero_col_thresh=0.25,
replace_zero=-1)
if options.replications is None: # no replications requested
run_experiment(model_params, datasets, options.model, options.processes, options.out_dir)
else: # replication requested
replication_processes = [] # list of processes for each replication
for replication in range(options.replications): # for each replication
new_experiment = mp.Process(target=run_experiment, # create a process for the replication
args=(model_params, datasets, options.model, options.processes,
options.out_dir, replication),
daemon=False)
replication_processes.append(new_experiment) # add replication process to list
new_experiment.start() # run replication
# wait for ALL replications to complete
wait_for_processes(replication_processes, 1, POLLING_RATE)
print('All experiments completed!\nExiting...')