diff --git a/doc/blocks.md b/doc/blocks.md new file mode 100644 index 0000000..fa62109 --- /dev/null +++ b/doc/blocks.md @@ -0,0 +1,145 @@ +# Blocks + +Blocks are one of IBEX's central abstractions, which present a uniform interface to any +scientifically interesting PV. + +`ibex_bluesky_core` has support for four types of blocks: +- Read-only +- Read/write +- Read/write with setpoint readback +- Motors + +> **_ℹ️_** +> All signals, including blocks, in bluesky have a strong type. This must match +> the underlying EPICS type of the PV, which helps to catch problems up-front rather than +> the middle of a plan. Example error at the start of a plan, from trying to connect a `str` block to a `float` PV: +> ``` +> ophyd_async.core._utils.NotConnected: +> mot: NotConnected: +> setpoint_readback: TypeError: TE:NDW2922:CS:SB:mot:SP:RBV has type float not str +> setpoint: TypeError: TE:NDW2922:CS:SB:mot:SP has type float not str +> readback: TypeError: TE:NDW2922:CS:SB:mot has type float not str +> ``` + +## Block types + +### `block_r` (read-only) + +This is a read-only block. It supports `bluesky`'s `Readable` protocol, as well as +basic metadata protocols such as `HasName`. + +This type of block is usable by: +- Plans like `bluesky.plans.count()` or `bluesky.plans.scan()` as a detector object. +- Plan stubs like `bluesky.plan_stubs.rd()`, which plans may use to get the current value +of a block easily for use in the plan. + +A `BlockR` object does not implement any logic on read - it simply returns the most recent +value of the block. + +A simple constructor, `block_r`, is available, which assumes the current instrument's PV +prefix: + +```python +from ibex_bluesky_core.devices.block import block_r +readable_block = block_r(float, "my_block_name") +``` + +### `block_rw` (read, write) + +This is a read-write block. It supports all of the same protocols as `BlockR`, with the +addition of the `Movable` protocol. + +The addition of the movable protocol means that this type of block can be moved by plan +stubs such as `bluesky.plan_stubs.mv()` or `bluesky.plan_stubs.abs_set()`. + +It can also be used as the `Movable` in full plans like `bluesky.plans.scan()`. + +> **_ℹ️_** +> In bluesky terminology, any object with a `set()` method is `Movable`. Therefore, a +> temperature controller is "moved" from one temperature to another, and a run title +> may equally be "moved" from one title to another. +> +> This is simply a matter of terminology - bluesky fully supports moving things which +> are not motors, even if the documentation tends to use motors as the examples. + +Like `block_r`, a simple constructor is available: + +```python +from ibex_bluesky_core.devices.block import block_rw, BlockWriteConfig +writable_block = block_rw( + float, + "my_block_name", + # Example: configure to always wait 5 seconds after being set. + # For further options, see docstring of BlockWriteConfig. + write_config=BlockWriteConfig(settle_time_s=5.0) +) +``` + +### `block_rw_rbv` (read, write, setpoint readback) + +This is a block with full support for reading and writing as per `BlockRw`, but with +the addition of `bluesky`'s `Locatable` protocol, which allows you to read back the +current setpoint. Where possible, the setpoint will be read back from hardware. + +This object is suitable for use in plan stubs such as `bluesky.plan_stubs.locate()`. + +This object is also more suitable for use in plans which use relative moves - the +relative move will be calculated with respect to the setpoint readback from hardware +(if available). + +Just like `block_rw`, a simple constructor is available: + +```python +from ibex_bluesky_core.devices.block import block_rw_rbv, BlockWriteConfig +rw_rbv_block = block_rw_rbv( + float, + "my_block_name", + # Example: configure to always wait 5 seconds after being set. + # For further options, see docstring of BlockWriteConfig. + write_config=BlockWriteConfig(settle_time_s=5.0) +) +``` + +### `block_mot` (motor-specific) + +This represents a block pointing at a motor record. This has support for: +- Reading (`Readable`) +- Writing (`Movable`) +- Limit-checking (`Checkable`) +- Stopping (e.g. on scan abort) (`Stoppable`) +- And advanced use-cases like fly-scanning + +This type is recommended to be used if the underlying block is a motor record. It always has +type `float`, and as such does not take a type argument (unlike the other block types). + +`Checkable` means that moves which would eventually violate limits can be detected by +bluesky simulators, before the plan ever runs. This can help to catch errors before +the plan is executed against hardware. + +`Stoppable` means that the motor can be asked to stop by bluesky. Plans may choose to execute +a `stop()` on failure, or explicitly during a plan. + +A `block_mot` can be made in a similar way to the other block types; however, it does not +require an explicit type as motors are always of `float` data type: + +```python +from ibex_bluesky_core.devices.block import block_mot +mot_block = block_mot("motor_block") +``` + +## Configuring block write behaviour + +`BlockRw` and `BlockRwRbv` both take a `write_config` argument, which can be used to configure +the behaviour on writing to a block, for example tolerances and settle times. + +See the docstring on `ibex_bluesky_core.devices.block.BlockWriteConfig` for a detailed +description of all the options which are available. + +## Run control + +Run control information is available via the `block.run_control` sub-device. + +Both configuring and reading the current status of run control are permitted. + +> **_ℹ️_** +> Run control limits are always `float`, regardless of the datatype of the block. diff --git a/pyproject.toml b/pyproject.toml index bd0e1bb..402994c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ dependencies = [ [project.optional-dependencies] dev = [ - "ruff", + "ruff>=0.6", "pyright", "pytest", "pytest-asyncio", @@ -79,7 +79,17 @@ directory = "coverage_html_report" [tool.pyright] include = ["src", "tests"] -reportUntypedFunctionDecorator = true +reportConstantRedefinition = true +reportDeprecated = true +reportInconsistentConstructor = true +reportMissingParameterType = true +reportMissingTypeArgument = true +reportUnnecessaryCast = true +reportUnnecessaryComparison = true +reportUnnecessaryContains = true +reportUnnecessaryIsInstance = true +reportUntypedBaseClass = true reportUntypedClassDecorator = true +reportUntypedFunctionDecorator = true [tool.setuptools_scm] diff --git a/src/ibex_bluesky_core/callbacks/document_logger.py b/src/ibex_bluesky_core/callbacks/document_logger.py index 5651bdb..a11b0d5 100644 --- a/src/ibex_bluesky_core/callbacks/document_logger.py +++ b/src/ibex_bluesky_core/callbacks/document_logger.py @@ -2,6 +2,7 @@ import json from pathlib import Path +from typing import Any log_location = Path("C:\\") / "instrument" / "var" / "logs" / "bluesky" / "raw_documents" @@ -14,7 +15,7 @@ def __init__(self) -> None: self.current_start_document = None self.filename = None - def __call__(self, name: str, document: dict) -> None: + def __call__(self, name: str, document: dict[str, Any]) -> None: """Is called when a new document needs to be processed. Writes document to a file. Args: @@ -31,7 +32,7 @@ def __call__(self, name: str, document: dict) -> None: assert self.filename is not None, "Could not create filename." assert self.current_start_document is not None, "Saw a non-start document before a start." - to_write = {"type": name, "document": document} + to_write: dict[str, Any] = {"type": name, "document": document} with open(self.filename, "a") as outfile: outfile.write(f"{json.dumps(to_write)}\n") diff --git a/src/ibex_bluesky_core/demo_plan.py b/src/ibex_bluesky_core/demo_plan.py index 3bd9e96..41b1c2f 100644 --- a/src/ibex_bluesky_core/demo_plan.py +++ b/src/ibex_bluesky_core/demo_plan.py @@ -9,7 +9,7 @@ from ophyd_async.plan_stubs import ensure_connected from ibex_bluesky_core.devices import get_pv_prefix -from ibex_bluesky_core.devices.block import Block +from ibex_bluesky_core.devices.block import BlockRwRbv, block_rw_rbv from ibex_bluesky_core.devices.dae import Dae from ibex_bluesky_core.run_engine import get_run_engine @@ -28,12 +28,12 @@ def run_demo_plan() -> None: """ RE = get_run_engine() prefix = get_pv_prefix() - block = Block(prefix, "mot", float) + block = block_rw_rbv(float, "mot") dae = Dae(prefix) RE(demo_plan(block, dae), LiveTable(["mot", "DAE"])) -def demo_plan(block: Block, dae: Dae) -> Generator[Msg, None, None]: +def demo_plan(block: BlockRwRbv[float], dae: Dae) -> Generator[Msg, None, None]: """Demonstration plan which moves a block and reads the DAE.""" yield from ensure_connected(block, dae, force_reconnect=True) diff --git a/src/ibex_bluesky_core/devices/block.py b/src/ibex_bluesky_core/devices/block.py index 715a706..bf8ae10 100644 --- a/src/ibex_bluesky_core/devices/block.py +++ b/src/ibex_bluesky_core/devices/block.py @@ -1,43 +1,336 @@ """ophyd-async devices and utilities for communicating with IBEX blocks.""" -from typing import Generic, Type, TypeVar +import asyncio +from dataclasses import dataclass +from typing import Callable, Generic, Type, TypeVar -from bluesky.protocols import Locatable, Location -from ophyd_async.core import AsyncStatus, HintedSignal, SignalR, SignalRW, StandardReadable +from bluesky.protocols import Locatable, Location, Movable, Triggerable +from ophyd_async.core import ( + AsyncStatus, + HintedSignal, + SignalR, + SignalRW, + StandardReadable, + observe_value, +) +from ophyd_async.epics.motor import Motor from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw +from ibex_bluesky_core.devices import get_pv_prefix + """Block data type""" T = TypeVar("T") -class Block(StandardReadable, Locatable, Generic[T]): - """Device representing an IBEX read/write block of arbitrary data type.""" +__all__ = [ + "BlockMot", + "BlockR", + "BlockRw", + "BlockRwRbv", + "BlockWriteConfig", + "RunControl", + "block_mot", + "block_r", + "block_rw", + "block_rw_rbv", +] + + +@dataclass(kw_only=True) +class BlockWriteConfig(Generic[T]): + """Configuration settings for writing to blocks. + + use_completion_callback: Whether to wait for an EPICS completion callback while setting + this block. Defaults to true, which is appropriate for most blocks. + + set_success_func: An arbitrary function which is called to decide whether the block has + set successfully yet or not. The function takes (setpoint, actual) as arguments and + should return true if the value has successfully set and is "ready", or False otherwise. + + This can be used to implement arbitrary tolerance behaviour. For example: + >>> def check(setpoint: T, actual: T) -> bool: + >>> return setpoint - 0.1 <= actual <= setpoint + 0.1 + + If use_completion_callback is True, the completion callback must complete before + set_success_func is ever called. + + Executing this function should be "fast" (i.e. the function should not sleep), and it should + not do any external I/O. + + Defaults to None, which means no check is applied. + + set_timeout_s: A timeout, in seconds, on the value being set successfully. The timeout + applies to the EPICS completion callback (if enabled) and the set success function + (if provided), and excludes any configured settle time. - def __init__(self, prefix: str, block_name: str, datatype: Type[T]) -> None: - """Create a new Block device.""" + Defaults to None, which means no timeout. + + settle_time_s: A wait time, in seconds, which is unconditionally applied just before the set + status is marked as complete. Defaults to zero. + + """ + + use_completion_callback: bool = True + set_success_func: Callable[[T, T], bool] | None = None + set_timeout_s: float | None = None + settle_time_s: float = 0.0 + + +class RunControl(StandardReadable): + """Subdevice for common run-control signals.""" + + def __init__(self, prefix: str, name: str = "") -> None: + """Create a run control wrapper for a block. + + Usually run control should be accessed via the run_control property on a block, rather + than by constructing an instance of this class directly. + + Args: + prefix: the run-control prefix, e.g. "IN:INSTRUMENT:CS:SB:blockname:RC:" + name: ophyd device name + + """ + with self.add_children_as_readables(HintedSignal): + # When explicitly reading run control, the most obvious signal that people will be + # interested in is whether the block is in range or not. + self.in_range = epics_signal_r(bool, f"{prefix}INRANGE") + + self.low_limit = epics_signal_rw(float, f"{prefix}LOW") + self.high_limit = epics_signal_rw(float, f"{prefix}HIGH") + + self.suspend_if_invalid = epics_signal_rw(bool, f"{prefix}SOI") + self.enabled = epics_signal_rw(bool, f"{prefix}ENABLE") + + self.out_time = epics_signal_r(float, f"{prefix}OUT:TIME") + self.in_time = epics_signal_r(float, f"{prefix}IN:TIME") + + super().__init__(name=name) + + +class BlockR(StandardReadable, Triggerable, Generic[T]): + """Device representing an IBEX readable block of arbitrary data type.""" + + def __init__(self, datatype: Type[T], prefix: str, block_name: str) -> None: + """Create a new read-only block. + + Args: + datatype: the type of data in this block (e.g. str, int, float) + prefix: the current instrument's PV prefix + block_name: the name of the block + + """ with self.add_children_as_readables(HintedSignal): self.readback: SignalR[T] = epics_signal_r(datatype, f"{prefix}CS:SB:{block_name}") - with self.add_children_as_readables(): - self.setpoint: SignalRW[T] = epics_signal_rw(datatype, f"{prefix}CS:SB:{block_name}:SP") + # Run control doesn't need to be read by default + self.run_control = RunControl(f"{prefix}CS:SB:{block_name}:RC:") super().__init__(name=block_name) self.readback.set_name(block_name) - def set(self, value: T) -> AsyncStatus: - """Set the setpoint of this block. + @AsyncStatus.wrap + async def trigger(self) -> None: + """Blocks need to be triggerable to be used in adaptive scans. - The status returned by this object will be marked done when: - - An EPICS completion callback has been received - * (To do: make completion callback optional) - - (To do: an optionally-configured time period) - - (To do: an optionally-configured readback tolerance) + They do not do anything when triggered. """ - return self.setpoint.set(value, wait=True) + + +class BlockRw(BlockR[T], Movable): + """Device representing an IBEX read/write block of arbitrary data type.""" + + def __init__( + self, + datatype: Type[T], + prefix: str, + block_name: str, + *, + write_config: BlockWriteConfig[T] | None = None, + ) -> None: + """Create a new read-write block. + + The setpoint is not added to read() by default. For most cases where setpoint readback + functionality is desired, BlockRwRbv is a more suitable type. + + If you *explicitly* need to read the setpoint from a BlockRw, you can do so in a plan with: + >>> import bluesky.plan_stubs as bps + >>> block: BlockRw = ... + >>> bps.read(block.setpoint) + + But note that this does not read back the setpoint from hardware, but rather the setpoint + which was last sent by EPICS. + + Args: + datatype: the type of data in this block (e.g. str, int, float) + prefix: the current instrument's PV prefix + block_name: the name of the block + write_config: Settings which control how this device will set the underlying PVs + + """ + self.setpoint: SignalRW[T] = epics_signal_rw(datatype, f"{prefix}CS:SB:{block_name}:SP") + + self._write_config: BlockWriteConfig[T] = write_config or BlockWriteConfig() + + super().__init__(datatype=datatype, prefix=prefix, block_name=block_name) + + @AsyncStatus.wrap + async def set(self, value: T) -> None: + """Set the setpoint of this block.""" + + async def do_set(setpoint: T) -> None: + await self.setpoint.set( + setpoint, wait=self._write_config.use_completion_callback, timeout=None + ) + + # Wait for the _set_success_func to return true. + # This uses an "async for" to loop over items from observe_value, which is an async + # generator. See documentation on "observe_value" or python "async for" for more details + if self._write_config.set_success_func is not None: + async for actual_value in observe_value(self.readback): + if self._write_config.set_success_func(setpoint, actual_value): + break + + async def set_and_settle(setpoint: T) -> None: + if self._write_config.set_timeout_s is not None: + await asyncio.wait_for(do_set(setpoint), timeout=self._write_config.set_timeout_s) + else: + await do_set(setpoint) + + await asyncio.sleep(self._write_config.settle_time_s) + + await set_and_settle(value) + + +class BlockRwRbv(BlockRw[T], Locatable): + """Device representing an IBEX read/write/setpoint readback block of arbitrary data type.""" + + def __init__( + self, + datatype: Type[T], + prefix: str, + block_name: str, + *, + write_config: BlockWriteConfig[T] | None = None, + ) -> None: + """Create a new read/write/setpoint readback block. + + The setpoint readback is added to read(), but not hints(), by default. If you do not need + a setpoint readback, choose BlockRw instead of BlockRwRbv. + + Args: + datatype: the type of data in this block (e.g. str, int, float) + prefix: the current instrument's PV prefix + block_name: the name of the block + write_config: Settings which control how this device will set the underlying PVs + + """ + with self.add_children_as_readables(): + self.setpoint_readback: SignalR[T] = epics_signal_r( + datatype, f"{prefix}CS:SB:{block_name}:SP:RBV" + ) + + super().__init__( + datatype=datatype, prefix=prefix, block_name=block_name, write_config=write_config + ) async def locate(self) -> Location[T]: - """Get the current 'location' (primary value) of this block.""" + """Get the current 'location' of this block.""" + actual, sp_rbv = await asyncio.gather( + self.readback.get_value(), + self.setpoint_readback.get_value(), + ) return { - "readback": await self.readback.get_value(), - "setpoint": await self.setpoint.get_value(), + "readback": actual, + "setpoint": sp_rbv, } + + +class BlockMot(Motor): + """Device representing an IBEX block pointing at a motor.""" + + def __init__( + self, + prefix: str, + block_name: str, + ) -> None: + """Create a new motor-record block. + + The 'BlockMot' object supports motion-specific functionality such as: + - Stopping if a scan is aborted (supports the bluesky 'Stoppable' protocol) + - Limit checking (before a move starts - supports the bluesky 'Checkable' protocol) + - Automatic calculation of move timeouts based on motor velocity + - Fly scanning + + However, it generally relies on the underlying motor being "well-behaved". For example, a + motor which does many retries may exceed the simple default timeout based on velocity (it + is possible to explicitly specify a timeout on set() to override this). + + Blocks pointing at motors do not take a BlockWriteConfiguration parameter, as these + parameters duplicate functionality which already exists in the motor record. The mapping is: + + use_completion_callback: + Motors always use completion callbacks to check whether motion has completed. Whether to + wait on that completion callback can be configured by the 'wait' keyword argument on + set(). + set_success_func: + Use .RDBD and .RTRY to control motor retries if the position has not been reached to + within a specified tolerance. Note that motors which retry a lot may exceed the default + motion timeout which is calculated based on velocity, distance and acceleration. + set_timeout_s: + A suitable timeout is calculated automatically based on velocity, distance and + acceleration as defined on the motor record. This may be overridden by the 'timeout' + keyword-argument on set(). + settle_time_s: + Use .DLY on the motor record to configure this. + """ + self.run_control = RunControl(f"{prefix}CS:SB:{block_name}:RC:") + + # GWBLOCK aliases .VAL to .RBV on a motor record for a block pointing at MOT:MTRxxxx.RBV, + # which is what we have recommended to our users for motor blocks... That means that you + # can't write to .VAL on a motor block. ophyd_async (reasonably) assumes you can write to + # .VAL for a motor which you want to move. + # + # However, we also have motor record aliases for :SP and :SP:RBV, which *don't* get mangled + # by GWBLOCK in that way. So by pointing at CS:SB:blockname:SP:RBV rather than + # CS:SB:blockname here, we avoid a write access exception when moving a motor block. + super().__init__(f"{prefix}CS:SB:{block_name}:SP:RBV", name=block_name) + + +def block_r(datatype: Type[T], block_name: str) -> BlockR[T]: + """Get a local read-only block for the current instrument. + + See documentation of BlockR for more information. + """ + return BlockR(datatype=datatype, prefix=get_pv_prefix(), block_name=block_name) + + +def block_rw( + datatype: Type[T], block_name: str, *, write_config: BlockWriteConfig[T] | None = None +) -> BlockRw[T]: + """Get a local read-write block for the current instrument. + + See documentation of BlockRw for more information. + """ + return BlockRw( + datatype=datatype, prefix=get_pv_prefix(), block_name=block_name, write_config=write_config + ) + + +def block_rw_rbv( + datatype: Type[T], block_name: str, *, write_config: BlockWriteConfig[T] | None = None +) -> BlockRwRbv[T]: + """Get a local read/write/setpoint readback block for the current instrument. + + See documentation of BlockRwRbv for more information. + """ + return BlockRwRbv( + datatype=datatype, prefix=get_pv_prefix(), block_name=block_name, write_config=write_config + ) + + +def block_mot(block_name: str) -> BlockMot: + """Get a local block pointing at a motor record for the local instrument. + + See documentation of BlockMot for more information. + """ + return BlockMot(prefix=get_pv_prefix(), block_name=block_name) diff --git a/tests/conftest.py b/tests/conftest.py index 2ecc7b5..3b2fbe1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import pytest from bluesky.run_engine import RunEngine + from ibex_bluesky_core.run_engine import get_run_engine diff --git a/tests/devices/test_block.py b/tests/devices/test_block.py index 1c28bb4..323bcc4 100644 --- a/tests/devices/test_block.py +++ b/tests/devices/test_block.py @@ -1,38 +1,291 @@ +# pyright: reportMissingParameterType=false + +import asyncio +import sys +from unittest.mock import ANY, MagicMock, patch + +import bluesky.plan_stubs as bps +import bluesky.plans as bp import pytest -from ibex_bluesky_core.devices.block import Block from ophyd_async.core import get_mock_put, set_mock_value +from ibex_bluesky_core.devices.block import ( + BlockMot, + BlockR, + BlockRw, + BlockRwRbv, + BlockWriteConfig, + block_mot, + block_r, + block_rw, + block_rw_rbv, +) + +MOCK_PREFIX = "UNITTEST:MOCK:" + + +if sys.version_info < (3, 11): + aio_timeout_error = asyncio.exceptions.TimeoutError +else: + aio_timeout_error = TimeoutError + + +async def _make_block(clazz): + block = clazz(float, MOCK_PREFIX, "float_block") + await block.connect(mock=True) + return block + @pytest.fixture -async def float_block() -> Block[float]: - block = Block("UNITTEST:MOCK:", "float_block", float) +async def rw_rbv_block() -> BlockRwRbv[float]: + return await _make_block(BlockRwRbv) + + +@pytest.fixture(params=[BlockRw, BlockRwRbv]) +async def writable_block(request) -> BlockRw[float]: + return await _make_block(request.param) + + +@pytest.fixture(params=[BlockR, BlockRw, BlockRwRbv]) +async def readable_block(request) -> BlockR[float]: + return await _make_block(request.param) + + +@pytest.fixture +async def mot_block(): + block = BlockMot(MOCK_PREFIX, "mot_block") + await block.connect(mock=True) + return block + + +async def _block_with_write_config(write_config: BlockWriteConfig[float]) -> BlockRwRbv[float]: + block = BlockRwRbv(float, MOCK_PREFIX, "block", write_config=write_config) await block.connect(mock=True) return block -def test_block_naming(float_block): - assert float_block.name == "float_block" - assert float_block.setpoint.name == "float_block-setpoint" - assert float_block.readback.name == "float_block" +def test_block_naming(rw_rbv_block): + assert rw_rbv_block.name == "float_block" + assert rw_rbv_block.setpoint.name == "float_block-setpoint" + assert rw_rbv_block.setpoint_readback.name == "float_block-setpoint_readback" + assert rw_rbv_block.readback.name == "float_block" -def test_block_signal_monitors_correct_pv(float_block): - assert float_block.readback.source.endswith("UNITTEST:MOCK:CS:SB:float_block") - assert float_block.setpoint.source.endswith("UNITTEST:MOCK:CS:SB:float_block:SP") +def test_mot_block_naming(mot_block): + assert mot_block.name == "mot_block" + assert mot_block.user_readback.name == "mot_block" + assert mot_block.user_setpoint.name == ("mot_block-user_setpoint" "") -async def test_locate(float_block): - set_mock_value(float_block.readback, 10) - set_mock_value(float_block.setpoint, 20) - location = await float_block.locate() +def test_block_signal_monitors_correct_pv(rw_rbv_block): + assert rw_rbv_block.readback.source.endswith("UNITTEST:MOCK:CS:SB:float_block") + assert rw_rbv_block.setpoint.source.endswith("UNITTEST:MOCK:CS:SB:float_block:SP") + assert rw_rbv_block.setpoint_readback.source.endswith("UNITTEST:MOCK:CS:SB:float_block:SP:RBV") + + +def test_mot_block_monitors_correct_pv(mot_block): + # The SP:RBV here is intentional - GWBLOCK mangles mot_block by "swapping" .RBV and .VAL, + # but doesn't mangle the :SP:RBV motor record alias, so we use that instead. + assert mot_block.user_setpoint.source.endswith("UNITTEST:MOCK:CS:SB:mot_block:SP:RBV.VAL") + assert mot_block.user_readback.source.endswith("UNITTEST:MOCK:CS:SB:mot_block:SP:RBV.RBV") + + +async def test_locate(rw_rbv_block): + set_mock_value(rw_rbv_block.readback, 10) + set_mock_value(rw_rbv_block.setpoint, 20) + set_mock_value(rw_rbv_block.setpoint_readback, 30) + location = await rw_rbv_block.locate() assert location == { "readback": 10, - "setpoint": 20, + "setpoint": 30, # Should use SP:RBV not SP + } + + +async def test_hints(readable_block): + # The primary readback should be the only "hinted" signal on a block + assert readable_block.hints == {"fields": ["float_block"]} + + +async def test_mot_hints(mot_block): + assert mot_block.hints == {"fields": ["mot_block"]} + + +async def test_read(rw_rbv_block): + set_mock_value(rw_rbv_block.readback, 10.0) + set_mock_value(rw_rbv_block.setpoint, 20.0) + set_mock_value(rw_rbv_block.setpoint_readback, 30.0) + reading = await rw_rbv_block.read() + + assert reading == { + "float_block": { + "alarm_severity": 0, + "timestamp": ANY, + "value": 10.0, + }, + "float_block-setpoint_readback": { + "alarm_severity": 0, + "timestamp": ANY, + "value": 30.0, + }, + } + + +async def test_describe(rw_rbv_block): + set_mock_value(rw_rbv_block.readback, 10.0) + set_mock_value(rw_rbv_block.setpoint, 20.0) + set_mock_value(rw_rbv_block.setpoint_readback, 30.0) + reading = await rw_rbv_block.read() + descriptor = await rw_rbv_block.describe() + + assert reading.keys() == descriptor.keys() + + assert descriptor["float_block"]["dtype"] == "number" + assert descriptor["float_block-setpoint_readback"]["dtype"] == "number" + + +async def test_read_and_describe_configuration(readable_block): + # Blocks don't have any configuration signals at the moment so these should be empty + configuration_reading = await readable_block.read_configuration() + configuration_descriptor = await readable_block.describe_configuration() + assert configuration_reading == {} + assert configuration_descriptor == {} + + +async def test_block_set(writable_block): + set_mock_value(writable_block.setpoint, 10) + await writable_block.set(20) + get_mock_put(writable_block.setpoint).assert_called_once_with(20, wait=True, timeout=None) + + +async def test_block_set_without_epics_completion_callback(): + block = await _block_with_write_config(BlockWriteConfig(use_completion_callback=False)) + await block.set(20) + get_mock_put(block.setpoint).assert_called_once_with(20, wait=False, timeout=None) + + +async def test_block_set_with_arbitrary_completion_function(): + func = MagicMock(return_value=True) + block = await _block_with_write_config(BlockWriteConfig(set_success_func=func)) + + set_mock_value(block.readback, 10) + set_mock_value(block.setpoint_readback, 30) + + await block.set(20) + + func.assert_called_once_with(20, 10) + + +async def test_block_set_with_timeout(): + func = MagicMock(return_value=False) # Never completes + block = await _block_with_write_config( + BlockWriteConfig(set_success_func=func, set_timeout_s=0.1) + ) + + set_mock_value(block.readback, 10) + + with pytest.raises(aio_timeout_error): + await block.set(20) + + func.assert_called_once_with(20, 10) + + +async def test_block_set_which_completes_before_timeout(): + block = await _block_with_write_config( + BlockWriteConfig(use_completion_callback=False, set_timeout_s=1) + ) + await block.set(20) + + +async def test_block_set_with_settle_time_longer_than_timeout(): + block = await _block_with_write_config( + BlockWriteConfig(use_completion_callback=False, set_timeout_s=1, settle_time_s=30) + ) + + with patch("ibex_bluesky_core.devices.block.asyncio.sleep") as mock_aio_sleep: + await block.set(20) + mock_aio_sleep.assert_called_once_with(30) + + +@pytest.mark.parametrize( + "func,args", + [ + (block_r, (float, "some_block")), + (block_rw, (float, "some_block")), + (block_rw_rbv, (float, "some_block")), + (block_mot, ("some_block",)), + ], +) +def test_block_utility_function(func, args): + with patch("ibex_bluesky_core.devices.block.get_pv_prefix") as mock_get_prefix: + mock_get_prefix.return_value = MOCK_PREFIX + block = func(*args) + assert block.name == "some_block" + + +async def test_runcontrol_read_and_describe(readable_block): + reading = await readable_block.run_control.read() + descriptor = await readable_block.run_control.describe() + + assert reading.keys() == descriptor.keys() + + assert reading.keys() == { + "float_block-run_control-in_range", + } + + assert reading["float_block-run_control-in_range"] == { + "alarm_severity": 0, + "timestamp": ANY, + "value": False, } + assert descriptor["float_block-run_control-in_range"]["dtype"] == "boolean" + + +async def test_runcontrol_hints(readable_block): + # Hinted field for explicitly reading run-control: is the reading in range? + hints = readable_block.run_control.hints + assert hints == {"fields": ["float_block-run_control-in_range"]} + + +async def test_runcontrol_monitors_correct_pv(readable_block): + source = readable_block.run_control.in_range.source + assert source.endswith("UNITTEST:MOCK:CS:SB:float_block:RC:INRANGE") + + +async def test_mot_block_runcontrol_monitors_correct_pv(mot_block): + source = mot_block.run_control.in_range.source + # The main "motor" uses mot_block:SP:RBV, but run control should not. + assert source.endswith("UNITTEST:MOCK:CS:SB:mot_block:RC:INRANGE") + + +def test_plan_count_block(RE, readable_block): + set_mock_value(readable_block.readback, 123.0) + + docs = [] + result = RE(bp.count([readable_block]), lambda typ, doc: docs.append((typ, doc))) + assert result.exit_status == "success" + + # Should have one event document + assert len([doc for (typ, doc) in docs if typ == "event"]) == 1 + + for typ, doc in docs: + if typ == "event": + assert doc["data"]["float_block"] == 123.0 + + +def test_plan_rd_block(RE, readable_block): + set_mock_value(readable_block.readback, 123.0) + result = RE(bps.rd(readable_block)) + assert result.plan_result == 123.0 + + +def test_plan_trigger_block(RE, readable_block): + # A block must be able to be triggered for use in adaptive scans. + result = RE(bps.trigger(readable_block)) + assert result.exit_status == "success" -async def test_block_set(float_block): - set_mock_value(float_block.setpoint, 10) - await float_block.set(20) - get_mock_put(float_block.setpoint).assert_called_once_with(20, wait=True, timeout=10) +def test_plan_mv_block(RE, writable_block): + set_mock_value(writable_block.setpoint, 123.0) + RE(bps.mv(writable_block, 456.0)) + get_mock_put(writable_block.setpoint).assert_called_once_with(456.0, wait=True, timeout=None) diff --git a/tests/devices/test_dae.py b/tests/devices/test_dae.py index 186b841..d30bd7b 100644 --- a/tests/devices/test_dae.py +++ b/tests/devices/test_dae.py @@ -1,7 +1,10 @@ +# pyright: reportMissingParameterType=false + import pytest -from ibex_bluesky_core.devices.dae import Dae from ophyd_async.core import get_mock_put +from ibex_bluesky_core.devices.dae import Dae + @pytest.fixture async def dae() -> Dae: diff --git a/tests/devices/test_init.py b/tests/devices/test_init.py index 5468119..1908483 100644 --- a/tests/devices/test_init.py +++ b/tests/devices/test_init.py @@ -1,6 +1,7 @@ from unittest.mock import patch import pytest + from ibex_bluesky_core.devices import get_pv_prefix diff --git a/tests/test_document_logging_callback.py b/tests/test_document_logging_callback.py index cd697d8..8b4602c 100644 --- a/tests/test_document_logging_callback.py +++ b/tests/test_document_logging_callback.py @@ -1,3 +1,5 @@ +# pyright: reportMissingParameterType=false + import json from pathlib import Path from typing import Generator diff --git a/tests/test_run_engine.py b/tests/test_run_engine.py index 91dd973..d1c5048 100644 --- a/tests/test_run_engine.py +++ b/tests/test_run_engine.py @@ -1,3 +1,5 @@ +# pyright: reportMissingParameterType=false + import threading from typing import Any, Generator from unittest.mock import MagicMock @@ -6,6 +8,7 @@ import pytest from bluesky.run_engine import RunEngineResult from bluesky.utils import Msg, RequestAbort, RequestStop, RunEngineInterrupted + from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine