From 8cd3061ed48d2f83afa789b291c2f65674442265 Mon Sep 17 00:00:00 2001 From: tangkong Date: Wed, 12 Jun 2024 12:30:46 -0700 Subject: [PATCH 01/14] ENH: add _BaseShim and aioca async CA shim layer --- superscore/control_layers/__init__.py | 0 superscore/control_layers/_aioca.py | 21 +++++++++++++++++++++ superscore/control_layers/_base_shim.py | 12 ++++++++++++ 3 files changed, 33 insertions(+) create mode 100644 superscore/control_layers/__init__.py create mode 100644 superscore/control_layers/_aioca.py create mode 100644 superscore/control_layers/_base_shim.py diff --git a/superscore/control_layers/__init__.py b/superscore/control_layers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/superscore/control_layers/_aioca.py b/superscore/control_layers/_aioca.py new file mode 100644 index 0000000..e813a2b --- /dev/null +++ b/superscore/control_layers/_aioca.py @@ -0,0 +1,21 @@ +""" +Control layer shim for communicating asynchronously through channel access +""" +from typing import Any, Callable + +from aioca import caget, camonitor, caput + +from ._base_shim import _ShimBase + + +class AiocaShim(_ShimBase): + # TODO: consider handling datatype arguments in caput/get + # TODO: wrap CANothing results into unified status object + async def get(self, address: str): + return await caget(address) + + async def put(self, address: str, value: Any): + return await caput(address, value) + + def monitor(self, address: str, callback: Callable): + camonitor(address, callback) diff --git a/superscore/control_layers/_base_shim.py b/superscore/control_layers/_base_shim.py new file mode 100644 index 0000000..4bae3b8 --- /dev/null +++ b/superscore/control_layers/_base_shim.py @@ -0,0 +1,12 @@ +from typing import Any, Callable + + +class _ShimBase: + async def get(self, address: str): + raise NotImplementedError + + async def put(self, address: str, value: Any): + raise NotImplementedError + + def monitor(self, address: str, callback: Callable): + raise NotImplementedError From d60a908c7661ec682dd88497cab56811bdc29482 Mon Sep 17 00:00:00 2001 From: tangkong Date: Wed, 12 Jun 2024 12:38:17 -0700 Subject: [PATCH 02/14] ENH: first pass at control layer, holding shims --- superscore/control_layers/core.py | 98 +++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 superscore/control_layers/core.py diff --git a/superscore/control_layers/core.py b/superscore/control_layers/core.py new file mode 100644 index 0000000..09e611b --- /dev/null +++ b/superscore/control_layers/core.py @@ -0,0 +1,98 @@ +""" +Main control layer objects. Exposes basic communication operations, +and dispatches to various shims depending on the context. +""" +import asyncio +from contextlib import suppress +from functools import singledispatchmethod +from typing import Any, Dict + +from ._aioca import AiocaShim +from ._base_shim import _ShimBase + + +class ControlLayer: + """ + Control Layer used to communicate with the control system, dispatching to + whichever shim is relevant. + """ + shims: Dict[str, _ShimBase] + + def __init__(self, *args, **kwargs): + self.shims = { + 'ca': AiocaShim(), + } + + def shim_from_pv(self, pv: str) -> _ShimBase: + split = pv.split("://", 1) + if len(split) > 1: + # We got something like pva://mydevice, so use specified comms mode + shim = self.shims.get(split[0], None) + else: + # No comms mode specified, use the default + shim = list(self.shims.values())[0] + + if shim is None: + raise ValueError(f"PV is of an unsupported protocol: {pv}") + + return shim + + @singledispatchmethod + def get(self, pv): + print(f"PV is of an unsupported type: {type(pv)}. Provide either " + "a string or list of strings") + + @get.register + def _(self, pv: str): + return asyncio.run(self._get_one(pv)) + + @get.register + def _(self, pv: list): + coros = [] + for p in pv: + coros.append(self._get_one(p)) + + loop = asyncio.get_event_loop() + return loop.run_until_complete(asyncio.gather(*coros)) + + async def _get_one(self, pv: str): + shim = self.shim_from_pv(pv) + return await shim.get(pv) + + @singledispatchmethod + def put(self, pv, value: Any): + print(f"PV is of an unsupported type: {type(pv)}. Provide either " + "a string or list of strings") + + @put.register + def _(self, pv: str, value: Any): + return asyncio.run(self._put_one(pv, value)) + + @put.register + def _(self, pv: list, value: list, sequential: bool = False): + coros = [] + for p, val in zip(pv, value): + coros.append(self._put_one(p, val)) + + loop = asyncio.get_event_loop() + return loop.run_until_complete(asyncio.gather(*coros)) + + async def _put_one(self, pv: str, value: Any): + shim = self.shim_from_pv(pv) + return await shim.put(pv, value) + + def subscribe(self, pv, cb): + # Subscribes a callback to the PV address + shim = self.shim_from_pv(pv) + shim.monitor(pv, cb) + + def stop(self): + # stop all currently running tasks. + # TODO: make all tasks generated in superscore actually handle + # CancelledError properly and clean up... + loop = asyncio.get_event_loop() + pending_tasks = asyncio.all_tasks(loop) + for task in pending_tasks: + task.cancel() + with suppress(asyncio.CancelledError): + loop.run_until_complete(task) From d31549fd8ce2e554442b29b582dbbea5336167de Mon Sep 17 00:00:00 2001 From: tangkong Date: Thu, 13 Jun 2024 09:22:07 -0700 Subject: [PATCH 03/14] MNT: remove stop method, it will be useless given the current sync interface --- superscore/control_layers/core.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/superscore/control_layers/core.py b/superscore/control_layers/core.py index 09e611b..d4433ec 100644 --- a/superscore/control_layers/core.py +++ b/superscore/control_layers/core.py @@ -3,7 +3,6 @@ and dispatches to various shims depending on the context. """ import asyncio -from contextlib import suppress from functools import singledispatchmethod from typing import Any, Dict @@ -85,14 +84,3 @@ def subscribe(self, pv, cb): # Subscribes a callback to the PV address shim = self.shim_from_pv(pv) shim.monitor(pv, cb) - - def stop(self): - # stop all currently running tasks. - # TODO: make all tasks generated in superscore actually handle - # CancelledError properly and clean up... - loop = asyncio.get_event_loop() - pending_tasks = asyncio.all_tasks(loop) - for task in pending_tasks: - task.cancel() - with suppress(asyncio.CancelledError): - loop.run_until_complete(task) From 3688d791c9a6987955cf48a0e611984a24c2f35d Mon Sep 17 00:00:00 2001 From: tangkong Date: Thu, 13 Jun 2024 14:22:52 -0700 Subject: [PATCH 04/14] ENH: add TaskStatus wrapper, add callback arg to ControlLayer.put methods --- superscore/control_layers/_aioca.py | 4 +- superscore/control_layers/core.py | 44 ++++++++++++---- superscore/control_layers/status.py | 80 +++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 13 deletions(-) create mode 100644 superscore/control_layers/status.py diff --git a/superscore/control_layers/_aioca.py b/superscore/control_layers/_aioca.py index e813a2b..0362d0a 100644 --- a/superscore/control_layers/_aioca.py +++ b/superscore/control_layers/_aioca.py @@ -5,7 +5,7 @@ from aioca import caget, camonitor, caput -from ._base_shim import _ShimBase +from superscore.control_layers._base_shim import _ShimBase class AiocaShim(_ShimBase): @@ -15,7 +15,7 @@ async def get(self, address: str): return await caget(address) async def put(self, address: str, value: Any): - return await caput(address, value) + await caput(address, value) def monitor(self, address: str, callback: Callable): camonitor(address, callback) diff --git a/superscore/control_layers/core.py b/superscore/control_layers/core.py index d4433ec..1fee769 100644 --- a/superscore/control_layers/core.py +++ b/superscore/control_layers/core.py @@ -4,7 +4,9 @@ """ import asyncio from functools import singledispatchmethod -from typing import Any, Dict +from typing import Any, Callable, Dict, Optional + +from superscore.control_layers.status import TaskStatus from ._aioca import AiocaShim from ._base_shim import _ShimBase @@ -59,26 +61,46 @@ async def _get_one(self, pv: str): return await shim.get(pv) @singledispatchmethod - def put(self, pv, value: Any): + def put(self, pv, value: Any, cb: Optional[Callable]): print(f"PV is of an unsupported type: {type(pv)}. Provide either " "a string or list of strings") @put.register - def _(self, pv: str, value: Any): - return asyncio.run(self._put_one(pv, value)) + def _(self, pv: str, value: Any, cb: Optional[Callable] = None): + async def status_coro(): + status = self._put_one(pv, value) + if cb is not None: + status.add_callback(cb) + await status.task + return status + + return asyncio.run(status_coro()) @put.register - def _(self, pv: list, value: list, sequential: bool = False): - coros = [] - for p, val in zip(pv, value): - coros.append(self._put_one(p, val)) + def _(self, pv: list, value: list, cb: Optional[list[Callable]] = None): - loop = asyncio.get_event_loop() - return loop.run_until_complete(asyncio.gather(*coros)) + async def status_coros(): + statuses = [] + if cb is None: + callbacks = [None for _ in range(len(pv))] + else: + callbacks = cb + + for p, val, c in zip(pv, value, callbacks): + status = self._put_one(p, val) + if c is not None: + status.add_callback(c) + + statuses.append(status) + await asyncio.gather(*[s.task for s in statuses]) + return statuses + + return asyncio.run(status_coros()) + @TaskStatus.wrap async def _put_one(self, pv: str, value: Any): shim = self.shim_from_pv(pv) - return await shim.put(pv, value) + await shim.put(pv, value) def subscribe(self, pv, cb): # Subscribes a callback to the PV address diff --git a/superscore/control_layers/status.py b/superscore/control_layers/status.py new file mode 100644 index 0000000..c463a1f --- /dev/null +++ b/superscore/control_layers/status.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import asyncio +import functools +from typing import Awaitable, Callable, Optional, Type, TypeVar, cast + +TS = TypeVar("TS", bound="TaskStatus") + + +class TaskStatus: + """ + Unified Status object for wrapping task completion information and attaching + callbacks This must be created inside of a coroutine, but can be returned to + synchronous scope for examining the task + + Largely vendored from bluesky/ophyd-async + """ + + def __init__(self, awaitable: Awaitable): + if isinstance(awaitable, asyncio.Task): + self.task = awaitable + else: + self.task = asyncio.create_task(awaitable) + self.task.add_done_callback(self._run_callbacks) + self._callbacks: list[Callable] = [] + + def __await__(self): + return self.task.__await__() + + def add_callback(self, callback: Callable): + if self.done: + callback(self) + else: + self._callbacks.append(callback) + + def _run_callbacks(self, task: asyncio.Task): + for callback in self._callbacks: + callback(self) + + def exception(self) -> Optional[BaseException]: + if self.task.done(): + try: + return self.task.exception() + except asyncio.CancelledError as e: + return e + return None + + @property + def done(self) -> bool: + return self.task.done() + + @property + def success(self) -> bool: + return ( + self.task.done() + and not self.task.cancelled() + and self.task.exception() is None + ) + + def __repr__(self) -> str: + if self.done: + if e := self.exception(): + status = f"errored: {repr(e)}" + else: + status = "done" + else: + status = "pending" + return f"<{type(self).__name__}, task: {self.task.get_coro()}, {status}>" + + __str__ = __repr__ + + @classmethod + def wrap(cls: Type[TS], f: Callable[..., Awaitable]) -> Callable[..., TS]: + """Wrap an async function in a TaskStatus.""" + + @functools.wraps(f) + def wrap_f(*args, **kwargs) -> TS: + return cls(f(*args, **kwargs)) + + return cast(Callable[..., TS], wrap_f) From 7dfa500f8c91197a932687a2aa2b5547d8d628f7 Mon Sep 17 00:00:00 2001 From: tangkong Date: Thu, 13 Jun 2024 15:32:39 -0700 Subject: [PATCH 05/14] MNT: _ShimBase -> _BaseShim for consistency --- superscore/control_layers/_aioca.py | 4 ++-- superscore/control_layers/_base_shim.py | 2 +- superscore/control_layers/core.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/superscore/control_layers/_aioca.py b/superscore/control_layers/_aioca.py index 0362d0a..8cbf04e 100644 --- a/superscore/control_layers/_aioca.py +++ b/superscore/control_layers/_aioca.py @@ -5,10 +5,10 @@ from aioca import caget, camonitor, caput -from superscore.control_layers._base_shim import _ShimBase +from superscore.control_layers._base_shim import _BaseShim -class AiocaShim(_ShimBase): +class AiocaShim(_BaseShim): # TODO: consider handling datatype arguments in caput/get # TODO: wrap CANothing results into unified status object async def get(self, address: str): diff --git a/superscore/control_layers/_base_shim.py b/superscore/control_layers/_base_shim.py index 4bae3b8..38fc55a 100644 --- a/superscore/control_layers/_base_shim.py +++ b/superscore/control_layers/_base_shim.py @@ -1,7 +1,7 @@ from typing import Any, Callable -class _ShimBase: +class _BaseShim: async def get(self, address: str): raise NotImplementedError diff --git a/superscore/control_layers/core.py b/superscore/control_layers/core.py index 1fee769..02fa5bc 100644 --- a/superscore/control_layers/core.py +++ b/superscore/control_layers/core.py @@ -9,7 +9,7 @@ from superscore.control_layers.status import TaskStatus from ._aioca import AiocaShim -from ._base_shim import _ShimBase +from ._base_shim import _BaseShim class ControlLayer: @@ -17,14 +17,14 @@ class ControlLayer: Control Layer used to communicate with the control system, dispatching to whichever shim is relevant. """ - shims: Dict[str, _ShimBase] + shims: Dict[str, _BaseShim] def __init__(self, *args, **kwargs): self.shims = { 'ca': AiocaShim(), } - def shim_from_pv(self, pv: str) -> _ShimBase: + def shim_from_pv(self, pv: str) -> _BaseShim: split = pv.split("://", 1) if len(split) > 1: # We got something like pva://mydevice, so use specified comms mode From 6e74fbf32482d057d1290d95b6decd5b94cdd2a4 Mon Sep 17 00:00:00 2001 From: tangkong Date: Thu, 13 Jun 2024 16:55:44 -0700 Subject: [PATCH 06/14] BLD: add aioca and pytest-asyncio to dependencies --- dev-requirements.txt | 1 + requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index ca01160..6d998b6 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -2,6 +2,7 @@ # necessarily required for _using_ it. coverage pytest +pytest-asyncio pytest-cov pytest-qt diff --git a/requirements.txt b/requirements.txt index 8754683..daf67f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # List requirements here. +aioca apischema pcdsutils PyQt5 From a1b94700202c391b290dadb269367e3ab6e5dd48 Mon Sep 17 00:00:00 2001 From: tangkong Date: Thu, 13 Jun 2024 16:56:45 -0700 Subject: [PATCH 07/14] TST: first pass status and cl tests --- pyproject.toml | 2 +- superscore/tests/conftest.py | 22 ++++++++++++++ superscore/tests/test_cl.py | 34 +++++++++++++++++++++ superscore/tests/test_status.py | 54 +++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 superscore/tests/test_cl.py create mode 100644 superscore/tests/test_status.py diff --git a/pyproject.toml b/pyproject.toml index c021d5b..08dcf88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,4 +52,4 @@ file = "dev-requirements.txt" file = "docs-requirements.txt" [tool.pytest.ini_options] -addopts = "--cov=." +addopts = "--cov=. --asyncio-mode=auto" diff --git a/superscore/tests/conftest.py b/superscore/tests/conftest.py index 14cf4a7..372d6ca 100644 --- a/superscore/tests/conftest.py +++ b/superscore/tests/conftest.py @@ -7,6 +7,8 @@ from superscore.backends.core import _Backend from superscore.backends.filestore import FilestoreBackend from superscore.backends.test import TestBackend +from superscore.control_layers._base_shim import _BaseShim +from superscore.control_layers.core import ControlLayer from superscore.model import (Collection, Parameter, Readback, Root, Setpoint, Snapshot) @@ -675,3 +677,23 @@ def test_backends(filestore_backend: FilestoreBackend) -> List[_Backend]: def backends(request, test_backends: List[_Backend]): i = request.param return test_backends[i] + + +class DummyShim(_BaseShim): + """Shim that does nothing""" + async def get(self, *args, **kwargs): + return + + async def put(self, *args, **kwargs): + return + + def monitor(self, *args, **kwargs): + return + + +@pytest.fixture(scope='function') +def dummy_cl() -> ControlLayer: + cl = ControlLayer() + cl.shims['ca'] = DummyShim() + cl.shims['pva'] = DummyShim() + return cl diff --git a/superscore/tests/test_cl.py b/superscore/tests/test_cl.py new file mode 100644 index 0000000..ebb5ddb --- /dev/null +++ b/superscore/tests/test_cl.py @@ -0,0 +1,34 @@ +from unittest.mock import AsyncMock + +from superscore.control_layers.status import TaskStatus + + +def test_get(dummy_cl): + mock_ca_get = AsyncMock(return_value='ca_value') + dummy_cl.shims['ca'].get = mock_ca_get + mock_pva_get = AsyncMock(return_value='pva_value') + dummy_cl.shims['pva'].get = mock_pva_get + assert dummy_cl.get("SOME_PREFIX") == "ca_value" + assert dummy_cl.get("ca://SOME_PREFIX") == "ca_value" + assert dummy_cl.get("pva://SOME_PREFIX") == "pva_value" + + +def test_put(dummy_cl): + result = dummy_cl.put("OTHER:PREFIX", 4) + assert isinstance(result, TaskStatus) + assert result.done + + results = dummy_cl.put(["OTHER:PREFIX", "GE", "LT"], [4, 5, 6]) + assert all(isinstance(res, TaskStatus) for res in results) + assert result.done + + +def test_put_callback(dummy_cl): + cbs = [] + + # callback gets called with the task as a single argument + result = dummy_cl.put("SOME:PREFIX", 2, cbs.append) + + assert result.exception() is None + assert result.success is True + assert len(cbs) == 1 diff --git a/superscore/tests/test_status.py b/superscore/tests/test_status.py new file mode 100644 index 0000000..e2678bc --- /dev/null +++ b/superscore/tests/test_status.py @@ -0,0 +1,54 @@ +import asyncio +from typing import Any, Callable + +import pytest + +from superscore.control_layers.status import TaskStatus + + +@pytest.fixture +async def normal_coroutine() -> Callable[[], Any]: + async def inner_coroutine(): + await asyncio.sleep(0.01) + + return inner_coroutine + + +@pytest.fixture +async def failing_coroutine() -> Callable[[], Any]: + async def inner_coroutine(): + await asyncio.sleep(0.01) + raise ValueError() + + return inner_coroutine + + +async def test_status_success(normal_coroutine): + st = TaskStatus(normal_coroutine()) + assert isinstance(st, TaskStatus) + assert not st.done + assert not st.success + await st + assert st.done + assert st.success + + +async def test_status_fail(failing_coroutine): + status = TaskStatus(failing_coroutine()) + assert status.exception() is None + + with pytest.raises(ValueError): + await status + + assert type(status.exception()) == ValueError + + +async def test_status_wrap(): + @TaskStatus.wrap + async def coro_status(): + await asyncio.sleep(0.01) + + st = coro_status() + assert isinstance(st, TaskStatus) + await st.task + assert st.done From 2b6822d478b781962c1280c6e0eefd36679c3291 Mon Sep 17 00:00:00 2001 From: tangkong Date: Thu, 13 Jun 2024 17:00:09 -0700 Subject: [PATCH 08/14] BLD: add pytest-asyncio to conda-recipe --- conda-recipe/meta.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index 6408d19..29375ad 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -32,6 +32,7 @@ test: requires: - coverage - pytest + - pytest-asyncio - pytest-qt - sphinx - sphinx_rtd_theme From a92070686e72aad5f1955848db53dec13f077a1f Mon Sep 17 00:00:00 2001 From: tangkong Date: Fri, 14 Jun 2024 07:53:44 -0700 Subject: [PATCH 09/14] BLD: sometimes I forget conda-recipes exist --- conda-recipe/meta.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index 29375ad..0d8144c 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -21,6 +21,7 @@ requirements: - setuptools_scm run: - python >=3.9 + - aioca - apischema - pcdsutils - pyqt From 94e6b7a20e429b7717e737979cc2f67f87139e7e Mon Sep 17 00:00:00 2001 From: tangkong Date: Fri, 14 Jun 2024 09:48:15 -0700 Subject: [PATCH 10/14] MNT: wrap communication errors to trim traceback --- superscore/control_layers/_aioca.py | 20 +++++++++++++++----- superscore/errors.py | 5 +++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/superscore/control_layers/_aioca.py b/superscore/control_layers/_aioca.py index 8cbf04e..2d71d1c 100644 --- a/superscore/control_layers/_aioca.py +++ b/superscore/control_layers/_aioca.py @@ -1,21 +1,31 @@ """ Control layer shim for communicating asynchronously through channel access """ +import logging from typing import Any, Callable -from aioca import caget, camonitor, caput +from aioca import CANothing, caget, camonitor, caput from superscore.control_layers._base_shim import _BaseShim +from superscore.errors import CommunicationError + +logger = logging.getLogger(__name__) class AiocaShim(_BaseShim): - # TODO: consider handling datatype arguments in caput/get - # TODO: wrap CANothing results into unified status object async def get(self, address: str): - return await caget(address) + try: + return await caget(address) + except CANothing as ex: + logger.debug(f"CA get failed {ex.__repr__()}") + raise CommunicationError(f'CA get failed for {ex}') async def put(self, address: str, value: Any): - await caput(address, value) + try: + await caput(address, value) + except CANothing as ex: + logger.debug(f"CA put failed {ex.__repr__()}") + raise CommunicationError(f'CA put failed for {ex}') def monitor(self, address: str, callback: Callable): camonitor(address, callback) diff --git a/superscore/errors.py b/superscore/errors.py index 1c4c3e8..ca14609 100644 --- a/superscore/errors.py +++ b/superscore/errors.py @@ -14,3 +14,8 @@ class EntryExistsError(BackendError): class EntryNotFoundError(BackendError): """Raised when an Entry is fetched from the backend but can't be found""" pass + + +class CommunicationError(Exception): + """Raised when communication with the control system fails""" + pass From f17cf0124434fe5a603300fd7f95ae9df5de83fe Mon Sep 17 00:00:00 2001 From: tangkong Date: Fri, 14 Jun 2024 10:14:39 -0700 Subject: [PATCH 11/14] TST: test failed cl actions --- superscore/tests/test_cl.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/superscore/tests/test_cl.py b/superscore/tests/test_cl.py index ebb5ddb..5773ad0 100644 --- a/superscore/tests/test_cl.py +++ b/superscore/tests/test_cl.py @@ -1,5 +1,7 @@ from unittest.mock import AsyncMock +import pytest + from superscore.control_layers.status import TaskStatus @@ -23,6 +25,22 @@ def test_put(dummy_cl): assert result.done +def test_fail(dummy_cl): + mock_ca_get = AsyncMock(side_effect=ValueError) + dummy_cl.shims['ca'].get = mock_ca_get + + # exceptions get passed through the control layer + with pytest.raises(ValueError): + dummy_cl.get("THIS:PV") + + mock_ca_put = AsyncMock(side_effect=ValueError) + dummy_cl.shims['ca'].put = mock_ca_put + + # exceptions get passed through the control layer + with pytest.raises(ValueError): + dummy_cl.put("THAT:PV", 4) + + def test_put_callback(dummy_cl): cbs = [] From b6fb51d1965855e9e9ed71ce3ef2240ae1874ca1 Mon Sep 17 00:00:00 2001 From: tangkong Date: Fri, 14 Jun 2024 12:20:49 -0700 Subject: [PATCH 12/14] DOC/MNT: docstring and stubs pass, fix up ControlLayer.get list length checks --- superscore/control_layers/__init__.py | 1 + superscore/control_layers/_aioca.py | 50 +++++++++- superscore/control_layers/_base_shim.py | 3 + superscore/control_layers/core.py | 124 +++++++++++++++++++++--- superscore/control_layers/core.pyi | 30 ++++++ superscore/tests/test_cl.py | 2 + 6 files changed, 193 insertions(+), 17 deletions(-) create mode 100644 superscore/control_layers/core.pyi diff --git a/superscore/control_layers/__init__.py b/superscore/control_layers/__init__.py index e69de29..af7fe27 100644 --- a/superscore/control_layers/__init__.py +++ b/superscore/control_layers/__init__.py @@ -0,0 +1 @@ +from .core import ControlLayer # noqa diff --git a/superscore/control_layers/_aioca.py b/superscore/control_layers/_aioca.py index 2d71d1c..45c72b9 100644 --- a/superscore/control_layers/_aioca.py +++ b/superscore/control_layers/_aioca.py @@ -13,19 +13,63 @@ class AiocaShim(_BaseShim): - async def get(self, address: str): + """async compatible EPICS channel access shim layer""" + async def get(self, address: str) -> Any: + """ + Get the value at the PV: ``address``. + + Parameters + ---------- + address : str + The PV to caget. + + Returns + ------- + Any + The data at ``address``. + + Raises + ------ + CommunicationError + If the caget operation fails for any reason. + """ try: return await caget(address) except CANothing as ex: logger.debug(f"CA get failed {ex.__repr__()}") raise CommunicationError(f'CA get failed for {ex}') - async def put(self, address: str, value: Any): + async def put(self, address: str, value: Any) -> None: + """ + Put ``value`` to the PV ``address``. + + Parameters + ---------- + address : str + The PV to put ``value`` to. + value : Any + Value to put to ``address``. + + Raises + ------ + CommunicationError + If the caput operation fails for any reason. + """ try: await caput(address, value) except CANothing as ex: logger.debug(f"CA put failed {ex.__repr__()}") raise CommunicationError(f'CA put failed for {ex}') - def monitor(self, address: str, callback: Callable): + def monitor(self, address: str, callback: Callable) -> None: + """ + Subscribe ``callback`` to updates on the PV ``address``. + + Parameters + ---------- + address : str + The PV to monitor. + callback : Callable + The callback to run on updates to ``address`` + """ camonitor(address, callback) diff --git a/superscore/control_layers/_base_shim.py b/superscore/control_layers/_base_shim.py index 38fc55a..43b0cc4 100644 --- a/superscore/control_layers/_base_shim.py +++ b/superscore/control_layers/_base_shim.py @@ -1,3 +1,6 @@ +""" +Base shim abstract base class +""" from typing import Any, Callable diff --git a/superscore/control_layers/core.py b/superscore/control_layers/core.py index 02fa5bc..03c9d5e 100644 --- a/superscore/control_layers/core.py +++ b/superscore/control_layers/core.py @@ -4,7 +4,7 @@ """ import asyncio from functools import singledispatchmethod -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, Optional, Union from superscore.control_layers.status import TaskStatus @@ -25,6 +25,26 @@ def __init__(self, *args, **kwargs): } def shim_from_pv(self, pv: str) -> _BaseShim: + """ + Determine the correct shim to use for the provided ``pv``. + ``pv`` can optionally hold a protocol defining prefix such as "ca://" or + "pva://". If no prefix is provided, will select the first available shim. + + Parameters + ---------- + pv : str + a PV address such as "MY:PREFIX:mtr1" or "pva://MY:PREFIX:dt" + + Returns + ------- + _BaseShim + The shim held by this ControlLayer for ``pv``'s protocol + + Raises + ------ + ValueError + If pv cannot be recognized or a matching shim cannot be found + """ split = pv.split("://", 1) if len(split) > 1: # We got something like pva://mydevice, so use specified comms mode @@ -39,34 +59,88 @@ def shim_from_pv(self, pv: str) -> _BaseShim: return shim @singledispatchmethod - def get(self, pv): + def get(self, pv: Union[str, list[str]]) -> Any: + """ + Get the value(s) in ``pv``. + If a single pv is provided, will return a single value. + If a list of pvs is provided, will get the values for each asynchronously. + + Parameters + ---------- + pv : Union[str, list[str]] + The PV(s) to get values for. + + Returns + ------- + Any + The requested data + """ + # Dispatches to _get_single and _get_list depending on type print(f"PV is of an unsupported type: {type(pv)}. Provide either " "a string or list of strings") @get.register - def _(self, pv: str): + def _get_single(self, pv: str) -> Any: + """Synchronously get a single ``pv``""" return asyncio.run(self._get_one(pv)) @get.register - def _(self, pv: list): - coros = [] - for p in pv: - coros.append(self._get_one(p)) + def _get_list(self, pv: list) -> Any: + """Synchronously get a list of ``pv``""" + async def gathered_coros(): + coros = [] + for p in pv: + coros.append(self._get_one(p)) + return await asyncio.gather(*coros) - loop = asyncio.get_event_loop() - return loop.run_until_complete(asyncio.gather(*coros)) + return asyncio.run(gathered_coros()) async def _get_one(self, pv: str): + """ + Base async get function. Use this to construct higher-level get methods + """ shim = self.shim_from_pv(pv) return await shim.get(pv) @singledispatchmethod - def put(self, pv, value: Any, cb: Optional[Callable]): + def put( + self, + pv: Union[str, list[str]], + value: Union[Any, list[Any]], + cb: Optional[Callable] = None + ) -> Union[TaskStatus, list[TaskStatus]]: + """ + Put ``value`` to ``pv`` + If ``pv`` is a list, ``value`` and ``cb`` must be lists of equal length + + Parameters + ---------- + pv : Union[str, list[str]] + The PV(s) to put ``values`` to + value : Union[Any, list[Any]] + The value(s) to put to the ``pv`` + cb : Optional[Callable], by default None + Callbacks to run on completion of the put task. + Callbacks will be called with the associated TaskStatus as its + sole argument + + Returns + ------- + Union[TaskStatus, list[TaskStatus]] + The TaskStatus object(s) for the put operation + """ + # Dispatches to _put_single and _put_list depending on type print(f"PV is of an unsupported type: {type(pv)}. Provide either " "a string or list of strings") @put.register - def _(self, pv: str, value: Any, cb: Optional[Callable] = None): + def _put_single( + self, + pv: str, + value: Any, + cb: Optional[Callable] = None + ) -> TaskStatus: + """Synchronously put ``value`` to ``pv``, running ``cb`` on completion""" async def status_coro(): status = self._put_one(pv, value) if cb is not None: @@ -77,7 +151,26 @@ async def status_coro(): return asyncio.run(status_coro()) @put.register - def _(self, pv: list, value: list, cb: Optional[list[Callable]] = None): + def _put_list( + self, + pv: list, + value: list, + cb: Optional[list[Callable]] = None + ) -> list[TaskStatus]: + """ + Synchronously put ``value`` to ``pv``, running ``cb`` on completion. + All arguments must be of equal length. + """ + if cb is None: + cb_length = len(pv) + else: + cb_length = len(cb) + + if not (len(pv) == len(value) == cb_length): + raise ValueError( + 'Arguments are of different length: ' + f'pvs({len(pv)}), values({len(value)}), cbs({len(cb)})' + ) async def status_coros(): statuses = [] @@ -99,10 +192,13 @@ async def status_coros(): @TaskStatus.wrap async def _put_one(self, pv: str, value: Any): + """ + Base async get function. Use this to construct higher-level get methods + """ shim = self.shim_from_pv(pv) await shim.put(pv, value) - def subscribe(self, pv, cb): - # Subscribes a callback to the PV address + def subscribe(self, pv: str, cb: Callable): + """Subscribes a callback (``cb``) to the provide address (``pv``)""" shim = self.shim_from_pv(pv) shim.monitor(pv, cb) diff --git a/superscore/control_layers/core.pyi b/superscore/control_layers/core.pyi new file mode 100644 index 0000000..0ee0f3e --- /dev/null +++ b/superscore/control_layers/core.pyi @@ -0,0 +1,30 @@ +from typing import Any, Callable, Optional, overload + +from superscore.control_layers.status import TaskStatus + +class ControlLayer: + @overload + def get(self, pv: str) -> Any: + ... + + @overload + def get(self, pv: list[str]) -> list[Any]: + ... + + @overload + def put( + self, + pv: str, + value: Any, + cb: Optional[Callable] = None + ) -> TaskStatus: + ... + + @overload + def put( + self, + pv: list, + value: list, + cb: Optional[list[Callable]] = None + ) -> list[TaskStatus]: + ... diff --git a/superscore/tests/test_cl.py b/superscore/tests/test_cl.py index 5773ad0..73e190a 100644 --- a/superscore/tests/test_cl.py +++ b/superscore/tests/test_cl.py @@ -14,6 +14,8 @@ def test_get(dummy_cl): assert dummy_cl.get("ca://SOME_PREFIX") == "ca_value" assert dummy_cl.get("pva://SOME_PREFIX") == "pva_value" + assert dummy_cl.get(['a', 'b', 'c']) == ["ca_value" for i in range(3)] + def test_put(dummy_cl): result = dummy_cl.put("OTHER:PREFIX", 4) From 29d85bec0a4ecb80ea8ce5c45c0e3fc0118a4fdd Mon Sep 17 00:00:00 2001 From: tangkong Date: Fri, 14 Jun 2024 13:19:49 -0700 Subject: [PATCH 13/14] DOC: pre-release notes --- .../35-enh_control_layer.rst | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 docs/source/upcoming_release_notes/35-enh_control_layer.rst diff --git a/docs/source/upcoming_release_notes/35-enh_control_layer.rst b/docs/source/upcoming_release_notes/35-enh_control_layer.rst new file mode 100644 index 0000000..dfa0feb --- /dev/null +++ b/docs/source/upcoming_release_notes/35-enh_control_layer.rst @@ -0,0 +1,23 @@ +35 enh_control_layer +#################### + +API Breaks +---------- +- N/A + +Features +-------- +- Adds shim layer for aioca (async Channel Access) +- Adds ControlLayer class for communicating with shims + +Bugfixes +-------- +- N/A + +Maintenance +----------- +- N/A + +Contributors +------------ +- tangkong From 879499b75a8628e60ca3a2031873fa4046e0b3a5 Mon Sep 17 00:00:00 2001 From: tangkong Date: Fri, 21 Jun 2024 10:09:10 -0700 Subject: [PATCH 14/14] MNT: argument consistency (address instead of pv), touch up type hints and tests --- superscore/control_layers/_base_shim.py | 4 +- superscore/control_layers/core.py | 90 ++++++++++++------------- superscore/control_layers/core.pyi | 8 +-- superscore/tests/test_cl.py | 2 +- 4 files changed, 53 insertions(+), 51 deletions(-) diff --git a/superscore/control_layers/_base_shim.py b/superscore/control_layers/_base_shim.py index 43b0cc4..eb0a8b0 100644 --- a/superscore/control_layers/_base_shim.py +++ b/superscore/control_layers/_base_shim.py @@ -3,9 +3,11 @@ """ from typing import Any, Callable +from superscore.type_hints import AnyEpicsType + class _BaseShim: - async def get(self, address: str): + async def get(self, address: str) -> AnyEpicsType: raise NotImplementedError async def put(self, address: str, value: Any): diff --git a/superscore/control_layers/core.py b/superscore/control_layers/core.py index 03c9d5e..66391e0 100644 --- a/superscore/control_layers/core.py +++ b/superscore/control_layers/core.py @@ -24,28 +24,28 @@ def __init__(self, *args, **kwargs): 'ca': AiocaShim(), } - def shim_from_pv(self, pv: str) -> _BaseShim: + def shim_from_pv(self, address: str) -> _BaseShim: """ - Determine the correct shim to use for the provided ``pv``. - ``pv`` can optionally hold a protocol defining prefix such as "ca://" or + Determine the correct shim to use for the provided ``address``. + ``address`` can optionally hold a protocol defining prefix such as "ca://" or "pva://". If no prefix is provided, will select the first available shim. Parameters ---------- - pv : str + address : str a PV address such as "MY:PREFIX:mtr1" or "pva://MY:PREFIX:dt" Returns ------- _BaseShim - The shim held by this ControlLayer for ``pv``'s protocol + The shim held by this ControlLayer for ``address``'s protocol Raises ------ ValueError - If pv cannot be recognized or a matching shim cannot be found + If address cannot be recognized or a matching shim cannot be found """ - split = pv.split("://", 1) + split = address.split("://", 1) if len(split) > 1: # We got something like pva://mydevice, so use specified comms mode shim = self.shims.get(split[0], None) @@ -54,20 +54,20 @@ def shim_from_pv(self, pv: str) -> _BaseShim: shim = list(self.shims.values())[0] if shim is None: - raise ValueError(f"PV is of an unsupported protocol: {pv}") + raise ValueError(f"PV is of an unsupported protocol: {address}") return shim @singledispatchmethod - def get(self, pv: Union[str, list[str]]) -> Any: + def get(self, address: Union[str, list[str]]) -> Any: """ - Get the value(s) in ``pv``. + Get the value(s) in ``address``. If a single pv is provided, will return a single value. If a list of pvs is provided, will get the values for each asynchronously. Parameters ---------- - pv : Union[str, list[str]] + address : Union[str, list[str]] The PV(s) to get values for. Returns @@ -76,49 +76,49 @@ def get(self, pv: Union[str, list[str]]) -> Any: The requested data """ # Dispatches to _get_single and _get_list depending on type - print(f"PV is of an unsupported type: {type(pv)}. Provide either " + print(f"PV is of an unsupported type: {type(address)}. Provide either " "a string or list of strings") @get.register - def _get_single(self, pv: str) -> Any: - """Synchronously get a single ``pv``""" - return asyncio.run(self._get_one(pv)) + def _get_single(self, address: str) -> Any: + """Synchronously get a single ``address``""" + return asyncio.run(self._get_one(address)) @get.register - def _get_list(self, pv: list) -> Any: - """Synchronously get a list of ``pv``""" + def _get_list(self, address: list) -> Any: + """Synchronously get a list of ``address``""" async def gathered_coros(): coros = [] - for p in pv: + for p in address: coros.append(self._get_one(p)) return await asyncio.gather(*coros) return asyncio.run(gathered_coros()) - async def _get_one(self, pv: str): + async def _get_one(self, address: str): """ Base async get function. Use this to construct higher-level get methods """ - shim = self.shim_from_pv(pv) - return await shim.get(pv) + shim = self.shim_from_pv(address) + return await shim.get(address) @singledispatchmethod def put( self, - pv: Union[str, list[str]], + address: Union[str, list[str]], value: Union[Any, list[Any]], - cb: Optional[Callable] = None + cb: Optional[Union[Callable, list[Callable]]] = None ) -> Union[TaskStatus, list[TaskStatus]]: """ - Put ``value`` to ``pv`` - If ``pv`` is a list, ``value`` and ``cb`` must be lists of equal length + Put ``value`` to ``address`` + If ``address`` is a list, ``value`` and ``cb`` must be lists of equal length Parameters ---------- - pv : Union[str, list[str]] + address : Union[str, list[str]] The PV(s) to put ``values`` to value : Union[Any, list[Any]] - The value(s) to put to the ``pv`` + The value(s) to put to the ``address`` cb : Optional[Callable], by default None Callbacks to run on completion of the put task. Callbacks will be called with the associated TaskStatus as its @@ -130,19 +130,19 @@ def put( The TaskStatus object(s) for the put operation """ # Dispatches to _put_single and _put_list depending on type - print(f"PV is of an unsupported type: {type(pv)}. Provide either " + print(f"PV is of an unsupported type: {type(address)}. Provide either " "a string or list of strings") @put.register def _put_single( self, - pv: str, + address: str, value: Any, cb: Optional[Callable] = None ) -> TaskStatus: - """Synchronously put ``value`` to ``pv``, running ``cb`` on completion""" + """Synchronously put ``value`` to ``address``, running ``cb`` on completion""" async def status_coro(): - status = self._put_one(pv, value) + status = self._put_one(address, value) if cb is not None: status.add_callback(cb) await status.task @@ -153,33 +153,33 @@ async def status_coro(): @put.register def _put_list( self, - pv: list, + address: list, value: list, cb: Optional[list[Callable]] = None ) -> list[TaskStatus]: """ - Synchronously put ``value`` to ``pv``, running ``cb`` on completion. + Synchronously put ``value`` to ``address``, running ``cb`` on completion. All arguments must be of equal length. """ if cb is None: - cb_length = len(pv) + cb_length = len(address) else: cb_length = len(cb) - if not (len(pv) == len(value) == cb_length): + if not (len(address) == len(value) == cb_length): raise ValueError( 'Arguments are of different length: ' - f'pvs({len(pv)}), values({len(value)}), cbs({len(cb)})' + f'addresses({len(address)}), values({len(value)}), cbs({len(cb)})' ) async def status_coros(): statuses = [] if cb is None: - callbacks = [None for _ in range(len(pv))] + callbacks = [None for _ in range(len(address))] else: callbacks = cb - for p, val, c in zip(pv, value, callbacks): + for p, val, c in zip(address, value, callbacks): status = self._put_one(p, val) if c is not None: status.add_callback(c) @@ -191,14 +191,14 @@ async def status_coros(): return asyncio.run(status_coros()) @TaskStatus.wrap - async def _put_one(self, pv: str, value: Any): + async def _put_one(self, address: str, value: Any): """ Base async get function. Use this to construct higher-level get methods """ - shim = self.shim_from_pv(pv) - await shim.put(pv, value) + shim = self.shim_from_pv(address) + await shim.put(address, value) - def subscribe(self, pv: str, cb: Callable): - """Subscribes a callback (``cb``) to the provide address (``pv``)""" - shim = self.shim_from_pv(pv) - shim.monitor(pv, cb) + def subscribe(self, address: str, cb: Callable): + """Subscribes a callback (``cb``) to the provide address (``address``)""" + shim = self.shim_from_pv(address) + shim.monitor(address, cb) diff --git a/superscore/control_layers/core.pyi b/superscore/control_layers/core.pyi index 0ee0f3e..6527a7a 100644 --- a/superscore/control_layers/core.pyi +++ b/superscore/control_layers/core.pyi @@ -4,17 +4,17 @@ from superscore.control_layers.status import TaskStatus class ControlLayer: @overload - def get(self, pv: str) -> Any: + def get(self, address: str) -> Any: ... @overload - def get(self, pv: list[str]) -> list[Any]: + def get(self, address: list[str]) -> list[Any]: ... @overload def put( self, - pv: str, + address: str, value: Any, cb: Optional[Callable] = None ) -> TaskStatus: @@ -23,7 +23,7 @@ class ControlLayer: @overload def put( self, - pv: list, + address: list, value: list, cb: Optional[list[Callable]] = None ) -> list[TaskStatus]: diff --git a/superscore/tests/test_cl.py b/superscore/tests/test_cl.py index 73e190a..a135112 100644 --- a/superscore/tests/test_cl.py +++ b/superscore/tests/test_cl.py @@ -24,7 +24,7 @@ def test_put(dummy_cl): results = dummy_cl.put(["OTHER:PREFIX", "GE", "LT"], [4, 5, 6]) assert all(isinstance(res, TaskStatus) for res in results) - assert result.done + assert all(res.done for res in results) def test_fail(dummy_cl):