-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Restructure epics/test_signals.py (#630)
* Use single IOC for both CA and PVA signals in epics/test_signals.py move pva only records to test_records_pva.db use ioc args to load multiple templates in epics/test_signals.py create_ioc_fixtures in epics.testing.utils Move example IOC and device for epics signal test to epics.testing module * Update test_device_save_loader to use example Device from epics.testing * Add TestingIOC class to epics.testing and example ioc helper functions
- Loading branch information
Showing
9 changed files
with
870 additions
and
705 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from ._example_ioc import ( | ||
CA_PVA_RECORDS, | ||
PVA_RECORDS, | ||
ExampleCaDevice, | ||
ExampleEnum, | ||
ExamplePvaDevice, | ||
ExampleTable, | ||
connect_example_device, | ||
get_example_ioc, | ||
) | ||
from ._utils import TestingIOC, generate_random_PV_prefix | ||
|
||
__all__ = [ | ||
"CA_PVA_RECORDS", | ||
"PVA_RECORDS", | ||
"ExampleCaDevice", | ||
"ExampleEnum", | ||
"ExamplePvaDevice", | ||
"ExampleTable", | ||
"connect_example_device", | ||
"get_example_ioc", | ||
"TestingIOC", | ||
"generate_random_PV_prefix", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from collections.abc import Sequence | ||
from pathlib import Path | ||
from typing import Annotated as A | ||
from typing import Literal | ||
|
||
import numpy as np | ||
|
||
from ophyd_async.core import ( | ||
Array1D, | ||
SignalRW, | ||
StrictEnum, | ||
Table, | ||
) | ||
from ophyd_async.epics.core import ( | ||
EpicsDevice, | ||
PvSuffix, | ||
) | ||
|
||
from ._utils import TestingIOC | ||
|
||
CA_PVA_RECORDS = str(Path(__file__).parent / "test_records.db") | ||
PVA_RECORDS = str(Path(__file__).parent / "test_records_pva.db") | ||
|
||
|
||
class ExampleEnum(StrictEnum): | ||
a = "Aaa" | ||
b = "Bbb" | ||
c = "Ccc" | ||
|
||
|
||
class ExampleTable(Table): | ||
bool: Array1D[np.bool_] | ||
int: Array1D[np.int32] | ||
float: Array1D[np.float64] | ||
str: Sequence[str] | ||
enum: Sequence[ExampleEnum] | ||
|
||
|
||
class ExampleCaDevice(EpicsDevice): | ||
my_int: A[SignalRW[int], PvSuffix("int")] | ||
my_float: A[SignalRW[float], PvSuffix("float")] | ||
my_str: A[SignalRW[str], PvSuffix("str")] | ||
my_bool: A[SignalRW[bool], PvSuffix("bool")] | ||
enum: A[SignalRW[ExampleEnum], PvSuffix("enum")] | ||
enum2: A[SignalRW[ExampleEnum], PvSuffix("enum2")] | ||
bool_unnamed: A[SignalRW[bool], PvSuffix("bool_unnamed")] | ||
partialint: A[SignalRW[int], PvSuffix("partialint")] | ||
lessint: A[SignalRW[int], PvSuffix("lessint")] | ||
uint8a: A[SignalRW[Array1D[np.uint8]], PvSuffix("uint8a")] | ||
int16a: A[SignalRW[Array1D[np.int16]], PvSuffix("int16a")] | ||
int32a: A[SignalRW[Array1D[np.int32]], PvSuffix("int32a")] | ||
float32a: A[SignalRW[Array1D[np.float32]], PvSuffix("float32a")] | ||
float64a: A[SignalRW[Array1D[np.float64]], PvSuffix("float64a")] | ||
stra: A[SignalRW[Sequence[str]], PvSuffix("stra")] | ||
|
||
|
||
class ExamplePvaDevice(ExampleCaDevice): # pva can support all signal types that ca can | ||
int8a: A[SignalRW[Array1D[np.int8]], PvSuffix("int8a")] | ||
uint16a: A[SignalRW[Array1D[np.uint16]], PvSuffix("uint16a")] | ||
uint32a: A[SignalRW[Array1D[np.uint32]], PvSuffix("uint32a")] | ||
int64a: A[SignalRW[Array1D[np.int64]], PvSuffix("int64a")] | ||
uint64a: A[SignalRW[Array1D[np.uint64]], PvSuffix("uint64a")] | ||
table: A[SignalRW[ExampleTable], PvSuffix("table")] | ||
ntndarray_data: A[SignalRW[Array1D[np.int64]], PvSuffix("ntndarray:data")] | ||
|
||
|
||
async def connect_example_device( | ||
ioc: TestingIOC, protocol: Literal["ca", "pva"] | ||
) -> ExamplePvaDevice | ExampleCaDevice: | ||
"""Helper function to return a connected example device. | ||
Parameters | ||
---------- | ||
ioc: TestingIOC | ||
TestingIOC configured to provide the records needed for the device | ||
protocol: Literal["ca", "pva"] | ||
The transport protocol of the device | ||
Returns | ||
------- | ||
ExamplePvaDevice | ExampleCaDevice | ||
a connected EpicsDevice with signals of many EPICS record types | ||
""" | ||
device_cls = ExamplePvaDevice if protocol == "pva" else ExampleCaDevice | ||
device = device_cls(f"{protocol}://{ioc.prefix_for(device_cls)}") | ||
await device.connect() | ||
return device | ||
|
||
|
||
def get_example_ioc() -> TestingIOC: | ||
"""Get TestingIOC instance with the example databases loaded. | ||
Returns | ||
------- | ||
TestingIOC | ||
instance with test_records.db loaded for ExampleCaDevice and | ||
test_records.db and test_records_pva.db loaded for ExamplePvaDevice. | ||
""" | ||
ioc = TestingIOC() | ||
ioc.database_for(PVA_RECORDS, ExamplePvaDevice) | ||
ioc.database_for(CA_PVA_RECORDS, ExamplePvaDevice) | ||
ioc.database_for(CA_PVA_RECORDS, ExampleCaDevice) | ||
return ioc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import random | ||
import string | ||
import subprocess | ||
import sys | ||
import time | ||
from pathlib import Path | ||
|
||
from aioca import purge_channel_caches | ||
|
||
from ophyd_async.core import Device | ||
|
||
|
||
def generate_random_PV_prefix() -> str: | ||
return "".join(random.choice(string.ascii_lowercase) for _ in range(12)) + ":" | ||
|
||
|
||
class TestingIOC: | ||
_dbs: dict[type[Device], list[Path]] = {} | ||
_prefixes: dict[type[Device], str] = {} | ||
|
||
@classmethod | ||
def with_database(cls, db: Path | str): # use as a decorator | ||
def inner(device_cls: type[Device]): | ||
cls.database_for(db, device_cls) | ||
return device_cls | ||
|
||
return inner | ||
|
||
@classmethod | ||
def database_for(cls, db, device_cls): | ||
path = Path(db) | ||
if not path.is_file(): | ||
raise OSError(f"{path} is not a file.") | ||
if device_cls not in cls._dbs: | ||
cls._dbs[device_cls] = [] | ||
cls._dbs[device_cls].append(path) | ||
|
||
def prefix_for(self, device_cls): | ||
# generate random prefix, return existing if already generated | ||
return self._prefixes.setdefault(device_cls, generate_random_PV_prefix()) | ||
|
||
def start_ioc(self): | ||
ioc_args = [ | ||
sys.executable, | ||
"-m", | ||
"epicscorelibs.ioc", | ||
] | ||
for device_cls, dbs in self._dbs.items(): | ||
prefix = self.prefix_for(device_cls) | ||
for db in dbs: | ||
ioc_args += ["-m", f"device={prefix}", "-d", str(db)] | ||
self._process = subprocess.Popen( | ||
ioc_args, | ||
stdin=subprocess.PIPE, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
universal_newlines=True, | ||
) | ||
start_time = time.monotonic() | ||
while "iocRun: All initialization complete" not in ( | ||
self._process.stdout.readline().strip() # type: ignore | ||
): | ||
if time.monotonic() - start_time > 10: | ||
try: | ||
print(self._process.communicate("exit()")[0]) | ||
except ValueError: | ||
# Someone else already called communicate | ||
pass | ||
raise TimeoutError("IOC did not start in time") | ||
|
||
def stop_ioc(self): | ||
# close backend caches before the event loop | ||
purge_channel_caches() | ||
try: | ||
print(self._process.communicate("exit()")[0]) | ||
except ValueError: | ||
# Someone else already called communicate | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
record(bo, "$(device)bool") { | ||
field(ZNAM, "No") | ||
field(ONAM, "Yes") | ||
field(VAL, "1") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(bo, "$(device)bool_unnamed") { | ||
field(VAL, "1") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(longout, "$(device)int") { | ||
field(LLSV, "MAJOR") # LOLO is alarm | ||
field(LSV, "MINOR") # LOW is warning | ||
field(HSV, "MINOR") # HIGH is warning | ||
field(HHSV, "MAJOR") # HIHI is alarm | ||
field(HOPR, "100") | ||
field(HIHI, "98") | ||
field(HIGH, "96") | ||
field(DRVH, "90") | ||
field(DRVL, "10") | ||
field(LOW, "5") | ||
field(LOLO, "2") | ||
field(LOPR, "0") | ||
field(VAL, "42") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(longout, "$(device)partialint") { | ||
field(LLSV, "MAJOR") # LOLO is alarm | ||
field(HHSV, "MAJOR") # HIHI is alarm | ||
field(HOPR, "100") | ||
field(HIHI, "98") | ||
field(DRVH, "90") | ||
field(DRVL, "10") | ||
field(LOLO, "2") | ||
field(LOPR, "0") | ||
field(VAL, "42") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(longout, "$(device)lessint") { | ||
field(HSV, "MINOR") # LOW is warning | ||
field(LSV, "MINOR") # HIGH is warning | ||
field(HOPR, "100") | ||
field(HIGH, "98") | ||
field(LOW, "2") | ||
field(LOPR, "0") | ||
field(VAL, "42") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(ao, "$(device)float") { | ||
field(PREC, "1") | ||
field(EGU, "mm") | ||
field(VAL, "3.141") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(ao, "$(device)float_prec_0") { | ||
field(PREC, "0") | ||
field(EGU, "mm") | ||
field(VAL, "3") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(ao, "$(device)float_prec_1") { | ||
field(PREC, "1") | ||
field(EGU, "mm") | ||
field(VAL, "3") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(stringout, "$(device)str") { | ||
field(VAL, "hello") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(mbbo, "$(device)enum") { | ||
field(ZRST, "Aaa") | ||
field(ZRVL, "5") | ||
field(ONST, "Bbb") | ||
field(ONVL, "6") | ||
field(TWST, "Ccc") | ||
field(TWVL, "7") | ||
field(VAL, "1") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(mbbo, "$(device)enum2") { | ||
field(ZRST, "Aaa") | ||
field(ONST, "Bbb") | ||
field(TWST, "Ccc") | ||
field(VAL, "1") | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)uint8a") { | ||
field(NELM, "3") | ||
field(FTVL, "UCHAR") | ||
field(INP, {const:[0, 255]}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)int16a") { | ||
field(NELM, "3") | ||
field(FTVL, "SHORT") | ||
field(INP, {const:[-32768, 32767]}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)int32a") { | ||
field(NELM, "3") | ||
field(FTVL, "LONG") | ||
field(INP, {const:[-2147483648, 2147483647]}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)float32a") { | ||
field(NELM, "3") | ||
field(FTVL, "FLOAT") | ||
field(INP, {const:[0.000002, -123.123]}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)float64a") { | ||
field(NELM, "3") | ||
field(FTVL, "DOUBLE") | ||
field(INP, {const:[0.1, -12345678.123]}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)stra") { | ||
field(NELM, "3") | ||
field(FTVL, "STRING") | ||
field(INP, {const:["five", "six", "seven"]}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(waveform, "$(device)longstr") { | ||
field(NELM, "80") | ||
field(FTVL, "CHAR") | ||
field(INP, {const:"a string that is just longer than forty characters"}) | ||
field(PINI, "YES") | ||
} | ||
|
||
record(lsi, "$(device)longstr2") { | ||
field(SIZV, "80") | ||
field(INP, {const:"a string that is just longer than forty characters"}) | ||
field(PINI, "YES") | ||
} |
Oops, something went wrong.