diff --git a/ophyd/sim.py b/ophyd/sim.py index 632fbecc4..a0d6971a6 100644 --- a/ophyd/sim.py +++ b/ophyd/sim.py @@ -14,6 +14,7 @@ from types import SimpleNamespace import numpy as np +import scipy from .areadetector.base import EpicsSignalWithRBV from .areadetector.paths import EpicsPathSignal @@ -648,6 +649,108 @@ def exposure_time(self, v): self.val.exposure_time = v + +class SynFunc(Device): + """ + Evaluate a point on an arbitrary function based on the value of a motor. + + Parameters + ---------- + name : string + motor : Device + motor_field : string + response_function : function from R -> R + noise : {'poisson', 'uniform', None}, optional + Add noise to the function peak. + noise_multiplier : float, optional + Only relevant for 'uniform' noise. Multiply the random amount of + noise by 'noise_multiplier' + random_state : numpy random state object, optional + np.random.RandomState(0), to generate random number with given seed + + Example + ------- + motor = SynAxis(name='motor') + response_function = -scipy.special.erf + det = SynFunc('det', motor, 'motor', response_function) + """ + + def _compute(self): + m = self._motor.read()[self._motor_field]["value"] + # we need to do this one at a time because + # - self.read() may be screwed with by the user + # - self.get() would cause infinite recursion + noise = self.noise.get() + noise_multiplier = self.noise_multiplier.get() + v = self._response_function(m) + if noise == "poisson": + v = int(self.random_state.poisson(np.round(v), 1)) + elif noise == "uniform": + v += self.random_state.uniform(-1, 1) * noise_multiplier + return v + + val = Cpt(SynSignal, kind="hinted") + noise = Cpt( + EnumSignal, + value="none", + kind="config", + enum_strings=("none", "poisson", "uniform"), + ) + noise_multiplier = Cpt(Signal, value=1, kind="config") + + def __init__( + self, name, motor, motor_field, response_function, *, random_state=None, **kwargs + ): + set_later = {} + for k in ("noise", "noise_multiplier"): + v = kwargs.pop(k, None) + if v is not None: + set_later[k] = v + super().__init__(name=name, **kwargs) + self._motor = motor + self._motor_field = motor_field + self._response_function = response_function + + self.random_state = random_state or np.random + self.val.name = self.name + self.val.sim_set_func(self._compute) + for k, v in set_later.items(): + getattr(self, k).put(v) + + self.trigger() + + def subscribe(self, *args, **kwargs): + return self.val.subscribe(*args, **kwargs) + + def clear_sub(self, cb, event_type=None): + return self.val.clear_sub(cb, event_type=event_type) + + def unsubscribe(self, cid): + return self.val.unsubscribe(cid) + + def unsubscribe_all(self): + return self.val.unsubscribe_all() + + def trigger(self, *args, **kwargs): + return self.val.trigger(*args, **kwargs) + + @property + def precision(self): + return self.val.precision + + @precision.setter + def precision(self, v): + self.val.precision = v + + @property + def exposure_time(self): + return self.val.exposure_time + + @exposure_time.setter + def exposure_time(self, v): + self.val.exposure_time = v + + class Syn2DGauss(Device): """ Evaluate a point on a Gaussian based on the value of a motor. @@ -1551,6 +1654,17 @@ def hw(save_path=None): noise_multiplier=0.1, labels={"detectors"}, ) + def det_func(x): + return -scipy.special.erf(x) + 1.1 + noisy_func_det = SynFunc( + "noisy_func_det", + motor, + "motor", + det_func, + noise="uniform", + noise_multiplier=0.1, + labels={"detectors"}, + ) det = SynGauss( "det", motor, "motor", center=0, Imax=1, sigma=1, labels={"detectors"} ) @@ -1642,6 +1756,7 @@ def hw(save_path=None): jittery_motor1=jittery_motor1, jittery_motor2=jittery_motor2, noisy_det=noisy_det, + noisy_func_det=noisy_func_det, det=det, identical_det=identical_det, det1=det1,