diff --git a/openwfs/algorithms/custom_iter_dual_reference.py b/openwfs/algorithms/custom_iter_dual_reference.py index 73b083b..f5c9643 100644 --- a/openwfs/algorithms/custom_iter_dual_reference.py +++ b/openwfs/algorithms/custom_iter_dual_reference.py @@ -45,8 +45,16 @@ class IterativeDualReference: https://opg.optica.org/oe/ abstract.cfm?uri=oe-27-8-1167 """ - def __init__(self, feedback: Detector, slm: PhaseSLM, phase_patterns: tuple[nd, nd], group_mask: nd, - phase_steps: int = 4, iterations: int = 4, analyzer: Optional[callable] = analyze_phase_stepping): + def __init__( + self, + feedback: Detector, + slm: PhaseSLM, + phase_patterns: tuple[nd, nd], + group_mask: nd, + phase_steps: int = 4, + iterations: int = 4, + analyzer: Optional[callable] = analyze_phase_stepping, + ): """ Args: feedback: The feedback source, usually a detector that provides measurement data. @@ -62,7 +70,9 @@ def __init__(self, feedback: Detector, slm: PhaseSLM, phase_patterns: tuple[nd, A, B, A, B, A. Should be at least 2 analyzer: The function used to analyze the phase stepping data. Must return a WFSResult object. Defaults to `analyze_phase_stepping` """ - if (phase_patterns[0].shape[0:2] != group_mask.shape) or (phase_patterns[1].shape[0:2] != group_mask.shape): + if (phase_patterns[0].shape[0:2] != group_mask.shape) or ( + phase_patterns[1].shape[0:2] != group_mask.shape + ): raise ValueError("The phase patterns and group mask must all have the same shape.") if iterations < 2: raise ValueError("The number of iterations must be at least 2.") @@ -74,13 +84,18 @@ def __init__(self, feedback: Detector, slm: PhaseSLM, phase_patterns: tuple[nd, self.phase_steps = phase_steps self.iterations = iterations self.analyzer = analyzer - self.phase_patterns = (phase_patterns[0].astype(np.float32), phase_patterns[1].astype(np.float32)) + self.phase_patterns = ( + phase_patterns[0].astype(np.float32), + phase_patterns[1].astype(np.float32), + ) mask = group_mask.astype(bool) self.masks = (~mask, mask) # masks[0] is True for group A, mask[1] is True for group B # Pre-compute the conjugate modes for reconstruction - self.modes = [np.exp(-1j * self.phase_patterns[side]) * np.expand_dims(self.masks[side], axis=2) for side in - range(2)] + self.modes = [ + np.exp(-1j * self.phase_patterns[side]) * np.expand_dims(self.masks[side], axis=2) + for side in range(2) + ] def execute(self, capture_intermediate_results: bool = False, progress_bar=None) -> WFSResult: """ @@ -104,23 +119,36 @@ def execute(self, capture_intermediate_results: bool = False, progress_bar=None) # Initialize storage lists t_set_all = [None] * self.iterations results_all = [None] * self.iterations # List to store all results - results_latest = [None, None] # The two latest results. Used for computing fidelity factors. - intermediate_results = np.zeros(self.iterations) # List to store feedback from full patterns + results_latest = [ + None, + None, + ] # The two latest results. Used for computing fidelity factors. + intermediate_results = np.zeros( + self.iterations + ) # List to store feedback from full patterns # Prepare progress bar if progress_bar: - num_measurements = np.ceil(self.iterations / 2) * self.modes[0].shape[2] \ - + np.floor(self.iterations / 2) * self.modes[1].shape[2] + num_measurements = ( + np.ceil(self.iterations / 2) * self.modes[0].shape[2] + + np.floor(self.iterations / 2) * self.modes[1].shape[2] + ) progress_bar.total = num_measurements # Switch the phase sets back and forth multiple times for it in range(self.iterations): side = it % 2 # pick set A or B for phase stepping - ref_phases = -np.angle(t_full) # use the best estimate so far to construct an optimized reference + ref_phases = -np.angle( + t_full + ) # use the best estimate so far to construct an optimized reference side_mask = self.masks[side] # Perform WFS experiment on one side, keeping the other side sized at the ref_phases - result = self._single_side_experiment(mod_phases=self.phase_patterns[side], ref_phases=ref_phases, - mod_mask=side_mask, progress_bar=progress_bar) + result = self._single_side_experiment( + mod_phases=self.phase_patterns[side], + ref_phases=ref_phases, + mod_mask=side_mask, + progress_bar=progress_bar, + ) # Compute transmission matrix for the current side and update # estimated transmission matrix @@ -139,23 +167,33 @@ def execute(self, capture_intermediate_results: bool = False, progress_bar=None) intermediate_results[it] = self.feedback.read() # Compute average fidelity factors - fidelity_noise = weighted_average(results_latest[0].fidelity_noise, - results_latest[1].fidelity_noise, results_latest[0].n, - results_latest[1].n) - fidelity_amplitude = weighted_average(results_latest[0].fidelity_amplitude, - results_latest[1].fidelity_amplitude, results_latest[0].n, - results_latest[1].n) - fidelity_calibration = weighted_average(results_latest[0].fidelity_calibration, - results_latest[1].fidelity_calibration, results_latest[0].n, - results_latest[1].n) - - result = WFSResult(t=t_full, - t_f=None, - n=self.modes[0].shape[2] + self.modes[1].shape[2], - axis=2, - fidelity_noise=fidelity_noise, - fidelity_amplitude=fidelity_amplitude, - fidelity_calibration=fidelity_calibration) + fidelity_noise = weighted_average( + results_latest[0].fidelity_noise, + results_latest[1].fidelity_noise, + results_latest[0].n, + results_latest[1].n, + ) + fidelity_amplitude = weighted_average( + results_latest[0].fidelity_amplitude, + results_latest[1].fidelity_amplitude, + results_latest[0].n, + results_latest[1].n, + ) + fidelity_calibration = weighted_average( + results_latest[0].fidelity_calibration, + results_latest[1].fidelity_calibration, + results_latest[0].n, + results_latest[1].n, + ) + + result = WFSResult( + t=t_full, + n=self.modes[0].shape[2] + self.modes[1].shape[2], + axis=2, + fidelity_noise=fidelity_noise, + fidelity_amplitude=fidelity_amplitude, + fidelity_calibration=fidelity_calibration, + ) # TODO: document the t_set_all and results_all attributes result.t_set_all = t_set_all @@ -163,8 +201,9 @@ def execute(self, capture_intermediate_results: bool = False, progress_bar=None) result.intermediate_results = intermediate_results return result - def _single_side_experiment(self, mod_phases: nd, ref_phases: nd, mod_mask: nd, - progress_bar=None) -> WFSResult: + def _single_side_experiment( + self, mod_phases: nd, ref_phases: nd, mod_mask: nd, progress_bar=None + ) -> WFSResult: """ Conducts experiments on one part of the SLM. diff --git a/openwfs/algorithms/genetic.py b/openwfs/algorithms/genetic.py index 519aa54..360d690 100644 --- a/openwfs/algorithms/genetic.py +++ b/openwfs/algorithms/genetic.py @@ -99,7 +99,7 @@ def execute(self, *, progress_bar=None) -> WFSResult: # Terminate after the specified number of generations, return the best wavefront if i >= self.generations: - return WFSResult(t=np.exp(-1.0j * population[sorted_indices[-1]]), t_f=None, axis=2) + return WFSResult(t=np.exp(-1.0j * population[sorted_indices[-1]]), axis=2) # We keep the elite individuals, and regenerate the rest by mixing the elite # For this mixing, the probability of selecting an individual is proportional to its measured intensity. diff --git a/openwfs/algorithms/utilities.py b/openwfs/algorithms/utilities.py index 545cd1d..2a323a8 100644 --- a/openwfs/algorithms/utilities.py +++ b/openwfs/algorithms/utilities.py @@ -30,7 +30,6 @@ class WFSResult: def __init__( self, t: np.ndarray, - t_f: np.ndarray, axis: int, fidelity_noise: ArrayLike = 1.0, fidelity_amplitude: ArrayLike = 1.0, @@ -60,7 +59,6 @@ def __init__( """ self.t = t - self.t_f = t_f self.axis = axis self.fidelity_noise = np.atleast_1d(fidelity_noise) self.n = np.prod(t.shape[0:axis]) if n is None else n @@ -119,7 +117,6 @@ def select_target(self, b) -> "WFSResult": """ return WFSResult( t=self.t.reshape((*self.t.shape[0:2], -1))[:, :, b], - t_f=self.t_f.reshape((*self.t_f.shape[0:2], -1))[:, :, b], axis=self.axis, intensity_offset=self.intensity_offset[:][b], fidelity_noise=self.fidelity_noise[:][b], @@ -151,7 +148,6 @@ def weighted_average(attribute): return WFSResult( t=weighted_average("t"), - t_f=weighted_average("t_f"), n=n, axis=axis, fidelity_noise=weighted_average("fidelity_noise"), @@ -255,7 +251,6 @@ def analyze_phase_stepping(measurements: np.ndarray, axis: int, A: Optional[floa return WFSResult( t, - t_f=t_f, axis=axis, fidelity_amplitude=amplitude_factor, fidelity_noise=noise_factor,