-
Notifications
You must be signed in to change notification settings - Fork 0
/
diagnostics_branin2d.py
169 lines (144 loc) · 5.38 KB
/
diagnostics_branin2d.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python
# coding: utf-8
import argparse
import os
import re
from functools import partial
import matplotlib.pyplot as plt
import numpy as np
import pyDOE
from tqdm.rich import tqdm
from tqdm import trange, TqdmExperimentalWarning
import warnings
warnings.filterwarnings("ignore", category=TqdmExperimentalWarning)
from sklearn.gaussian_process.kernels import Matern
import robustGP.tools as tools
# Imports: robustGP
from robustGP.SURmodel import AdaptiveStrategy
from robustGP.test_functions import branin_2d
# from adaptive_article import initialize_branin, fname
NDIM = 2
# log_folder = os.path.join(os.getcwd(), "logs")
def initialize_function(
function, NDIM, idxU=[1], name=None, initial_design=None, save=True
):
"""
Create new instance of AdaptiveStrategy of the Branin 2d function
with LHS as initial design
"""
bounds = np.asarray([(0, 1)] * NDIM)
if initial_design is None:
initial_design = 5 * NDIM
if isinstance(initial_design, int):
initial_design = pyDOE.lhs(
n=NDIM, samples=initial_design, criterion="maximin", iterations=50
)
else:
pass # initial_design is already a set of samples
ada_strat = AdaptiveStrategy(bounds, function, name)
ada_strat.fit_gp(
initial_design,
ada_strat.evaluate_function(initial_design),
Matern(np.ones(NDIM)),
n_restarts_optimizer=50,
)
ada_strat.set_idxU(idxU, ndim=NDIM)
if save:
ada_strat.save_design(last=False)
return ada_strat
bounds = np.asarray([(0, 1)] * NDIM)
# For plots
npts = 2**3
x, y = np.linspace(0, 1, npts), np.linspace(0, 1, npts)
(XY, (xmg, ymg)) = tools.pairify((x, y))
xl, yl = np.linspace(0, 1, 2**6), np.linspace(0, 1, 2**6)
(XYl, (xmgl, ymgl)) = tools.pairify((xl, yl))
Niter = 50
# hartmann = initialize_function(hartmann_3d, 3, idxU=[2], name="test")
# branin = initialize_function(branin_2d, 2, idxU=[1])
def probability_regret(strat, nx=100, nu=100, alpha=2.0, truth=False):
if truth:
get_cond_mini = lambda u: strat.get_conditional_minimiser_true(u)
get_fun = lambda x_, u: strat.function(
np.concatenate([np.array([x_]), np.array([u])])
)
else:
get_cond_mini = lambda u: strat.get_conditional_minimiser(u)
get_fun = lambda x_, u: strat.gp.predict(
np.concatenate([np.array([x_]), np.array([u])]).reshape(-1, 2)
)[0]
optim_gp = {"theta_star": np.empty((nu,)), "J_star": np.empty((nu,))}
Delta = np.empty((nx, nu))
for i, u in tqdm(list(enumerate(np.linspace(0, 1, nu)))):
opt = get_cond_mini(u)
Jstar = opt.fun
optim_gp["theta_star"][i] = opt.x
optim_gp["J_star"][i] = Jstar
for j, xy_ in enumerate(np.linspace(0, 1, nx)):
J = get_fun(xy_, u)
Delta[j, i] = J - alpha * Jstar
return Delta.reshape(nx, nu), optim_gp
def error_probability_regret(design_file, log_folder, nU, freq=10):
design = np.genfromtxt(log_folder + "/" + design_file + "_design.txt")
sur_strat = initialize_function(
branin_2d, NDIM, initial_design=design[:15, :NDIM], save=False
)
Delta_truth, optim_true = probability_regret(
sur_strat, nx=100, nu=nU, alpha=2.0, truth=True
)
prob_Delta_leq_0 = []
norm_Jstar_iters = []
norm_theta_star_iters = []
for npoints in trange(15, 115, freq):
sur_strat = initialize_function(
branin_2d, NDIM, initial_design=design[:npoints, :NDIM], save=False
)
Delta_gp, optim_gp = probability_regret(
sur_strat, nx=100, nu=nU, alpha=2.0, truth=False
)
error_prob_ = np.sum(
((Delta_gp < 0).mean(-1) - (Delta_truth < 0).mean(-1)) ** 2
)
prob_Delta_leq_0.append((npoints, error_prob_))
norm_Jstar = np.mean((optim_gp["J_star"] - optim_true["J_star"]) ** 2)
norm_Jstar_iters.append(norm_Jstar)
norm_theta_star = np.mean(
(optim_gp["theta_star"] - optim_true["theta_star"]) ** 2
)
norm_theta_star_iters.append(norm_theta_star)
with open(
os.path.join(f"{log_folder}", f"{design_file}_diagnostic.txt"),
"a+",
) as diag_file:
diag_file.write(
f"{npoints}, {error_prob_}, {norm_Jstar}, {norm_theta_star}\n"
)
return None
def get_experiments_files(exp_name, log_folder):
files = os.listdir(log_folder)
reg = rf"{exp_name}_[\d]+"
exp_files = sorted(list(set(re.findall(reg, " ".join(files)))))
return exp_files
## Make experiments
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Make diagnostics of SUR experiment")
parser.add_argument(
"-e",
"--exp-list",
nargs="+",
default=[],
help="Name of experiments",
)
parser.add_argument("--log_folder", type=str, help="folder containing the logs")
parser.add_argument("--freq", type=int, help="frequency of the diagnostics")
parser.add_argument("--nU", type=int, help="Number of u points")
parsed_args = parser.parse_args()
exp_list = parsed_args.exp_list
for exp in exp_list:
print(exp)
list_files = get_experiments_files(exp, parsed_args.log_folder)
for fi in list_files:
print(f"-- {fi}")
_ = error_probability_regret(
fi, parsed_args.log_folder, parsed_args.nU, parsed_args.freq
)