Skip to content

Commit

Permalink
support optional fields (which seems to be a combination of bit + swi…
Browse files Browse the repository at this point in the history
…tch)

update tests so that they both register + verify nested objects are working

clean up tests. is redundant and messing up a more natural path pattern for running tests

simplify logic with BitFieldState
  • Loading branch information
David Chen authored and oroulet committed May 20, 2022
1 parent 1b85c61 commit 96a8ea6
Show file tree
Hide file tree
Showing 12 changed files with 822 additions and 48 deletions.
98 changes: 80 additions & 18 deletions opcua/common/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import re
import logging
# The next two imports are for generated code
from collections import namedtuple
from datetime import datetime
import uuid
from enum import Enum, IntEnum, EnumMeta
from typing import Any, Union

from lxml import objectify

Expand All @@ -21,13 +23,13 @@

logger = logging.getLogger(__name__)


# TODO: use non-empty fields for testing.
def get_default_value(uatype, enums):
if uatype == "String":
return "None"
if uatype in ("String", "CharArray"):
return "''"
elif uatype == "Guid":
return "uuid.uuid4()"
elif uatype in ("ByteString", "CharArray", "Char"):
elif uatype in ("ByteString", "Char"):
return "b''"
elif uatype == "Boolean":
return "True"
Expand Down Expand Up @@ -82,9 +84,17 @@ def __init__(self, name, value):
class Struct(object):
def __init__(self, name):
self.name = _clean_name(name)
self.bit_mapping = {}
self.fields = []
self.typeid = None

def get_ua_switches(self):
ua_switches = {}
for field in self.fields:
if field.is_switch():
ua_switches[field.name] = self.bit_mapping.get(field.switch_name)
return ua_switches

def __str__(self):
return "Struct(name={}, fields={}".format(self.name, self.fields)
__repr__ = __str__
Expand All @@ -99,15 +109,19 @@ class {0}(object):
'''
""".format(self.name)

ua_switches = self.get_ua_switches()
if ua_switches:
code += " ua_switches = {\n"
for field_name, encode_tuple in self.get_ua_switches().items():
code += " '{}': {}, \n".format(field_name, encode_tuple)
code += " }\n"
code += " ua_types = [\n"
for field in self.fields:
prefix = "ListOf" if field.array else ""
uatype = prefix + field.uatype
if uatype == "ListOfChar":
uatype = "String"
code += " ('{}', '{}'),\n".format(field.name, uatype)

code += " ]"
code += """
def __str__(self):
Expand All @@ -128,15 +142,51 @@ def __init__(self):
class Field(object):
def __init__(self, name):
self.name = name
self.switch_name = ""
self.uatype = None
self.value = None
self.value = None # e.g: ua.Int32(0)
self.array = False

def is_switch(self):
return len(self.switch_name) > 0

def __str__(self):
return "Field(name={}, uatype={}".format(self.name, self.uatype)
__repr__ = __str__


class BitFieldState:
def __init__(self):
self.encoding_field: Union[Field, None] = None
self.bit_size = 0
self.bit_offset = 0
self.encoding_field_counter = 0

def add_bit(self, length: int) -> Union[Field, None]:
""" Returns field if a new one was added. Else None """
if not self.encoding_field:
return self.reset_encoding_field()
else:
if self.bit_size + length > 32:
return self.reset_encoding_field()
else:
self.bit_size += length
self.bit_offset += 1
return None

def reset_encoding_field(self) -> Field:
field = Field(f"BitEncoding{self.encoding_field_counter}")
field.uatype = "UInt32"
self.encoding_field = field
self.bit_offset = 0
self.encoding_field_counter += 1
return field

def get_bit_info(self) -> (str, int):
""" With the field name and bit offset, we can extract the bit later."""
return self.encoding_field.name, self.bit_offset


class StructGenerator(object):
def __init__(self):
self.model = []
Expand Down Expand Up @@ -165,22 +215,33 @@ def _make_model(self, root):
for child in root.iter("{*}StructuredType"):
struct = Struct(child.get("Name"))
array = False
bit_state = BitFieldState()
for xmlfield in child.iter("{*}Field"):
name = xmlfield.get("Name")
clean_name = _clean_name(name)
if name.startswith("NoOf"):
array = True
continue
field = Field(_clean_name(name))
field.uatype = xmlfield.get("TypeName")
if ":" in field.uatype:
field.uatype = field.uatype.split(":")[1]
field.uatype = _clean_name(field.uatype)
field.value = get_default_value(field.uatype, enums)
if array:
field.array = True
field.value = []
array = False
struct.fields.append(field)
_type = xmlfield.get("TypeName")
if ":" in _type:
_type = _type.split(":")[1]
_type = _clean_name(_type)
if _type == 'Bit':
bit_length = int(xmlfield.get("Length", 1)) # Will longer bits be used?
field = bit_state.add_bit(bit_length)
# Whether or not a new encoding field was added, we want to store the current one.
struct.bit_mapping[name] = (bit_state.encoding_field.name, bit_state.bit_offset)
else:
field = Field(clean_name)
field.uatype = _type
field.switch_name = xmlfield.get('SwitchField', "")
if field:
field.value = get_default_value(field.uatype, enums)
if array:
field.array = True
field.value = []
array = False
struct.fields.append(field)
self.model.append(struct)

def save_to_file(self, path, register=False):
Expand Down Expand Up @@ -226,6 +287,7 @@ def _make_header(self, _file):
'''
from datetime import datetime
from enum import IntEnum
import uuid
from opcua import ua
Expand Down
2 changes: 1 addition & 1 deletion opcua/ua/uatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ def get_default_value(vtype):
raise RuntimeError("function take a uatype as argument, got:", vtype)


# These dictionnaries are used to register extensions classes for automatic
# These dictionaries are used to register extensions classes for automatic
# decoding and encoding
extension_object_classes = {}
extension_object_ids = {}
Expand Down
2 changes: 0 additions & 2 deletions run-tests.sh

This file was deleted.

Loading

0 comments on commit 96a8ea6

Please sign in to comment.