-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBasicModel.py
154 lines (116 loc) · 5.38 KB
/
BasicModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
Class file to create the basic classifier model
"""
import matplotlib.pyplot as plt
import tensorflow.keras as keras
import numpy as np
from sklearn.metrics import confusion_matrix
class BasicModel:
def __init__(self, num_input):
in_ = keras.layers.Input(shape=(num_input+1,))
fc1 = keras.layers.Dense(num_input // 2, activation="relu")(in_)
fc2 = keras.layers.Dense(num_input // 4, activation="relu")(fc1)
fc3 = keras.layers.Dense(num_input // 8, activation="relu")(fc2)
out = keras.layers.Dense(1, activation="sigmoid")(fc3)
classifier = keras.models.Model(in_, out, name='classifier_model')
classifier.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
self.model = classifier
# Variables for tracking information
self.epoch_loss = np.array([])
self.epoch_accuracy = np.array([])
self.num_trains = 0
def display_models(self):
print("Model:")
print(self.model.summary())
def train(self, data, protected, labels, batch_size=128):
self.num_trains += 1
num_batches = data.shape[0] // batch_size
batch_loss = np.array([])
batch_accuracy = np.array([])
for batch_idx in range(num_batches):
idx = np.random.randint(0, data.shape[0], batch_size) # Look at better batching
# get values from data
x = data[idx]
y = labels[idx]
z = protected[idx]
# Since only interested in classifier, we can use protected
z = np.reshape(z, (len(z), 1))
x = np.append(x, z, 1)
loss, acc = self.model.train_on_batch(x, y)
batch_loss = np.append(batch_loss, loss)
batch_accuracy = np.append(batch_accuracy, acc)
loss = batch_loss.mean()
acc = batch_accuracy.mean()
self.__update_epoch_vars(loss, acc)
def test(self, data, protected, labels, batch_size=128):
protected = np.reshape(protected, (len(protected), 1))
data = np.append(data, protected, 1)
results = self.model.evaluate(data, labels, batch_size=batch_size)
return results
def confusion_matrix(self, data, protected, labels, batch_size=128):
cms = self.__gender_confusion_matrix(data.copy(), protected.copy(), labels.copy())
protected = np.reshape(protected, (len(protected), 1))
data = np.append(data, protected, 1)
raw_prediction = self.model.predict(data, batch_size=batch_size)
predictions = np.where(raw_prediction > .5, 1, 0)
labels = np.reshape(labels, (len(labels), 1))
cm = confusion_matrix(labels, predictions)
return np.append(cm.flatten() / labels.shape[0], cms).flatten()
def __gender_confusion_matrix(self, data, protected, labels, batch_size=128):
male_idx = protected == 0
female_idx = protected == 1
male_data = data[male_idx]
male_protected = protected[male_idx]
male_labels = labels[male_idx]
female_data = data[female_idx]
female_protected = protected[female_idx]
female_labels = labels[female_idx]
male_protected = np.reshape(male_protected, (len(male_protected), 1))
female_protected = np.reshape(female_protected, (len(female_protected), 1))
male_data = np.append(male_data, male_protected, 1)
female_data = np.append(female_data, female_protected, 1)
raw_male = self.model.predict(male_data, batch_size=batch_size)
raw_female = self.model.predict(female_data, batch_size=batch_size)
male_predictions = np.where(raw_male > .5, 1, 0)
female_predictions = np.where(raw_female > .5, 1, 0)
cm_male = confusion_matrix(male_labels, male_predictions)
cm_female = confusion_matrix(female_labels, female_predictions)
cms = np.append(cm_male.flatten() / male_data.shape[0],
cm_female.flatten() / female_data.shape[0]).flatten()
return cms
def __update_epoch_vars(self, loss, acc):
self.epoch_loss = np.append(self.epoch_loss, loss)
self.epoch_accuracy = np.append(self.epoch_accuracy, acc)
@staticmethod
def __format_data(data, labels, idxs):
print(data.shape)
print(labels.shape)
print(idxs.shape)
print(data[idxs])
# print(labels[idxs])
return data[idxs, :], labels[idxs]
@staticmethod
def __reshape_1d(arr):
return np.reshape(arr, (len(arr), 1))
def create_figs(self, epoch, folder):
# Loss figure
loss_path = folder + 'basic_model_loss_epoch{}.png'.format(epoch)
plt.figure(figsize=(10, 15))
plt.plot(self.epoch_loss)
plt.title("Basic Model Loss for {} Epochs".format(epoch))
plt.savefig(loss_path)
plt.close()
# Accuracy figure
acc_path = folder + 'basic_model_acc_epoch{}.png'.format(epoch)
plt.figure(figsize=(10, 15))
plt.plot(self.epoch_accuracy)
plt.title("Basic Model Accuracy for {} Epochs".format(epoch))
plt.savefig(acc_path)
plt.close()
def model_save(self, path, epoch):
self.model.save(path+'basic_at_{}.h5'.format(epoch))
@staticmethod
def result_graph_info():
result_names = ["Basic Model Loss", "Basic Model Accuracy"]
plot_paths = ['overall_loss.png', 'overall_acc.png']
return result_names, plot_paths