diff --git a/examples/wfs_demonstration_experimental.py b/examples/wfs_demonstration_experimental.py index e4998a2..b2a16f2 100644 --- a/examples/wfs_demonstration_experimental.py +++ b/examples/wfs_demonstration_experimental.py @@ -27,11 +27,7 @@ # at a gray value of 142 slm.lookup_table = range(142) alg = FourierDualReference( - feedback=roi_detector, - slm=slm, - slm_shape=[800, 800], - k_angles_min=-5, - k_angles_max=5, + feedback=roi_detector, slm=slm, slm_shape=[800, 800], k_radius=7 ) result = alg.execute() diff --git a/openwfs/algorithms/__init__.py b/openwfs/algorithms/__init__.py index dd7e333..00bee96 100644 --- a/openwfs/algorithms/__init__.py +++ b/openwfs/algorithms/__init__.py @@ -1,5 +1,4 @@ +from .basic_fourier import FourierDualReference +from .dual_reference import DualReference from .ssa import StepwiseSequential -from .fourier import FourierBase -from .basic_fourier import FourierDualReference, FourierDualReferenceCircle from .troubleshoot import troubleshoot -from .custom_iter_dual_reference import CustomIterativeDualReference diff --git a/openwfs/algorithms/basic_fourier.py b/openwfs/algorithms/basic_fourier.py index a8c2695..2b1f32b 100644 --- a/openwfs/algorithms/basic_fourier.py +++ b/openwfs/algorithms/basic_fourier.py @@ -1,146 +1,42 @@ from typing import Optional import numpy as np -import matplotlib.pyplot as plt +from .dual_reference import DualReference from .utilities import analyze_phase_stepping -from .fourier import FourierBase from ..core import Detector, PhaseSLM +from ..utilities import tilt -def build_square_k_space(k_min, k_max, k_step=1.0): +class FourierDualReference(DualReference): """ - Constructs the k-space by creating a set of (k_x, k_y) coordinates. - Fills the k_left and k_right matrices with the same k-space. (k_x, k_y) denote the k-space coordinates of the whole - pupil. Only half SLM (and thus pupil) is modulated at a time, hence k_y (axis=1) must make steps of 2. + Fourier double reference algorithm, based on Mastiani et al. [1]. - Args: - k_min: Minimum value for k_x and k_y, without k_step scaling applied. - k_max: Maximum value for k_x and k_y, without k_step scaling applied. - k_step: Scaling factor for the steps in k-space. - - Returns: - k_space (np.ndarray): A 2xN array of k-space coordinates. - """ - # Generate kx and ky coordinates - kx_angles = np.arange(k_min, k_max + 1, 1) - k_angles_min_even = k_min if k_min % 2 == 0 else k_min + 1 # Must be even - ky_angles = np.arange(k_angles_min_even, k_max + 1, 2) # Steps of 2 - - # Combine kx and ky coordinates into pairs - k_x = np.repeat( - np.array(kx_angles)[np.newaxis, :], len(ky_angles), axis=0 - ).flatten() - k_y = np.repeat( - np.array(ky_angles)[:, np.newaxis], len(kx_angles), axis=1 - ).flatten() - k_space = np.vstack((k_x, k_y)) * k_step - return k_space - - -class FourierDualReference(FourierBase): - """Fourier double reference algorithm, based on Mastiani et al. [1]. - - It constructs a square k-space coordinate grid for the algorithm. For custom k-spaces, you should use the - FourierBase class. The k-space coordinates denote the entire pupil, not just one half. The k-space is normalized - such that (1, 0) yields a -π to π gradient over the entire pupil. - diffraction limit. + Improvements over [1]: + * The set of plane waves is taken from a disk in k-space instead of a square. + * No overlap between the two halves is needed, instead the final stitching step is done using measurements already in the data set. + * When only a single target is optimized, the algorithm can be used in an iterative version to increase SNR during the measurument, + similar to [2]. [1]: Bahareh Mastiani, Gerwin Osnabrugge, and Ivo M. Vellekoop, "Wavefront shaping for forward scattering," Opt. Express 30, 37436-37445 (2022) - """ - - def __init__( - self, - feedback: Detector, - slm: PhaseSLM, - slm_shape=(500, 500), - phase_steps=4, - k_angles_min: int = -3, - k_angles_max: int = 3, - analyzer: Optional[callable] = analyze_phase_stepping, - ): - """ - Args: - feedback (Detector): Source of feedback - slm (PhaseSLM): The spatial light modulator - slm_shape (tuple of two ints): The shape that the SLM patterns & transmission matrices are calculated for, - does not necessarily have to be the actual pixel dimensions as the SLM. - phase_steps (int): The number of phase steps. - k_angles_min (int): The minimum k-angle. - k_angles_max (int): The maximum k-angle. - """ - super().__init__( - feedback, - slm, - slm_shape, - np.array((0, 0)), - np.array((0, 0)), - phase_steps=phase_steps, - analyzer=analyzer, - ) - self._k_angles_min = k_angles_min - self._k_angles_max = k_angles_max - self._build_k_space() - - def _build_k_space(self): - """ - Constructs the k-space by creating Cartesian products of k_x and k_y angles. - Fills the k_left and k_right matrices with the same k-space. (k_x, k_y) denote the k-space coords of the whole - SLM. Only half the SLM is modulated at a time, hence ky must make steps of 2. - - Returns: - None: The function updates the instance attributes. - """ - k_space = build_square_k_space(self.k_angles_min, self.k_angles_max) - self.k_left = k_space - self.k_right = k_space - - @property - def k_angles_min(self) -> int: - """The lower bound of the range of angles in x and y direction""" - return self._k_angles_min - - @k_angles_min.setter - def k_angles_min(self, value): - """Sets the lower bound of the range of angles in x and y direction, triggers the building of the internal - k-space properties. - """ - self._k_angles_min = value - self._build_k_space() - - @property - def k_angles_max(self) -> int: - """The higher bound of the range of angles in x and y direction""" - return self._k_angles_max - - @k_angles_max.setter - def k_angles_max(self, value): - """Sets the higher bound of the range of angles in x and y direction, triggers the building of the internal - k-space properties.""" - self._k_angles_max = value - self._build_k_space() - - -class FourierDualReferenceCircle(FourierBase): - """ - Slightly altered version of Fourier double reference algorithm, based on Mastiani et al. [1]. - In this version, the k-space coordinates are restricted to lie within a circle of chosen radius. - - [1]: Bahareh Mastiani, Gerwin Osnabrugge, and Ivo M. Vellekoop, - "Wavefront shaping for forward scattering," Opt. Express 30, 37436-37445 (2022) + [2]: X. Tao, T. Lam, B. Zhu, et al., “Three-dimensional focusing through scattering media using conjugate adaptive + optics with remote focusing (CAORF),” Opt. Express 25, 10368–10383 (2017). """ def __init__( self, + *, feedback: Detector, slm: PhaseSLM, slm_shape=(500, 500), phase_steps=4, k_radius: float = 3.2, k_step: float = 1.0, + iterations: int = 2, analyzer: Optional[callable] = analyze_phase_stepping, + optimized_reference: Optional[bool] = None ): """ Args: @@ -152,44 +48,51 @@ def __init__( k_step (float): Make steps in k-space of this value. 1 corresponds to diffraction limited tilt. phase_steps (int): The number of phase steps. """ - # TODO: Could be rewritten more compactly if we ditch the settable properties: - # first build the k_space, then call super().__init__ with k_left=k_space, k_right=k_space. - # TODO: Add custom grid spacing - + self._k_radius = k_radius + self.k_step = k_step + self._slm_shape = slm_shape + group_mask = np.zeros(slm_shape, dtype=bool) + group_mask[:, slm_shape[1] // 2 :] = True super().__init__( feedback=feedback, slm=slm, - slm_shape=slm_shape, - k_left=np.array((0, 0)), - k_right=np.array((0, 0)), + phase_patterns=None, + group_mask=group_mask, phase_steps=phase_steps, + iterations=iterations, + optimized_reference=optimized_reference, analyzer=analyzer, ) + self._update_modes() + + def _update_modes(self): + """Constructs the set of plane wave modes.""" + + # start with a grid of k-values + # then filter out the ones that are outside the circle + # in the grid, the spacing in the kx direction is twice the spacing in the ky direction + # because we subdivide the SLM into two halves along the x direction, + # which effectively doubles the number of kx values + int_radius_x = np.ceil(self.k_radius / (self.k_step * 2)) + int_radius_y = np.ceil(self.k_radius / self.k_step) + kx, ky = np.meshgrid( + np.arange(-int_radius_x, int_radius_x + 1) * (self.k_step * 2), + np.arange(-int_radius_y, int_radius_y + 1) * self.k_step, + ) - self._k_radius = k_radius - self.k_step = k_step - self._build_k_space() - - def _build_k_space(self): - """ - Constructs the k-space by creating Cartesian products of k_x and k_y angles. - Fills the k_left and k_right matrices with the same k-space. (k_x, k_y) denote the k-space coordinates of the - whole SLM. Only half the SLM is modulated at a time, hence k_y must make steps of 2. - - Returns: - None: The function updates the instance attributes k_left and k_right. - """ - k_radius = self.k_radius - k_step = self.k_step - k_max = int(np.floor(k_radius)) - k_space_square = build_square_k_space(-k_max, k_max, k_step=k_step) + # only keep the points within the circle + mask = kx**2 + ky**2 <= self.k_radius**2 + k = np.stack((ky[mask], kx[mask])).T - # Filter out k-space coordinates that are outside the circle of radius k_radius - k_mask = np.linalg.norm(k_space_square, axis=0) <= k_radius - k_space = k_space_square[:, k_mask] + # construct the modes for these kx ky values + modes = np.zeros((*self._slm_shape, len(k)), dtype=np.float32) + for i, k_i in enumerate(k): + # tilt generates a pattern from -2.0 to 2.0 (The convention for Zernike modes normalized to an RMS of 1). + # The natural step to take is the Abbe diffraction limit of the modulated part, which corresponds to a gradient + # from -π to π over the modulated part. + modes[..., i] = tilt(self._slm_shape, g=k_i * 0.5 * np.pi) - self.k_left = k_space - self.k_right = k_space + self.phase_patterns = (modes, modes) @property def k_radius(self) -> float: @@ -200,16 +103,4 @@ def k_radius(self) -> float: def k_radius(self, value): """Sets the maximum radius of the k-space circle, triggers the building of the internal k-space properties.""" self._k_radius = value - self._build_k_space() - - def plot_k_space(self): - """Plots the k-space coordinates. Useful for debugging.""" - phi = np.linspace(0, 2 * np.pi, 200) - x = self.k_radius * np.cos(phi) - y = self.k_radius * np.sin(phi) - plt.plot(x, y, "k") - plt.plot(self.k_left[0, :], self.k_left[1, :], "ob", label="k_left") - plt.plot(self.k_right[0, :], self.k_right[1, :], ".r", label="k_right") - plt.xlabel("k_x") - plt.ylabel("k_y") - plt.gca().set_aspect("equal") + self._update_modes() diff --git a/openwfs/algorithms/custom_iter_dual_reference.py b/openwfs/algorithms/custom_iter_dual_reference.py deleted file mode 100644 index ec26fdf..0000000 --- a/openwfs/algorithms/custom_iter_dual_reference.py +++ /dev/null @@ -1,283 +0,0 @@ -from typing import Optional - -import matplotlib.pyplot as plt -import numpy as np -from numpy import ndarray as nd -from tqdm import tqdm - -from .utilities import analyze_phase_stepping, WFSResult -from ..core import Detector, PhaseSLM - - -def weighted_average(a, b, wa, wb): - """ - Compute the weighted average of two values. - - Args: - a: The first value. - b: The second value. - wa: The weight of the first value. - wb: The weight of the second value. - """ - return (a * wa + b * wb) / (wa + wb) - - -class CustomIterativeDualReference: - """ - A generic iterative dual reference WFS algorithm, which can use a custom set of basis functions. - - Similar to the Fourier Dual Reference algorithm [1], the SLM is divided in two large segments (e.g. both halves, - split in the middle). The blind focusing dual reference algorithm switches back and forth multiple times between two - large segments of the SLM (A and B). The segment shape is defined with a binary mask. Each segment has a - corresponding set of phase patterns to measure. With these measurements, a correction pattern for one segment can - be computed. To achieve convergence or 'blind focusing' [2], in each iteration we use the previously constructed - correction pattern as reference. This makes this algorithm suitable for non-linear feedback, such as multi-photon - excitation fluorescence, and unsuitable for multi-target optimization. - - This algorithm assumes a phase-only SLM. Hence, the input modes are defined by passing the corresponding phase - patterns as input argument. - - [1]: Bahareh Mastiani, Gerwin Osnabrugge, and Ivo M. Vellekoop, - "Wavefront shaping for forward scattering", Optics Express 30, 37436-37445 (2022) - [2]: Gerwin Osnabrugge, Lyubov V. Amitonova, and Ivo M. Vellekoop. "Blind focusing through strongly scattering media - using wavefront shaping with nonlinear feedback", Optics Express, 27(8):11673–11688, 2019. - https://opg.optica.org/oe/ abstract.cfm?uri=oe-27-8-1167 - """ - - def __init__( - self, - feedback: Detector, - slm: PhaseSLM, - slm_shape: tuple[int, int], - phases: tuple[nd, nd], - set1_mask: nd, - phase_steps: int = 4, - iterations: int = 4, - analyzer: Optional[callable] = analyze_phase_stepping, - do_try_full_patterns=False, - do_progress_bar=True, - progress_bar_kwargs={}, - ): - """ - Args: - feedback (Detector): The feedback source, usually a detector that provides measurement data. - slm (PhaseSLM): slm object. - The slm may have the `extent` property set to indicate the extent of the back pupil of the microscope - objective in slm coordinates. By default, a value of 2.0, 2.0 is used (indicating that the pupil - corresponds to a circle of radius 1.0 on the SLM). However, to prevent artefacts at the edges of the - SLM,it may be overfilled, such that the `phases` image is mapped to an extent of e. g. (2.2, 2.2), - i. e. 10% larger than the back pupil. - slm_shape (tuple[int, int]): The shape of the SLM patterns and transmission matrices. - phases (tuple): A tuple of two 3D arrays. We will refer to these as set A and B (phases[0] and - phases[1] respectively). The 3D arrays contain the set of phases to measure set A and B. With both of - these 3D arrays, axis 0 and 1 are used as spatial axes. Axis 2 is used as phase pattern index. - E.g. phases[1][:, :, 4] is the 4th phase pattern of set B. - set1_mask: A 2D array of that defines the elements used by set A (= modes[0]) with 0s and elements used by - set B (= modes[1]) with 1s. - phase_steps (int): The number of phase steps for each mode (default is 4). Depending on the type of - non-linear feedback and the SNR, more might be required. - iterations (int): Number of times to measure a mode set, e.g. when iterations = 5, the measurements are - A, B, A, B, A. - analyzer (callable): The function used to analyze the phase stepping data. Must return a WFSResult object. - do_try_full_patterns (bool): Whether to measure feedback from the full patterns each iteration. This can - be useful to determine how many iterations are needed to converge to an optimal pattern. - do_progress_bar (bool): Whether to print a tqdm progress bar during algorithm execution. - progress_bar_kwargs (dict): Dictionary containing keyword arguments for the tqdm progress bar. - """ - self.slm = slm - self.feedback = feedback - self.phase_steps = phase_steps - self.iterations = iterations - self.slm_shape = slm_shape - self.analyzer = analyzer - self.do_try_full_patterns = do_try_full_patterns - self.do_progress_bar = do_progress_bar - self.progress_bar_kwargs = progress_bar_kwargs - - assert (phases[0].shape[0] == phases[1].shape[0]) and ( - phases[0].shape[1] == phases[1].shape[1] - ) - self.phases = (phases[0].astype(np.float32), phases[1].astype(np.float32)) - - # Pre-compute set0 mask - mask1 = set1_mask.astype(dtype=np.float32) - mask0 = 1.0 - mask1 - self.set_masks = (mask0, mask1) - - # Pre-compute the conjugate modes for reconstruction - modes0 = np.exp(-1j * self.phases[0]) * np.expand_dims(mask0, axis=2) - modes1 = np.exp(-1j * self.phases[1]) * np.expand_dims(mask1, axis=2) - self.modes = (modes0, modes1) - - def execute(self) -> WFSResult: - """ - Executes the blind focusing dual reference algorithm and compute the SLM transmission matrix. - - Returns: - WFSResult: An object containing the computed SLM transmission matrix and related data. The amplitude profile - of each mode is assumed to be 1. If a different amplitude profile is desired, this can be obtained by - multiplying that amplitude profile with this transmission matrix. - """ - - # Initial transmission matrix for reference is constant phase - t_full = np.zeros(shape=self.modes[0].shape[0:2]) - - # Initialize storage lists - t_set = t_full - t_set_all = [None] * self.iterations - results_all = [None] * self.iterations # List to store all results - results_latest = [ - None, - None, - ] # The two latest results. Used for computing fidelity factors. - full_pattern_feedback = np.zeros( - self.iterations - ) # List to store feedback from full patterns - - # Prepare progress bar - if self.do_progress_bar: - num_measurements = ( - np.ceil(self.iterations / 2) * self.modes[0].shape[2] - + np.floor(self.iterations / 2) * self.modes[1].shape[2] - ) - progress_bar = tqdm(total=num_measurements, **self.progress_bar_kwargs) - else: - progress_bar = None - - # Switch the phase sets back and forth multiple times - for it in range(self.iterations): - s = it % 2 # Set id: 0 or 1. Used to pick set A or B for phase stepping - mod_mask = self.set_masks[s] - t_prev = t_set - ref_phases = -np.angle( - t_prev - ) # Shaped reference phase pattern from transmission matrix - - # Measure and compute - result = self._single_side_experiment( - mod_phases=self.phases[s], - ref_phases=ref_phases, - mod_mask=mod_mask, - progress_bar=progress_bar, - ) - t_set = self.compute_t_set( - result, self.modes[s] - ) # Compute transmission matrix from measurements - - # Store results - t_full = t_prev + t_set - t_set_all[it] = t_set # Store transmission matrix - results_all[it] = result # Store result - results_latest[s] = result # Store latest result for this set - - # Try full pattern - if self.do_try_full_patterns: - self.slm.set_phases(-np.angle(t_full)) - full_pattern_feedback[it] = self.feedback.read() - - # Compute average fidelity factors - fidelity_noise = weighted_average( - results_latest[0].fidelity_noise, - results_latest[1].fidelity_noise, - results_latest[0].n, - results_latest[1].n, - ) - fidelity_amplitude = weighted_average( - results_latest[0].fidelity_amplitude, - results_latest[1].fidelity_amplitude, - results_latest[0].n, - results_latest[1].n, - ) - fidelity_calibration = weighted_average( - results_latest[0].fidelity_calibration, - results_latest[1].fidelity_calibration, - results_latest[0].n, - results_latest[1].n, - ) - - result = WFSResult( - t=t_full, - t_f=None, - n=self.modes[0].shape[2] + self.modes[1].shape[2], - axis=2, - fidelity_noise=fidelity_noise, - fidelity_amplitude=fidelity_amplitude, - fidelity_calibration=fidelity_calibration, - ) - - # TODO: This is a dirty way to add attributes. Find better way. - result.t_set_all = t_set_all - result.results_all = results_all - result.full_pattern_feedback = full_pattern_feedback - return result - - def _single_side_experiment( - self, - mod_phases: nd, - ref_phases: nd, - mod_mask: nd, - progress_bar: Optional[tqdm] = None, - ) -> WFSResult: - """ - Conducts experiments on one part of the SLM. - - Args: - mod_phases: 3D array containing the phase patterns of each mode. Axis 0 and 1 are used as spatial axis. - Axis 2 is used for the 'phase pattern index' or 'mode index'. - ref_phases: 2D array containing the reference phase pattern. - mod_mask: 2D array containing a mask of 1s and 0s, where 1s indicate the modulated part of the SLM. - progress_bar: An optional tqdm progress bar. - - Returns: - WFSResult: An object containing the computed SLM transmission matrix and related data. - - Note: In order to speed up calculations, I used np.float32 phases with a mask, instead of adding complex128 - fields and taking the np.angle. I did a quick test for this (on AMD Ryzen 7 3700X) for a 1000x1000 array. This - brings down the phase pattern computation time from ~26ms to ~2ms. - Code comparison: - With complex128: phase_pattern = np.angle(field_B + field_A * np.exp(step)) ~26ms per phase pattern - With float32: phase_pattern = phases_B + (phases_A + step) * mask ~2ms per phase pattern - """ - num_of_modes = mod_phases.shape[2] - measurements = np.zeros( - (num_of_modes, self.phase_steps, *self.feedback.data_shape) - ) - ref_phases_masked = ( - 1.0 - mod_mask - ) * ref_phases # Pre-compute masked reference phase pattern - - for m in range(num_of_modes): - for p in range(self.phase_steps): - phase_step = p * 2 * np.pi / self.phase_steps - phase_pattern = ref_phases_masked + mod_mask * ( - mod_phases[:, :, m] + phase_step - ) - self.slm.set_phases(phase_pattern) - self.feedback.trigger(out=measurements[m, p, ...]) - - if progress_bar is not None: - progress_bar.update() - - self.feedback.wait() - return self.analyzer(measurements, axis=1) - - @staticmethod - def compute_t_set(wfs_result: WFSResult, mode_set: nd) -> nd: - """ - Compute the transmission matrix in SLM space from transmission matrix in input mode space. - - Note 1: This function computes the transmission matrix for one mode set, and thus returns one part of the full - transmission matrix. The elements that are not part of the mode set will be 0. The full transmission matrix can - be obtained by simply adding the parts, i.e. t_full = t_set0 + t_set1. - - Note 2: As this is a blind focusing WFS algorithm, there may be only one target or 'output mode'. - - Args: - wfs_result (WFSResult): The result of the WFS algorithm. This contains the transmission matrix in the space - of input modes. - mode_set: 3D array with set of modes. - """ - t = wfs_result.t.squeeze().reshape((1, 1, mode_set.shape[2])) - norm_factor = np.prod(mode_set.shape[0:2]) - t_set = (t * mode_set).sum(axis=2) / norm_factor - return t_set diff --git a/openwfs/algorithms/dual_reference.py b/openwfs/algorithms/dual_reference.py new file mode 100644 index 0000000..1f6fd92 --- /dev/null +++ b/openwfs/algorithms/dual_reference.py @@ -0,0 +1,299 @@ +from typing import Optional + +import numpy as np +from numpy import ndarray as nd + +from .utilities import analyze_phase_stepping, WFSResult +from ..core import Detector, PhaseSLM + + +class DualReference: + """ + A generic iterative dual reference WFS algorithm, which can use a custom set of basis functions. + + This algorithm is adapted from [1], with the addition of the ability to use custom basis functions and specify the number of iterations. + + In this algorithm, the SLM pixels are divided into two groups: A and B, as indicated by the boolean group_mask argument. + The algorithm first keeps the pixels in group B fixed, and displays a sequence on patterns on the pixels of group A. + It uses these measurements to construct an optimized wavefront that is displayed on the pixels of group A. + This process is then repeated for the pixels of group B, now using the *optimized* wavefront on group A as reference. + Optionally, the process can be repeated for a number of iterations, which each iteration using the current correction + pattern as a reference. This makes this algorithm suitable for non-linear feedback, such as multi-photon + excitation fluorescence [2]. + + This algorithm assumes a phase-only SLM. Hence, the input modes are defined by passing the corresponding phase + patterns (in radians) as input argument. + + [1]: X. Tao, T. Lam, B. Zhu, et al., “Three-dimensional focusing through scattering media using conjugate adaptive + optics with remote focusing (CAORF),” Opt. Express 25, 10368–10383 (2017). + + [2]: Gerwin Osnabrugge, Lyubov V. Amitonova, and Ivo M. Vellekoop. "Blind focusing through strongly scattering media + using wavefront shaping with nonlinear feedback", Optics Express, 27(8):11673–11688, 2019. + https://opg.optica.org/oe/ abstract.cfm?uri=oe-27-8-1167 + """ + + def __init__( + self, + *, + feedback: Detector, + slm: PhaseSLM, + phase_patterns: Optional[tuple[nd, nd]], + group_mask: nd, + phase_steps: int = 4, + iterations: int = 2, + analyzer: Optional[callable] = analyze_phase_stepping, + optimized_reference: Optional[bool] = None + ): + """ + Args: + feedback: The feedback source, usually a detector that provides measurement data. + slm: Spatial light modulator object. + phase_patterns: A tuple of two 3D arrays, containing the phase patterns for group A and group B, respectively. + The first two dimensions are the spatial dimensions, and should match the size of group_mask. + The 3rd dimension in the array is index of the phase pattern. The number of phase patterns in A and B may be different. + When None, the phase_patterns attribute must be set before executing the algorithm. + group_mask: A 2D bool array of that defines the pixels used by group A with False and elements used by + group B with True. + phase_steps: The number of phase steps for each mode (default is 4). Depending on the type of + non-linear feedback and the SNR, more might be required. + iterations: Number of times to optimize a mode set, e.g. when iterations = 5, the measurements are + A, B, A, B, A. + optimized_reference: When `True`, during each iteration the other half of the SLM displays the optimized pattern so far (as in [1]). + When `False`, the algorithm optimizes A with a flat wavefront on B, and then optimizes B with a flat wavefront on A. + This mode also allows for multi-target optimization, where the algorithm optimizes multiple targets in parallel. + The two halves are then combined (stitched) to form the full transmission matrix. + In this mode, it is essential that both A and B include a flat wavefront as mode 0. The measurement for + mode A0 and for B0 both give contain relative phase between group A and B, so there is a slight redundancy. + These two measurements are combined to find the final phase for stitching. + When set to `None` (default), the algorithm uses True if there is a single target, and False if there are multiple targets. + + analyzer: The function used to analyze the phase stepping data. + Must return a WFSResult object. Defaults to `analyze_phase_stepping` + + [1]: X. Tao, T. Lam, B. Zhu, et al., “Three-dimensional focusing through scattering media using conjugate adaptive + optics with remote focusing (CAORF),” Opt. Express 25, 10368–10383 (2017). + + """ + if optimized_reference is None: # 'auto' mode + optimized_reference = np.prod(feedback.data_shape) == 1 + elif optimized_reference and np.prod(feedback.data_shape) != 1: + raise ValueError( + "When using an optimized reference, the feedback detector should return a single scalar value." + ) + + if iterations < 2: + raise ValueError("The number of iterations must be at least 2.") + if not optimized_reference and iterations != 2: + raise ValueError( + "When not using an optimized reference, the number of iterations must be 2." + ) + + self.slm = slm + self.feedback = feedback + self.phase_steps = phase_steps + self.optimized_reference = optimized_reference + self.iterations = iterations + self._analyzer = analyzer + self._phase_patterns = None + self._shape = group_mask.shape + mask = group_mask.astype(bool) + self.masks = ( + ~mask, + mask, + ) # mask[0] is True for group A, mask[1] is True for group B + self.phase_patterns = phase_patterns + + @property + def phase_patterns(self) -> tuple[nd, nd]: + return self._phase_patterns + + @phase_patterns.setter + def phase_patterns(self, value): + """Sets the phase patterns for group A and group B. This also updates the conjugate modes.""" + if value is None: + self._phase_patterns = None + return + + if not self.optimized_reference: + # find the modes in A and B that correspond to flat wavefronts with phase 0 + try: + a0_index = next( + i + for i in range(value[0].shape[2]) + if np.allclose(value[0][:, :, i], 0) + ) + b0_index = next( + i + for i in range(value[1].shape[2]) + if np.allclose(value[1][:, :, i], 0) + ) + self.zero_indices = (a0_index, b0_index) + except StopIteration: + raise ( + "For multi-target optimization, the both sets must contain a flat wavefront with phase 0." + ) + + if (value[0].shape[0:2] != self._shape) or (value[1].shape[0:2] != self._shape): + raise ValueError( + "The phase patterns and group mask must all have the same shape." + ) + + self._phase_patterns = ( + value[0].astype(np.float32), + value[1].astype(np.float32), + ) + + def execute( + self, capture_intermediate_results: bool = False, progress_bar=None + ) -> WFSResult: + """ + Executes the blind focusing dual reference algorithm and compute the SLM transmission matrix. + capture_intermediate_results: When True, measures the feedback from the optimized wavefront after each iteration. + This can be useful to determine how many iterations are needed to converge to an optimal pattern. + This data is stored as the 'intermediate_results' field in the results + progress_bar: Optional progress bar object. Following the convention for tqdm progress bars, + this object should have a `total` attribute and an `update()` function. + + Returns: + WFSResult: An object containing the computed SLM transmission matrix and related data. The amplitude profile + of each mode is assumed to be 1. If a different amplitude profile is desired, this can be obtained by + multiplying that amplitude profile with this transmission matrix. + """ + + # Current estimate of the transmission matrix (start with all 0) + cobasis = [ + np.exp(-1j * self.phase_patterns[side]) + * np.expand_dims(self.masks[side], axis=2) + for side in range(2) + ] + + ref_phases = np.zeros(self._shape) + + # Initialize storage lists + results_all = [None] * self.iterations # List to store all results + intermediate_results = np.zeros( + self.iterations + ) # List to store feedback from full patterns + + # Prepare progress bar + if progress_bar: + num_measurements = ( + np.ceil(self.iterations / 2) * self.phase_patterns[0].shape[2] + + np.floor(self.iterations / 2) * self.phase_patterns[1].shape[2] + ) + progress_bar.total = num_measurements + + # Switch the phase sets back and forth multiple times + for it in range(self.iterations): + side = it % 2 # pick set A or B for phase stepping + side_mask = self.masks[side] + # Perform WFS experiment on one side, keeping the other side sized at the ref_phases + results_all[it] = self._single_side_experiment( + mod_phases=self.phase_patterns[side], + ref_phases=ref_phases, + mod_mask=side_mask, + progress_bar=progress_bar, + ) + + # Compute transmission matrix for the current side and update + # estimated transmission matrix + + if self.optimized_reference: + # use the best estimate so far to construct an optimized reference + t_this_side = self.compute_t_set( + results_all[it].t, cobasis[side] + ).squeeze() + ref_phases[self.masks[side]] = -np.angle(t_this_side[self.masks[side]]) + + # Try full pattern + if capture_intermediate_results: + self.slm.set_phases(ref_phases) + intermediate_results[it] = self.feedback.read() + + if self.optimized_reference: + factor = 1.0 + else: + # when not using optimized reference, we need to stitch the + # two halves of the wavefront together. For that, we need the + # relative phase between the two sides, which we extract from + # the measurements of the flat wavefronts. + relative = results_all[0].t[self.zero_indices[0], ...] + np.conjugate( + results_all[1].t[self.zero_indices[1], ...] + ) + factor = (relative / np.abs(relative)).reshape( + (1, *self.feedback.data_shape) + ) + + t_full = self.compute_t_set(results_all[0].t, cobasis[0]) + self.compute_t_set( + factor * results_all[1].t, cobasis[1] + ) + + # Compute average fidelity factors + # subtract 1 from n, because both sets (usually) contain a flat wavefront, + # so there is one redundant measurement + result = WFSResult.combine(results_all[-2:]) + result.n = result.n - 1 + result.t = t_full + + # TODO: document the results_all attribute + result.results_all = results_all + result.intermediate_results = intermediate_results + return result + + def _single_side_experiment( + self, mod_phases: nd, ref_phases: nd, mod_mask: nd, progress_bar=None + ) -> WFSResult: + """ + Conducts experiments on one part of the SLM. + + Args: + mod_phases: 3D array containing the phase patterns of each mode. Axis 0 and 1 are used as spatial axis. + Axis 2 is used for the 'phase pattern index' or 'mode index'. + ref_phases: 2D array containing the reference phase pattern. + mod_mask: 2D array containing a boolean mask, where True indicates the modulated part of the SLM. + progress_bar: Optional progress bar object. Following the convention for tqdm progress bars, + this object should have a `total` attribute and an `update()` function. + + Returns: + WFSResult: An object containing the computed SLM transmission matrix and related data. + """ + num_modes = mod_phases.shape[2] + measurements = np.zeros( + (num_modes, self.phase_steps, *self.feedback.data_shape) + ) + + for m in range(num_modes): + phases = ref_phases.copy() + modulated = mod_phases[:, :, m] + for p in range(self.phase_steps): + phi = p * 2 * np.pi / self.phase_steps + # set the modulated pixel values to the values corresponding to mode m and phase offset phi + phases[mod_mask] = modulated[mod_mask] + phi + self.slm.set_phases(phases) + self.feedback.trigger(out=measurements[m, p, ...]) + + if progress_bar is not None: + progress_bar.update() + + self.feedback.wait() + return self._analyzer(measurements, axis=1) + + @staticmethod + def compute_t_set(t, cobasis: nd) -> nd: + """ + Compute the transmission matrix in SLM space from transmission matrix in input mode space. + + Note 1: This function computes the transmission matrix for one mode set, and thus returns one part of the full + transmission matrix. The elements that are not part of the mode set will be 0. The full transmission matrix can + be obtained by simply adding the parts, i.e. t_full = t_set0 + t_set1. + + Note 2: As this is a blind focusing WFS algorithm, there may be only one target or 'output mode'. + + Args: + t: transmission matrix in mode-index space. The first axis corresponds to the input modes. + cobasis: 3D array with set of modes (conjugated) + Returns: + nd: The transmission matrix in SLM space. The last two axes correspond to SLM coordinates + """ + norm_factor = np.prod(cobasis.shape[0:2]) + return np.tensordot(cobasis, t, 1) / norm_factor diff --git a/openwfs/algorithms/fourier.py b/openwfs/algorithms/fourier.py deleted file mode 100644 index 65b0228..0000000 --- a/openwfs/algorithms/fourier.py +++ /dev/null @@ -1,239 +0,0 @@ -from typing import Optional - -import numpy as np - -from .utilities import analyze_phase_stepping, WFSResult -from ..core import Detector, PhaseSLM -from ..utilities.patterns import tilt - - -class FourierBase: - """Base class definition for the Fourier algorithms as described in [1]. - - This algorithm optimises the wavefront in a Fourier-basis. The modes that are tested are provided into a 'k-space' - of which each 'k-vector' represents a certain angled wavefront that will be tested. (more detailed explanation is - found in _get_phase_pattern). - - As described in [1], these modes are measured by interfering a certain mode on one half of the SLM with a - 'reference beam'. This is done by not modulating the other half of the SLM. In order to find a full corrective - wavefront therefore, the experiment has to be repeated twice for each side of the SLM. Finally, the two wavefronts - are combined. - - [1]: Bahareh Mastiani, Gerwin Osnabrugge, and Ivo M. Vellekoop, - "Wavefront shaping for forward scattering," Opt. Express 30, 37436-37445 (2022) - """ - - def __init__( - self, - feedback: Detector, - slm: PhaseSLM, - slm_shape: tuple[int, int], - k_left: np.ndarray, - k_right: np.ndarray, - phase_steps: int = 4, - analyzer: Optional[callable] = analyze_phase_stepping, - ): - """ - - Args: - feedback (Detector): The feedback source, usually a detector that provides measurement data. - slm (PhaseSLM): slm object. - The slm may have the `extent` property set to indicate the extent of the back pupil of the microscope - objective in slm coordinates. By default, a value of 2.0, 2.0 is used (indicating that the pupil - corresponds to a circle of radius 1.0 on the SLM). However, to prevent artefacts at the edges of the - SLM,it may be overfilled, such that the `phases` image is mapped to an extent of e. g. (2.2, 2.2), i. e. - 10% larger than the back pupil. - slm_shape (tuple[int, int]): The shape of the SLM patterns and transmission matrices. - k_left (numpy.ndarray): 2-row matrix containing the y, and x components of the spatial frequencies - used as a basis for the left-hand side of the SLM. - The frequencies are defined such that a frequency of (1,0) or (0,1) corresponds to - a phase gradient of -π to π over the back pupil of the microscope objective, which results in - a displacement in the focal plane of exactly a distance corresponding to the Abbe diffraction limit. - k_right (numpy.ndarray): 2-row matrix containing the y and x components of the spatial frequencies - for the right-hand side of the SLM. - The number of frequencies need not be equal for k_left and k_right. - phase_steps (int): The number of phase steps for each mode (default is 4). - analyzer (callable): The function used to analyze the phase stepping data. Must return a WFSResult object. - """ - self._execute_button = False - self.slm = slm - self.feedback = feedback - self.phase_steps = phase_steps - self.k_left = k_left - self.k_right = k_right - self.slm_shape = slm_shape - self.analyzer = analyzer - - def execute(self) -> WFSResult: - """ - Executes the FourierDualRef algorithm, computing the SLM transmission matrix. - - Returns: - WFSResult: An object containing the computed SLM transmission matrix and related data. - """ - # left side experiment - data_left = self._single_side_experiment(self.k_left, 0) - - # right side experiment - data_right = self._single_side_experiment(self.k_right, 1) - - # Compute transmission matrix (=field at SLM), as well as noise statistics - results = self.compute_t(data_left, data_right, self.k_left, self.k_right) - results.left = data_left - results.right = data_right - results.k_left = self.k_left - results.k_right = self.k_right - return results - - def _single_side_experiment(self, k_set: np.ndarray, side: int) -> WFSResult: - """ - Conducts experiments on one side of the SLM, generating measurements for each spatial frequency and phase step. - - Args: - k_set (np.ndarray): An array of spatial frequencies to use in the experiment. - side (int): Indicates which side of the SLM to use (0 for the left hand side, 1 for right hand side). - - Returns: - WFSResult: An object containing the computed SLM transmission matrix and related data. - """ - measurements = np.zeros( - (k_set.shape[1], self.phase_steps, *self.feedback.data_shape) - ) - - for i in range(k_set.shape[1]): - for p in range(self.phase_steps): - phase_offset = p * 2 * np.pi / self.phase_steps - phase_pattern = self._get_phase_pattern(k_set[:, i], phase_offset, side) - self.slm.set_phases(phase_pattern) - self.feedback.trigger(out=measurements[i, p, ...]) - - self.feedback.wait() - return self.analyzer(measurements, axis=1) - - def _get_phase_pattern( - self, k: np.ndarray, phase_offset: float, side: int - ) -> np.ndarray: - """ - Generates a phase pattern for the SLM based on the given spatial frequency, phase offset, and side. - - Args: - k (np.ndarray): A 2-element array representing the spatial frequency over the whole pupil plane. - phase_offset (float): The phase offset to apply to the pattern. - side (int): Indicates the side of the SLM for the pattern (0 for left, 1 for right). - - Returns: - np.ndarray: The generated phase pattern. - """ - # tilt generates a pattern from -2.0 to 2.0 (The convention for Zernike modes normalized to an RMS of 1). - # The natural step to take is the Abbe diffraction limit of the modulated part, which corresponds to a gradient - # from -π to π over the modulated part. - num_columns = self.slm_shape[1] // 2 - tilted_front = tilt( - [self.slm_shape[0], num_columns], - k * (0.5 * np.pi), - extent=(2.0, 1.0), - phase_offset=phase_offset, - ) - - # Handle side-dependent pattern - - empty_part = np.zeros((self.slm_shape[0], self.slm_shape[1] - num_columns)) - - # Concatenate based on the side - if side == 0: - # Place the pattern on the left - result = np.concatenate((tilted_front, empty_part), axis=1) - else: - # Place the pattern on the right - result = np.concatenate((empty_part, tilted_front), axis=1) - - return result - - def compute_t( - self, left: WFSResult, right: WFSResult, k_left, k_right - ) -> WFSResult: - """ - Computes the SLM transmission matrix by combining the Fourier transmission matrices from both sides of the SLM. - - Args: - left (WFSResult): The wavefront shaping result for the left side. - right (WFSResult): The wavefront shaping result for the right side. - k_left (np.ndarray): The spatial frequency matrix for the left side. - k_right (np.ndarray): The spatial frequency matrix for the right side. - - Returns: - WFSResult: An object containing the combined transmission matrix and related statistics. - """ - - # TODO: determine noise - # Initialize transmission matrices - t1 = np.zeros((*self.slm_shape, *self.feedback.data_shape), dtype="complex128") - t2 = np.zeros((*self.slm_shape, *self.feedback.data_shape), dtype="complex128") - - # Calculate phase difference between the two halves - # We have two phase stepping measurements where both halves are flat (k=0) - # Locate these measurements, and use the phase difference between them to connect the two halves - # of the corrected wavefront. - - # Find the index of the (0,0) mode in k_left and k_right - index_0_left = np.argmin(k_left[0] ** 2 + k_left[1] ** 2) - index_0_right = np.argmin(k_right[0] ** 2 + k_left[1] ** 2) - if not np.all(k_left[:, index_0_left] == 0.0) or not np.all( - k_right[:, index_0_right] == 0.0 - ): - raise Exception( - "k=(0,0) component missing from the measurement set, cannot determine relative phase." - ) - - # average the measurements for better accuracy - # TODO: absolute values are not the same in simulation, 'A' scaling is off? - relative = 0.5 * ( - left.t[index_0_left, ...] + np.conjugate(right.t[index_0_right, ...]) - ) - - # Apply phase correction to the right side - phase_correction = relative / np.abs(relative) - - # Construct the transmission matrices - normalisation = 1.0 / (0.5 * self.slm_shape[0] * self.slm_shape[1]) - for n, t in enumerate(left.t): - phi = self._get_phase_pattern(k_left[:, n], 0, 0) - t1 += np.tensordot(np.exp(-1j * phi), t * normalisation, 0) - - for n, t in enumerate(right.t): - phi = self._get_phase_pattern(k_right[:, n], 0, 1) - t2 += np.tensordot( - np.exp(-1j * phi), t * (normalisation * phase_correction), 0 - ) - - # Combine the left and right sides - t_full = np.concatenate( - [ - t1[:, : self.slm_shape[0] // 2, ...], - t2[:, self.slm_shape[0] // 2 :, ...], - ], - axis=1, - ) - t_f_full = np.concatenate( - [left.t_f, right.t_f], axis=1 - ) # also store raw data (not normalized or corrected yet!) - - # return combined result, along with a course estimate of the snr and expected enhancement - # TODO: not accurate yet - # for the estimated_improvement, first convert to field improvement, then back to intensity improvement - def weighted_average(x_left, x_right): - return (left.n * x_left + right.n * x_right) / (left.n + right.n) - - return WFSResult( - t=t_full, - t_f=t_f_full, - n=left.n + right.n, - axis=2, - fidelity_noise=weighted_average(left.fidelity_noise, right.fidelity_noise), - fidelity_amplitude=weighted_average( - left.fidelity_amplitude, right.fidelity_amplitude - ), - fidelity_calibration=weighted_average( - left.fidelity_calibration, right.fidelity_calibration - ), - ) diff --git a/openwfs/algorithms/ssa.py b/openwfs/algorithms/ssa.py index 7403c31..4e3eeee 100644 --- a/openwfs/algorithms/ssa.py +++ b/openwfs/algorithms/ssa.py @@ -1,6 +1,7 @@ import numpy as np -from ..core import Detector, PhaseSLM + from .utilities import analyze_phase_stepping, WFSResult +from ..core import Detector, PhaseSLM class StepwiseSequential: diff --git a/openwfs/algorithms/utilities.py b/openwfs/algorithms/utilities.py index ec2d040..f79b7f8 100644 --- a/openwfs/algorithms/utilities.py +++ b/openwfs/algorithms/utilities.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Optional +from typing import Optional, Sequence import numpy as np from numpy.typing import ArrayLike @@ -130,6 +130,37 @@ def select_target(self, b) -> "WFSResult": n=self.n, ) + @staticmethod + def combine(results: Sequence["WFSResult"]): + """Merges the results for several sub-experiments. + + Currently, this just computes the average of the fidelities, weighted + by the number of segments used in each sub-experiment. + + Note: the matrix t is also averaged, but this is not always meaningful. + The caller can replace the `.t` attribute of the result with a more meaningful value. + """ + n = sum(r.n for r in results) + axis = results[0].axis + if any(r.axis != axis for r in results): + raise ValueError("All results must have the same axis") + + def weighted_average(attribute): + data = getattr(results[0], attribute) * results[0].n / n + for r in results[1:]: + data += getattr(r, attribute) * r.n / n + return data + + return WFSResult( + t=weighted_average("t"), + t_f=weighted_average("t_f"), + n=n, + axis=axis, + fidelity_noise=weighted_average("fidelity_noise"), + fidelity_amplitude=weighted_average("fidelity_amplitude"), + fidelity_calibration=weighted_average("fidelity_calibration"), + ) + def analyze_phase_stepping( measurements: np.ndarray, axis: int, A: Optional[float] = None @@ -212,15 +243,20 @@ def analyze_phase_stepping( # (which occurs twice, ideally in the +1 and -1 components of the Fourier transform), # but this factor of two is already included in the 'signal_energy' calculation. # an offset, and the rest is noise. - offset_energy = np.sum(np.take(t_f, 0, axis=axis) ** 2) - total_energy = np.sum(np.abs(t_f) ** 2) - + # average over all targets to get the most accurate result (assuming all targets are similar) + axes = tuple([i for i in range(t_f.ndim) if i != axis]) + energies = np.sum(np.abs(t_f) ** 2, axis=axes) + offset_energy = energies[0] + total_energy = np.sum(energies) + signal_energy = energies[1] + energies[-1] if phase_steps > 3: + # estimate the noise energy as the energy that is not explained + # by the signal or the offset. noise_energy = (total_energy - signal_energy - offset_energy) / ( phase_steps - 3 ) noise_factor = np.abs( - np.maximum(signal_energy - noise_energy, 0.0) / signal_energy + np.maximum(signal_energy - 2 * noise_energy, 0.0) / signal_energy ) else: noise_factor = 1.0 # cannot estimate reliably diff --git a/openwfs/core.py b/openwfs/core.py index 4b2756f..3151216 100644 --- a/openwfs/core.py +++ b/openwfs/core.py @@ -559,6 +559,11 @@ class Processor(Detector, ABC): The `latency` and `duration` properties are computed from the latency and duration of the inputs and cannot be set. By default, the `pixel_size` and `data_shape` are the same as the `pixel_size` and `data_shape` of the first input. To override this behavior, override the `pixel_size` and `data_shape` properties. + + Args: + multi_threaded: If True, `_fetch` is called from a worker thread. Otherwise, `_fetch` is called + directly from `trigger`. If the device is not thread-safe, or threading provides no benefit, + or for easy debugging, set this to False. """ def __init__(self, *args, multi_threaded: bool): diff --git a/openwfs/devices/nidaq_gain.py b/openwfs/devices/nidaq_gain.py index b3c6dea..b0cb23c 100644 --- a/openwfs/devices/nidaq_gain.py +++ b/openwfs/devices/nidaq_gain.py @@ -1,9 +1,9 @@ -import nidaqmx as ni -from nidaqmx.constants import LineGrouping -from typing import Annotated import time + import astropy.units as u +import nidaqmx as ni from astropy.units import Quantity +from nidaqmx.constants import LineGrouping class Gain: diff --git a/openwfs/simulation/mockdevices.py b/openwfs/simulation/mockdevices.py index 4dedeb1..1117e61 100644 --- a/openwfs/simulation/mockdevices.py +++ b/openwfs/simulation/mockdevices.py @@ -215,6 +215,11 @@ def digital_max(self) -> int: """ return self._digital_max + @property + def conversion_factor(self) -> float: + """Conversion factor between analog and digital values.""" + return self.digital_max / self.analog_max + @digital_max.setter def digital_max(self, value): if value < 0 or value > 0xFFFF: @@ -383,3 +388,36 @@ def open(self, value: bool): def _fetch(self, source: np.ndarray) -> np.ndarray: # noqa return source if self._open else 0.0 * source + + +class GaussianNoise(Processor): + """Adds gaussian noise of a specified standard deviation to the signal + Args: + source (Detector): The source detector object to process the data from. + std (float): The standard deviation of the gaussian noise. + multi_threaded: Whether to perform processing in a worker thread. + """ + + def __init__(self, source: Detector, std: float, multi_threaded: bool = True): + super().__init__(source, multi_threaded=multi_threaded) + self._std = std + + @property + def std(self) -> float: + return self._std + + @std.setter + def std(self, value: float): + if value < 0.0: + raise ValueError("Standard deviation must be non-negative") + self._std = float(value) + + def _fetch(self, data: np.ndarray) -> np.ndarray: # noqa + """ + Args: + data (ndarray): source data + + Returns: the out array containing the image with added noise. + + """ + return data + np.random.normal(0.0, self.std, data.shape) diff --git a/openwfs/simulation/transmission.py b/openwfs/simulation/transmission.py index 6691fa2..d3b3ec4 100644 --- a/openwfs/simulation/transmission.py +++ b/openwfs/simulation/transmission.py @@ -50,13 +50,13 @@ def __init__( """ # transmission matrix (normalized so that the maximum transmission is 1) - self._t = ( + self.t = ( t if t is not None else np.exp(1.0j * aberrations) / (aberrations.shape[0] * aberrations.shape[1]) ) - self.slm = slm if slm is not None else SLM(self._t.shape[0:2]) + self.slm = slm if slm is not None else SLM(self.t.shape[0:2]) super().__init__(self.slm.field, multi_threaded=multi_threaded) self.beam_amplitude = beam_amplitude @@ -76,9 +76,9 @@ def _fetch(self, incident_field): # noqa np.ndarray: A numpy array containing the calculated intensity in the focus. """ - field = np.tensordot(incident_field * self.beam_amplitude, self._t, 2) + field = np.tensordot(incident_field * self.beam_amplitude, self.t, 2) return np.abs(field) ** 2 @property def data_shape(self): - return self._t.shape[2:] + return self.t.shape[2:] diff --git a/pyproject.toml b/pyproject.toml index ee43077..094e472 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,14 @@ classifiers = [ 'License :: OSI Approved :: BSD License', 'Operating System :: OS Independent', ] +packages = [ + { include = "openwfs" }, + { include = "openwfs.algorithms" }, + { include = "openwfs.devices" }, + { include = "openwfs.processors" }, + { include = "openwfs.simulation" }, + { include = "openwfs.utilities" } +] [build-system] requires = ["poetry-core"] @@ -25,7 +33,7 @@ numpy = ">=1.25.2" astropy = ">=5.3.4" glfw = ">=2.5.9" opencv-python = ">=4.9.0.80" -matplotlib = ">=3.7.3" +matplotlib = ">=3.7.3" # TODO: remove dependency? scipy = ">=1.11.3" annotated-types = "~0.7.0" tqdm = "^4.66.2" # TODO: remove dependency @@ -51,6 +59,7 @@ optional = true scikit-image = ">=0.21.0" pytest = "~7.0.0" nidaq = "nidaqmx >=0.8.0" # we can test without the hardware, but still need the package +black = ">=24.0.0" # code formatter [tool.poetry.group.docs] optional = true diff --git a/tests/test_core.py b/tests/test_core.py index 89502d8..a089f27 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,11 +1,13 @@ import logging import time + +import astropy.units as u +import numpy as np import pytest + +from ..openwfs.processors import CropProcessor from ..openwfs.simulation import StaticSource, NoiseSource, SLM from ..openwfs.utilities import set_pixel_size, get_pixel_size -from ..openwfs.processors import CropProcessor -import numpy as np -import astropy.units as u def test_set_pixel_size(): diff --git a/tests/test_processors.py b/tests/test_processors.py index 4a847a4..591a2c5 100644 --- a/tests/test_processors.py +++ b/tests/test_processors.py @@ -1,9 +1,10 @@ -import pytest +import astropy.units as u import numpy as np -from ..openwfs.simulation.mockdevices import StaticSource -from ..openwfs.processors import SingleRoi, select_roi, Roi, MultipleRoi +import pytest import skimage as sk -import astropy.units as u + +from ..openwfs.processors import SingleRoi, select_roi, Roi, MultipleRoi +from ..openwfs.simulation.mockdevices import StaticSource @pytest.mark.skip( diff --git a/tests/test_utilities.py b/tests/test_utilities.py index ea5d698..f6766cc 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -1,4 +1,6 @@ +import astropy.units as u import numpy as np + from ..openwfs.utilities import ( set_pixel_size, get_pixel_size, @@ -6,7 +8,6 @@ Transform, project, ) -import astropy.units as u def test_to_matrix(): diff --git a/tests/test_wfs.py b/tests/test_wfs.py index 10244fe..f134287 100644 --- a/tests/test_wfs.py +++ b/tests/test_wfs.py @@ -2,207 +2,160 @@ import numpy as np import pytest import skimage -from numpy import ndarray as nd from scipy.linalg import hadamard from scipy.ndimage import zoom -from skimage.transform import resize - -from ..openwfs.algorithms import ( - StepwiseSequential, - FourierDualReference, - FourierDualReferenceCircle, - CustomIterativeDualReference, - troubleshoot, -) + +from ..openwfs.algorithms import StepwiseSequential, FourierDualReference, DualReference from ..openwfs.algorithms.troubleshoot import field_correlation from ..openwfs.algorithms.utilities import WFSController from ..openwfs.processors import SingleRoi -from ..openwfs.simulation import ( - SimulatedWFS, - StaticSource, - SLM, - Microscope, - ADCProcessor, - Shutter, -) -from ..openwfs.utilities import set_pixel_size, tilt - - -def assert_enhancement(slm, feedback, wfs_results, t_correct=None): - """Helper function to check if the intensity in the target focus increases as much as expected""" - optimised_wf = -np.angle(wfs_results.t) - slm.set_phases(0.0) - before = feedback.read() - slm.set_phases(optimised_wf) - after = feedback.read() - ratio = after / before - estimated_ratio = wfs_results.estimated_optimized_intensity / before - print(f"expected: {estimated_ratio}, actual: {ratio}") - assert ( - estimated_ratio * 0.5 <= ratio <= estimated_ratio * 2.0 - ), f""" - The SSA algorithm did not enhance the focus as much as expected. - Expected at least 0.5 * {estimated_ratio}, got {ratio}""" - - if t_correct is not None: - # Check if we correctly measured the transmission matrix. - # The correlation will be less for fewer segments, hence an (ad hoc) factor of 2/sqrt(n) - t = wfs_results.t[:] - corr = np.abs( - np.vdot(t_correct, t) - / np.sqrt(np.vdot(t_correct, t_correct) * np.vdot(t, t)) - ) - assert corr > 1.0 - 2.0 / np.sqrt(wfs_results.n) +from ..openwfs.simulation import SimulatedWFS, StaticSource, SLM, Microscope +from ..openwfs.simulation.mockdevices import GaussianNoise -def half_plane_wave_basis(N1: int, N2: int) -> nd: +@pytest.mark.parametrize("shape", [(4, 7), (10, 7), (20, 31)]) +@pytest.mark.parametrize("noise", [0.0, 0.1]) +@pytest.mark.parametrize("algorithm", ["ssa", "fourier"]) +def test_multi_target_algorithms(shape, noise: float, algorithm: str): """ - Create a plane wave basis for one half of the SLM. + Test the multi-target capable algorithms (SSA and Fourier dual ref). - N1: shape[0] of SLM pattern half - N2: shape[1] of SLM pattern half + This tests checks if the algorithm achieves the theoretical enhancement, + and it also verifies that the enhancement and noise fidelity + are estimated correctly by the algorithm. """ - M = N1 * N2 - return np.fft.fft2(np.eye(M).reshape((N1, N2, M)), axes=(0, 1)) + np.random.seed(42) # for reproducibility + M = 100 # number of targets + phase_steps = 6 -def half_hadamard_basis(N1: int, N2: int) -> nd: - """ - Create a Hadamard basis for one half of the SLM. N1 and N2 must be powers of 2. - - N1: shape[0] of SLM pattern half - N2: shape[1] of SLM pattern half - """ - M = N1 * N2 - return hadamard(M, dtype=np.complex128).reshape((N1, N2, M)) - - -def half_split_mask(N1: int, N2: int) -> nd: - """ - Create a mask that splits the slm in the center in a left and right half. - - N1: shape[0] of slm pattern half - N2: shape[1] of slm pattern half - """ - return np.concatenate((np.zeros((N1, N2)), np.ones((N1, N2))), axis=1) - - -@pytest.mark.parametrize("n_y, n_x", [(5, 5), (7, 11), (6, 4), (30, 20)]) -def test_ssa(n_y, n_x): - """ - Test the enhancement performance of the SSA algorithm. - Note, for low N, the improvement estimate is not accurate, - and the test may sometimes fail due to statistical fluctuations. - """ - aberrations = np.random.uniform(0.0, 2 * np.pi, (n_y, n_x)) - sim = SimulatedWFS(aberrations=aberrations) - alg = StepwiseSequential(feedback=sim, slm=sim.slm, n_x=n_x, n_y=n_y, phase_steps=4) + # create feedback object, with noise if needed + sim = SimulatedWFS(t=random_transmission_matrix((*shape, M))) + sim.slm.set_phases(0.0) + I_0 = np.mean(sim.read()) + feedback = GaussianNoise(sim, std=I_0 * noise) + + if algorithm == "ssa": + alg = StepwiseSequential( + feedback=feedback, + slm=sim.slm, + n_x=shape[1], + n_y=shape[0], + phase_steps=phase_steps, + ) + N = np.prod(shape) # number of input modes + alg_fidelity = (N - 1) / N # SSA is inaccurate if N is low + signal = (N - 1) / N**2 # for estimating SNR + else: # 'fourier': + alg = FourierDualReference( + feedback=feedback, + slm=sim.slm, + slm_shape=shape, + k_radius=(np.min(shape) - 1) // 2, + phase_steps=phase_steps, + ) + N = ( + alg.phase_patterns[0].shape[2] + alg.phase_patterns[1].shape[2] + ) # number of input modes + alg_fidelity = 1.0 # Fourier is accurate for any N + signal = 1 / 2 # for estimating SNR. + + # Execute the algorithm to get the optimized wavefront + # for all targets simultaneously result = alg.execute() - print(np.mean(np.abs(result.t))) - assert_enhancement(sim.slm, sim, result, np.exp(1j * aberrations)) + # Determine the optimized intensities in each of the targets individually + # Also estimate the fidelity of the transmission matrix reconstruction + # This fidelity is determined row by row, since we need to compensate + # the unknown phases. The normalization of the correlation function + # is performed on all rows together, not per row, to increase + # the accuracy of the estimate. + I_opt = np.zeros((M,)) + t_correlation = 0.0 + t_norm = 0.0 + for b in range(M): + sim.slm.set_phases(-np.angle(result.t[:, :, b])) + I_opt[b] = feedback.read()[b] + t_correlation += abs(np.vdot(result.t[:, :, b], sim.t[:, :, b])) ** 2 + t_norm += abs( + np.vdot(result.t[:, :, b], result.t[:, :, b]) + * np.vdot(sim.t[:, :, b], sim.t[:, :, b]) + ) + t_correlation /= t_norm -@pytest.mark.parametrize("n_y, n_x", [(5, 5), (7, 11), (6, 4)]) -def test_ssa_noise(n_y, n_x): - """ - Test the enhancement prediction with noisy SSA. + # a correlation of 1 means optimal reconstruction of the N modulated modes, which may be less than the total number of inputs in the transmission matrix + t_correlation *= np.prod(shape) / N - Note: this test fails if a smooth image is shown, indicating that the estimators - only work well for strong scattering at the moment. - """ - generator = np.random.default_rng(seed=12345) - aberrations = generator.uniform(0.0, 2 * np.pi, (n_y, n_x)) - sim_no_noise = SimulatedWFS(aberrations=aberrations) - slm = sim_no_noise.slm - scale = np.max(sim_no_noise.read()) - sim = ADCProcessor( - sim_no_noise, - analog_max=scale * 200.0, - digital_max=10000, - shot_noise=True, - generator=generator, + # Check the enhancement, noise fidelity and + # the fidelity of the transmission matrix reconstruction + theoretical_noise_fidelity = signal / (signal + noise**2 / phase_steps) + enhancement = I_opt.mean() / I_0 + theoretical_enhancement = ( + np.pi / 4 * theoretical_noise_fidelity * alg_fidelity * (N - 1) + 1 ) - alg = StepwiseSequential(feedback=sim, slm=slm, n_x=n_x, n_y=n_y, phase_steps=10) - result = alg.execute() - print(result.fidelity_noise) - - assert_enhancement(slm, sim, result) - - -def test_ssa_enhancement(): - input_shape = (40, 40) - output_shape = (200, 200) # todo: resize - rng = np.random.default_rng(seed=12345) - - def get_random_aberrations(): - return resize(rng.uniform(size=input_shape) * 2 * np.pi, output_shape, order=0) - - # Define mock hardware and algorithm - slm = SLM(shape=output_shape) - - # Find average background intensity - unshaped_intensities = np.zeros((30,)) - for n in range(len(unshaped_intensities)): - signal = SimulatedWFS(aberrations=get_random_aberrations(), slm=slm) - unshaped_intensities[n] = signal.read() + estimated_enhancement = result.estimated_enhancement.mean() * alg_fidelity + theoretical_t_correlation = theoretical_noise_fidelity * alg_fidelity + estimated_t_correlation = ( + result.fidelity_noise * result.fidelity_calibration * alg_fidelity + ) + tolerance = 2.0 / np.sqrt(M) + print( + f"\nenhancement: \ttheoretical= {theoretical_enhancement},\testimated={estimated_enhancement},\tactual: {enhancement}" + ) + print( + f"t-matrix fidelity:\ttheoretical = {theoretical_t_correlation},\testimated = {estimated_t_correlation},\tactual = {t_correlation}" + ) + print( + f"noise fidelity: \ttheoretical = {theoretical_noise_fidelity},\testimated = {result.fidelity_noise}" + ) + print(f"comparing at relative tolerance: {tolerance}") - num_runs = 10 - shaped_intensities_ssa = np.zeros(num_runs) - for r in range(num_runs): - sim = SimulatedWFS(aberrations=get_random_aberrations(), slm=slm) + assert np.allclose( + enhancement, theoretical_enhancement, rtol=tolerance + ), f""" + The SSA algorithm did not enhance the focus as much as expected. + Theoretical {theoretical_enhancement}, got {enhancement}""" - # SSA - print(f"SSA run {r + 1}/{num_runs}") - alg_ssa = StepwiseSequential( - feedback=sim, slm=sim.slm, n_x=13, n_y=13, phase_steps=6 - ) - wfs_result_ssa = alg_ssa.execute() - sim.slm.set_phases(-np.angle(wfs_result_ssa.t)) - shaped_intensities_ssa[r] = sim.read() + assert np.allclose( + estimated_enhancement, enhancement, rtol=tolerance + ), f""" + The SSA algorithm did not estimate the enhancement correctly. + Estimated {estimated_enhancement}, got {enhancement}""" - # Compute enhancements and error margins - enhancement_ssa = shaped_intensities_ssa.mean() / unshaped_intensities.mean() - enhancement_ssa_std = shaped_intensities_ssa.std() / unshaped_intensities.mean() + assert np.allclose( + t_correlation, theoretical_t_correlation, rtol=tolerance + ), f""" + The SSA algorithm did not measure the transmission matrix correctly. + Expected {theoretical_t_correlation}, got {t_correlation}""" - print( - f"SSA enhancement (squared signal): {enhancement_ssa:.2f}, std={enhancement_ssa_std:.2f}, with {wfs_result_ssa.n} modes" - ) + assert np.allclose( + estimated_t_correlation, theoretical_t_correlation, rtol=tolerance + ), f""" + The SSA algorithm did not estimate the fidelity of the transmission matrix correctly. + Expected {theoretical_t_correlation}, got {estimated_t_correlation}""" - assert enhancement_ssa > 100.0 + assert np.allclose( + result.fidelity_noise, theoretical_noise_fidelity, rtol=tolerance + ), f""" + The SSA algorithm did not estimate the noise correctly. + Expected {theoretical_noise_fidelity}, got {result.fidelity_noise}""" -@pytest.mark.parametrize("n_x", [2, 3]) -def test_fourier(n_x): +def random_transmission_matrix(shape): """ - Test the enhancement performance of the Fourier-based algorithm. - Use the 'cameraman' test image since it is relatively smooth. + Create a random transmission matrix with the given shape. """ - aberrations = skimage.data.camera() * (2.0 * np.pi / 255.0) - sim = SimulatedWFS(aberrations=aberrations) - alg = FourierDualReference( - feedback=sim, - slm=sim.slm, - slm_shape=np.shape(aberrations), - k_angles_min=-n_x, - k_angles_max=n_x, - phase_steps=4, - ) - results = alg.execute() - assert_enhancement(sim.slm, sim, results, np.exp(1j * aberrations)) + return np.random.normal(size=shape) + 1j * np.random.normal(size=shape) +@pytest.mark.skip("Not implemented") def test_fourier2(): """Test the Fourier dual reference algorithm using WFSController.""" slm_shape = (1000, 1000) aberrations = skimage.data.camera() * ((2 * np.pi) / 255.0) sim = SimulatedWFS(aberrations=aberrations) alg = FourierDualReference( - feedback=sim, - slm=sim.slm, - slm_shape=slm_shape, - k_angles_min=-5, - k_angles_max=5, - phase_steps=3, + feedback=sim, slm=sim.slm, slm_shape=slm_shape, k_radius=7.5, phase_steps=3 ) controller = WFSController(alg) controller.wavefront = WFSController.State.SHAPED_WAVEFRONT @@ -210,50 +163,7 @@ def test_fourier2(): assert_enhancement(sim.slm, sim, controller._result, np.exp(1j * scaled_aberration)) -@pytest.mark.skip( - reason="This test is is not passing yet and needs further inspection to see if the test itself is " - "correct." -) -def test_fourier3(): - """Test the Fourier dual reference algorithm using WFSController.""" - slm_shape = (32, 32) - aberrations = np.random.uniform(0.0, 2 * np.pi, slm_shape) - sim = SimulatedWFS(aberrations=aberrations) - alg = FourierDualReference( - feedback=sim, - slm=sim.slm, - slm_shape=slm_shape, - k_angles_min=-32, - k_angles_max=32, - phase_steps=3, - ) - controller = WFSController(alg) - controller.wavefront = WFSController.State.SHAPED_WAVEFRONT - scaled_aberration = zoom(aberrations, np.array(slm_shape) / aberrations.shape) - assert_enhancement(sim.slm, sim, controller._result, np.exp(1j * scaled_aberration)) - - -@pytest.mark.parametrize( - "k_radius, g", - [[2.5, (1.0, 0.0)], [2.5, (0.0, 2.0)]], -) -def test_fourier_circle(k_radius, g): - """ - Test Fourier dual reference algorithm with a circular k-space, with a tilt 'aberration'. - """ - aberrations = tilt(shape=(100, 100), extent=(2, 2), g=g, phase_offset=0.5) - sim = SimulatedWFS(aberrations=aberrations) - alg = FourierDualReferenceCircle( - feedback=sim, - slm=sim.slm, - slm_shape=np.shape(aberrations), - k_radius=k_radius, - phase_steps=4, - ) - results = alg.execute() - assert_enhancement(sim.slm, sim, results, np.exp(1j * aberrations)) - - +@pytest.mark.skip("Not implemented") def test_fourier_microscope(): aberration_phase = skimage.data.camera() * ((2 * np.pi) / 255.0) + np.pi aberration = StaticSource( @@ -277,12 +187,7 @@ def test_fourier_microscope(): cam = sim.get_camera(analog_max=100) roi_detector = SingleRoi(cam, pos=(250, 250)) # Only measure that specific point alg = FourierDualReference( - feedback=roi_detector, - slm=slm, - slm_shape=slm_shape, - k_angles_min=-1, - k_angles_max=1, - phase_steps=3, + feedback=roi_detector, slm=slm, slm_shape=slm_shape, k_radius=1.5, phase_steps=3 ) controller = WFSController(alg) controller.wavefront = WFSController.State.FLAT_WAVEFRONT @@ -309,8 +214,7 @@ def test_fourier_correction_field(): feedback=sim, slm=sim.slm, slm_shape=np.shape(aberrations), - k_angles_min=-2, - k_angles_max=2, + k_radius=3.0, phase_steps=3, ) t = alg.execute().t @@ -328,6 +232,7 @@ def test_phase_shift_correction(): """ Test the effect of shifting the found correction of the Fourier-based algorithm. Without the bug, a phase shift of the entire correction should not influence the measurement. + TODO: move to test of SimulatedWFS, since it is not testing the WFS algorithm itself """ aberrations = skimage.data.camera() * (2.0 * np.pi / 255.0) sim = SimulatedWFS(aberrations=aberrations) @@ -335,8 +240,7 @@ def test_phase_shift_correction(): feedback=sim, slm=sim.slm, slm_shape=np.shape(aberrations), - k_angles_min=-1, - k_angles_max=1, + k_radius=1.5, phase_steps=3, ) t = alg.execute().t @@ -356,35 +260,43 @@ def test_phase_shift_correction(): signals.append(signal) assert ( - np.std(signals) < 0.0001 * before - ), f"""The simulated response of the Fourier algorithm is sensitive to a - flat - phase-shift. This is incorrect behaviour""" + np.std(signals) / np.mean(signals) < 0.001 + ), f"""The response of SimulatedWFS is sensitive to a flat + phase shift. This is incorrect behaviour""" -def test_flat_wf_response_fourier(): +@pytest.mark.parametrize("optimized_reference", [True, False]) +@pytest.mark.parametrize("step", [True, False]) +def test_flat_wf_response_fourier(optimized_reference, step): """ Test the response of the Fourier-based WFS method when the solution is flat A flat solution means that the optimal correction is no correction. + Also tests if stitching is done correctly by having an aberration pattern which is flat (but different) on the two halves. test the optimized wavefront by checking if it has irregularities. """ - aberrations = np.zeros(shape=(512, 512)) + aberrations = np.ones(shape=(4, 4)) + if step: + aberrations[:, 2:] = 2.0 sim = SimulatedWFS(aberrations=aberrations.reshape((*aberrations.shape, 1))) alg = FourierDualReference( feedback=sim, slm=sim.slm, slm_shape=np.shape(aberrations), - k_angles_min=-1, - k_angles_max=1, + k_radius=1.5, phase_steps=3, + optimized_reference=optimized_reference, ) t = alg.execute().t # test the optimized wavefront by checking if it has irregularities. - assert np.std(t) < 0.001 # The measured wavefront is not flat. + measured_aberrations = np.squeeze(np.angle(t)) + measured_aberrations += aberrations[0, 0] - measured_aberrations[0, 0] + assert np.allclose( + measured_aberrations, aberrations, atol=0.02 + ) # The measured wavefront is not flat. def test_flat_wf_response_ssa(): @@ -437,9 +349,7 @@ def test_multidimensional_feedback_fourier(): sim = SimulatedWFS(aberrations=aberrations) # input the camera as a feedback object, such that it is multidimensional - alg = FourierDualReference( - feedback=sim, slm=sim.slm, k_angles_min=-1, k_angles_max=1, phase_steps=3 - ) + alg = FourierDualReference(feedback=sim, slm=sim.slm, k_radius=3.5, phase_steps=3) t = alg.execute().t # compute the phase pattern to optimize the intensity in target 0 @@ -459,129 +369,34 @@ def test_multidimensional_feedback_fourier(): Expected at least 3.0, got {enhancement}""" -@pytest.mark.parametrize("gaussian_noise_std", (0.0, 0.1, 0.5, 3.0)) -def test_ssa_fidelity(gaussian_noise_std): - """Test fidelity prediction for WFS simulation with various noise levels.""" - # === Define virtual devices for a WFS simulation === - # Define aberration as a pattern of random phases at the pupil plane - aberrations = np.random.uniform(size=(80, 80)) * 2 * np.pi - - # Define specimen as an image with several bright pixels - specimen_img = np.zeros((240, 240)) - specimen_img[120, 120] = 2e5 - specimen = set_pixel_size(specimen_img, pixel_size=100 * u.nm) - - # The SLM is conjugated to the back pupil plane - slm = SLM(shape=(80, 80)) - # Also simulate a shutter that can turn off the light - shutter = Shutter(slm.field) - - # Simulate a WFS microscope looking at the specimen - sim = Microscope( - source=specimen, - incident_field=shutter, - aberrations=aberrations, - wavelength=800 * u.nm, - ) - - # Simulate a camera device with gaussian noise and shot noise - cam = sim.get_camera( - analog_max=1e4, shot_noise=False, gaussian_noise_std=gaussian_noise_std - ) - - # Define feedback as circular region of interest in the center of the frame - roi_detector = SingleRoi(cam, radius=1) - - # === Run wavefront shaping experiment === - # Use the stepwise sequential (SSA) WFS algorithm - n_x = 10 - n_y = 10 - alg = StepwiseSequential( - feedback=roi_detector, slm=slm, n_x=n_x, n_y=n_y, phase_steps=8 - ) - - # Define a region of interest to determine average speckle intensity - roi_background = SingleRoi(cam, radius=50) - - # Run WFS troubleshooter and output a report to the console - trouble = troubleshoot( - algorithm=alg, - background_feedback=roi_background, - frame_source=cam, - shutter=shutter, - ) - - assert np.isclose( - trouble.measured_enhancement, trouble.expected_enhancement, rtol=0.2 - ) - - -def test_ssa_aberration_reconstruction(): - """Test if SSA can closely reconstruct the ground truth aberrations.""" - do_debug = False - - n_x = 6 - n_y = 5 - - # Create aberrations - x = np.linspace(-1, 1, n_x).reshape((1, -1)) - y = np.linspace(-1, 1, n_y).reshape((-1, 1)) - aberrations = ( - np.sin(0.8 * np.pi * x) - * np.cos(1.3 * np.pi * y) - * (0.8 * np.pi + 0.4 * x + 0.4 * y) - ) % (2 * np.pi) - aberrations[0:1, :] = 0 - aberrations[:, 0:1] = 0 - - # Initialize simulation and algorithm - sim = SimulatedWFS(aberrations=aberrations.reshape((*aberrations.shape, 1))) - - alg = StepwiseSequential(feedback=sim, slm=sim.slm, n_x=n_x, n_y=n_y, phase_steps=4) - - result = alg.execute() - - if do_debug: - import matplotlib.pyplot as plt - - plt.figure() - plt.imshow( - np.angle(np.exp(1j * aberrations)), vmin=-np.pi, vmax=np.pi, cmap="hsv" - ) - plt.title("Aberrations") - - plt.figure() - plt.imshow(np.angle(result.t), vmin=-np.pi, vmax=np.pi, cmap="hsv") - plt.title("t") - plt.colorbar() - plt.show() - - assert np.abs(field_correlation(np.exp(1j * aberrations), result.t)) > 0.99 - - -@pytest.mark.parametrize( - "construct_basis", (half_plane_wave_basis, half_hadamard_basis) -) -def test_custom_blind_dual_reference_ortho_split(construct_basis: callable): - """Test custom blind dual reference with an orthonormal phase-only basis.""" +@pytest.mark.parametrize("type", ("plane_wave", "hadamard")) +@pytest.mark.parametrize("shape", ((8, 8), (16, 4))) +def test_custom_blind_dual_reference_ortho_split(type: str, shape): + """Test custom blind dual reference with an orthonormal phase-only basis. + Two types of bases are tested: plane waves and Hadamard""" do_debug = False - - # Create set of phase-only orthonormal modes - N1 = 8 - N2 = 4 - M = N1 * N2 - mode_set_half = construct_basis(N1, N2) - mode_set = np.concatenate((mode_set_half, np.zeros(shape=(N1, N2, M))), axis=1) + N = shape[0] * (shape[1] // 2) + modes_shape = (shape[0], shape[1] // 2, N) + if type == "plane_wave": + # Create a full plane wave basis for one half of the SLM. + modes = np.fft.fft2(np.eye(N).reshape(modes_shape), axes=(0, 1)) + else: # type == 'hadamard': + modes = hadamard(N).reshape(modes_shape) + + mask = np.concatenate( + (np.zeros(modes_shape[0:2], dtype=bool), np.ones(modes_shape[0:2], dtype=bool)), + axis=1, + ) + mode_set = np.concatenate((modes, np.zeros(shape=modes_shape)), axis=1) phases_set = np.angle(mode_set) - mask = half_split_mask(N1, N2) if do_debug: # Plot the modes import matplotlib.pyplot as plt plt.figure(figsize=(12, 7)) - for m in range(M): - plt.subplot(N2, N1, m + 1) + for m in range(N): + plt.subplot(*modes_shape[0:1], m + 1) plt.imshow(np.angle(mode_set[:, :, m]), vmin=-np.pi, vmax=np.pi) plt.title(f"m={m}") plt.xticks([]) @@ -589,25 +404,13 @@ def test_custom_blind_dual_reference_ortho_split(construct_basis: callable): plt.pause(0.1) # Create aberrations - x = np.linspace(-1, 1, 1 * N1).reshape((1, -1)) - y = np.linspace(-1, 1, 1 * N1).reshape((-1, 1)) - aberrations = ( - np.sin(0.8 * np.pi * x) - * np.cos(1.3 * np.pi * y) - * (0.8 * np.pi + 0.4 * x + 0.4 * y) - ) % (2 * np.pi) - aberrations[0:2, :] = 0 - aberrations[:, 0:2] = 0 - - sim = SimulatedWFS(aberrations=aberrations.reshape((*aberrations.shape, 1))) + sim = SimulatedWFS(t=random_transmission_matrix(shape)) - alg = CustomIterativeDualReference( + alg = DualReference( feedback=sim, slm=sim.slm, - slm_shape=aberrations.shape, - phases=(phases_set, np.flip(phases_set, axis=1)), - set1_mask=mask, - phase_steps=4, + phase_patterns=(phases_set, np.flip(phases_set, axis=1)), + group_mask=mask, iterations=4, ) @@ -615,9 +418,7 @@ def test_custom_blind_dual_reference_ortho_split(construct_basis: callable): if do_debug: plt.figure() - plt.imshow( - np.angle(np.exp(1j * aberrations)), vmin=-np.pi, vmax=np.pi, cmap="hsv" - ) + plt.imshow(np.angle(sim.t), vmin=-np.pi, vmax=np.pi, cmap="hsv") plt.title("Aberrations") plt.figure() @@ -626,14 +427,16 @@ def test_custom_blind_dual_reference_ortho_split(construct_basis: callable): plt.colorbar() plt.show() - assert np.abs(field_correlation(np.exp(1j * aberrations), result.t)) > 0.999 + assert ( + np.abs(field_correlation(sim.t, result.t)) > 0.99 + ) # todo: find out why this is not higher def test_custom_blind_dual_reference_non_ortho(): """ Test custom blind dual reference with a non-orthogonal basis. """ - do_debug = True + do_debug = False # Create set of modes that are barely linearly independent N1 = 6 @@ -653,7 +456,7 @@ def test_custom_blind_dual_reference_non_ortho(): plt.figure(figsize=(12, 7)) for m in range(M): plt.subplot(N2, N1, m + 1) - plt.imshow(np.angle(mode_set[:, :, m]), vmin=-np.pi, vmax=np.pi) + plt.imshow(phases_set[:, :, m], vmin=-np.pi, vmax=np.pi) plt.title(f"m={m}") plt.xticks([]) plt.yticks([]) @@ -671,14 +474,13 @@ def test_custom_blind_dual_reference_non_ortho(): aberrations[0:1, :] = 0 aberrations[:, 0:2] = 0 - sim = SimulatedWFS(aberrations=aberrations.reshape((*aberrations.shape, 1))) + sim = SimulatedWFS(aberrations=aberrations) - alg = CustomIterativeDualReference( + alg = DualReference( feedback=sim, slm=sim.slm, - slm_shape=aberrations.shape, - phases=(phases_set, np.flip(phases_set, axis=1)), - set1_mask=mask, + phase_patterns=(phases_set, np.flip(phases_set, axis=1)), + group_mask=mask, phase_steps=4, iterations=4, )