-
Notifications
You must be signed in to change notification settings - Fork 52
/
main.py
187 lines (154 loc) · 7.13 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import numpy
import pandas
import matplotlib.pyplot as plt
from keras.layers import Dense, LSTM
from keras.models import Sequential
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from tqdm import trange
# fix random seed for reproducibility
numpy.random.seed(7)
def load_dataset(datasource: str) -> (numpy.ndarray, MinMaxScaler):
"""
The function loads dataset from given file name and uses MinMaxScaler to transform data
:param datasource: file name of data source
:return: tuple of dataset and the used MinMaxScaler
"""
# load the dataset
dataframe = pandas.read_csv(datasource, usecols=[1])
dataframe = dataframe.fillna(method='pad')
dataset = dataframe.values
dataset = dataset.astype('float32')
plt.plot(dataset)
plt.show()
# normalize the dataset
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)
return dataset, scaler
def create_dataset(dataset: numpy.ndarray, look_back: int=1) -> (numpy.ndarray, numpy.ndarray):
"""
The function takes two arguments: the `dataset`, which is a NumPy array that we want to convert into a dataset,
and the `look_back`, which is the number of previous time steps to use as input variables
to predict the next time period — in this case defaulted to 1.
:param dataset: numpy dataset
:param look_back: number of previous time steps as int
:return: tuple of input and output dataset
"""
data_x, data_y = [], []
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), 0]
data_x.append(a)
data_y.append(dataset[i + look_back, 0])
return numpy.array(data_x), numpy.array(data_y)
def split_dataset(dataset: numpy.ndarray, train_size, look_back) -> (numpy.ndarray, numpy.ndarray):
"""
Splits dataset into training and test datasets. The last `look_back` rows in train dataset
will be used as `look_back` for the test dataset.
:param dataset: source dataset
:param train_size: specifies the train data size
:param look_back: number of previous time steps as int
:return: tuple of training data and test dataset
"""
if not train_size > look_back:
raise ValueError('train_size must be lager than look_back')
train, test = dataset[0:train_size, :], dataset[train_size - look_back:len(dataset), :]
print('train_dataset: {}, test_dataset: {}'.format(len(train), len(test)))
return train, test
def build_model(look_back: int, batch_size: int=1) -> Sequential:
"""
The function builds a keras Sequential model
:param look_back: number of previous time steps as int
:param batch_size: batch_size as int, defaults to 1
:return: keras Sequential model
"""
model = Sequential()
model.add(LSTM(64,
activation='relu',
batch_input_shape=(batch_size, look_back, 1),
stateful=True,
return_sequences=False))
model.add(Dense(1, activation='linear'))
model.compile(loss='mean_squared_error', optimizer='adam')
return model
def plot_data(dataset: numpy.ndarray,
look_back: int,
train_predict: numpy.ndarray,
test_predict: numpy.ndarray,
forecast_predict: numpy.ndarray):
"""
Plots baseline and predictions.
blue: baseline
green: prediction with training data
red: prediction with test data
cyan: prediction based on predictions
:param dataset: dataset used for predictions
:param look_back: number of previous time steps as int
:param train_predict: predicted values based on training data
:param test_predict: predicted values based on test data
:param forecast_predict: predicted values based on previous predictions
:return: None
"""
plt.plot(dataset)
plt.plot([None for _ in range(look_back)] +
[x for x in train_predict])
plt.plot([None for _ in range(look_back)] +
[None for _ in train_predict] +
[x for x in test_predict])
plt.plot([None for _ in range(look_back)] +
[None for _ in train_predict] +
[None for _ in test_predict] +
[x for x in forecast_predict])
plt.show()
def make_forecast(model: Sequential, look_back_buffer: numpy.ndarray, timesteps: int=1, batch_size: int=1):
forecast_predict = numpy.empty((0, 1), dtype=numpy.float32)
for _ in trange(timesteps, desc='predicting data\t', mininterval=1.0):
# make prediction with current lookback buffer
cur_predict = model.predict(look_back_buffer, batch_size)
# add prediction to result
forecast_predict = numpy.concatenate([forecast_predict, cur_predict], axis=0)
# add new axis to prediction to make it suitable as input
cur_predict = numpy.reshape(cur_predict, (cur_predict.shape[1], cur_predict.shape[0], 1))
# remove oldest prediction from buffer
look_back_buffer = numpy.delete(look_back_buffer, 0, axis=1)
# concat buffer with newest prediction
look_back_buffer = numpy.concatenate([look_back_buffer, cur_predict], axis=1)
return forecast_predict
def main():
datasource = 'international-airline-passengers.csv'
dataset, scaler = load_dataset(datasource)
# split into train and test sets
look_back = int(len(dataset) * 0.20)
train_size = int(len(dataset) * 0.70)
train, test = split_dataset(dataset, train_size, look_back)
# reshape into X=t and Y=t+1
train_x, train_y = create_dataset(train, look_back)
test_x, test_y = create_dataset(test, look_back)
# reshape input to be [samples, time steps, features]
train_x = numpy.reshape(train_x, (train_x.shape[0], train_x.shape[1], 1))
test_x = numpy.reshape(test_x, (test_x.shape[0], test_x.shape[1], 1))
# create and fit Multilayer Perceptron model
batch_size = 1
model = build_model(look_back, batch_size=batch_size)
for _ in trange(100, desc='fitting model\t', mininterval=1.0):
model.fit(train_x, train_y, nb_epoch=1, batch_size=batch_size, verbose=0, shuffle=False)
model.reset_states()
# generate predictions for training
train_predict = model.predict(train_x, batch_size)
test_predict = model.predict(test_x, batch_size)
# generate forecast predictions
forecast_predict = make_forecast(model, test_x[-1::], timesteps=100, batch_size=batch_size)
# invert dataset and predictions
dataset = scaler.inverse_transform(dataset)
train_predict = scaler.inverse_transform(train_predict)
train_y = scaler.inverse_transform([train_y])
test_predict = scaler.inverse_transform(test_predict)
test_y = scaler.inverse_transform([test_y])
forecast_predict = scaler.inverse_transform(forecast_predict)
# calculate root mean squared error
train_score = numpy.sqrt(mean_squared_error(train_y[0], train_predict[:, 0]))
print('Train Score: %.2f RMSE' % train_score)
test_score = numpy.sqrt(mean_squared_error(test_y[0], test_predict[:, 0]))
print('Test Score: %.2f RMSE' % test_score)
plot_data(dataset, look_back, train_predict, test_predict, forecast_predict)
if __name__ == '__main__':
main()