Skip to content

Commit

Permalink
cleaning up algorithm and WFScontroller
Browse files Browse the repository at this point in the history
  • Loading branch information
IvoVellekoop committed Oct 9, 2024
1 parent 452057a commit cebfaa2
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 109 deletions.
10 changes: 9 additions & 1 deletion STYLEGUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
line limit can be very cumbersome.
- PEP 8:E203 whitespace before ':'. May be disabled. This is already checked by (and conflicts with) black.

# Layout

- Line length of code is limited to 120 characters by `black`.
- Use soft wrapping for Markdown (`.md`) and reStructuredTest (`.rst`) files.

# Tests

- Tests must *not* plot figures.
Expand All @@ -39,4 +44,7 @@ Common warnings:
images, use ``len(images)`` instead of ``images.shape(0)``. But to access the number of rows in an image, use
``image.shape(0)``.


# Properties
- Document properties in the getter method only, not in the setter method. Also describe what happens if the property is
set.
-
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@

# Hide some classes that are not production ready yet
def skip(_app, _what, name, _obj, do_skip, _options):
if name in ("WFSController", "Gain"):
if name in ("Gain"):
return True
return do_skip

Expand Down
4 changes: 0 additions & 4 deletions openwfs/algorithms/basic_fourier.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __init__(
k_step: float = 1.0,
iterations: int = 2,
amplitude: np.ndarray = 1.0,
analyzer: Optional[callable] = analyze_phase_stepping,
optimized_reference: Optional[bool] = None
):
"""
Expand All @@ -51,8 +50,6 @@ def __init__(
k_step: Make steps in k-space of this value. 1 corresponds to diffraction limited tilt.
iterations: Number of ping-pong iterations. Defaults to 2.
amplitude: Amplitude profile over the SLM. Defaults to 1.0 (flat)
analyzer: The function used to analyze the phase stepping data.
Must return a WFSResult object. Defaults to `analyze_phase_stepping`
optimized_reference:
When `True`, during each iteration the other half of the SLM displays the optimized pattern so far (as in [1]).
When `False`, the algorithm optimizes A with a flat wavefront on B,
Expand All @@ -77,7 +74,6 @@ def __init__(
iterations=iterations,
amplitude=amplitude,
optimized_reference=optimized_reference,
analyzer=analyzer,
)

def _construct_modes(self) -> tuple[np.ndarray, np.ndarray]:
Expand Down
12 changes: 2 additions & 10 deletions openwfs/algorithms/dual_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(
amplitude: nd = 1.0,
phase_steps: int = 4,
iterations: int = 2,
analyzer: Optional[callable] = analyze_phase_stepping,
optimized_reference: Optional[bool] = None
):
"""
Expand Down Expand Up @@ -79,19 +78,13 @@ def __init__(
When set to `None` (default), the algorithm uses True if there is a single target,
and False if there are multiple targets.
analyzer: The function used to analyze the phase stepping data.
Must return a WFSResult object. Defaults to `analyze_phase_stepping`
[1]: X. Tao, T. Lam, B. Zhu, et al., “Three-dimensional focusing through scattering media using conjugate adaptive
optics with remote focusing (CAORF),” Opt. Express 25, 10368–10383 (2017).
"""
if optimized_reference is None: # 'auto' mode
optimized_reference = np.prod(feedback.data_shape) == 1
elif optimized_reference and np.prod(feedback.data_shape) != 1:
raise ValueError(
"When using an optimized reference, the feedback detector should return a single scalar value."
)

raise ValueError("In optimized_reference mode, only scalar (single target) feedback signals can be used.")
if iterations < 2:
raise ValueError("The number of iterations must be at least 2.")
if not optimized_reference and iterations != 2:
Expand All @@ -102,7 +95,6 @@ def __init__(
self.phase_steps = phase_steps
self.optimized_reference = optimized_reference
self.iterations = iterations
self._analyzer = analyzer
self._phase_patterns = None
self._gram = None
self._shape = group_mask.shape
Expand Down Expand Up @@ -298,7 +290,7 @@ def _single_side_experiment(self, mod_phases: nd, ref_phases: nd, mod_mask: nd,
progress_bar.update()

self.feedback.wait()
return self._analyzer(measurements, axis=1)
return analyze_phase_stepping(measurements, axis=1)

def compute_t_set(self, t, side) -> nd:
"""
Expand Down
176 changes: 90 additions & 86 deletions openwfs/algorithms/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ def weighted_average(attribute):
fidelity_calibration=weighted_average("fidelity_calibration"),
)

@property
def snr(self):
return 1.0 / (1.0 / self.fidelity_noise - 1.0)


def analyze_phase_stepping(measurements: np.ndarray, axis: int):
"""Analyzes the result of phase stepping measurements, returning matrix `t` and noise statistics
Expand Down Expand Up @@ -240,88 +244,93 @@ def analyze_phase_stepping(measurements: np.ndarray, axis: int):

class WFSController:
"""
Controller for Wavefront Shaping (WFS) operations using a specified algorithm in the MicroManager environment.
EXPERIMENTAL - Controller for Wavefront Shaping (WFS) operations using a specified algorithm in the Micro-Manager environment.
Usage:
.. code-block:: python
# not wrapped:
alg = FourierDualReference(feedback, slm)
# wrapped
alg = WFSController(FourierDualReference, feedback, slm)
Under the hood, a dynamic class is created that inherits both ``WFSController`` and ``FourierDualReference)``.
Effectively this is similar to having ``class WFSController(FourierDualReference)`` inheritance.
Since Micro-Manager / PyDevice does not yet support buttons to activate actions, a WFS experiment is started by setting
the trigger attribute :attr:`wavefront` to the value State.OPTIMIZED
It adds attributes for inspecting the statistics of the last WFS optimization.
Manages the state of the wavefront and executes the algorithm to optimize and apply wavefront corrections, while
exposing all these parameters to MicroManager.
"""

class State(Enum):
FLAT_WAVEFRONT = 0
SHAPED_WAVEFRONT = 1
FLAT = 0
OPTIMIZED = 1
REOPTIMIZE = 2

def __init__(self, algorithm):
def __init__(self, _algorithm_class, *args, **kwargs):
"""
Args:
algorithm: An instance of a wavefront shaping algorithm.
"""
self.algorithm = algorithm
self._wavefront = WFSController.State.FLAT_WAVEFRONT
self._result = None
self._noise_factor = None
self._amplitude_factor = None
self._estimated_enhancement = None
self._calibration_fidelity = None
self._estimated_optimized_intensity = None
self._snr = None # Average SNR. Computed when wavefront is computed.
self._optimized_wavefront = None
self._recompute_wavefront = False
self._feedback_enhancement = None
super().__init__(*args, **kwargs)
self._wavefront = WFSController.State.FLAT
self._result: Optional[WFSResult] = None
self._feedback_ratio = 0.0
self._test_wavefront = False # Trigger to test the optimized wavefront
self._run_troubleshooter = False # Trigger troubleshooter
self.dark_frame = None
self.before_frame = None

def __new__(cls, algorithm_class, *args, **kwargs):
"""Dynamically creates a class of type `class X(WFSController, algorithm_class` and returns an instance of that class"""

# Dynamically create the class using type()
class_name = "WFSController_" + algorithm_class.__name__
DynamicClass = type(class_name, (cls, algorithm_class), {})
instance = super(WFSController, cls).__new__(DynamicClass)
return instance

@property
def wavefront(self) -> State:
"""
Gets the current wavefront state.
Returns:
State: The current state of the wavefront, either FLAT_WAVEFRONT or SHAPED_WAVEFRONT.
Enables switching between FLAT or OPTIMIZED wavefront on the SLM.
Setting this state to OPTIMIZED causes the algorithm execute if the optimized wavefront is not yet computed.
Setting this state to REOPTIMIZE always causes the algorithm to recompute the wavefront. The state switches to OPTIMIZED after executioin of the algorithm.
For multi-target optimizations, OPTIMIZED shows the wavefront for the first target.
"""
return self._wavefront

@wavefront.setter
def wavefront(self, value):
"""
Sets the wavefront state and applies the corresponding phases to the SLM.
Args:
value (State): The desired state of the wavefront to set.
"""
self._wavefront = value
if value == WFSController.State.FLAT_WAVEFRONT:
self.algorithm.slm.set_phases(0.0)
else:
if self._recompute_wavefront or self._optimized_wavefront is None:
# select only the wavefront and statistics for the first target
result = self.algorithm.execute().select_target(0)
self._optimized_wavefront = -np.angle(result.t)
self._noise_factor = result.fidelity_noise
self._amplitude_factor = result.fidelity_amplitude
self._estimated_enhancement = result.estimated_enhancement
self._calibration_fidelity = result.fidelity_calibration
self._estimated_optimized_intensity = result.estimated_optimized_intensity
self._snr = 1.0 / (1.0 / result.fidelity_noise - 1.0)
self._result = result
self.algorithm.slm.set_phases(self._optimized_wavefront)
self._wavefront = WFSController.State(value)
if value == WFSController.State.FLAT:
self.slm.set_phases(0.0)
elif value == WFSController.State.OPTIMIZED:
if self._result is None:
# run the algorithm
self._result = self.execute().select_target(0)
self.slm.set_phases(self.optimized_wavefront)
else: # value == WFSController.State.REOPTIMIZE:
self._result = None # remove stored result
self.wavefront = WFSController.State.OPTIMIZED # recompute the wavefront

@property
def noise_factor(self) -> float:
def fidelity_noise(self) -> float:
"""
Returns:
float: noise factor: the estimated loss in fidelity caused by the limited snr.
float: the estimated loss in fidelity caused by the limited snr.
"""
return self._noise_factor
return self._result.fidelity_noise if self._result is not None else 0.0

@property
def amplitude_factor(self) -> float:
def fidelity_amplitude(self) -> float:
"""
Returns:
float: amplitude factor: estimated reduction of the fidelity due to phase-only
float: estimated reduction of the fidelity due to phase-only
modulation (≈ π/4 for fully developed speckle)
"""
return self._amplitude_factor
return self._result.fidelity_amplitude if self._result is not None else 0.0

@property
def estimated_enhancement(self) -> float:
Expand All @@ -330,68 +339,63 @@ def estimated_enhancement(self) -> float:
float: estimated enhancement: estimated ratio <after>/<before> (with <> denoting
ensemble average)
"""
return self._estimated_enhancement
return self._result.estimated_enhancement if self._result is not None else 0.0

@property
def calibration_fidelity(self) -> float:
def fidelity_calibration(self) -> float:
"""
Returns:
float: non-linearity.
"""
return self._calibration_fidelity
return self._result.fidelity_calibration if self._result is not None else 0.0

@property
def estimated_optimized_intensity(self) -> float:
"""
Returns:
float: estimated optimized intensity.
"""
return self._estimated_optimized_intensity
return self._estimated_optimized_intensity if self._result is not None else 0.0

@property
def snr(self) -> float:
"""
Gets the signal-to-noise ratio (SNR) of the optimized wavefront.
Returns:
float: The average SNR computed during wavefront optimization.
float: The average signal-to-noise ratio (SNR) of the wavefront optimization measurements.
"""
return self._snr
return self._result.snr if self._result is not None else 0.0

@property
def recompute_wavefront(self) -> bool:
"""Returns: bool that indicates whether the wavefront needs to be recomputed."""
return self._recompute_wavefront

@recompute_wavefront.setter
def recompute_wavefront(self, value):
"""Sets the bool that indicates whether the wavefront needs to be recomputed."""
self._recompute_wavefront = value
def optimized_wavefront(self) -> np.ndarray:
return -np.angle(self._result.t) if self._result is not None else 0.0

@property
def feedback_enhancement(self) -> float:
"""Returns: the average enhancement of the feedback, returns none if no such enhancement was measured."""
return self._feedback_enhancement
def feedback_ratio(self) -> float:
"""The ratio of average feedback signals after and before optimization.
This value is calculated when the :attr:`test_wavefront` trigger is set to True.
Note: this is *not* the enhancement factor, because the 'before' signal is not ensemble averaged.
Therefore, this value should be used with caution.
Returns:
float: average enhancement of the feedback, 0.0 none if no such enhancement was measured."""
return self._feedback_ratio

@property
def test_wavefront(self) -> bool:
"""Returns: bool that indicates whether test_wavefront will be performed if set."""
return self._test_wavefront
"""Trigger to test the wavefront.
@test_wavefront.setter
def test_wavefront(self, value):
Set this value `True` to measure feedback signals with a flat and an optimized wavefront and compute the :attr:`feedback_ratio`.
This value is reset to `False` after the test is performed.
"""
Calculates the feedback enhancement between the flat and shaped wavefronts by measuring feedback for both
cases.
return False

Args:
value (bool): True to enable test mode, False to disable.
"""
@test_wavefront.setter
def test_wavefront(self, value):
if value:
self.wavefront = WFSController.State.FLAT_WAVEFRONT
feedback_flat = self.algorithm.feedback.read().copy()
self.wavefront = WFSController.State.SHAPED_WAVEFRONT
feedback_shaped = self.algorithm.feedback.read().copy()
self._feedback_enhancement = float(feedback_shaped.sum() / feedback_flat.sum())

self._test_wavefront = value
self.wavefront = WFSController.State.FLAT
feedback_flat = self.feedback.read().sum()
self.wavefront = WFSController.State.OPTIMIZED
feedback_shaped = self.feedback.read().sum()
self._feedback_ratio = float(feedback_shaped / feedback_flat)
23 changes: 16 additions & 7 deletions tests/test_wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,26 @@ def t_fidelity(
return fidelity / norm


@pytest.mark.skip("Not implemented")
def test_fourier2():
"""Test the Fourier dual reference algorithm using WFSController."""
slm_shape = (1000, 1000)
slm_shape = (10, 10)
aberrations = skimage.data.camera() * ((2 * np.pi) / 255.0)
sim = SimulatedWFS(aberrations=aberrations)
alg = FourierDualReference(feedback=sim, slm=sim.slm, slm_shape=slm_shape, k_radius=7.5, phase_steps=3)
controller = WFSController(alg)
controller.wavefront = WFSController.State.SHAPED_WAVEFRONT
scaled_aberration = zoom(aberrations, np.array(slm_shape) / aberrations.shape)
assert_enhancement(sim.slm, sim, controller._result, np.exp(1j * scaled_aberration))
alg = WFSController(
FourierDualReference, feedback=sim, slm=sim.slm, slm_shape=slm_shape, k_radius=3.5, phase_steps=3
)

# check if the attributes of the algorithm were passed through correctly
assert alg.k_radius == 3.5
alg.k_radius = 2.5
assert alg.k_radius == 2.5
before = sim.read()
alg.wavefront = WFSController.State.OPTIMIZED # this will trigger the algorithm to optimize the wavefront
after = sim.read()
alg.wavefront = WFSController.State.FLAT # this set the wavefront back to flat
before2 = sim.read()
assert before == before2
assert after / before > 3.0


@pytest.mark.skip("Not implemented")
Expand Down

0 comments on commit cebfaa2

Please sign in to comment.