-
Notifications
You must be signed in to change notification settings - Fork 1
/
poisoning.py
242 lines (196 loc) · 9.51 KB
/
poisoning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
from typing import Optional, Tuple
import numpy as np
from tqdm.auto import tqdm
from attacks.attack import PoisoningAttackWhiteBox
from models.model import ScikitlearnClassifierSVC
# from art.utils import compute_success
logger = logging.getLogger(__name__)
class PoisoningAttackSVM(PoisoningAttackWhiteBox):
"""
Close implementation of poisoning attack on Support Vector Machines (SVM) by Biggio et al.
"""
# attack_params = PoisoningAttackWhiteBox.attack_params + [
# "classifier",
# "step",
# "eps",
# "x_train",
# "y_train",
# "x_val",
# "y_val",
# "verbose",
# ]
_estimator_requirements = (ScikitlearnClassifierSVC,)
def __init__(self,classifier,step,eps,x_train,y_train,x_val,y_val,max_iter):
self.classifier= classifier
self.step=0.001
self.eps=1.0
self.x_train = x_train
self.y_train=y_train
self.x_val=x_val
self.y_val=y_val
self.max_iter=100
# ) -> None:
"""
Initialize an SVM poisoning attack.
:param classifier: A trained :class:`.ScikitlearnSVC` classifier.
:param step: The step size of the classifier.
:param eps: The minimum difference in loss before convergence of the classifier.
:param x_train: The training data used for classification.
:param y_train: The training labels used for classification.
:param x_val: The validation data used to test the attack.
:param y_val: The validation labels used to test the attack.
:param max_iter: The maximum number of iterations for the attack.
:raises `NotImplementedError`, `TypeError`: If the argument classifier has the wrong type.
:param verbose: Show progress bars.
"""
# pylint: disable=W0212
from sklearn.svm import LinearSVC, SVC
super().__init__(classifier=classifier)
if isinstance(self.estimator.model, LinearSVC):
self._estimator = ScikitlearnClassifierSVC(
model=SVC(C=self.estimator.model.C, kernel="linear"),
clip_values=self.estimator.clip_values,
nb_classes=self.nb_classes
)
self.estimator.fit(x_train, y_train)
elif not isinstance(self.estimator.model, SVC):
raise NotImplementedError(f"Model type '{type(self.estimator.model)}' not yet supported")
self.step = step
self.eps = eps
self.x_train = x_train
self.y_train = y_train
self.x_val = x_val
self.y_val = y_val
self.max_iter = max_iter
# self.verbose = verbose
# self._check_params()
def poison(self, x: np.ndarray, y: Optional[np.ndarray] = None, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Iteratively finds optimal attack points starting at values at `x`.
:param x: An array with the points that initialize attack points.
:param y: The target labels for the attack.
:return: A tuple holding the `(poisoning_examples, poisoning_labels)`.
"""
if y is None: # pragma: no cover
raise ValueError("Target labels `y` need to be provided for a targeted attack.")
y_attack = np.copy(y)
num_poison = len(x)
if num_poison == 0: # pragma: no cover
raise ValueError("Must input at least one poison point")
num_features = len(x[0])
train_data = np.copy(self.x_train)
train_labels = np.copy(self.y_train)
all_poison = []
for attack_point, attack_label in tqdm(zip(x, y_attack), desc="SVM poisoning"):
poison = self.generate_attack_point(attack_point, attack_label)
all_poison.append(poison)
train_data = np.vstack([train_data, poison])
train_labels = np.vstack([train_labels, attack_label])
x_adv = np.array(all_poison).reshape((num_poison, num_features))
# targeted = y is not None
# logger.info(
# "Success rate of poisoning attack SVM attack: %.2f%%",
# 100 * compute_success(self.estimator, x, y, x_adv, targeted=targeted),
# )
return x_adv, y_attack
def generate_attack_point(self, x_attack: np.ndarray, y_attack: np.ndarray) -> np.ndarray:
"""
Generate a single poison attack the model, using `x_val` and `y_val` as validation points.
The attack begins at the point init_attack. The attack class will be the opposite of the model's
classification for `init_attack`.
:param x_attack: The initial attack point.
:param y_attack: The initial attack label.
:return: A tuple containing the final attack point and the poisoned model.
"""
# pylint: disable=W0212
from sklearn.preprocessing import normalize
if self.y_train is None or self.x_train is None:
raise ValueError("`x_train` and `y_train` cannot be None for generating an attack point.")
poisoned_model = self.estimator.model
y_t = np.argmax(self.y_train, axis=1)
poisoned_model.fit(self.x_train, y_t)
y_a = np.argmax(y_attack)
attack_point = np.expand_dims(x_attack, axis=0)
var_g = poisoned_model.decision_function(self.x_val)
k_values = np.where(-var_g > 0)
new_p = np.sum(var_g[k_values])
old_p = np.copy(new_p)
i = 0
while new_p - old_p < self.eps and i < self.max_iter:
old_p = new_p
poisoned_input = np.vstack([self.x_train, attack_point])
poisoned_labels = np.append(y_t, y_a)
poisoned_model.fit(poisoned_input, poisoned_labels)
unit_grad = normalize(self.attack_gradient(attack_point))
attack_point += self.step * unit_grad
lower, upper = self.estimator.clip_values
new_attack = np.clip(attack_point, lower, upper)
new_g = poisoned_model.decision_function(self.x_val)
k_values = np.where(-new_g > 0)
new_p = np.sum(new_g[k_values])
i += 1
attack_point = new_attack
poisoned_input = np.vstack([self.x_train, attack_point])
poisoned_labels = np.append(y_t, y_a)
poisoned_model.fit(poisoned_input, poisoned_labels)
return attack_point
def predict_sign(self, vec: np.ndarray) -> np.ndarray:
"""
Predicts the inputs by binary classifier and outputs -1 and 1 instead of 0 and 1.
:param vec: An input array.
:return: An array of -1/1 predictions.
"""
# pylint: disable=W0212
preds = self.estimator.model.predict(vec)
return 2 * preds - 1
def attack_gradient(self, attack_point: np.ndarray, tol: float = 0.0001) -> np.ndarray:
"""
Calculates the attack gradient, or dP for this attack.
See equation 8 in Biggio et al. Ch. 14
:param attack_point: The current attack point.
:param tol: Tolerance level.
:return: The attack gradient.
"""
# pylint: disable=W0212
if self.x_val is None or self.y_val is None: # pragma: no cover
raise ValueError("The values of `x_val` and `y_val` are required for computing the gradients.")
art_model = self.estimator
model = self.estimator.model
grad = np.zeros((1, self.x_val.shape[1]))
support_vectors = model.support_vectors_
num_support = len(support_vectors)
support_labels = np.expand_dims(self.predict_sign(support_vectors), axis=1)
c_idx = np.isin(support_vectors, attack_point).all(axis=1)
if not c_idx.any(): # pragma: no cover
return grad
c_idx = np.where(c_idx > 0)[0][0]
alpha_c = model.dual_coef_[0, c_idx]
assert support_labels.shape == (num_support, 1)
qss = art_model.q_submatrix(support_vectors, support_vectors)
qss_inv = np.linalg.inv(qss + np.random.uniform(0, 0.01 * np.min(qss) + tol, (num_support, num_support)))
zeta = np.matmul(qss_inv, support_labels)
zeta = np.matmul(support_labels.T, zeta)
nu_k = np.matmul(qss_inv, support_labels)
for x_k, y_k in zip(self.x_val, self.y_val):
y_k = 2 * np.expand_dims(np.argmax(y_k), axis=0) - 1
q_ks = art_model.q_submatrix(np.array([x_k]), support_vectors)
m_k = (1.0 / zeta) * np.matmul(q_ks, zeta * qss_inv - np.matmul(nu_k, nu_k.T)) + np.matmul(y_k, nu_k.T)
d_q_sc = np.fromfunction(
lambda i: art_model._get_kernel_gradient_sv(i, attack_point),
(len(support_vectors),),
dtype=int,
)
d_q_kc = art_model._kernel_grad(x_k, attack_point)
grad += (np.matmul(m_k, d_q_sc) + d_q_kc) * alpha_c
return grad
def _check_params(self) -> None:
if self.step is not None and self.step <= 0:
raise ValueError("Step size must be strictly positive.")
if self.eps is not None and self.eps <= 0:
raise ValueError("Value of eps must be strictly positive.")
if self.max_iter <= 1:
raise ValueError("Value of max_iter must be strictly positive.")
if not isinstance(self.verbose, bool):
raise ValueError("The argument `verbose` has to be of type bool.")