From cebfaa22a0d51d82c8a67413cf4c599c42a5188e Mon Sep 17 00:00:00 2001 From: Ivo Vellekoop Date: Wed, 9 Oct 2024 11:26:39 +0200 Subject: [PATCH] cleaning up algorithm and WFScontroller --- STYLEGUIDE.md | 10 +- docs/source/conf.py | 2 +- openwfs/algorithms/basic_fourier.py | 4 - openwfs/algorithms/dual_reference.py | 12 +- openwfs/algorithms/utilities.py | 176 ++++++++++++++------------- tests/test_wfs.py | 23 ++-- 6 files changed, 118 insertions(+), 109 deletions(-) diff --git a/STYLEGUIDE.md b/STYLEGUIDE.md index 29236a3..1a0aa7a 100644 --- a/STYLEGUIDE.md +++ b/STYLEGUIDE.md @@ -13,6 +13,11 @@ line limit can be very cumbersome. - PEP 8:E203 whitespace before ':'. May be disabled. This is already checked by (and conflicts with) black. +# Layout + +- Line length of code is limited to 120 characters by `black`. +- Use soft wrapping for Markdown (`.md`) and reStructuredTest (`.rst`) files. + # Tests - Tests must *not* plot figures. @@ -39,4 +44,7 @@ Common warnings: images, use ``len(images)`` instead of ``images.shape(0)``. But to access the number of rows in an image, use ``image.shape(0)``. - + # Properties +- Document properties in the getter method only, not in the setter method. Also describe what happens if the property is + set. +- \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index 6a05b01..16a13ec 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -137,7 +137,7 @@ # Hide some classes that are not production ready yet def skip(_app, _what, name, _obj, do_skip, _options): - if name in ("WFSController", "Gain"): + if name in ("Gain"): return True return do_skip diff --git a/openwfs/algorithms/basic_fourier.py b/openwfs/algorithms/basic_fourier.py index 53d302d..3c26a41 100644 --- a/openwfs/algorithms/basic_fourier.py +++ b/openwfs/algorithms/basic_fourier.py @@ -37,7 +37,6 @@ def __init__( k_step: float = 1.0, iterations: int = 2, amplitude: np.ndarray = 1.0, - analyzer: Optional[callable] = analyze_phase_stepping, optimized_reference: Optional[bool] = None ): """ @@ -51,8 +50,6 @@ def __init__( k_step: Make steps in k-space of this value. 1 corresponds to diffraction limited tilt. iterations: Number of ping-pong iterations. Defaults to 2. amplitude: Amplitude profile over the SLM. Defaults to 1.0 (flat) - analyzer: The function used to analyze the phase stepping data. - Must return a WFSResult object. Defaults to `analyze_phase_stepping` optimized_reference: When `True`, during each iteration the other half of the SLM displays the optimized pattern so far (as in [1]). When `False`, the algorithm optimizes A with a flat wavefront on B, @@ -77,7 +74,6 @@ def __init__( iterations=iterations, amplitude=amplitude, optimized_reference=optimized_reference, - analyzer=analyzer, ) def _construct_modes(self) -> tuple[np.ndarray, np.ndarray]: diff --git a/openwfs/algorithms/dual_reference.py b/openwfs/algorithms/dual_reference.py index 0eb882d..05c21ca 100644 --- a/openwfs/algorithms/dual_reference.py +++ b/openwfs/algorithms/dual_reference.py @@ -46,7 +46,6 @@ def __init__( amplitude: nd = 1.0, phase_steps: int = 4, iterations: int = 2, - analyzer: Optional[callable] = analyze_phase_stepping, optimized_reference: Optional[bool] = None ): """ @@ -79,19 +78,13 @@ def __init__( When set to `None` (default), the algorithm uses True if there is a single target, and False if there are multiple targets. - analyzer: The function used to analyze the phase stepping data. - Must return a WFSResult object. Defaults to `analyze_phase_stepping` - [1]: X. Tao, T. Lam, B. Zhu, et al., “Three-dimensional focusing through scattering media using conjugate adaptive optics with remote focusing (CAORF),” Opt. Express 25, 10368–10383 (2017). """ if optimized_reference is None: # 'auto' mode optimized_reference = np.prod(feedback.data_shape) == 1 elif optimized_reference and np.prod(feedback.data_shape) != 1: - raise ValueError( - "When using an optimized reference, the feedback detector should return a single scalar value." - ) - + raise ValueError("In optimized_reference mode, only scalar (single target) feedback signals can be used.") if iterations < 2: raise ValueError("The number of iterations must be at least 2.") if not optimized_reference and iterations != 2: @@ -102,7 +95,6 @@ def __init__( self.phase_steps = phase_steps self.optimized_reference = optimized_reference self.iterations = iterations - self._analyzer = analyzer self._phase_patterns = None self._gram = None self._shape = group_mask.shape @@ -298,7 +290,7 @@ def _single_side_experiment(self, mod_phases: nd, ref_phases: nd, mod_mask: nd, progress_bar.update() self.feedback.wait() - return self._analyzer(measurements, axis=1) + return analyze_phase_stepping(measurements, axis=1) def compute_t_set(self, t, side) -> nd: """ diff --git a/openwfs/algorithms/utilities.py b/openwfs/algorithms/utilities.py index 86e46c1..5908135 100644 --- a/openwfs/algorithms/utilities.py +++ b/openwfs/algorithms/utilities.py @@ -143,6 +143,10 @@ def weighted_average(attribute): fidelity_calibration=weighted_average("fidelity_calibration"), ) + @property + def snr(self): + return 1.0 / (1.0 / self.fidelity_noise - 1.0) + def analyze_phase_stepping(measurements: np.ndarray, axis: int): """Analyzes the result of phase stepping measurements, returning matrix `t` and noise statistics @@ -240,88 +244,93 @@ def analyze_phase_stepping(measurements: np.ndarray, axis: int): class WFSController: """ - Controller for Wavefront Shaping (WFS) operations using a specified algorithm in the MicroManager environment. + EXPERIMENTAL - Controller for Wavefront Shaping (WFS) operations using a specified algorithm in the Micro-Manager environment. + + Usage: + + .. code-block:: python + + # not wrapped: + alg = FourierDualReference(feedback, slm) + + # wrapped + alg = WFSController(FourierDualReference, feedback, slm) + + Under the hood, a dynamic class is created that inherits both ``WFSController`` and ``FourierDualReference)``. + Effectively this is similar to having ``class WFSController(FourierDualReference)`` inheritance. + + Since Micro-Manager / PyDevice does not yet support buttons to activate actions, a WFS experiment is started by setting + the trigger attribute :attr:`wavefront` to the value State.OPTIMIZED + It adds attributes for inspecting the statistics of the last WFS optimization. Manages the state of the wavefront and executes the algorithm to optimize and apply wavefront corrections, while exposing all these parameters to MicroManager. """ class State(Enum): - FLAT_WAVEFRONT = 0 - SHAPED_WAVEFRONT = 1 + FLAT = 0 + OPTIMIZED = 1 + REOPTIMIZE = 2 - def __init__(self, algorithm): + def __init__(self, _algorithm_class, *args, **kwargs): """ Args: algorithm: An instance of a wavefront shaping algorithm. """ - self.algorithm = algorithm - self._wavefront = WFSController.State.FLAT_WAVEFRONT - self._result = None - self._noise_factor = None - self._amplitude_factor = None - self._estimated_enhancement = None - self._calibration_fidelity = None - self._estimated_optimized_intensity = None - self._snr = None # Average SNR. Computed when wavefront is computed. - self._optimized_wavefront = None - self._recompute_wavefront = False - self._feedback_enhancement = None + super().__init__(*args, **kwargs) + self._wavefront = WFSController.State.FLAT + self._result: Optional[WFSResult] = None + self._feedback_ratio = 0.0 self._test_wavefront = False # Trigger to test the optimized wavefront - self._run_troubleshooter = False # Trigger troubleshooter - self.dark_frame = None - self.before_frame = None + + def __new__(cls, algorithm_class, *args, **kwargs): + """Dynamically creates a class of type `class X(WFSController, algorithm_class` and returns an instance of that class""" + + # Dynamically create the class using type() + class_name = "WFSController_" + algorithm_class.__name__ + DynamicClass = type(class_name, (cls, algorithm_class), {}) + instance = super(WFSController, cls).__new__(DynamicClass) + return instance @property def wavefront(self) -> State: """ - Gets the current wavefront state. - - Returns: - State: The current state of the wavefront, either FLAT_WAVEFRONT or SHAPED_WAVEFRONT. + Enables switching between FLAT or OPTIMIZED wavefront on the SLM. + Setting this state to OPTIMIZED causes the algorithm execute if the optimized wavefront is not yet computed. + Setting this state to REOPTIMIZE always causes the algorithm to recompute the wavefront. The state switches to OPTIMIZED after executioin of the algorithm. + For multi-target optimizations, OPTIMIZED shows the wavefront for the first target. """ return self._wavefront @wavefront.setter def wavefront(self, value): - """ - Sets the wavefront state and applies the corresponding phases to the SLM. - - Args: - value (State): The desired state of the wavefront to set. - """ - self._wavefront = value - if value == WFSController.State.FLAT_WAVEFRONT: - self.algorithm.slm.set_phases(0.0) - else: - if self._recompute_wavefront or self._optimized_wavefront is None: - # select only the wavefront and statistics for the first target - result = self.algorithm.execute().select_target(0) - self._optimized_wavefront = -np.angle(result.t) - self._noise_factor = result.fidelity_noise - self._amplitude_factor = result.fidelity_amplitude - self._estimated_enhancement = result.estimated_enhancement - self._calibration_fidelity = result.fidelity_calibration - self._estimated_optimized_intensity = result.estimated_optimized_intensity - self._snr = 1.0 / (1.0 / result.fidelity_noise - 1.0) - self._result = result - self.algorithm.slm.set_phases(self._optimized_wavefront) + self._wavefront = WFSController.State(value) + if value == WFSController.State.FLAT: + self.slm.set_phases(0.0) + elif value == WFSController.State.OPTIMIZED: + if self._result is None: + # run the algorithm + self._result = self.execute().select_target(0) + self.slm.set_phases(self.optimized_wavefront) + else: # value == WFSController.State.REOPTIMIZE: + self._result = None # remove stored result + self.wavefront = WFSController.State.OPTIMIZED # recompute the wavefront @property - def noise_factor(self) -> float: + def fidelity_noise(self) -> float: """ Returns: - float: noise factor: the estimated loss in fidelity caused by the limited snr. + float: the estimated loss in fidelity caused by the limited snr. """ - return self._noise_factor + return self._result.fidelity_noise if self._result is not None else 0.0 @property - def amplitude_factor(self) -> float: + def fidelity_amplitude(self) -> float: """ Returns: - float: amplitude factor: estimated reduction of the fidelity due to phase-only + float: estimated reduction of the fidelity due to phase-only modulation (≈ π/4 for fully developed speckle) """ - return self._amplitude_factor + return self._result.fidelity_amplitude if self._result is not None else 0.0 @property def estimated_enhancement(self) -> float: @@ -330,15 +339,15 @@ def estimated_enhancement(self) -> float: float: estimated enhancement: estimated ratio / (with <> denoting ensemble average) """ - return self._estimated_enhancement + return self._result.estimated_enhancement if self._result is not None else 0.0 @property - def calibration_fidelity(self) -> float: + def fidelity_calibration(self) -> float: """ Returns: float: non-linearity. """ - return self._calibration_fidelity + return self._result.fidelity_calibration if self._result is not None else 0.0 @property def estimated_optimized_intensity(self) -> float: @@ -346,52 +355,47 @@ def estimated_optimized_intensity(self) -> float: Returns: float: estimated optimized intensity. """ - return self._estimated_optimized_intensity + return self._estimated_optimized_intensity if self._result is not None else 0.0 @property def snr(self) -> float: """ - Gets the signal-to-noise ratio (SNR) of the optimized wavefront. - Returns: - float: The average SNR computed during wavefront optimization. + float: The average signal-to-noise ratio (SNR) of the wavefront optimization measurements. """ - return self._snr + return self._result.snr if self._result is not None else 0.0 @property - def recompute_wavefront(self) -> bool: - """Returns: bool that indicates whether the wavefront needs to be recomputed.""" - return self._recompute_wavefront - - @recompute_wavefront.setter - def recompute_wavefront(self, value): - """Sets the bool that indicates whether the wavefront needs to be recomputed.""" - self._recompute_wavefront = value + def optimized_wavefront(self) -> np.ndarray: + return -np.angle(self._result.t) if self._result is not None else 0.0 @property - def feedback_enhancement(self) -> float: - """Returns: the average enhancement of the feedback, returns none if no such enhancement was measured.""" - return self._feedback_enhancement + def feedback_ratio(self) -> float: + """The ratio of average feedback signals after and before optimization. + + This value is calculated when the :attr:`test_wavefront` trigger is set to True. + + Note: this is *not* the enhancement factor, because the 'before' signal is not ensemble averaged. + Therefore, this value should be used with caution. + + Returns: + float: average enhancement of the feedback, 0.0 none if no such enhancement was measured.""" + return self._feedback_ratio @property def test_wavefront(self) -> bool: - """Returns: bool that indicates whether test_wavefront will be performed if set.""" - return self._test_wavefront + """Trigger to test the wavefront. - @test_wavefront.setter - def test_wavefront(self, value): + Set this value `True` to measure feedback signals with a flat and an optimized wavefront and compute the :attr:`feedback_ratio`. + This value is reset to `False` after the test is performed. """ - Calculates the feedback enhancement between the flat and shaped wavefronts by measuring feedback for both - cases. + return False - Args: - value (bool): True to enable test mode, False to disable. - """ + @test_wavefront.setter + def test_wavefront(self, value): if value: - self.wavefront = WFSController.State.FLAT_WAVEFRONT - feedback_flat = self.algorithm.feedback.read().copy() - self.wavefront = WFSController.State.SHAPED_WAVEFRONT - feedback_shaped = self.algorithm.feedback.read().copy() - self._feedback_enhancement = float(feedback_shaped.sum() / feedback_flat.sum()) - - self._test_wavefront = value + self.wavefront = WFSController.State.FLAT + feedback_flat = self.feedback.read().sum() + self.wavefront = WFSController.State.OPTIMIZED + feedback_shaped = self.feedback.read().sum() + self._feedback_ratio = float(feedback_shaped / feedback_flat) diff --git a/tests/test_wfs.py b/tests/test_wfs.py index d0fe69f..999ffea 100644 --- a/tests/test_wfs.py +++ b/tests/test_wfs.py @@ -192,17 +192,26 @@ def t_fidelity( return fidelity / norm -@pytest.mark.skip("Not implemented") def test_fourier2(): """Test the Fourier dual reference algorithm using WFSController.""" - slm_shape = (1000, 1000) + slm_shape = (10, 10) aberrations = skimage.data.camera() * ((2 * np.pi) / 255.0) sim = SimulatedWFS(aberrations=aberrations) - alg = FourierDualReference(feedback=sim, slm=sim.slm, slm_shape=slm_shape, k_radius=7.5, phase_steps=3) - controller = WFSController(alg) - controller.wavefront = WFSController.State.SHAPED_WAVEFRONT - scaled_aberration = zoom(aberrations, np.array(slm_shape) / aberrations.shape) - assert_enhancement(sim.slm, sim, controller._result, np.exp(1j * scaled_aberration)) + alg = WFSController( + FourierDualReference, feedback=sim, slm=sim.slm, slm_shape=slm_shape, k_radius=3.5, phase_steps=3 + ) + + # check if the attributes of the algorithm were passed through correctly + assert alg.k_radius == 3.5 + alg.k_radius = 2.5 + assert alg.k_radius == 2.5 + before = sim.read() + alg.wavefront = WFSController.State.OPTIMIZED # this will trigger the algorithm to optimize the wavefront + after = sim.read() + alg.wavefront = WFSController.State.FLAT # this set the wavefront back to flat + before2 = sim.read() + assert before == before2 + assert after / before > 3.0 @pytest.mark.skip("Not implemented")