From cf3deccfd34c398b9aedfb71a7d5adbf3373d43c Mon Sep 17 00:00:00 2001 From: RobertoRoos Date: Thu, 25 Jul 2024 10:57:08 +0200 Subject: [PATCH] Added fallback to type detection to allow automatic ENUM usage + Added exception raising for unknown types --- pyads/connection.py | 21 ++++++++++----- pyads/symbol.py | 61 +++++++++++++++++++++++++++++++++----------- tests/test_symbol.py | 35 ++++++++++++++++++++----- 3 files changed, 90 insertions(+), 27 deletions(-) diff --git a/pyads/connection.py b/pyads/connection.py index bfe4a55..2873987 100644 --- a/pyads/connection.py +++ b/pyads/connection.py @@ -92,6 +92,7 @@ _dict_slice_generator, bytes_from_dict, size_of_structure, + ADSError, ) from .symbol import AdsSymbol from .utils import decode_ads @@ -174,16 +175,16 @@ def _query_plc_datatype_from_name(self, data_name: str, If cache_symbol_info is True then the SymbolInfo will be cached and adsGetSymbolInfo will only used once. - """ + info = None if cache_symbol_info: info = self._symbol_info_cache.get(data_name) - if info is None: - info = adsGetSymbolInfo(self._port, self._adr, data_name) - self._symbol_info_cache[data_name] = info - else: + if info is None: info = adsGetSymbolInfo(self._port, self._adr, data_name) - return AdsSymbol.get_type_from_str(info.symbol_type) + if cache_symbol_info: + self._symbol_info_cache[data_name] = info + + return AdsSymbol.get_type_from_info(info) def open(self) -> None: """Connect to the TwinCAT message router.""" @@ -543,6 +544,10 @@ def read_by_name( plc_datatype = self._query_plc_datatype_from_name(data_name, cache_symbol_info) + if plc_datatype is None: + # `adsSyncReadReqEx2()` will fail for a None type + raise ADSError(None, f"Failed to detect datatype for `{data_name}`") + return adsSyncReadByNameEx( self._port, self._adr, @@ -688,6 +693,10 @@ def write_by_name( plc_datatype = self._query_plc_datatype_from_name(data_name, cache_symbol_info) + if plc_datatype is None: + # `adsSyncWriteReqEx()` does not support `None` for a type + raise ADSError(None, f"Failed to detect datatype for `{data_name}`") + return adsSyncWriteByNameEx( self._port, self._adr, data_name, value, plc_datatype, handle=handle ) diff --git a/pyads/symbol.py b/pyads/symbol.py index 762cc99..7bf776a 100644 --- a/pyads/symbol.py +++ b/pyads/symbol.py @@ -16,9 +16,9 @@ from typing import TYPE_CHECKING, Any, Optional, List, Tuple, Callable, Union, Type from . import constants # To access all constants, use package notation -from .constants import PLCDataType +from .constants import PLCDataType, ads_type_to_ctype from .pyads_ex import adsGetSymbolInfo, ADSError -from .structs import NotificationAttrib +from .structs import NotificationAttrib, SAdsSymbolEntry # ads.Connection relies on structs.AdsSymbol (but in type hints only), so use # this 'if' to only include it when type hinting (False during execution) @@ -125,7 +125,7 @@ def __init__( self.name = name self.index_offset = index_offset self.index_group = index_group - self.symbol_type = symbol_type + self.symbol_type = None # Apply `symbol_type` later self.comment = comment self._value: Any = None @@ -137,15 +137,14 @@ def __init__( from .ads import size_of_structure self._structure_size = size_of_structure(self.structure_def * self.array_size) + self.plc_type: Optional[Type[PLCDataType]] = None + if missing_info: self._create_symbol_from_info() # Perform remote lookup - # Now `self.symbol_type` should have a value, find the actual PLCTYPE - # from it. - # This is relevant for both lookup and full user definition. - - self.plc_type: Optional[Type[PLCDataType]] = None - if self.symbol_type is not None: + # Apply user-provided type (overriding auto detect if any): + if symbol_type is not None: + self.symbol_type = symbol_type if isinstance(self.symbol_type, str): # Perform lookup if string self.plc_type = AdsSymbol.get_type_from_str(self.symbol_type) else: # Otherwise `symbol_type` is probably a pyads.PLCTYPE_* constant @@ -166,12 +165,8 @@ def _create_symbol_from_info(self) -> None: if info.comment: self.comment = info.comment - # info.dataType is an integer mapping to a type in - # constants.ads_type_to_ctype. - # However, this type ignores whether the variable is really an array! - # So are not going to be using this and instead rely on the textual - # type - self.symbol_type = info.symbol_type # Save the type as string + self.plc_type = self.get_type_from_info(info) + self.symbol_type = info.symbol_type # Also save the type as string def _check_for_open_connection(self) -> None: """Assert the current object is ready to read from/write to. @@ -195,6 +190,12 @@ def read(self) -> Any: structure_size=self._structure_size, array_size=self.array_size) else: + if self.plc_type is None: + raise ADSError( + None, + f"Cannot read data with unknown datatype for symbol " + f"{self.name} ({self.symbol_type})" + ) self._value = self._plc.read(self.index_group, self.index_offset, self.plc_type) return self._value @@ -218,6 +219,12 @@ def write(self, new_value: Optional[Any] = None) -> None: self._plc.write_structure_by_name(self.name, new_value, self.structure_def, structure_size=self._structure_size, array_size=self.array_size) else: + if self.plc_type is None: + raise ADSError( + None, + f"Cannot write data with unknown datatype for symbol " + f"{self.name} ({self.symbol_type})" + ) self._plc.write(self.index_group, self.index_offset, new_value, self.plc_type) def __repr__(self) -> str: @@ -286,6 +293,30 @@ def _value_callback(self, notification: Any, data_name: Any) -> None: ) self._value = value + @classmethod + def get_type_from_info(cls, info: SAdsSymbolEntry) -> Optional[Type[PLCDataType]]: + """Get PLCTYPE_* from symbol info struct + + Also see :meth:`~get_type_from_str`. + """ + plc_type = cls.get_type_from_str(info.symbol_type) + if plc_type is not None: + return plc_type + + # Failed to detect by name (e.g. type is enum) + + # Use `ADST_*` integer to detect type instead + plc_type = ads_type_to_ctype.get(info.dataType, None) + if plc_type is not None: + array_size = int(info.size / sizeof(plc_type)) + # However, `dataType` is always a scalar, even if the object is an array: + if array_size > 1: + plc_type = plc_type * array_size + + return plc_type + + return None + @staticmethod def get_type_from_str(type_str: str) -> Optional[Type[PLCDataType]]: """Get PLCTYPE_* from PLC name string diff --git a/tests/test_symbol.py b/tests/test_symbol.py index d6200a8..65504d8 100644 --- a/tests/test_symbol.py +++ b/tests/test_symbol.py @@ -15,7 +15,7 @@ import pyads from pyads.testserver import AdsTestServer, AdvancedHandler, PLCVariable -from pyads import constants, AdsSymbol, bytes_from_dict +from pyads import constants, AdsSymbol, ADSError, bytes_from_dict from tests.test_connection_class import create_notification_struct @@ -143,8 +143,8 @@ def test_init_by_name_matrix_style(self): struct.pack("<21b", *range(21)), ads_type=constants.ADST_VOID, symbol_type="matrix_21_int8_T", # Simulink array looks like this - index_group = 123, - index_offset = 100, + index_group=123, + index_offset=100, ) self.handler.add_variable(var) self.plc.open() @@ -158,7 +158,7 @@ def test_init_by_name_matrix_style(self): # Verify looked up info self.assertEqual(constants.PLCTYPE_ARR_SINT(21), symbol.plc_type) - self.assertEqual(var.symbol_type, symbol.symbol_type) + self.assertEqual("matrix_21_int8_T", symbol.symbol_type) self.assertAdsRequestsCount(0) # No requests @@ -265,10 +265,10 @@ def test_init_invalid_type(self): # Create symbol while providing everything: symbol = AdsSymbol(self.plc, name=var.name) self.assertEqual(var.symbol_type, symbol.symbol_type) - with self.assertRaises(TypeError) as cm: + with self.assertRaises(ADSError) as cm: # Error is thrown inside pyads_ex symbol.read() - self.assertIn("NoneType", str(cm.exception)) + self.assertIn("unknown datatype", str(cm.exception)) self.assertAdsRequestsCount(1) # Only a WRITE followed by a READ def test_read_write_errors(self): @@ -370,6 +370,29 @@ def test_read_structure_array(self): self.assertEqual(values, read_values) + def test_read_enum(self): + """Test reading from a symbol when it's an ENUM type""" + self.handler.add_variable( + PLCVariable("TestEnum", 7, constants.ADST_UINT16, symbol_type="E_MyEnum")) + + with self.plc: + symbol = self.plc.get_symbol("TestEnum") + value = symbol.read() + + self.assertEqual(7, value) + + def test_read_enum_array(self): + """Test reading from a symbol when it's an array of an ENUM type""" + value_bytes = struct.pack("<3h", 1, 2, 3) + self.handler.add_variable( + PLCVariable("TestEnumList", value_bytes, constants.ADST_UINT16, symbol_type="ARRAY [1..3] OF E_MyEnum")) + + with self.plc: + symbol = self.plc.get_symbol("TestEnumList") + value = symbol.read() + + self.assertEqual([1, 2, 3], value) + def test_write(self): """Test symbol value writing""" with self.plc: