Skip to content

Commit

Permalink
Merge pull request #35 from tangkong/enh_control_layer
Browse files Browse the repository at this point in the history
ENH: Control layer and shim layers
  • Loading branch information
tangkong authored Jun 21, 2024
2 parents 6073b4d + 879499b commit 1c28e47
Show file tree
Hide file tree
Showing 15 changed files with 570 additions and 1 deletion.
2 changes: 2 additions & 0 deletions conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ requirements:
- setuptools_scm
run:
- python >=3.9
- aioca
- apischema
- pcdsutils
- pyqt
Expand All @@ -32,6 +33,7 @@ test:
requires:
- coverage
- pytest
- pytest-asyncio
- pytest-qt
- sphinx
- sphinx_rtd_theme
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# necessarily required for _using_ it.
coverage
pytest
pytest-asyncio
pytest-cov
pytest-qt

Expand Down
23 changes: 23 additions & 0 deletions docs/source/upcoming_release_notes/35-enh_control_layer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
35 enh_control_layer
####################

API Breaks
----------
- N/A

Features
--------
- Adds shim layer for aioca (async Channel Access)
- Adds ControlLayer class for communicating with shims

Bugfixes
--------
- N/A

Maintenance
-----------
- N/A

Contributors
------------
- tangkong
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ file = "dev-requirements.txt"
file = "docs-requirements.txt"

[tool.pytest.ini_options]
addopts = "--cov=."
addopts = "--cov=. --asyncio-mode=auto"
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# List requirements here.
aioca
apischema
pcdsutils
PyQt5
Expand Down
1 change: 1 addition & 0 deletions superscore/control_layers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .core import ControlLayer # noqa
75 changes: 75 additions & 0 deletions superscore/control_layers/_aioca.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Control layer shim for communicating asynchronously through channel access
"""
import logging
from typing import Any, Callable

from aioca import CANothing, caget, camonitor, caput

from superscore.control_layers._base_shim import _BaseShim
from superscore.errors import CommunicationError

logger = logging.getLogger(__name__)


class AiocaShim(_BaseShim):
"""async compatible EPICS channel access shim layer"""
async def get(self, address: str) -> Any:
"""
Get the value at the PV: ``address``.
Parameters
----------
address : str
The PV to caget.
Returns
-------
Any
The data at ``address``.
Raises
------
CommunicationError
If the caget operation fails for any reason.
"""
try:
return await caget(address)
except CANothing as ex:
logger.debug(f"CA get failed {ex.__repr__()}")
raise CommunicationError(f'CA get failed for {ex}')

async def put(self, address: str, value: Any) -> None:
"""
Put ``value`` to the PV ``address``.
Parameters
----------
address : str
The PV to put ``value`` to.
value : Any
Value to put to ``address``.
Raises
------
CommunicationError
If the caput operation fails for any reason.
"""
try:
await caput(address, value)
except CANothing as ex:
logger.debug(f"CA put failed {ex.__repr__()}")
raise CommunicationError(f'CA put failed for {ex}')

def monitor(self, address: str, callback: Callable) -> None:
"""
Subscribe ``callback`` to updates on the PV ``address``.
Parameters
----------
address : str
The PV to monitor.
callback : Callable
The callback to run on updates to ``address``
"""
camonitor(address, callback)
17 changes: 17 additions & 0 deletions superscore/control_layers/_base_shim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
Base shim abstract base class
"""
from typing import Any, Callable

from superscore.type_hints import AnyEpicsType


class _BaseShim:
async def get(self, address: str) -> AnyEpicsType:
raise NotImplementedError

async def put(self, address: str, value: Any):
raise NotImplementedError

def monitor(self, address: str, callback: Callable):
raise NotImplementedError
204 changes: 204 additions & 0 deletions superscore/control_layers/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
Main control layer objects. Exposes basic communication operations,
and dispatches to various shims depending on the context.
"""
import asyncio
from functools import singledispatchmethod
from typing import Any, Callable, Dict, Optional, Union

from superscore.control_layers.status import TaskStatus

from ._aioca import AiocaShim
from ._base_shim import _BaseShim


class ControlLayer:
"""
Control Layer used to communicate with the control system, dispatching to
whichever shim is relevant.
"""
shims: Dict[str, _BaseShim]

def __init__(self, *args, **kwargs):
self.shims = {
'ca': AiocaShim(),
}

def shim_from_pv(self, address: str) -> _BaseShim:
"""
Determine the correct shim to use for the provided ``address``.
``address`` can optionally hold a protocol defining prefix such as "ca://" or
"pva://". If no prefix is provided, will select the first available shim.
Parameters
----------
address : str
a PV address such as "MY:PREFIX:mtr1" or "pva://MY:PREFIX:dt"
Returns
-------
_BaseShim
The shim held by this ControlLayer for ``address``'s protocol
Raises
------
ValueError
If address cannot be recognized or a matching shim cannot be found
"""
split = address.split("://", 1)
if len(split) > 1:
# We got something like pva://mydevice, so use specified comms mode
shim = self.shims.get(split[0], None)
else:
# No comms mode specified, use the default
shim = list(self.shims.values())[0]

if shim is None:
raise ValueError(f"PV is of an unsupported protocol: {address}")

return shim

@singledispatchmethod
def get(self, address: Union[str, list[str]]) -> Any:
"""
Get the value(s) in ``address``.
If a single pv is provided, will return a single value.
If a list of pvs is provided, will get the values for each asynchronously.
Parameters
----------
address : Union[str, list[str]]
The PV(s) to get values for.
Returns
-------
Any
The requested data
"""
# Dispatches to _get_single and _get_list depending on type
print(f"PV is of an unsupported type: {type(address)}. Provide either "
"a string or list of strings")

@get.register
def _get_single(self, address: str) -> Any:
"""Synchronously get a single ``address``"""
return asyncio.run(self._get_one(address))

@get.register
def _get_list(self, address: list) -> Any:
"""Synchronously get a list of ``address``"""
async def gathered_coros():
coros = []
for p in address:
coros.append(self._get_one(p))
return await asyncio.gather(*coros)

return asyncio.run(gathered_coros())

async def _get_one(self, address: str):
"""
Base async get function. Use this to construct higher-level get methods
"""
shim = self.shim_from_pv(address)
return await shim.get(address)

@singledispatchmethod
def put(
self,
address: Union[str, list[str]],
value: Union[Any, list[Any]],
cb: Optional[Union[Callable, list[Callable]]] = None
) -> Union[TaskStatus, list[TaskStatus]]:
"""
Put ``value`` to ``address``
If ``address`` is a list, ``value`` and ``cb`` must be lists of equal length
Parameters
----------
address : Union[str, list[str]]
The PV(s) to put ``values`` to
value : Union[Any, list[Any]]
The value(s) to put to the ``address``
cb : Optional[Callable], by default None
Callbacks to run on completion of the put task.
Callbacks will be called with the associated TaskStatus as its
sole argument
Returns
-------
Union[TaskStatus, list[TaskStatus]]
The TaskStatus object(s) for the put operation
"""
# Dispatches to _put_single and _put_list depending on type
print(f"PV is of an unsupported type: {type(address)}. Provide either "
"a string or list of strings")

@put.register
def _put_single(
self,
address: str,
value: Any,
cb: Optional[Callable] = None
) -> TaskStatus:
"""Synchronously put ``value`` to ``address``, running ``cb`` on completion"""
async def status_coro():
status = self._put_one(address, value)
if cb is not None:
status.add_callback(cb)
await status.task
return status

return asyncio.run(status_coro())

@put.register
def _put_list(
self,
address: list,
value: list,
cb: Optional[list[Callable]] = None
) -> list[TaskStatus]:
"""
Synchronously put ``value`` to ``address``, running ``cb`` on completion.
All arguments must be of equal length.
"""
if cb is None:
cb_length = len(address)
else:
cb_length = len(cb)

if not (len(address) == len(value) == cb_length):
raise ValueError(
'Arguments are of different length: '
f'addresses({len(address)}), values({len(value)}), cbs({len(cb)})'
)

async def status_coros():
statuses = []
if cb is None:
callbacks = [None for _ in range(len(address))]
else:
callbacks = cb

for p, val, c in zip(address, value, callbacks):
status = self._put_one(p, val)
if c is not None:
status.add_callback(c)

statuses.append(status)
await asyncio.gather(*[s.task for s in statuses])
return statuses

return asyncio.run(status_coros())

@TaskStatus.wrap
async def _put_one(self, address: str, value: Any):
"""
Base async get function. Use this to construct higher-level get methods
"""
shim = self.shim_from_pv(address)
await shim.put(address, value)

def subscribe(self, address: str, cb: Callable):
"""Subscribes a callback (``cb``) to the provide address (``address``)"""
shim = self.shim_from_pv(address)
shim.monitor(address, cb)
30 changes: 30 additions & 0 deletions superscore/control_layers/core.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Any, Callable, Optional, overload

from superscore.control_layers.status import TaskStatus

class ControlLayer:
@overload
def get(self, address: str) -> Any:
...

@overload
def get(self, address: list[str]) -> list[Any]:
...

@overload
def put(
self,
address: str,
value: Any,
cb: Optional[Callable] = None
) -> TaskStatus:
...

@overload
def put(
self,
address: list,
value: list,
cb: Optional[list[Callable]] = None
) -> list[TaskStatus]:
...
Loading

0 comments on commit 1c28e47

Please sign in to comment.