-
Notifications
You must be signed in to change notification settings - Fork 1
/
synthetic_generator.py
304 lines (266 loc) · 11.9 KB
/
synthetic_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import scipy, h5py
import numpy as np
from scipy.signal import resample
from utils.cov_util import calc_cross_cov_mats_from_data
from utils.plotting.fig1 import lorenz_fig_axes, plot_3d, plot_lorenz_3d, plot_traces, plot_dca_demo, plot_r2, plot_cov
import matplotlib.pyplot as plt
from dca import DynamicalComponentsAnalysis as DCA
from utils.plotting import style
import pickle
def gen_lorenz_system(T, integration_dt=0.005):
"""
Period ~ 1 unit of time (total time is T)
So make sure integration_dt << 1
Known-to-be-good chaotic parameters
See sussillo LFADS paper
"""
rho = 28.0
sigma = 10.0
beta = 8 / 3.
def dx_dt(state, t):
x, y, z = state
x_dot = sigma * (y - x)
y_dot = x * (rho - z) - y
z_dot = x * y - beta * z
return (x_dot, y_dot, z_dot)
x_0 = np.ones(3)
t = np.arange(0, T, integration_dt)
X = scipy.integrate.odeint(dx_dt, x_0, t)
return X
def gen_lorenz_data(num_samples, normalize=True):
integration_dt = 0.005
data_dt = 0.025
skipped_samples = 1000
T = (num_samples + skipped_samples) * data_dt
X = gen_lorenz_system(T, integration_dt)
if normalize:
X -= X.mean(axis=0)
X /= X.std(axis=0)
X_dwn = resample(X, num_samples + skipped_samples, axis=0)
X_dwn = X_dwn[skipped_samples:, :]
return X_dwn
def random_basis(N, D, rng):
return scipy.stats.ortho_group.rvs(N, random_state=rng)[:, :D]
def median_subspace(N, D, rng, num_samples=5000, V_0=None):
subspaces = np.zeros((num_samples, N, D))
angles = np.zeros((num_samples, min(D, V_0.shape[1])))
if V_0 is None:
V_0 = np.eye(N)[:, :D]
for i in range(num_samples):
subspaces[i] = random_basis(N, D, rng)
angles[i] = np.rad2deg(scipy.linalg.subspace_angles(V_0, subspaces[i]))
median_angles = np.median(angles, axis=0)
median_subspace_idx = np.argmin(np.sum((angles - median_angles)**2, axis=1))
median_subspace = subspaces[median_subspace_idx]
return median_subspace
def gen_noise_cov(N, D, var, rng, V_noise=None):
noise_spectrum = var * np.exp(-2 * np.arange(N) / D)
if V_noise is None:
V_noise = scipy.stats.ortho_group.rvs(N, random_state=rng)
noise_cov = np.dot(V_noise, np.dot(np.diag(noise_spectrum), V_noise.T))
return noise_cov
def embedded_lorenz_cross_cov_mats(N, T, snr=1., noise_dim=7, return_samples=False,
num_lorenz_samples=10000, num_subspace_samples=5000,
V_dynamics=None, V_noise=None, X_dynamics=None, seed=20200326):
"""Embed the Lorenz system into high dimensions with additive spatially
structued white noise. Signal and noise subspaces are oriented with the
median subspace angle.
Parameters
----------
N : int
Embedding dimension.
T : int
Number of timesteps (2 * T_pi)
snr : float
Signal-to-noise ratio. Specifically it is the ratio of the largest
eigenvalue of the signal covariance to the largest eigenvalue of the
noise covariance.
noise_dim : int
Dimension at which noise eigenvalues fall to 1/e. If noise_dim is
np.inf then a flat spectrum is used.
return_samples : bool
Whether to return cross_cov_mats or data samples.
num_lorenz_samples : int
Number of data samples to use.
num_subspace_samples : int
Number of random subspaces used to calculate the median relative angle.
seed : int
Seed for Numpy random state.
"""
rng = np.random.RandomState(seed)
# Generate Lorenz dynamics
if X_dynamics is None:
X_dynamics = gen_lorenz_data(num_lorenz_samples)
dynamics_var = np.max(scipy.linalg.eigvalsh(np.cov(X_dynamics.T)))
noise_var = dynamics_var / snr
# Generate dynamics embedding matrix (will remain fixed)
if V_dynamics is None:
if N == 3:
V_dynamics = np.eye(3)
else:
V_dynamics = random_basis(N, 3, rng)
if noise_dim == np.inf:
noise_cov = np.eye(N) * noise_var
else:
# Generate a subspace with median principal angles w.r.t. dynamics subspace
if V_noise is None:
V_noise = median_subspace(N, noise_dim, rng, num_samples=num_subspace_samples,
V_0=V_dynamics)
# Extend V_noise to a basis for R^N
if V_noise.shape[1] < N:
V_noise_comp = scipy.linalg.orth(np.eye(N) - np.dot(V_noise, V_noise.T))
V_noise = np.concatenate((V_noise, V_noise_comp), axis=1)
# Add noise covariance
noise_cov = gen_noise_cov(N, noise_dim, noise_var, rng, V_noise=V_noise)
# Generate actual samples of high-D data
cross_cov_mats = calc_cross_cov_mats_from_data(X_dynamics, T)
cross_cov_mats = np.array([V_dynamics.dot(C).dot(V_dynamics.T) for C in cross_cov_mats])
cross_cov_mats[0] += noise_cov
if return_samples:
X_samples = (np.dot(X_dynamics, V_dynamics.T) +
rng.multivariate_normal(mean=np.zeros(N),
cov=noise_cov, size=len(X_dynamics)))
return cross_cov_mats, X_samples
else:
return cross_cov_mats
def generate_syn(T, N, noise_dim, snr_vals, num_samples=10000, random_seed=42):
"""
:param T: window size of DCA
:param N: dimension of observations
:param noise_dim: dimension of subspace for noise
:param snr_vals: signal-noise ratio
:param num_samples: number os time stamps
:param random_seed: random seed
:return:
"""
np.random.seed(random_seed)
# Save params
with h5py.File(RESULTS_FILENAME, "w") as f:
f.attrs["T"] = T
f.attrs["N"] = N
f.attrs["noise_dim"] = noise_dim
f.attrs["snr_vals"] = snr_vals
# Generate Lorenz dynamics
X_dynamics = gen_lorenz_data(num_samples)
dynamics_var = np.max(scipy.linalg.eigvalsh(np.cov(X_dynamics.T)))
# Save dynamics
f.create_dataset("X_dynamics", data=X_dynamics)
f.attrs["dynamics_var"] = dynamics_var
# Generate dynamics embedding matrix (will remain fixed)
V_dynamics = random_basis(N, 3, np.random)
X = np.dot(X_dynamics, V_dynamics.T)
# Generate a subspace with median principal angles w.r.t. dynamics subspace
V_noise = median_subspace(N, noise_dim, num_samples=5000, V_0=V_dynamics, rng=np.random)
# ... and extend V_noise to a basis for R^N
V_noise_comp = scipy.linalg.orth(np.eye(N) - np.dot(V_noise, V_noise.T))
V_noise = np.concatenate((V_noise, V_noise_comp), axis=1)
# Save embeded dynamics and embedding matrices
f.create_dataset("X", data=X)
f.attrs["V_dynamics"] = V_dynamics
f.attrs["V_noise"] = V_noise
# Run DCA
opt = DCA(T=T, d=3)
opt.fit(X)
V_dca = opt.coef_
# Run PCA
V_pca = scipy.linalg.eigh(np.cov(X.T))[1][:, ::-1][:, :3]
# Project data onto DCA and PCA bases
X_dca = np.dot(X, V_dca)
X_pca = np.dot(X, V_pca)
# Linearly trasnform projected data to be close to original Lorenz attractor
beta_pca = np.linalg.lstsq(X_pca, X_dynamics, rcond=None)[0]
beta_dca = np.linalg.lstsq(X_dca, X_dynamics, rcond=None)[0]
X_pca_trans = np.dot(X_pca, beta_pca)
X_dca_trans = np.dot(X_dca, beta_dca)
# Save transformed projections
X_pca_trans_dset_true = X_pca_trans
X_dca_trans_dset_true = X_dca_trans
f.create_dataset("X_pca_trans_true", data=X_pca_trans_dset_true)
f.create_dataset("X_dca_trans_true", data=X_dca_trans_dset_true)
# To-save: noisy data, reconstructed PCA, reconstructed DCA
X_noisy_dset = f.create_dataset("X_noisy", (len(snr_vals), num_samples, N))
X_pca_trans_dset = f.create_dataset("X_pca_trans", (len(snr_vals), num_samples, 3))
X_dca_trans_dset = f.create_dataset("X_dca_trans", (len(snr_vals), num_samples, 3))
# Loop over SNR vals
for snr_idx in range(len(snr_vals)):
snr = snr_vals[snr_idx]
print("snr =", snr)
_, X_noisy = embedded_lorenz_cross_cov_mats(N, T, snr, noise_dim, return_samples=True,
V_dynamics=V_dynamics, V_noise=V_noise,
X_dynamics=X_dynamics)
X_noisy = X_noisy - X_noisy.mean(axis=0)
# Save noisy data
X_noisy_dset[snr_idx] = X_noisy
# Run DCA
opt = DCA(T=T, d=3)
opt.fit(X_noisy)
V_dca = opt.coef_
# Run PCA
V_pca = scipy.linalg.eigh(np.cov(X_noisy.T))[1][:, ::-1][:, :3]
# Project data onto DCA and PCA bases
X_dca = np.dot(X_noisy, V_dca)
X_pca = np.dot(X_noisy, V_pca)
# Linearly trasnform projected data to be close to original Lorenz attractor
beta_pca = np.linalg.lstsq(X_pca, X_dynamics, rcond=None)[0]
beta_dca = np.linalg.lstsq(X_dca, X_dynamics, rcond=None)[0]
X_pca_trans = np.dot(X_pca, beta_pca)
X_dca_trans = np.dot(X_dca, beta_dca)
# Save transformed projections
X_pca_trans_dset[snr_idx] = X_pca_trans
X_dca_trans_dset[snr_idx] = X_dca_trans
if __name__ == "__main__":
seed = 22 # original seed = 42
np.random.seed(seed)
# RESULTS_FILENAME = "../data/lorenz/lorenz_results.hdf5"
RESULTS_FILENAME = "../data/lorenz/lorenz_exploration.hdf5"
do_vis = False
#Set parameters
T = 4
N = 30
noise_dim = 5
if RESULTS_FILENAME == "../data/lorenz/lorenz_results.hdf5":
snr_vals = np.array([0.01, 0.02, 0.05, 0.1, 1])
if RESULTS_FILENAME == "../data/lorenz/lorenz_exploration.hdf5":
snr_vals = np.logspace(-3.0, -1.0, num=10)
# generate data
generate_syn(T, N, noise_dim, snr_vals, random_seed=seed)
# load data and plot
with h5py.File(RESULTS_FILENAME, "r") as f:
snr_vals = f.attrs["snr_vals"][:]
X = f["X"][:]
X_noisy_dset = f["X_noisy"][:]
X_pca_trans_dset = f["X_pca_trans"][:]
X_dca_trans_dset = f["X_dca_trans"][:]
X_dynamics = f["X_dynamics"][:]
r2_vals = np.zeros((len(snr_vals), 2))
for snr_idx in range(len(snr_vals)):
X_pca_trans = X_pca_trans_dset[snr_idx]
X_dca_trans = X_dca_trans_dset[snr_idx]
r2_pca = 1 - np.sum((X_pca_trans - X_dynamics) ** 2) / np.sum(
(X_dynamics - np.mean(X_dynamics, axis=0)) ** 2)
r2_dca = 1 - np.sum((X_dca_trans - X_dynamics) ** 2) / np.sum(
(X_dynamics - np.mean(X_dynamics, axis=0)) ** 2)
r2_vals[snr_idx] = [r2_pca, r2_dca]
# ax: Lorenz 3D Plot
T_to_show_3d = 500
linewidth_3d = 0.5
if do_vis:
fig = plt.figure()
ax = plt.axes(projection='3d')
plot_lorenz_3d(ax, X_dynamics[:T_to_show_3d], linewidth_3d)
plt.title("True dynamics")
plt.show()
ax = plt.axes(projection='3d')
plot_lorenz_3d(ax, X_pca_trans[:T_to_show_3d], linewidth_3d)
plt.title("Embedded dynamics by PCA, SNR={}".format(snr_vals[snr_idx]))
plt.show()
ax = plt.axes(projection='3d')
plot_lorenz_3d(ax, X_dca_trans[:T_to_show_3d], linewidth_3d)
plt.title("Embedded dynamics by DCA, SNR={}".format(snr_vals[snr_idx]))
plt.show()
# plot R2 scores
r2_dca = r2_vals[:, 1]
fig = plt.figure()
plt.plot(snr_vals, r2_dca, label="DCA")
plt.legend()
plt.show()