Skip to content

Commit

Permalink
DELETE toeplitz convolution class, ADD make_scaled_wave_basis function
Browse files Browse the repository at this point in the history
  • Loading branch information
RichieHakim committed Feb 4, 2024
1 parent 4169b93 commit 07d40e1
Showing 1 changed file with 98 additions and 162 deletions.
260 changes: 98 additions & 162 deletions bnpm/featurization.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,171 +377,107 @@ def mspline(x, k, i, T):
(x - T[i]) * mspline(x, k - 1, i, T)
+ (T[i + k] - x) * mspline(x, k - 1, i + 1, T)
) / ((k-1) * (T[i + k] - T[i]))



class Toeplitz_convolution2d:
def make_scaled_wave_basis(
mother,
lens_waves,
lens_windows=None,
interp_kind='cubic',
fill_value=0,
):
"""
Convolve a 2D array with a 2D kernel using the Toeplitz matrix
multiplication method.
Allows for SPARSE 'x' inputs. 'k' should remain dense.
Ideal when 'x' is very sparse (density<0.01), 'x' is small
(shape <(1000,1000)), 'k' is small (shape <(100,100)), and
the batch size is large (e.g. 1000+).
Generally faster than scipy.signal.convolve2d when convolving mutliple
arrays with the same kernel. Maintains low memory footprint by
storing the toeplitz matrix as a sparse matrix.
See: https://stackoverflow.com/a/51865516 and https://github.com/alisaaalehi/convolution_as_multiplication
for a nice illustration.
See: https://docs.scipy.org/doc/scipy/reference/generated/scipy.linalg.convolution_matrix.html
for 1D version.
See: https://docs.scipy.org/doc/scipy/reference/generated/scipy.linalg.matmul_toeplitz.html#scipy.linalg.matmul_toeplitz
for potential ways to make this implementation faster.
Test with: tests.test_toeplitz_convolution2d()
RH 2022
Generates a set of wavelet-like basis functions by scaling a mother wavelet
to different sizes. \n
Note that this does not necessarily result in a true
orthogonal 'wavelet' basis set. This function uses interpolation to adjust
the mother wavelet's size, making it suitable for creating filter banks with
various frequency resolutions.
RH 2024
Parameters:
- mother (np.ndarray):
A 1D numpy array representing the mother wavelet used as the basis for scaling.
- lens_waves (int, list, tuple, np.ndarray):
The lengths of the output waves. Can be a single integer or a list/array of integers.
- lens_windows (int, list, tuple, np.ndarray, None):
The window lengths for each of the output waves. If None, defaults to
the values in lens_waves. Can be a single integer (applied to all waves)
or a list/array of integers corresponding to each wave length.
- interp_kind (str):
Specifies the kind of interpolation as a string ('linear', 'nearest',
'zero', 'slinear', 'quadratic', 'cubic', where 'zero', 'slinear',
'quadratic' and 'cubic' refer to a spline interpolation of zeroth,
first, second or third order).
- fill_value (float):
Value used to fill in for requested points outside of the domain of the
x_mother. Can be anything from scipy.interpolate.interp1d. If not
provided, defaults to 0.
Returns:
(tuple):
- waves (list):
List of the scaled wavelets.
- xs (list):
List of the x-values for each of the scaled wavelets.
Example:
```
mother_wave = np.cos(np.linspace(-2*np.pi, 2*np.pi, 10000, endpoint=True))
lens_waves = [50, 100, 200]
lens_windows = [100, 200, 400]
waves, xs = make_scaled_wave_basis(mother_wave, lens_waves, lens_windows)
```
"""
def __init__(
self,
x_shape,
k,
mode='same',
dtype=None,
):
"""
Initialize the convolution object.
Makes the Toeplitz matrix and stores it.
Args:
x_shape (tuple):
The shape of the 2D array to be convolved.
k (np.ndarray):
2D kernel to convolve with
mode (str):
'full', 'same' or 'valid'
see scipy.signal.convolve2d for details
dtype (np.dtype):
The data type to use for the Toeplitz matrix.
Ideally, this matches the data type of the input array.
If None, then the data type of the kernel is used.
"""
self.k = k = np.flipud(k.copy())
self.mode = mode
self.x_shape = x_shape
self.dtype = k.dtype if dtype is None else dtype

if mode == 'valid':
assert x_shape[0] >= k.shape[0] and x_shape[1] >= k.shape[1], "x must be larger than k in both dimensions for mode='valid'"

self.so = so = size_output_array = ( (k.shape[0] + x_shape[0] -1), (k.shape[1] + x_shape[1] -1)) ## 'size out' is the size of the output array

## make the toeplitz matrices
t = toeplitz_matrices = [scipy.sparse.diags(
diagonals=np.ones((k.shape[1], x_shape[1]), dtype=self.dtype) * k_i[::-1][:,None],
offsets=np.arange(-k.shape[1]+1, 1),
shape=(so[1], x_shape[1]),
dtype=self.dtype,
) for k_i in k[::-1]] ## make the toeplitz matrices for the rows of the kernel
tc = toeplitz_concatenated = scipy.sparse.vstack(t + [scipy.sparse.dia_matrix((t[0].shape), dtype=self.dtype)]*(x_shape[0]-1)) ## add empty matrices to the bottom of the block due to padding, then concatenate

## make the double block toeplitz matrix
self.dt = double_toeplitz = scipy.sparse.hstack([self._roll_sparse(
x=tc,
shift=(ii>0)*ii*(so[1]) ## shift the blocks by the size of the output array
) for ii in range(x_shape[0])]).tocsr()
assert isinstance(mother, np.ndarray), "mother must be a 1D array"
assert mother.ndim == 1, "mother must be a 1D array"

def __call__(
self,
x,
batching=True,
mode=None,
):
"""
Convolve the input array with the kernel.
Args:
x (np.ndarray or scipy.sparse.csc_matrix or scipy.sparse.csr_matrix):
Input array(s) (i.e. image(s)) to convolve with the kernel
If batching==False: Single 2D array to convolve with the kernel.
shape: (self.x_shape[0], self.x_shape[1])
type: np.ndarray or scipy.sparse.csc_matrix or scipy.sparse.csr_matrix
If batching==True: Multiple 2D arrays that have been flattened
into row vectors (with order='C').
shape: (n_arrays, self.x_shape[0]*self.x_shape[1])
type: np.ndarray or scipy.sparse.csc_matrix or scipy.sparse.csr_matrix
batching (bool):
If False, x is a single 2D array.
If True, x is a 2D array where each row is a flattened 2D array.
mode (str):
'full', 'same' or 'valid'
see scipy.signal.convolve2d for details
Overrides the mode set in __init__.
Returns:
out (np.ndarray or scipy.sparse.csr_matrix):
If batching==True: Multiple convolved 2D arrays that have been flattened
into row vectors (with order='C').
shape: (n_arrays, height*width)
type: np.ndarray or scipy.sparse.csc_matrix
If batching==False: Single convolved 2D array of shape (height, width)
"""
if mode is None:
mode = self.mode ## use the mode that was set in the init if not specified
issparse = scipy.sparse.issparse(x)

if batching:
x_v = x.T ## transpose into column vectors
arraylikes = (list, tuple, np.ndarray)
if isinstance(lens_waves, arraylikes):
lens_waves = np.array(lens_waves, dtype=int)
if lens_windows is None:
lens_windows = lens_waves
if isinstance(lens_windows, int):
lens_windows = np.array([lens_windows] * len(lens_waves), dtype=int)
if isinstance(lens_windows, arraylikes):
assert len(lens_waves) == len(lens_windows), "lens_waves and lens_windows must have the same length"
lens_windows = np.array(lens_windows, dtype=int)
else:
x_v = x.reshape(-1, 1) ## reshape 2D array into a column vector

if issparse:
x_v = x_v.tocsc()

out_v = self.dt @ x_v ## if sparse, then 'out_v' will be a csc matrix

## crop the output to the correct size
if mode == 'full':
p_t = 0
p_b = self.so[0]+1
p_l = 0
p_r = self.so[1]+1
if mode == 'same':
p_t = (self.k.shape[0]-1)//2
p_b = -(self.k.shape[0]-1)//2
p_l = (self.k.shape[1]-1)//2
p_r = -(self.k.shape[1]-1)//2

p_b = self.x_shape[0]+1 if p_b==0 else p_b
p_r = self.x_shape[1]+1 if p_r==0 else p_r
if mode == 'valid':
p_t = (self.k.shape[0]-1)
p_b = -(self.k.shape[0]-1)
p_l = (self.k.shape[1]-1)
p_r = -(self.k.shape[1]-1)

p_b = self.x_shape[0]+1 if p_b==0 else p_b
p_r = self.x_shape[1]+1 if p_r==0 else p_r

if batching:
idx_crop = np.zeros((self.so), dtype=np.bool_)
idx_crop[p_t:p_b, p_l:p_r] = True
idx_crop = idx_crop.reshape(-1)
out = out_v[idx_crop,:].T
raise ValueError("lens_windows must be an int or an array-like")
elif isinstance(lens_waves, int):
if lens_windows is None:
lens_windows = lens_waves
if isinstance(lens_windows, int):
lens_waves = np.array([lens_waves], dtype=int)
lens_windows = np.array([lens_windows], dtype=int)
if isinstance(lens_windows, arraylikes):
lens_waves = np.array([lens_waves] * len(lens_windows), dtype=int)
lens_windows = np.array(lens_windows, dtype=int)
else:
if issparse:
out = out_v.reshape((self.so)).tocsc()[p_t:p_b, p_l:p_r]
else:
out = out_v.reshape((self.so))[p_t:p_b, p_l:p_r] ## reshape back into 2D array and crop
return out

def _roll_sparse(
self,
x,
shift,
):
"""
Roll columns of a sparse matrix.
"""
out = x.copy()
out.row += shift
return out
raise ValueError("lens_windows must be an int or an array-like")
else:
raise ValueError("lens_waves must be an int or an array-like")


x_mother = np.linspace(start=0, stop=1, num=len(mother), endpoint=True) - 0.5

interpolator = scipy.interpolate.interp1d(
x=x_mother,
y=mother,
kind=interp_kind,
fill_value=fill_value,
bounds_error=False,
assume_sorted=True,
)

waves = []
xs = []
for i_wave, (l_wave, l_window) in enumerate(zip(lens_waves, lens_windows)):
x_wave = (np.linspace(start=0, stop=1, num=l_window, endpoint=True) - 0.5) * (l_window / l_wave)
wave = interpolator(x_wave)
waves.append(wave)
xs.append(x_wave)

return waves, xs

0 comments on commit 07d40e1

Please sign in to comment.