forked from chen0040/keras-anomaly-detection
-
Notifications
You must be signed in to change notification settings - Fork 0
/
convolutional.py
104 lines (82 loc) · 4.01 KB
/
convolutional.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
'''
Based on work by Xianshun Chen
https://github.com/chen0040/keras-anomaly-detection
'''
from keras.layers import Conv1D, GlobalMaxPool1D, Dense, Flatten
from keras.models import Sequential
from keras.callbacks import ModelCheckpoint
import numpy as np
class Conv1DAutoEncoder(object):
model_name = 'con1d-auto-encoder'
VERBOSE = 1
def __init__(self):
self.model = None
self.time_window_size = None
self.metric = None
self.threshold = 5.0
self.config = None
@staticmethod
def create_model(time_window_size, metric):
model = Sequential()
model.add(Conv1D(filters=256, kernel_size=5, padding='same', activation='relu',
input_shape=(time_window_size, 1)))
model.add(GlobalMaxPool1D())
model.add(Dense(units=time_window_size, activation='linear'))
model.compile(optimizer='adam', loss='mean_squared_error', metrics=[metric])
print(model.summary())
return model
@staticmethod
def get_config_file(model_dir_path):
return model_dir_path + '/' + Conv1DAutoEncoder.model_name + '-config.npy'
@staticmethod
def get_weight_file(model_dir_path):
return model_dir_path + '/' + Conv1DAutoEncoder.model_name + '-weights.h5'
@staticmethod
def get_architecture_file(model_dir_path):
return model_dir_path + '/' + Conv1DAutoEncoder.model_name + '-architecture.json'
def load_model(self, model_dir_path):
config_file_path = self.get_config_file(model_dir_path)
self.config = np.load(config_file_path).item()
self.metric = self.config['metric']
self.time_window_size = self.config['time_window_size']
self.threshold = self.config['threshold']
self.model = self.create_model(self.time_window_size, self.metric)
weight_file_path = self.get_weight_file(model_dir_path)
self.model.load_weights(weight_file_path)
def fit(self, dataset, model_dir_path, batch_size=8, epochs=100, validation_split=0.1, metric='mean_absolute_error',
estimated_negative_sample_ratio=0.9):
self.time_window_size = dataset.shape[1]
self.metric = metric
input_timeseries_dataset = np.expand_dims(dataset, axis=2)
weight_file_path = self.get_weight_file(model_dir_path=model_dir_path)
architecture_file_path = self.get_architecture_file(model_dir_path)
checkpoint = ModelCheckpoint(weight_file_path)
self.model = self.create_model(self.time_window_size, metric=self.metric)
open(architecture_file_path, 'w').write(self.model.to_json())
history = self.model.fit(x=input_timeseries_dataset, y=dataset,
batch_size=batch_size, epochs=epochs,
verbose=self.VERBOSE, validation_split=validation_split,
callbacks=[checkpoint]).history
self.model.save_weights(weight_file_path)
scores = self.predict(dataset)
scores.sort()
cut_point = int(estimated_negative_sample_ratio * len(scores))
self.threshold = scores[cut_point]
print('estimated threshold is ' + str(self.threshold))
self.config = dict()
self.config['time_window_size'] = self.time_window_size
self.config['metric'] = self.metric
self.config['threshold'] = self.threshold
config_file_path = self.get_config_file(model_dir_path=model_dir_path)
np.save(config_file_path, self.config)
return history
def predict(self, timeseries_dataset):
input_timeseries_dataset = np.expand_dims(timeseries_dataset, axis=2)
target_timeseries_dataset = self.model.predict(x=input_timeseries_dataset)
dist = np.linalg.norm(timeseries_dataset - target_timeseries_dataset, axis=-1)
return dist
def anomaly(self, timeseries_dataset, threshold=None):
if threshold is not None:
self.threshold = threshold
dist = self.predict(timeseries_dataset)
return zip(dist >= self.threshold, dist)