diff --git a/doc/conf.py b/doc/conf.py index 6358698..6ef69b1 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -20,6 +20,8 @@ # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration +myst_enable_extensions = ["dollarmath"] + extensions = [ "myst_parser", "sphinx.ext.autodoc", @@ -46,3 +48,4 @@ html_static_path = ["_static"] autoclass_content = "both" +myst_heading_anchors = 3 diff --git a/doc/dev/set_up_dev_environment.md b/doc/dev/set_up_dev_environment.md index deedd36..b7e3aa7 100644 --- a/doc/dev/set_up_dev_environment.md +++ b/doc/dev/set_up_dev_environment.md @@ -19,6 +19,7 @@ python -m venv .venv ``` python -m pip install -e .[dev] ``` +To run tests based on changes not yet in a release in the GUI PyDev console you will need to run this command using your global python install rather than in the venv. ## Unit tests ``` diff --git a/doc/fitting/fitting.md b/doc/fitting/fitting.md new file mode 100644 index 0000000..12e9bc6 --- /dev/null +++ b/doc/fitting/fitting.md @@ -0,0 +1,204 @@ +# Fitting Callback + +Similar to [`LivePlot`](../callbacks/plotting.md), `ibex_bluesky_core` provides a thin wrapper around Bluesky's LiveFit class, enhancing it with additional functionality to better support real-time data fitting. This wrapper not only offers a wide selection of models to fit your data on, but also introduces guess generation for fit parameters. As new data points are acquired, the wrapper refines these guesses dynamically, improving the accuracy of the fit with each additional piece of data, allowing for more efficient and adaptive real-time fitting workflows. + +In order to use the wrapper, import `LiveFit` from `ibex_bluesky_core` rather than +`bluesky` directly: +```py +from ibex_bluesky_core.callbacks.fitting import LiveFit +``` +**Note:** that you do not *need* `LivePlot` for `LiveFit` to work but it may be useful to know visaully how well the model fits to the raw data. + +## Configuration + +Below is a full example showing how to use standard `matplotlib` & `bluesky` functionality to apply fitting to a scan, using LivePlot and LiveFit. The fitting callback is set to expect data to take the form of a gaussian. +```py +import matplotlib.pyplot as plt +from ibex_bluesky_core.callbacks.plotting import LivePlot +from ibex_bluesky_core.callbacks.fitting import LiveFit +from bluesky.callbacks import LiveFitPlot + +# Create a new figure to plot onto. +plt.figure() +# Make a new set of axes on that figure +ax = plt.gca() +# ax is shared by fit_callback and plot_callback + +plot_callback = LivePlot(y="y_variable", x="x_variable", ax=ax) +fit_callback = LiveFit(Gaussian.fit(), y="y_variable", x="x_variable", update_every=0.5) +# update_every = in seconds, how often to recompute the fit. If `None`, do not compute until the end. Default is 1. +fit_plot_callback = LiveFitPlot(fit_callback, ax=ax, color="r") +``` + +**Note:** that the `LiveFit` callback doesn't directly do the plotting, it will return function parameters of the model its trying to fit to; a `LiveFit` object must be passed to `LiveFitPlot` which can then be subscribed to the `RunEngine`. See the [Bluesky Documentation](https://blueskyproject.io/bluesky/main/callbacks.html#livefitplot) for information on the various arguments that can be passed to the `LiveFitPlot` class. + +The `plot_callback` and `fit_plot_callback` objects can then be subscribed to the `RunEngine`, using the same methods as described in [`LivePlot`](../callbacks/plotting.md). See the following example using `@subs_decorator`: + +```py +@subs_decorator( + [ + fit_plot_callback, + plot_callback + ] +) + +def plan() -> ... +``` + +## Models + +We support **standard fits** for the following trends in data. See [Standard Fits](./standard_fits.md) for more infomation on the behaviour of these fits. + +| Trend | Class Name in fitting_utils | Arguments | +| ----- | -------------------------| ----------| +| Linear | [Linear](./standard_fits.md#linear) | None | +| Polynomial | [Polynomial](./standard_fits.md#polynomial) | Polynomial Degree (int) | +| Gaussian | [Gaussian](./standard_fits.md#gaussian) | None | +| Lorentzian | [Lorentzian](./standard_fits.md#lorentzian) | None | +| Damped Oscillator | [DampedOsc](./standard_fits.md#damped-oscillator-dampedosc) | None | +| Slit Scan Fit | [SlitScan](./standard_fits.md#slit-scan-slitscan) | Max Slit Size (int) | +| Error Function | [ERF](./standard_fits.md#error-function-erf) | None | +| Complementary Error Function | [ERFC](./standard_fits.md/#complementary-error-function-erfc) | None | +| Top Hat | [TopHat](./standard_fits.md#top-hat-tophat) | None | +| Trapezoid | [Trapezoid](./standard_fits.md#trapezoid) | None | +| PeakStats (COM) **\*** | - | - + +\* Native to Bluesky there is support for `PeakStats` which "computes peak statsitics after a run finishes." See [Bluesky docs](https://blueskyproject.io/bluesky/main/callbacks.html#peakstats) for more information on this. Similar to `LiveFit` and `LiveFitPLot`, `PeakStats` is a callback and must be passed to `PeakStatsPlot` to be plotted on a set of axes, which is subscribed to by the `RunEngine`. + +------- + +Each of the above fit classes has a `.fit()` which returns an object of type `FitMethod`. This tells `LiveFit` how to perform fitting on the data. `FitMethod` is defined in `ibex_bluesky_core.callbacks.fitting`. + +There are *two* ways that you can choose how to fit a model to your data: + +### Option 1: Use the standard fits +When only using the standard fits provided by `ibex_bluesky_core`, the following syntax can be used, replacing `[FIT]` with your chosen one from `ibex_bluesky_core.callbacks.fitting.fitting_utils`: + +```py +from bluesky.callbacks import LiveFitPlot +from ibex_bluesky_core.callbacks.fitting.fitting_utils import [FIT] + +# Pass [FIT].fit() to the first parameter of LiveFit +lf = LiveFit([FIT].fit(), y="y_variable", x="x_variable", update_every=0.5) + +# Then subscribe to LiveFitPlot(lf, ...) +``` + +The `[FIT].fit()` function will pass the `FitMethod` object straight to the `LiveFit` class. + +**Note:** that for the fits in the above table that require parameters, you will need to pass value(s) to their `.fit` method. For example Polynomial fitting: + +```py +lf = LiveFit(Polynomial.fit(3), y="y_variable", x="x_variable", update_every=0.5) +# For a polynomial of degree 3 +``` + +### Option 2: Use custom fits + +If you wish, you can define your own non-standard `FitMethod` object. The `FitMethod` class takes two function arguments as follows: + +- `model` + - A function representing the behaviour of the model. + - Returns the `y` value (`float`) at the given `x` value and model parameters. +- `guess` + - A function that must take two `np.array` arrays, representing `x` data and respective `y` data, of type `float` as arguments and must return a `dict` in the form `dict[str, lmfit.Parameter]`. + - This will be called to guess the properties of the model, given the data already collected in the Bluesky run. + +See the following example on how to define these. + +```py +# Linear Fitting + +import lmfit + +def model(x: float, m: float, c: float) -> float: + + return m * x + c # y = mx + c + +def guess(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> dict[str, lmfit.Parameter]: + + # Linear regression calculation + # x = set of x data + # y = set of respective y data + # x[n] makes a pair with y[n] + + numerator = sum(x * y) - sum(x) * sum(y) + denominator = sum(x**2) - sum(x) ** 2 + + m = numerator / denominator + c = (sum(y) - m * sum(x)) / len(x) + + init_guess = { + "m": lmfit.Parameter("m", m), # gradient + "c": lmfit.Parameter("c", c), # y - intercept + } + + return init_guess + +fit_method = FitMethod(model, guess) +#Pass the model and guess function to FitMethod + +lf = LiveFit(fit_method, y="y_variable", x="x_variable", update_every=0.5) + +# Then subscribe to LiveFitPlot(lf, ...) +``` + +**Note:** that the parameters returned from the guess function must allocate to the arguments to the model function, ignoring the independant variable e.g `x` in this case. Array-like structures are not allowed. See the [lmfit documentation](https://lmfit.github.io/lmfit-py/parameters.html) for more information. + +#### Option 2: Continued + +Each `Fit` in `ibex_bluesky_core.callbacks.fitting` has a `.model()` and `.guess()`, which make up their fitting method. These are publically accessible class methods. + +This means that aslong as the parameters returned from the guess function match to the arguments of the model function, you may mix and match user-made and standard, models and guess functions in the following manner: + +```py +import lmfit +from ibex_bluesky_core.callbacks.fitting.fitting_utils import Linear + +def different_model(x: float, m: float, c: float) -> float: + + return m * x + c ** 2 # y = mx + (c ** 2) + + +fit_method = FitMethod(different_model, Linear.guess()) +# Uses the user defined model and the standard Guessing. function for linear models + +lf = LiveFit(fit_method, y="y_variable", x="x_variable", update_every=0.5) + +# Then subscribe to LiveFitPlot(lf, ...) +``` +... or the other way round ... + +```py +import lmfit +from ibex_bluesky_core.callbacks.fitting.fitting_utils import Linear + +# This Guessing. function isn't very good because it's return values don't change on the data already collected in the Bluesky run +# It always guesses that the linear function is y = x + +def different_guess(x: float, m: float, c: float) -> float: + + init_guess = { + "m": lmfit.Parameter("m", 1), # gradient + "c": lmfit.Parameter("c", 0), # y - intercept + } + + return init_guess + +fit_method = FitMethod(Linear.model(), different_guess) +# Uses the standard linear model and the user defined Guessing. function + +lf = LiveFit(fit_method, y="y_variable", x="x_variable", update_every=0.5) + +# Then subscribe to LiveFitPlot(lf, ...) +``` + +Or you can create a completely user-defined fitting method. + +**Note:** that for fits that require arguments, you will need to pass values to their respecitive `.model` and `.guess` functions. E.g for `Polynomial` fitting: + +```py +fit_method = FitMethod(Polynomial.model(3), different_guess) # If using a custom guess function +lf = LiveFit(fit_method, ...) +``` +See the [standard fits](#models) list above for standard fits which require parameters. It gets more complicated if you want to define your own custom model or guess which you want to pass parameters to. You will have to define a function that takes these parameters and returns the model / guess function with the subsituted values. diff --git a/doc/fitting/images_fits/dampedosc.PNG b/doc/fitting/images_fits/dampedosc.PNG new file mode 100644 index 0000000..7ba0bc8 Binary files /dev/null and b/doc/fitting/images_fits/dampedosc.PNG differ diff --git a/doc/fitting/images_fits/erf.png b/doc/fitting/images_fits/erf.png new file mode 100644 index 0000000..bbe087b Binary files /dev/null and b/doc/fitting/images_fits/erf.png differ diff --git a/doc/fitting/images_fits/erfc.png b/doc/fitting/images_fits/erfc.png new file mode 100644 index 0000000..3a1c6dd Binary files /dev/null and b/doc/fitting/images_fits/erfc.png differ diff --git a/doc/fitting/images_fits/gaussian.png b/doc/fitting/images_fits/gaussian.png new file mode 100644 index 0000000..14f5516 Binary files /dev/null and b/doc/fitting/images_fits/gaussian.png differ diff --git a/doc/fitting/images_fits/lorentzian.png b/doc/fitting/images_fits/lorentzian.png new file mode 100644 index 0000000..01bb1e9 Binary files /dev/null and b/doc/fitting/images_fits/lorentzian.png differ diff --git a/doc/fitting/images_fits/slitscan.png b/doc/fitting/images_fits/slitscan.png new file mode 100644 index 0000000..55f8ec9 Binary files /dev/null and b/doc/fitting/images_fits/slitscan.png differ diff --git a/doc/fitting/images_fits/tophat.png b/doc/fitting/images_fits/tophat.png new file mode 100644 index 0000000..22858c4 Binary files /dev/null and b/doc/fitting/images_fits/tophat.png differ diff --git a/doc/fitting/images_fits/trapezoid.png b/doc/fitting/images_fits/trapezoid.png new file mode 100644 index 0000000..30f02f9 Binary files /dev/null and b/doc/fitting/images_fits/trapezoid.png differ diff --git a/doc/fitting/standard_fits.md b/doc/fitting/standard_fits.md new file mode 100644 index 0000000..70f7f9d --- /dev/null +++ b/doc/fitting/standard_fits.md @@ -0,0 +1,143 @@ +# Standard Fitting Models + +## Linear + +- `m` - Gradient +- `c` - (y) Intercept + +```{math} +y = mx + c +``` + +## Polynomial + +- `a` ... `d` - Polynomial coefficients + +For a polynomial degree `n`: +```{math} +y = ax^n + bx^n-1 + ... + cx^1 + d +``` + +## Gaussian + +- `amp` - The maximum height of the Gaussian above `background` +- `sigma` - A scalar for Gaussian width +- `x0` - The centre (x) of the Gaussian +- `background` - The minimum value (y) of the Gaussian + +```{math} +y = \text{amp} * e^{-\frac{(x - x0) ^ 2}{2 * \text{sigma}^2}} + \text{background} +``` + +![GaussianModel](./images_fits/gaussian.png) + +## Lorentzian + +- `amp` - The maximum height of the Lorentzian above `background` +- `sigma` - A scalar for Lorentzian width +- `center` - The centre (x) of the Lorentzian +- `background` - The minimum value (y) of the Lorentzian + +```{math} +y = \frac{\text{amp}}{1 + \frac{x - \text{center}}{\text{sigma}}^2} + \text{background} +``` + +![LorentzianModel](./images_fits/lorentzian.png) + +## Damped Oscillator (DampedOsc) + +- `center` - The centre (x) of the oscillation +- `amp` - The maximum height of the curve above 0 +- `freq` - The frequency of the oscillation +- `width` - How far away from the centre will oscillations last for + +```{math} +y = \text{amp} * \cos((x - \text{center}) * \text{freq}) * e^{-\frac{x - \text{center}}{\text{width}^ 2}} +``` + +![DampedOscModel](./images_fits/dampedosc.png) + +## Slit Scan (SlitScan) + +- `background` $b$ - The minimum value (y) of the model +- `inflection0` $i_0$ - The x coord of the first inflection point +- `gradient` $g$ - The gradient of the sloped-linear section of the model +- `inflections_diff` $i_{\Delta}$ - The x displacement between the two inflection points +- `height_above_inflection1` $h_1$ - The y displacement between inflection 1 and the model's asymptote + +```{math} +\text{exp_seg} = h_1 \cdot \text{erf} \left( g \cdot \frac{\sqrt{\pi}}{2h_1} \cdot (x - i_0 - \Delta i) \right) + g \cdot \Delta i + b +``` + +```{math} +\text{lin_seg} = \max(b + g * (x - i_0), b) +``` + +```{math} +y = \min(\text{lin_seg}, \text{exp_seg}) +``` + +![SlitScanModel](./images_fits/slitscan.png) + +## Error Function (ERF) + +- `cen` - The centre (x) of the model +- `stretch` - A horizontal stretch factor for the model +- `scale` - A vertical stretch factor for the model +- `background` - The minimum value (y) of the model + +```{math} +y = background + scale * erf(stretch * (x - cen)) +``` + +![ERFModel](./images_fits/erf.png) + +## Complementary Error Function (ERFC) + +- `cen` - The centre (x) of the model +- `stretch` - A horizontal stretch factor for the model +- `scale` - A vertical stretch factor for the model +- `background` - The minimum value (y) of the model + +```{math} +y = background + scale * erfc(stretch * (x - cen)) +``` + +![ERFCModel](./images_fits/erfc.png) + +## Top Hat (TopHat) + +- `cen` - The centre (x) of the model +- `width` - How wide the 'hat' is +- `height` - The maximum height of the model above `background` +- `background` - The minimum value (y) of the model + +```{math} +y = +\begin{cases} +\text{background} + \text{height}, & \text{if } |x - \text{cen}| < \frac{\text{width}}{2} \\ +\text{background}, & \text{otherwise} +\end{cases} +``` + +![TopHatModel](./images_fits/tophat.png) + +## Trapezoid + +- `cen` - The centre (x) of the model +- `gradient` - How steep the edges of the trapezoid are +- `height` - The maximum height of the model above `background` +- `background` - The minimum value (y) of the model +- `y_offset` - Acts as a width factor for the trapezoid. If you extrapolate the sides of the trapezoid until they meet above the top, this value represents the y coord of this point minus height and background. + +```{math} +f(x) = \text{y_offset} + \text{height} + \text{background} - \text{gradient} * |x - \text{cen}| +``` +```{math} +g(x) = \max(f(x), \text{background}) +``` +```{math} +y = \min(g(x), \text{background} + \text{height}) +``` + +![TrapezoidModel](./images_fits/trapezoid.png) \ No newline at end of file diff --git a/doc/index.rst b/doc/index.rst index 84cd58b..f26d47d 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -12,7 +12,6 @@ This documentation contains information for the bluesky plan stubs & devices for devices/* - .. toctree:: :maxdepth: 2 :caption: Callbacks @@ -20,6 +19,13 @@ This documentation contains information for the bluesky plan stubs & devices for callbacks/* +.. toctree:: + :maxdepth: 2 + :caption: Fitting + :glob: + + fitting/* + .. toctree:: :maxdepth: 2 :caption: Developer information diff --git a/pyproject.toml b/pyproject.toml index 17dfb9a..04130ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,8 @@ dependencies = [ "bluesky", "ophyd-async[ca]", "matplotlib", + "lmfit", + "scipy", "numpy", "scipp", ] diff --git a/src/ibex_bluesky_core/callbacks/fitting/__init__.py b/src/ibex_bluesky_core/callbacks/fitting/__init__.py new file mode 100644 index 0000000..6ef3bd9 --- /dev/null +++ b/src/ibex_bluesky_core/callbacks/fitting/__init__.py @@ -0,0 +1,94 @@ +"""For IBEX Bluesky scan fitting.""" + +import logging +from typing import Callable + +import lmfit +import numpy as np +import numpy.typing as npt +from bluesky.callbacks import LiveFit as _DefaultLiveFit +from bluesky.callbacks.core import make_class_safe + + +class FitMethod: + """Tell LiveFit how to fit to a scan. Has a Model function and a Guess function. + + Model - Takes x values and a set of parameters to return y values. + Guess - Takes x and y values and returns a rough 'guess' of the original parameters. + """ + + model: lmfit.Model + guess: Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]] + + def __init__( + self, + model: lmfit.Model | Callable[[npt.NDArray[np.float64]], npt.NDArray[np.float64]], + guess: Callable[ + [npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter] + ], + ) -> None: + """Assign model and guess functions. + + Args: + model (lmfit.Model | Callable): The model function to use. + guess (Callable): The guess function to use. + + """ + if callable(model): + self.model = lmfit.Model(model) + else: + self.model = model + + self.guess = guess + + +logger = logging.getLogger(__name__) + + +@make_class_safe(logger=logger) # pyright: ignore (pyright doesn't understand this decorator) +class LiveFit(_DefaultLiveFit): + """Live fit, customized for IBEX.""" + + def __init__( + self, + method: FitMethod, + y: str, + x: str, + *, + update_every: int = 1, + ) -> None: + """Call Bluesky LiveFit with assumption that there is only one independant variable. + + Args: + method (FitMethod): The FitMethod (Model & Guess) to use when fitting. + y (str): The name of the dependant variable. + x (str): The name of the independant variable. + update_every (int): How often to update the fit. (seconds) + + """ + self.method = method + + super().__init__( + model=method.model, + y=y, + independent_vars={"x": x}, + update_every=update_every, + ) + + def update_fit(self) -> None: + """Use the provided guess function with the most recent x and y values after every update. + + Args: + None + + Returns: + None + + """ + self.init_guess = self.method.guess( + np.array(next(iter(self.independent_vars_data.values()))), + np.array(self.ydata), + # Calls the guess function on the set of data already collected in the run + ) + + super().update_fit() diff --git a/src/ibex_bluesky_core/callbacks/fitting/fitting_utils.py b/src/ibex_bluesky_core/callbacks/fitting/fitting_utils.py new file mode 100644 index 0000000..0fc0957 --- /dev/null +++ b/src/ibex_bluesky_core/callbacks/fitting/fitting_utils.py @@ -0,0 +1,564 @@ +"""Defines the standard fits. The model and guess functions for each fit.""" + +from abc import ABC, abstractmethod +from typing import Callable + +import lmfit +import numpy as np +import numpy.typing as npt +import scipy +import scipy.special +from lmfit.models import PolynomialModel +from numpy import polynomial as p + +from ibex_bluesky_core.callbacks.fitting import FitMethod + + +class Fit(ABC): + """Base class for all fits.""" + + @classmethod + @abstractmethod + def model(cls, *args: int) -> lmfit.Model: + """Outline base model function. + + Args: + *args (int): Any extra parameters required for fitting. + + Returns: + lmfit.Model: Model function + (x-values: NDArray, parameters: np.float64 -> y-values: NDArray) + + """ + pass + + @classmethod + @abstractmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Outline base Guessing. method. + + Args: + *args (int): Any extra parameters required for fitting. + + Returns: + Callable: Guess function + (x-values: NDArray, y-values: NDArray -> parameters: Dict[str, lmfit.Parameter]) + + """ + pass + + @classmethod + def fit(cls, *args: int) -> FitMethod: + """Return a FitMethod given model and guess functions to pass to LiveFit.""" + return FitMethod(model=cls.model(*args), guess=cls.guess(*args)) + + +class Gaussian(Fit): + """Gaussian Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Gaussian Model.""" + + def model( + x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, background: float + ) -> npt.NDArray[np.float64]: + if sigma == 0: + sigma = 1 + + return amp * np.exp(-((x - x0) ** 2) / (2 * sigma**2)) + background + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Gaussian Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + mean = np.sum(x * y) / np.sum(y) + sigma = np.sqrt(np.sum(y * (x - mean) ** 2) / np.sum(y)) + background = np.min(y) + + if np.max(y) > abs(np.min(y)): + amp = np.max(y) - background + else: + amp = np.min(y) + background + + init_guess = { + "amp": lmfit.Parameter("amp", amp), + "sigma": lmfit.Parameter("sigma", sigma, min=0), + "x0": lmfit.Parameter("x0", mean), + "background": lmfit.Parameter("background", background), + } + + return init_guess + + return guess + + +class Lorentzian(Fit): + """Lorentzian Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Lorentzian Model.""" + + def model( + x: npt.NDArray[np.float64], amp: float, sigma: float, center: float, background: float + ) -> npt.NDArray[np.float64]: + if sigma == 0: + sigma = 1 + + return amp / (1 + ((x - center) / sigma) ** 2) + background + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Lorentzian Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + background = np.min(y) + + if np.max(y) > abs(np.min(y)): + amp_index = np.argmax(y) + amp = y[amp_index] - background + + else: + amp_index = np.argmin(y) + amp = y[amp_index] + background + + center = x[amp_index] + + # Guessing. sigma using FWHM + + half_max = amp / 2 + left_side = np.where(x < center)[0] # x-values left of the peak + right_side = np.where(x > center)[0] # x-values right of the peak + + # Left side + x1_index = ( + left_side[np.argmin(np.abs(y[left_side] - half_max))] if len(left_side) > 0 else 0 + ) + + # Right side + x2_index = ( + right_side[np.argmin(np.abs(y[right_side] - half_max))] + if len(right_side) > 0 + else -1 + ) + + sigma = (x[x2_index] - x[x1_index]) / 2 + + init_guess = { + "amp": lmfit.Parameter("amp", amp), + "sigma": lmfit.Parameter("sigma", sigma, min=0), + "center": lmfit.Parameter("center", center), + "background": lmfit.Parameter("background", background), + } + + return init_guess + + return guess + + +class Linear(Fit): + """Linear Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Linear Model.""" + + def model(x: npt.NDArray[np.float64], m: float, c: float) -> npt.NDArray[np.float64]: + return m * x + c + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Linear Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + # Linear Regression + numerator = sum(x * y) - sum(x) * sum(y) + denominator = sum(x**2) - sum(x) ** 2 + + m = numerator / denominator + c = (sum(y) - m * sum(x)) / len(x) + + init_guess = { + "m": lmfit.Parameter("m", m), + "c": lmfit.Parameter("c", c), + } + + return init_guess + + return guess + + +class Polynomial(Fit): + """Polynomial Fitting.""" + + @classmethod + def _check_degree(cls, args: tuple[int, ...]) -> int: + """Check that polynomial degree is valid.""" + degree = args[0] if args else 7 + if not (0 <= degree <= 7): + raise ValueError("The polynomial degree should be at least 0 and smaller than 8.") + return degree + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Polynomial Model.""" + degree = cls._check_degree(args) + return PolynomialModel(degree=degree) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Polynomial Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + init_guess = {} + degree = cls._check_degree(args) + + coeffs = p.polynomial.polyfit(x, y, degree) + + for i in range(degree + 1): + init_guess[f"c{i}"] = coeffs[i] + + return init_guess + + return guess + + +class DampedOsc(Fit): + """Damped Oscillator Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Damped Oscillator Model.""" + + def model( + x: npt.NDArray[np.float64], center: float, amp: float, freq: float, width: float + ) -> npt.NDArray[np.float64]: + return amp * np.cos((x - center) * freq) * np.exp(-(((x - center) / width) ** 2)) + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Damped Oscillator Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + peak = x[np.argmax(y)] + valley = x[np.argmin(y)] + + init_guess = { + "center": lmfit.Parameter("center", peak), + "amp": lmfit.Parameter("amp", np.max(y)), + "freq": lmfit.Parameter("freq", np.pi / np.abs(peak - valley)), + "width": lmfit.Parameter("width", max(x) - min(x)), + } + + return init_guess + + return guess + + +class SlitScan(Fit): + """Slit Scan Fitting.""" + + @classmethod + def _check_input(cls, args: tuple[int, ...]) -> int: + """Check that provided maximum slit size is atleast 0.""" + max_slit_gap = args[0] if args else 1 + if not (0 <= max_slit_gap): + raise ValueError("The slit gap should be atleast 0.") + return max_slit_gap + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Slit Scan Model.""" + + def model( + x: npt.NDArray[np.float64], + background: float, + inflection0: float, + gradient: float, + inflections_diff: float, + height_above_inflection1: float, + ) -> npt.NDArray[np.float64]: + linear_seg = background + gradient * (x - inflection0) + + if height_above_inflection1 == 0: + exp_seg = gradient * inflections_diff + background + else: + exp_seg = ( + height_above_inflection1 + * scipy.special.erf( + gradient + * (np.sqrt(np.pi) / (2 * height_above_inflection1)) + * (x - inflection0 - inflections_diff) + ) + + gradient * inflections_diff + + background + ) + + linear_seg = np.maximum(linear_seg, background) + exp_seg = np.maximum(exp_seg, background) + + y = np.minimum(linear_seg, exp_seg) + + return y + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Slit Scan Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + max_slit_size = cls._check_input(args) + + # Guessing. gradient of linear-slope part of function + dy = np.gradient(y) # Return array of differences in y + max_dy = np.max(dy) # Return max y difference, this will always be on the upwards slope + dx = x[1] - x[0] # Find x step + gradient = max_dy / dx + + d2y = np.diff(dy) # Double differentiate y to find how gradients change + inflection0 = x[np.argmax(d2y)] # Where there is positive gradient change + + background = min(y) # The lowest y value is the background + inflections_diff = -(background - y[np.argmax(y)]) / gradient + # As linear, using y - y1 = m(x - x1) -> x = (y - y1) / gradient - x1 + + # The highest y value + slightly more to account for further convergence + # - y distance travelled from inflection0 to inflection1 + height_above_inflection1 = np.max(y) + (y[-1] - y[-2]) - (gradient * inflections_diff) + + init_guess = { + "background": lmfit.Parameter("background", background), + "inflection0": lmfit.Parameter("inflection0", inflection0), + "gradient": lmfit.Parameter("gradient", gradient, min=0), + "inflections_diff": lmfit.Parameter( + "inflections_diff", inflections_diff, min=max_slit_size + ), + "height_above_inflection1": lmfit.Parameter( + "height_above_inflection1", height_above_inflection1, min=0 + ), + } + + return init_guess + + return guess + + +class ERF(Fit): + """Error Function Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Error Function Model.""" + + def model( + x: npt.NDArray[np.float64], cen: float, stretch: float, scale: float, background: float + ) -> npt.NDArray[np.float64]: + return background + scale * scipy.special.erf(stretch * (x - cen)) + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Error Function Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + init_guess = { + "cen": lmfit.Parameter("cen", np.mean(x)), + "stretch": lmfit.Parameter("stretch", (max(x) - min(x)) / 2), + "scale": lmfit.Parameter("scale", (max(y) - min(y)) / 2), + "background": lmfit.Parameter("background", np.mean(y)), + } + + return init_guess + + return guess + + +class ERFC(Fit): + """Complementary Error Function Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Complementary Error Function Model.""" + + def model( + x: npt.NDArray[np.float64], cen: float, stretch: float, scale: float, background: float + ) -> npt.NDArray[np.float64]: + return background + scale * scipy.special.erfc(stretch * (x - cen)) + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Complementary Error Function Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + init_guess = { + "cen": lmfit.Parameter("cen", np.mean(x)), + "stretch": lmfit.Parameter("stretch", (max(x) - min(x)) / 2), + "scale": lmfit.Parameter("scale", (max(y) - min(y)) / 2), + "background": lmfit.Parameter("background", np.min(y)), + } + + return init_guess + + return guess + + +class TopHat(Fit): + """Top Hat Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Top Hat Model.""" + + def model( + x: npt.NDArray[np.float64], cen: float, width: float, height: float, background: float + ) -> npt.NDArray[np.float64]: + y = x * 0 + y[np.abs(x - cen) < width / 2] = height + return background + y + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Top Hat Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + top = np.where(y > np.mean(y))[0] + # Guess that any value above the mean is the top part + + if len(top) > 0: + width = x[np.max(top)] - x[np.min(top)] + else: + width = (max(x) - min(x)) / 2 + + init_guess = { + "cen": lmfit.Parameter("cen", np.mean(x)), + "width": lmfit.Parameter("width", width), + "height": lmfit.Parameter("height", (max(y) - min(y))), + "background": lmfit.Parameter("background", min(y)), + } + + return init_guess + + return guess + + +class Trapezoid(Fit): + """Trapezoid Fitting.""" + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + """Trapezoid Model.""" + + def model( + x: npt.NDArray[np.float64], + cen: float, + gradient: float, + height: float, + background: float, + y_offset: float, # Acts as a width multiplier + ) -> npt.NDArray[np.float64]: + y = y_offset + height + background - gradient * np.abs(x - cen) + y = np.maximum(y, background) + y = np.minimum(y, background + height) + return y + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + """Trapezoid Guessing.""" + + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + top = np.where(y > np.mean(y))[0] + # Guess that any value above the y mean is the top part + + cen = np.mean(x) + background = np.min(y) + height = np.max(y) - background + + if top.size > 0: + i = np.min(top) + x1 = x[i] # x1 is the left of the top part + else: + width_top = (np.max(x) - np.min(x)) / 2 + x1 = cen - width_top / 2 + + x0 = 0.5 * (np.min(x) + x1) # Guess that x0 is half way between min(x) and x1 + + if height == 0.0: + gradient = 0.0 + else: + gradient = height / (x1 - x0) + + y_intercept0 = np.max(y) - gradient * x1 # To find the slope function + y_tip = gradient * cen + y_intercept0 + y_offset = y_tip - height - background + + init_guess = { + "cen": lmfit.Parameter("cen", cen), + "gradient": lmfit.Parameter("gradient", gradient, min=0), + "height": lmfit.Parameter("height", height, min=0), + "background": lmfit.Parameter("background", background), + "y_offset": lmfit.Parameter("y_offset", y_offset, min=0), + } + + return init_guess + + return guess diff --git a/tests/callbacks/fitting/__init__.py b/tests/callbacks/fitting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/callbacks/fitting/test_fitting_callback.py b/tests/callbacks/fitting/test_fitting_callback.py new file mode 100644 index 0000000..a59da49 --- /dev/null +++ b/tests/callbacks/fitting/test_fitting_callback.py @@ -0,0 +1,94 @@ +from unittest import mock + +import lmfit +import numpy as np +import numpy.typing as npt + +from ibex_bluesky_core.callbacks.fitting import FitMethod, LiveFit + + +def test_guess_called(): + guess = mock.MagicMock() + + def model(x: npt.NDArray[np.float64]): + return x + + fit = FitMethod(model=lmfit.Model(model), guess=guess) + lf = LiveFit(fit, y="y", x="x") + + x = 1 + y = 2 + + # Fake just enough of an event document. + lf.event( + { + "data": { # type: ignore + "y": y, + "x": x, + } + } + ) + + # Assert that guess_impl was called with the correct arguments + guess.assert_called_with(x, y) + + +def test_lmfit_model_called(): + model_mock = mock.MagicMock() + + def guess(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> dict[str, lmfit.Parameter]: + return {} + + def model(x: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + model_mock() + return x + + fit = FitMethod(model=lmfit.Model(model), guess=guess) + + lf = LiveFit(fit, y="y", x="x") + + x = 1 + y = 2 + + # Fake just enough of an event document. + lf.event( + { + "data": { # type: ignore + "y": y, + "x": x, + } + } + ) + + model_mock.assert_called() + + +def test_model_called(): + model_mock = mock.MagicMock() + + def guess(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> dict[str, lmfit.Parameter]: + return {} + + def model(x: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + model_mock() + return x + + fit = FitMethod(model=model, guess=guess) + # Does not pass the function to lmfit before FitMethod + + lf = LiveFit(fit, y="y", x="x") + + x = 1 + y = 2 + + # Fake just enough of an event document. + lf.event( + { + "data": { # type: ignore + "y": y, + "x": x, + } + } + ) + + model_mock.assert_called() diff --git a/tests/callbacks/fitting/test_fitting_methods.py b/tests/callbacks/fitting/test_fitting_methods.py new file mode 100644 index 0000000..17d2787 --- /dev/null +++ b/tests/callbacks/fitting/test_fitting_methods.py @@ -0,0 +1,703 @@ +import warnings +from typing import Callable +from unittest import mock + +import lmfit +import numpy as np +import numpy.typing as npt +import pytest +import scipy.signal as scsi + +from ibex_bluesky_core.callbacks.fitting import LiveFit +from ibex_bluesky_core.callbacks.fitting.fitting_utils import ( + ERF, + ERFC, + DampedOsc, + Fit, + Gaussian, + Linear, + Lorentzian, + Polynomial, + SlitScan, + TopHat, + Trapezoid, +) + + +class MockFit(Fit): + mock_model = mock.MagicMock() + mock_guess = mock.MagicMock() + + @classmethod + def mocks_called(cls) -> bool: + return cls.mock_model.called and cls.mock_guess.called + + @classmethod + def model(cls, *args: int) -> lmfit.Model: + def model(x: npt.NDArray[np.float64], offset: float) -> npt.NDArray[np.float64]: + cls.mock_model() + return x + offset + + return lmfit.Model(model) + + @classmethod + def guess( + cls, *args: int + ) -> Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], dict[str, lmfit.Parameter]]: + def guess( + x: npt.NDArray[np.float64], y: npt.NDArray[np.float64] + ) -> dict[str, lmfit.Parameter]: + cls.mock_guess() + init_guess = {"offset": lmfit.Parameter("offset", 1)} + return init_guess + + return guess + + +def test_fit_method_uses_respective_model_and_guess(): + # Checks that model and guess are called atleast once each + lf = LiveFit(MockFit.fit(), y="y", x="x") + + x = 1 + y = 2 + + # Fake just enough of an event document. + lf.event( + { + "data": { # type: ignore + "y": y, + "x": x, + } + } + ) + + assert MockFit.mocks_called() + + +class TestGaussian: + class TestGaussianModel: + def test_gaussian_model(self): + x = np.arange(-5.0, 5.0, 1.0) + + background = 1.0 + amp = 1.0 + sigma = 1 + x0 = 0.0 + + outp = Gaussian.model().func(x, amp, sigma, x0=x0, background=background) + outp1 = Gaussian.model().func(x, amp, sigma + 1, x0=x0, background=background) + + # Check the output starts and ends at background level + assert pytest.approx(outp[0], rel=1e-2) == background + assert pytest.approx(outp[-1], rel=1e-2) == background + + # Check the peak is at x0 + peak_index = np.argmin(np.abs(x - x0)) + assert pytest.approx(outp[peak_index], rel=1e-2) == amp + background + + # Check that as sigma increases, steepness of gaussian decreases + assert np.max(np.diff(outp)) > np.max(np.diff(outp1)) + + def test_invalid_gaussian_model(self): + x = np.arange(-5.0, 5.0, 1.0) + + outp = Gaussian.model().func(x, amp=1.0, sigma=0.0, x0=0.0, background=1.0) + outp1 = Gaussian.model().func(x, amp=1.0, sigma=1.0, x0=0.0, background=1.0) + # if sigma = 0.0 then gets changed to 1.0 in model func, this asserts that it does this + assert np.allclose(outp, outp1) + + class TestGaussianGuess: + def test_background(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 2.0, 1.0]) + outp = Gaussian.guess()(x, y) + + assert pytest.approx(y[0], rel=1e-2) == outp["background"].value + + def test_amp_x0(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 2.0, 1.0]) + outp = Gaussian.guess()(x, y) + + assert y[1] == pytest.approx(outp["amp"].value + outp["background"].value, rel=1e-2) # type: ignore + + def test_neg_amp_x0(self): + # upside down gaussian + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([-1.0, -2.0, -1.0]) + outp = Gaussian.guess()(x, y) + + assert outp["amp"] < 0 + + def test_sigma(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 2.0, 1.0]) + # y1 is "wider" so must have higher sigma + y1 = np.array([1.5, 1.75, 1.5]) + + outp = Gaussian.guess()(x, y) + outp1 = Gaussian.guess()(x, y1) + + assert outp["sigma"].value < outp1["sigma"].value # type: ignore + + +class TestLorentzian: + class TestLorentzianModel: + def test_lorentzian_model(self): + x = np.arange(-5.0, 5.0, 1.0) + + background = 1.0 + amp = 1.0 + sigma = 1 + center = 0.0 + + outp = Lorentzian.model().func( + x, amp=amp, sigma=sigma, center=center, background=background + ) + outp1 = Lorentzian.model().func( + x, amp=amp, sigma=sigma + 1, center=center, background=background + ) + + # Check the output starts and ends at background level + assert pytest.approx(outp[0], rel=1e-1) == background + assert pytest.approx(outp[-1], rel=1e-1) == background + + # Check the peak is at center + peak_index = np.argmin(np.abs(x - center)) + assert pytest.approx(outp[peak_index], rel=1e-2) == amp + background + + # Check that as sigma increases, steepness of gaussian decreases + assert np.max(np.diff(outp)) > np.max(np.diff(outp1)) + + def test_invalid_lorentzian_model(self): + x = np.arange(-5.0, 5.0, 1.0) + + outp = Lorentzian.model().func(x, amp=1.0, sigma=0.0, center=0.0, background=1.0) + outp1 = Lorentzian.model().func(x, amp=1.0, sigma=1.0, center=0.0, background=1.0) + # if sigma = 0.0 then gets changed to 1.0 in model func, this asserts that it does this + assert np.allclose(outp, outp1) + + class TestLorentzianGuess: + def test_background(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 2.0, 1.0]) + outp = Lorentzian.guess()(x, y) + + assert pytest.approx(1.0, rel=1e-1) == outp["background"].value + + def test_amp_center(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 2.0, 1.0]) + outp = Lorentzian.guess()(x, y) + + assert 2.0 == pytest.approx(outp["amp"].value + outp["background"].value, rel=1e-2) # type: ignore + + def test_neg_amp_x0(self): + # upside down lorentzian + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([-1.0, -2.0, -1.0]) + outp = Lorentzian.guess()(x, y) + + assert outp["amp"] < 0 + + def test_sigma(self): + x = np.array([-1.0, 0.0, 1.0, 2.0]) + y = np.array([0.0, 2.0, 0.0, 0.0]) + # y1 is "wider" so must have higher sigma + y1 = np.array([1.9, 2.0, 1.9, 1.8]) + + outp = Lorentzian.guess()(x, y) + outp1 = Lorentzian.guess()(x, y1) + + assert outp["sigma"].value < outp1["sigma"].value # type: ignore + + +class TestLinear: + class TestLinearModel: + def test_linear_model(self): + x = np.arange(-5.0, 5.0, 1.0) + grad = 3 + y_intercept = 2 + + outp = Linear.model().func(x, m=grad, c=y_intercept) + + # Check for constant gradient of grad + outp_m = np.diff(outp) + assert np.all(outp_m == grad) + + # Check that when x = 0 that y = y intercept + assert outp[5] == y_intercept + + class TestLinearGuess: + def test_gradient_guess(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([-1.0, 0.0, 1.0]) + outp = Linear.guess()(x, y) + + assert pytest.approx(outp["m"]) == 1.0 + + y = np.array([-2.0, 0.0, 2.0]) + outp1 = Linear.guess()(x, y) + # check with a graph with steeper gradient + assert outp["m"] < outp1["m"] + + def test_y_intercept_guess(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([-1.0, 0.0, 1.0]) + outp = Linear.guess()(x, y) + + assert pytest.approx(outp["c"]) == 0.0 + + +class TestPolynomial: + class TestPolynomialModel: + # Polynomial model provided by lmfit so no need to test extensively + # Just test that polynomial order <= 7 and >= 0 + + @pytest.mark.parametrize("deg", [-1, 8]) + def test_polynomial_model_order(self, deg: int): + # -1 and 8 are both invalid polynomial degrees + x = np.zeros(3) + + with pytest.raises(ValueError): + Polynomial.model(deg).func(x) + + def test_polynomial_model(self): + # check no problems + x = np.zeros(3) + Polynomial.model(7).func(x) + + class TestPolynomialGuess: + # Uses numpy polyfit so no need to test much + # Check that params and values are allocated correctly + + @pytest.mark.parametrize("deg", [2, 7]) + def test_polynomial_guess(self, deg: int): + warnings.filterwarnings("ignore") + # Suppress a rank warning, but np.RankWarning is deprecated + + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 0.0, 1.0]) + + outp = Polynomial.guess(deg)(x, y) + + assert len(outp) == deg + 1 + + for i in range(deg + 1): + assert outp[f"c{i}"] is not None + # checks that param values are allocated to param names + # and that 2 and 7 are both valid polynomial degrees + + @pytest.mark.parametrize("deg", [-1, 8]) + def test_invalid_polynomial_guess(self, deg: int): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 0.0, 1.0]) + + # -1 and 8 are both invalid polynomial degrees + with pytest.raises(ValueError): + Polynomial.guess(deg)(x, y) + + +class TestDampedOsc: + class TestDampedOscModel: + def test_damped_osc_model(self): + x = np.arange(-5.0, 5.0, 1.0) + + amp = 1 + freq = 1 + center = 0 + width = 2 + + outp = DampedOsc.model().func(x, center=center, amp=amp, freq=freq, width=width) + + # Check that the model produces no values further away from x-axis than amp + assert amp >= np.max(outp) + assert -amp <= np.min(outp) + + # Check that the centre is at a peak + assert x[np.argmax(outp)] == center or x[np.argmin(outp)] == center + + outp1 = DampedOsc.model().func( + x, center=center, amp=amp, freq=freq + 3, width=width + 10 + ) + + peaks, _ = scsi.find_peaks(outp) + peaks1, _ = scsi.find_peaks(outp1) + + # Check that the model with the higher freq has more peaks + assert peaks.size < peaks1.size + + # Check that the model with the greater width will have taller sides + assert abs(outp[0]) < abs(outp1[0]) + assert abs(outp[-1]) < abs(outp1[-1]) + + class TestDampedOscGuess: + def test_guess_amp_center(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([1.0, 2.0, 1.0]) + outp = DampedOsc.guess()(x, y) + + assert 2.0 == pytest.approx(outp["amp"].value, rel=1e-2) + + def test_guess_width(self): + x = np.array([-2.0, -1.0, 0.0, 1.0, 2.0]) + y = np.array([0.0, 1.0, 1.0, 1.0, 0.0]) + outp = DampedOsc.guess()(x, y) + + # Checks that the width guess is max x - min x + assert pytest.approx(outp["width"].value, rel=1e-2) == 4.0 + + def test_guess_freq(self): + x = np.array([-2.0, -1.0, 0.0, 1.0, 2.0]) + y = np.array([0.0, 2.0, 0.0, -2.0, 0.0]) + outp = DampedOsc.guess()(x, y) + + # freq = pi / (peak_x - valley_x) = pi / (1 - -1) + assert pytest.approx(outp["freq"].value, rel=1e-2) == np.pi / 2 + + +class TestSlitScan: + def test_max_slit_width(self): + x = np.arange(-5.0, 5.0, 1.0) + y = np.zeros(10) + + # -1 is not a valid max slit gap + with pytest.raises(ValueError): + SlitScan.guess(-1)(x, y) + + class TestSlitScanModel: + def test_slit_scan_model(self): + x = np.arange(-5.0, 5.0, 1.0) + background = 1 + height_above_inflection1 = 5 + gradient = 1 + inflections_diff = 6 + inflection0 = -3 + + outp = SlitScan.model().func( + x, + background=background, + inflection0=inflection0, + gradient=gradient, + inflections_diff=inflections_diff, + height_above_inflection1=height_above_inflection1, + ) + + # Check that the max outp value is not greater than + # the highest point of model given params + assert ( + np.max(outp) <= background + gradient * inflections_diff + height_above_inflection1 + ) + + # Check that any points past x = inflection0 + # do not have a y value equal to or below background + assert np.all(outp[np.where(x > inflection0)] > background) + + def test_slit_scan_model_no_exp(self): + x = np.arange(-5.0, 5.0, 1.0) + background = 1 + height_above_inflection1 = 0 + gradient = 1 + inflections_diff = 3 + inflection0 = -3 + # inflection1 at x=0 y=4 + + outp = SlitScan.model().func( + x, + background=background, + inflection0=inflection0, + gradient=gradient, + inflections_diff=inflections_diff, + height_above_inflection1=height_above_inflection1, + ) + + # Check that second half of array, + # starting from inflection1, is a flat line + # and that all values before are below this line + line = background + gradient * inflections_diff + assert np.all(outp[5:] == line) + assert np.all(outp[:4] < line) + + class TestSlitScanGuess: + def test_guess_background(self): + x = np.array([-1.0, 0.0, 1.0, 2.0]) + y = np.array([1.0, 1.0, 2.0, 3.0]) + outp = SlitScan.guess()(x, y) + + assert 1.0 == pytest.approx(outp["background"].value, rel=1e-2) + + def test_guess_gradient(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0, 4.0]) + y = np.array([1.0, 1.0, 2.0, 3.0, 4.0, 4.0]) + outp = SlitScan.guess()(x, y) + + assert 1.0 == pytest.approx(outp["gradient"].value, rel=1e-2) + + def test_guess_inflections_diff(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0, 4.0]) + y = np.array([1.0, 1.0, 2.0, 3.0, 4.0, 4.0]) + outp = SlitScan.guess()(x, y) + + assert 3.0 == pytest.approx(outp["inflections_diff"].value, rel=1e-2) + + def test_guess_height_above_inflection1(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0, 4.0]) + y = np.array([1.0, 1.0, 2.0, 3.0, 4.0, 4.0]) + outp = SlitScan.guess()(x, y) + + assert 1.0 == pytest.approx(outp["height_above_inflection1"].value, rel=1e-2) + + def test_guess_inflection0(self): + x = np.arange(-5.0, 5.0, 1.0) + y = np.array([0, 0, 0, 0, 0, 1, 2, 3, 4, 5]) + outp = SlitScan.guess()(x, y) + + assert -2.0 == pytest.approx(outp["inflection0"].value, rel=1e-2) + + +class TestERF: + class TestERFModel: + # only need to test for background and scale + # as using scipy sci model + def test_erf_model(self): + x = np.arange(-5.0, 6.0, 1.0) + cen = 0 + stretch = 1 + scale = 1 + background = 1 + + outp = ERF.model().func(x, background=background, cen=cen, stretch=stretch, scale=scale) + + assert background == pytest.approx(np.mean(outp), rel=1e-2) + + outp1 = ERF.model().func( + x, background=background, cen=cen, stretch=stretch, scale=scale + 5 + ) + + # an erf with a greater y-scale should mean greater y mean on absolute values + assert np.mean(np.abs(outp)) < np.mean(np.abs(outp1)) + + class TestERFGuess: + def test_guess_background(self): + x = np.array([-1.0, 0.0, 1.0]) + y = x + + outp = ERF.guess()(x, y) + + assert pytest.approx(outp["background"], rel=1e-2) == 0.0 + + def test_guess_scale(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([-2.0, 0.0, 2.0]) + + outp = ERF.guess()(x, y) + + assert pytest.approx(outp["scale"], rel=1e-2) == 2.0 + + +class TestERFC: + class TestERFCModel: + # only need to test for background and scale + # as using scipy sci model + def test_erfc_model(self): + x = np.arange(-5.0, 6.0, 1.0) + cen = 0 + stretch = 1 + scale = 1 + background = 1 + + outp = ERFC.model().func( + x, background=background, cen=cen, stretch=stretch, scale=scale + ) + + assert background == pytest.approx(np.min(outp), rel=1e-2) + + outp1 = ERFC.model().func( + x, background=background, cen=cen, stretch=stretch, scale=scale + 5 + ) + + # an erf with a greater y-scale should mean greater y mean on absolute values + assert np.mean(np.abs(outp)) < np.mean(np.abs(outp1)) + + class TestERFCGuess: + def test_guess_background(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([2.0, 1.0, 0.0]) + + outp = ERFC.guess()(x, y) + + assert pytest.approx(outp["background"], rel=1e-2) == 0.0 + + def test_guess_scale(self): + x = np.array([-1.0, 0.0, 1.0]) + y = np.array([4.0, 2.0, 0.0]) + + outp = ERFC.guess()(x, y) + + assert pytest.approx(outp["scale"], rel=1e-2) == 2.0 + + +class TestTopHat: + class TestTopHatModel: + def test_top_hat_model(self): + x = np.arange(-5.0, 6.0, 1.0) + cen = 0 + width = 1 + height = 1 + background = 1 + + outp = TopHat.model().func( + x, background=background, cen=cen, width=width, height=height + ) + + assert background == pytest.approx(np.min(outp), rel=1e-2) + assert height + background == pytest.approx(np.max(outp), rel=1e-2) + + outp1 = TopHat.model().func( + x, background=background, cen=cen + 3, width=width + 3, height=height + ) + + # check width + assert np.mean(outp) < np.mean(outp1) + + # check centre + assert np.mean(x[np.where(outp > background)]) < np.mean( + x[np.where(outp1 > background)] + ) + + class TestTopHatGuess: + def test_background_guess(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) + y = np.array([1.0, 2.0, 2.0, 2.0, 1.0]) + + outp = TopHat.guess()(x, y) + + assert outp["background"] == pytest.approx(1.0, rel=1e-2) + + def test_cen_height_guess(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) + y = np.array([1.0, 1.0, 2.0, 1.0, 1.0]) + + outp = TopHat.guess()(x, y) + + assert outp["cen"] == pytest.approx(1.0, rel=1e-2) + assert outp["height"] == pytest.approx(1.0, rel=1e-2) + + def test_width_guess(self): + x = np.array([-2.0, -1.0, 0.0, 1.0, 2.0, 3.0]) + y = np.array([1.0, 1.0, 2.0, 2.0, 1.0, 1.0]) + + outp = TopHat.guess()(x, y) + + assert outp["width"] == pytest.approx(1.0, rel=1e-2) + + def test_guess_given_flat_data(self): + x = np.arange(-5.0, 5.0, 1.0) + y = np.zeros_like(x) + 1 + + outp = TopHat.guess()(x, y) + assert outp["width"] == pytest.approx((max(x) - min(x)) / 2, rel=1e-2) + + +class TestTrapezoid: + class TestTrapezoidModel: + def test_trapezoid_model(self): + x = np.arange(-5.0, 6.0, 1.0) + cen = 0 + y_offset = 1 + height = 1 + background = 1 + gradient = 1 + + outp = Trapezoid.model().func( + x, + cen=cen, + y_offset=y_offset, + height=height, + background=background, + gradient=gradient, + ) + + assert background == pytest.approx(np.min(outp), rel=1e-2) + assert height + background == pytest.approx(np.max(outp), rel=1e-2) + + outp1 = Trapezoid.model().func( + x, + cen=cen + 3, + y_offset=y_offset, + height=height, + background=background, + gradient=gradient - 0.5, + ) + + # check centre moves when data is shifted + assert np.mean(x[np.where(outp > background)]) < np.mean( + x[np.where(outp1 > background)] + ) + + # check gradient: a greater gradient means greater average y values as wider + assert np.mean(outp) < np.mean(outp1) + + outp2 = Trapezoid.model().func( + x, + cen=cen, + y_offset=y_offset + 5, + height=height, + background=background, + gradient=gradient, + ) + + # check y_offset: a greater y_offset means greater average y values as wider + assert np.mean(outp) < np.mean(outp2) + + class TestTrapezoidGuess: + def test_background_guess(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) + y = np.array([1.0, 2.0, 2.0, 2.0, 1.0]) + + outp = Trapezoid.guess()(x, y) + + assert outp["background"] == pytest.approx(1.0, rel=1e-2) + + def test_cen_height_guess(self): + x = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) + y = np.array([1.0, 1.0, 2.0, 1.0, 1.0]) + + outp = Trapezoid.guess()(x, y) + + assert outp["cen"] == pytest.approx(1.0, rel=1e-2) + assert outp["height"] == pytest.approx(1.0, rel=1e-2) + + def test_gradient_guess(self): + x = np.array([-4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0]) + y = np.array([1.0, 2.0, 4.0, 8.0, 16.0, 8.0, 4.0, 2.0, 1.0]) + + # Should choose x = -1.0 as x1 and x = -2.5 as x0 + # height = 16 - 1 = 15 + # gradient = 15 / (-1 - -2.5) = 10 + + outp = Trapezoid.guess()(x, y) + + assert outp["gradient"] == pytest.approx(10.0, rel=1e-2) + + def test_y_offset_guess(self): + x = np.arange(-5.0, 5.0, 1.0) + y = np.array([1.0, 1.0, 1.0, 2.0, 3.0, 3.0, 3.0, 2.0, 1.0, 1.0, 1.0]) + + outp = Trapezoid.guess()(x, y) + + x1 = np.arange(-6.0, 6.0, 1.0) + y1 = np.array([1.0, 1.0, 1.0, 2.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 1.0, 1.0, 1.0]) + + outp1 = Trapezoid.guess()(x1, y1) + + # Assert that with a greater top width, y_offset increases + assert outp["y_offset"] < outp1["y_offset"] + + def test_guess_given_flat_data(self): + x = np.arange(-5.0, 5.0, 1.0) + y = np.zeros_like(x) + 1 + + outp = Trapezoid.guess()(x, y) + # check that with flat data gradient guess is 0 + assert outp["gradient"] == pytest.approx(0.0, rel=1e-2)