diff --git a/examples/server-persistent.py b/examples/server-persistent.py new file mode 100644 index 000000000..57642eedd --- /dev/null +++ b/examples/server-persistent.py @@ -0,0 +1,49 @@ +import sys +sys.path.insert(0, "..") +import time +import random + +from opcua import Server +from opcua.common.sqlite3_backend import SQLite3Backend +from opcua.server.address_space_sqlite import StandardAddressSpaceSQLite, AddressSpaceSQLite + +ITEMS = ('Pump', 'Motor', 'Fan', 'Gearbox', 'Filter', 'Building', 'Ventilation') + +if __name__ == "__main__": + + print('\nStart and stop this server multiple times and\n' + 'verify that address space is persisted in SQL.\n' + 'Values written from any opc-ua client into the\n' + 'server are also stored.\n') + + with SQLite3Backend(sqlFile='my_address_space.py', readonly=False) as backend, \ + StandardAddressSpaceSQLite() as stdAspace, \ + AddressSpaceSQLite(backend=backend, cache=stdAspace) as myAspace: + + # setup our server + server = Server(aspace=myAspace) + server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/") + + # setup our own namespace, not really necessary but should as spec + uri = "http://examples.freeopcua.github.io" + idx = server.register_namespace(uri) + + # get Objects node, this is where we should put our nodes + objects = server.get_objects_node() + + # populating our address space + myobj = objects.add_object(idx, "{:s}-{:d}".format(random.choice(ITEMS), random.randint(1,100))) + myvar = myobj.add_variable(idx, "MyVariable", 42) + myvar.set_writable() # Set MyVariable to be writable by clients + + # starting! + server.start() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + #close connection, remove subcsriptions, etc + server.stop() diff --git a/opcua/common/sqlite3_backend.py b/opcua/common/sqlite3_backend.py new file mode 100644 index 000000000..3c0f41fe1 --- /dev/null +++ b/opcua/common/sqlite3_backend.py @@ -0,0 +1,136 @@ + +import os +import sys +import time +import sqlite3 +import threading +from multiprocessing import Lock + +class SQLite3Backend(object): + PY2 = sys.version_info < (3, 0) + CHECKP_INTERVAL = 90 # [sec] WAL checkpoint + + def __init__(self, sqlFile = None, readonly=True): + assert(isinstance(sqlFile, str)) + assert(isinstance(readonly, bool)) + self._sqlFile = sqlFile # Path to database file. + self._readonly = bool(readonly) + self._lock = Lock() # Database lock. + self._conn = {} # Database connection. + self._lastCheckP = int(0) # Epoch of last checkpoint. + + def __enter__(self): + self._lastCheckP = time.time() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._db_disconnect() + + def __str__(self): + return self._sqlFile + + @property + def readonly(self): + return self._readonly + + # PUBLIC METHODS + def execute_read(self, dbCmd = None, params = (), CB = None): + with self._lock: + c = self._get_conn().cursor() + for row in c.execute(dbCmd, params): + CB(row) + c.close() + + def execute_write(self, dbCmd = None, params = (), commit=True): + with self._lock: + conn = self._get_conn() + if dbCmd is not None: + c = conn.cursor() + c.execute(dbCmd, params) + c.close() + if bool(commit) is True: + conn.commit() + self._wal_throttled() + + def wal_throttled_threadsafe(self): + with self._lock: + self._wal_throttled() + + def wal_checkpoint_threadsafe(self): + """ + Store checkpoint: forces database modifications to be persistent. + Automatically done when sqlite cache runs over the 1000 pages threshold. + IMPORTANT: slow operation, manual syncs are only useful for sporadic + transactions that you really want to survive a power loss. + """ + with self._lock: + self._wal_checkpoint() + + # PRIVATE METHODS + def _wal_throttled(self): + # commits still require a wal_checkpoint to become persistent. + if abs(time.time() - self._lastCheckP) < self.CHECKP_INTERVAL: + return + self._wal_checkpoint() + + def _wal_checkpoint(self): + self._lastCheckP = time.time() + c = self._get_conn().cursor() + c.execute('PRAGMA wal_checkpoint') + c.close() + + def _db_connect(self): + CID = SQLite3Backend._getCID() + assert CID not in self._conn + if SQLite3Backend.PY2: + self._db_connect_py2(CID) + else: + self._conn[CID] = sqlite3.connect( + 'file:{:s}?immutable={:s}'.format(self._sqlFile, '1' if self.readonly else '0'), + detect_types = sqlite3.PARSE_DECLTYPES, # so datetimes won't be BLOBs + check_same_thread = False, + uri = True, + ) + c = self._get_conn().cursor() + if self.readonly is True: + c.execute('PRAGMA query_only=1') + else: + c.execute('PRAGMA journal_mode=WAL') + c.execute('PRAGMA synchronous=NORMAL') + + # Legacy support for Python<3.x. + def _db_connect_py2(self, CID): + if os.access(self._sqlFile, os.W_OK) is False: + raise Exception('Python>=3.x is required for immutable sqlite3 database.') + self._conn[CID] = sqlite3.connect( + self._sqlFile, + detect_types = sqlite3.PARSE_DECLTYPES, + check_same_thread = False + ) + self._conn[CID].text_factory = bytes + + def _db_disconnect(self): + # Commit, checkpoint. + if self.readonly is False: + self.wal_checkpoint_threadsafe() + # Close all connections to database. + for CID in self._conn: + self._conn[CID].close() + # Remove all items from dict. + self._conn.clear() + + def _get_conn(self): + if self._lock.acquire(False) is True: + self._lock.release() + raise Exception('Forgot to lock?') + # sqlite3 multithreading: http://beets.io/blog/sqlite-nightmare.html + CID = SQLite3Backend._getCID() + try: + return self._conn[CID] + except KeyError: + self._db_connect() + return self._conn[CID] + + @staticmethod + def _getCID(): + return threading.current_thread().ident diff --git a/opcua/common/utils.py b/opcua/common/utils.py index 0398a62ed..9dc2fe45a 100644 --- a/opcua/common/utils.py +++ b/opcua/common/utils.py @@ -9,6 +9,7 @@ import functools import threading from socket import error as SocketError +from collections import MutableMapping try: import asyncio @@ -210,4 +211,53 @@ def run_until_complete(self, coro): return future.result() +class ThreadSafeDict(MutableMapping): + def __init__(self, cache=None): + self._lock = cache._lock if hasattr(cache, '_lock') else threading.RLock() # FIXME: should use multiple reader, one writter pattern + if cache is None: + self._cache = {} + else: + assert(isinstance(cache, (dict, ThreadSafeDict))) + self._cache = cache + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._cache = None + + def __getitem__(self, key): + with self._lock: + return self._cache.__getitem__(key) + + def get(self, key, value=None): + with self._lock: + return self._cache.get(key, value) + + def __setitem__(self, key, value): + with self._lock: + return self._cache.__setitem__(key, value) + + def __contains__(self, key): + with self._lock: + return self._cache.__contains__(key) + + def __delitem__(self, key): + with self._lock: + del self._cache[key] + + def __iter__(self): + with self._lock: + return self._cache.__iter__() + + def __len__(self): + return len(self._cache) + + def keys(self): + with self._lock: + return self._cache.keys() + + def empty(self): + with self._lock: + self._cache = {} diff --git a/opcua/server/address_space.py b/opcua/server/address_space.py index 20e5dcdaa..7b4ca9a03 100644 --- a/opcua/server/address_space.py +++ b/opcua/server/address_space.py @@ -1,15 +1,10 @@ -from threading import RLock import logging from datetime import datetime -import collections -import shelve -try: - import cPickle as pickle -except: - import pickle +import random from opcua import ua from opcua.server.user_manager import UserManager +from opcua.common.utils import ThreadSafeDict class AttributeValue(object): @@ -317,10 +312,10 @@ def _delete_node(self, item, user): return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown) if item.DeleteTargetReferences: - for elem in self._aspace.keys(): - for rdesc in self._aspace[elem].references: + for ndata in self._aspace.values(): + for rdesc in ndata.references: if rdesc.NodeId == item.NodeId: - self._aspace[elem].references.remove(rdesc) + ndata.references.remove(rdesc) self._delete_node_callbacks(self._aspace[item.NodeId]) @@ -478,165 +473,44 @@ def _call(self, method): return res -class AddressSpace(object): - +class AddressSpace(ThreadSafeDict): """ The address space object stores all the nodes of the OPC-UA server and helper methods. The methods are thread safe """ + # https://opcfoundation.org/UA/schemas/1.03/Opc.Ua.Types.bsd + # numeric nodeid is UInt32, but Siemens MindSphere uses Int32. + MAX_NUMERIC_IDENTIFIER = 0x7fffffff + DEFAULT_USER_NAMESPACE_INDEX = 2 - def __init__(self): + def __init__(self, cache=None): + super(AddressSpace, self).__init__(cache) self.logger = logging.getLogger(__name__) - self._nodes = {} - self._lock = RLock() # FIXME: should use multiple reader, one writter pattern self._datachange_callback_counter = 200 self._handle_to_attribute_map = {} - self._default_idx = 2 - self._nodeid_counter = {0: 20000, 1: 2000} - - def __getitem__(self, nodeid): - with self._lock: - return self._nodes.__getitem__(nodeid) - - def get(self, nodeid): - with self._lock: - return self._nodes.get(nodeid, None) - - def __setitem__(self, nodeid, value): - with self._lock: - return self._nodes.__setitem__(nodeid, value) - - def __contains__(self, nodeid): - with self._lock: - return self._nodes.__contains__(nodeid) - def __delitem__(self, nodeid): - with self._lock: - self._nodes.__delitem__(nodeid) - - def generate_nodeid(self, idx=None): - if idx is None: - idx = self._default_idx - if idx in self._nodeid_counter: - self._nodeid_counter[idx] += 1 - else: - # get the biggest identifier number from the existed nodes in address space - identifier_list = sorted([nodeid.Identifier for nodeid in self._nodes.keys() - if nodeid.NamespaceIndex == idx and nodeid.NodeIdType - in (ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte)]) - if identifier_list: - self._nodeid_counter[idx] = identifier_list[-1] - else: - self._nodeid_counter[idx] = 1 - nodeid = ua.NodeId(self._nodeid_counter[idx], idx) + def generate_nodeid(self, idx=DEFAULT_USER_NAMESPACE_INDEX): + nodeid = ua.NodeId( + identifier=random.randrange(AddressSpace.MAX_NUMERIC_IDENTIFIER), + namespaceidx=idx + ) with self._lock: # OK since reentrant lock - while True: - if nodeid in self._nodes: + for retry in range(0, 10): + if nodeid in self: nodeid = self.generate_nodeid(idx) else: return nodeid - - def keys(self): - with self._lock: - return self._nodes.keys() - - def empty(self): - """ - Delete all nodes in address space - """ - with self._lock: - self._nodes = {} - - def dump(self, path): - """ - Dump address space as binary to file; note that server must be stopped for this method to work - DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED! - """ - # prepare nodes in address space for being serialized - for nodeid, ndata in self._nodes.items(): - # if the node has a reference to a method call, remove it so the object can be serialized - if ndata.call is not None: - self._nodes[nodeid].call = None - - with open(path, 'wb') as f: - pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL) - - def load(self, path): - """ - Load address space from a binary file, overwriting everything in the current address space - """ - with open(path, 'rb') as f: - self._nodes = pickle.load(f) - - def make_aspace_shelf(self, path): - """ - Make a shelf for containing the nodes from the standard address space; this is typically only done on first - start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache - by the LazyLoadingDict class when they are accessed. Saving data back to the shelf - is currently NOT supported, it is only used for the default OPC UA standard address space - - Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time - """ - s = shelve.open(path, "n", protocol=pickle.HIGHEST_PROTOCOL) - for nodeid, ndata in self._nodes.items(): - s[nodeid.to_string()] = ndata - s.close() - - def load_aspace_shelf(self, path): - """ - Load the standard address space nodes from a python shelve via LazyLoadingDict as needed. - The dump() method can no longer be used if the address space is being loaded from a shelf - - Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time - """ - class LazyLoadingDict(collections.MutableMapping): - """ - Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the - shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf - is currently NOT supported - """ - def __init__(self, source): - self.source = source # python shelf - self.cache = {} # internal dict - - def __getitem__(self, key): - # try to get the item (node) from the cache, if it isn't there get it from the shelf - try: - return self.cache[key] - except KeyError: - node = self.cache[key] = self.source[key.to_string()] - return node - - def __setitem__(self, key, value): - # add a new item to the cache; if this item is in the shelf it is not updated - self.cache[key] = value - - def __contains__(self, key): - return key in self.cache or key.to_string() in self.source - - def __delitem__(self, key): - # only deleting items from the cache is allowed - del self.cache[key] - - def __iter__(self): - # only the cache can be iterated over - return iter(self.cache.keys()) - - def __len__(self): - # only returns the length of items in the cache, not unaccessed items in the shelf - return len(self.cache) - - self._nodes = LazyLoadingDict(shelve.open(path, "r")) + assert(False) # What are the odds? def get_attribute_value(self, nodeid, attr): with self._lock: self.logger.debug("get attr val: %s %s", nodeid, attr) - if nodeid not in self._nodes: + if nodeid not in self: dv = ua.DataValue() dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown) return dv - node = self._nodes[nodeid] + node = self[nodeid] if attr not in node.attributes: dv = ua.DataValue() dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid) @@ -649,10 +523,11 @@ def get_attribute_value(self, nodeid, attr): def set_attribute_value(self, nodeid, attr, value): with self._lock: self.logger.debug("set attr val: %s %s %s", nodeid, attr, value) - node = self._nodes.get(nodeid, None) - if node is None: + + try: + attval = self[nodeid].attributes.get(attr, None) + except KeyError: return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown) - attval = node.attributes.get(attr, None) if attval is None: return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid) @@ -673,12 +548,13 @@ def set_attribute_value(self, nodeid, attr, value): def add_datachange_callback(self, nodeid, attr, callback): with self._lock: self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback) - if nodeid not in self._nodes: + + try: + attval = self[nodeid].attributes.get(attr, None) + except KeyError: return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0 - node = self._nodes[nodeid] - if attr not in node.attributes: + if attval is None: return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0 - attval = node.attributes[attr] self._datachange_callback_counter += 1 handle = self._datachange_callback_counter attval.datachange_callbacks[handle] = callback @@ -689,9 +565,9 @@ def delete_datachange_callback(self, handle): with self._lock: if handle in self._handle_to_attribute_map: nodeid, attr = self._handle_to_attribute_map.pop(handle) - self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle) + self[nodeid].attributes[attr].datachange_callbacks.pop(handle) def add_method_callback(self, methodid, callback): with self._lock: - node = self._nodes[methodid] + node = self[methodid] node.call = callback diff --git a/opcua/server/address_space_sqlite.py b/opcua/server/address_space_sqlite.py new file mode 100644 index 000000000..917b90608 --- /dev/null +++ b/opcua/server/address_space_sqlite.py @@ -0,0 +1,581 @@ + +import sys +import os.path +import datetime +import sqlite3 +from struct import pack + +from opcua import ua +from opcua.ua.uatypes import NumericNodeId, NodeIdType +from opcua.common.utils import Buffer +from opcua.common.sqlite3_backend import SQLite3Backend +from opcua.server.address_space import NodeData, AddressSpace, AttributeValue + + +class ReadOnlyException(Exception): + pass + + +class MonitoredAttribute(AttributeValue): + + def __init__(self, attr, onchange_cb): + self._value = attr.value + self.value_callback = attr.value_callback + self.datachange_callbacks = attr.datachange_callbacks + self.onchange_cb = onchange_cb + + @property + def value(self): + return self._value + + @value.setter + def value(self, newVal): + self._value = newVal + self.onchange_cb() + + +class MonitoredNode(object): + + def __init__(self, aspace, ndata): + self._aspace = aspace + self._nodeid = AddressSpaceSQLite._nodeid_surjection(ndata.nodeid) + + @property + def aspace(self): + return self._aspace + + @property + def nodeid(self): + return self._nodeid + + +class MonitoredAttributeDict(MonitoredNode, dict): + + def __init__(self, aspace, ndata): + MonitoredNode.__init__(self, aspace, ndata) + for attrId, attr in ndata.attributes.items(): + self[attrId] = attr + + def __setitem__(self, attrId, attr): + def onchange_cb(): + self.aspace._insert_attribute_threadsafe(self.nodeid, attrId, self[attrId], commit=True) + mAttr = MonitoredAttribute(attr, onchange_cb) + dict.__setitem__(self, attrId, mAttr) + mAttr.onchange_cb() + + def __delitem__(self, attrId): + raise NotImplementedError + + +class MonitoredReferenceList(MonitoredNode, list): + + def __init__(self, aspace, ndata): + MonitoredNode.__init__(self, aspace, ndata) + list.__init__(self, ndata.references) + + def append(self, ref): + list.append(self, ref) + self._aspace._insert_reference_threadsafe(self.nodeid, ref, commit=True) + + def remove(self, ref): + self._aspace._remove_reference_threadsafe(self.nodeid, ref, commit=True) + list.remove(self, ref) + + +class AddressSpaceSQLite(AddressSpace): + """ + Load the standard address space nodes from a SQLite database. + Intended for slow devices, such as Raspberry Pi, to greatly improve start up time + """ + PY2 = sys.version_info < (3, 0) + NODEID_COL_NAME = 'NodeId' + ATTR_TABLE_NAME = 'Attributes' + REFS_TABLE_NAME = 'References' + CUR_TIME_NODEID = NumericNodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime, 0) + + def __init__(self, backend, cache=None): + super(AddressSpaceSQLite, self).__init__(cache) + self._backend = backend + + def __enter__(self): + super(AddressSpaceSQLite, self).__enter__() + if self.readonly is False: + AddressSpaceSQLite._create_attr_table(self.backend) + AddressSpaceSQLite._create_refs_table(self.backend) + return self + + def __str__(self): + return str(self.backend) + + @property + def backend(self): + return self._backend + + @property + def readonly(self): + return self.backend.readonly + + def __getitem__(self, nodeid): + with self._lock: + (nodeData, fromDisk) = self._getitem_backend(nodeid) + return nodeData + + def _getitem_backend(self, nodeid): + try: + if not hasattr(self._cache, '_getitem_backend'): + (nodeData, fromDisk) = (self._cache.__getitem__(nodeid), False) + else: + (nodeData, fromDisk) = self._cache._getitem_backend(nodeid) + if fromDisk: + AddressSpaceSQLite._read_nodedata(self.backend, nodeid, nodeData) + if not (self.readonly is True or hasattr(nodeData.attributes, 'aspace')): + self._monitor_nodedata(nodeData) + except KeyError: + (nodeData, fromDisk) = (NodeData(nodeid), True) + AddressSpaceSQLite._read_nodedata(self.backend, nodeid, nodeData) + if len(nodeData.attributes) == 0: + raise + elif self.readonly is False: + self._monitor_nodedata(nodeData) + self._cache[nodeid] = nodeData + return (nodeData, fromDisk) + + def _monitor_nodedata(self, ndata): + if self.readonly is True: + raise ReadOnlyException(ndata.nodeid) + elif hasattr(ndata.attributes, 'aspace') and ndata.attributes.aspace is not self: + other = str(ndata.attributes.aspace) + raise Exception('Node {:s} is monitored by {:s}'.format(str(ndata.nodeid), other)) + elif hasattr(ndata.references, 'aspace') and ndata.references.aspace is not self: + other = str(ndata.attributes.aspace) + raise Exception('Node {:s} is monitored by {:s}'.format(str(ndata.nodeid), other)) + else: + ndata.attributes = MonitoredAttributeDict(self, ndata) + ndata.references = MonitoredReferenceList(self, ndata) + + def get(self, nodeid, value=None): + try: + return self[nodeid] + except KeyError: + return value + + def __contains__(self, nodeid): + return self.get(nodeid) is not None + + def __setitem__(self, nodeid, ndata): + self._cache.__setitem__(nodeid, ndata) + if self.readonly is True: + return + with self._lock: + self._setitem_backend(nodeid, ndata) + + def _setitem_backend(self, nodeid, ndata): + if not hasattr(ndata.attributes, 'aspace'): + self._monitor_nodedata(ndata) + + if ndata.attributes.aspace is self: + self._write_nodedata(ndata) + + @staticmethod + def _nodeid_surjection(nodeid): + assert(isinstance(nodeid, ua.uatypes.NodeId)) + # For database lookups, map TwoByte and FourByte onto NumericNodeId. + if nodeid.NodeIdType == NodeIdType.Numeric: + return nodeid + elif nodeid.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte): + return NumericNodeId(nodeid.Identifier, nodeid.NamespaceIndex) + elif nodeid.NodeIdType in (NodeIdType.String, NodeIdType.Guid, NodeIdType.ByteString): + return nodeid + else: + raise Exception('NodeIdType {:d} is not supported.'.format(nodeid.NodeIdType)) + + def keys(self): + raise Exception("dict.keys() is not supported for performance. Use iterator.") + + def __delitem__(self, key): + self._drop_nodedata(nodeid=key) + super(AddressSpaceSQLite, self).__delitem__(key) + + def __iter__(self): + # TODO only the cache can be iterated over. + return super(AddressSpaceSQLite, self).__iter__() + + def __len__(self): + # TODO only returns the length of items in the cache. + return super(AddressSpaceSQLite, self).__len__() + + def dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX): + """ + Dump address space into a database; note that server must be stopped for this method to work + Note 1: DO NOT DUMP AN ADDRESS SPACE RESTORED FROM DATABASE, ONLY CACHED NODES WILL GET DUMPED! + Note 2: If a NodeData instance holds a reference to a method call, it is not preserved. + Note 3: numeric nodeid's are required for database searches. + """ + with self._lock: + self._dump(namespaceidx) + print("Export to {:s} completed".format(str(self.backend))) + + def _dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX): + # 1. Create tables. + AddressSpaceSQLite._create_attr_table(self.backend, drop=True) + AddressSpaceSQLite._create_refs_table(self.backend, drop=True) + + # 2. Populate. + for nodeid, ndata in self._cache.items(): + assert(nodeid == ndata.nodeid) + assert(isinstance(ndata, NodeData)) + if nodeid.NamespaceIndex == namespaceidx: + self._write_nodedata(ndata, commit=False) + continue + # inter-namespace references. + for ref in ndata.references: + if ref.NodeId.NamespaceIndex != namespaceidx: + continue + mapNodeId = AddressSpaceSQLite._nodeid_surjection(ndata.nodeid) + self._insert_reference(mapNodeId, ref, commit=False) + + # 3. commit + self.backend.execute_write(dbCmd=None, commit=True) + + # 4. Integrity checks. + for nodeid, ndata in self._cache.items(): + if nodeid.NamespaceIndex != namespaceidx: + continue + ndata2 = NodeData(nodeid) + AddressSpaceSQLite._read_nodedata(self.backend, nodeid, ndata2) + AddressSpaceSQLite._cmp_nodedata(ndata, ndata2) + + # Write NodeData to database + def _write_nodedata(self, ndata, commit=True): + mapNodeId = AddressSpaceSQLite._nodeid_surjection(ndata.nodeid) + self._write_attributes(mapNodeId, ndata, commit=commit) + self._write_references(mapNodeId, ndata, commit=commit) + + def _write_attributes(self, nodeid, ndata, commit=True): + assert(nodeid.NodeIdType == NodeIdType.Numeric) + assert(isinstance(ndata.attributes, dict)) + for attrId, attr in ndata.attributes.items(): + AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr, commit=commit) + + def _write_references(self, nodeid, ndata, commit=True): + assert(nodeid.NodeIdType == NodeIdType.Numeric) + assert(isinstance(ndata.references, list)) + for ref in ndata.references: + AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref, commit=commit) + + # Remove NodeData from database + def _drop_nodedata(self, nodeid, commit=True): + assert(nodeid.NodeIdType == NodeIdType.Numeric) + if self.readonly is True: + return + AddressSpaceSQLite._drop_attributes(self.backend, nodeid, commit=commit) + AddressSpaceSQLite._drop_references(self.backend, nodeid, commit=commit) + + # Read NodeData from database + @staticmethod + def _read_nodedata(backend, nodeid, ndata): + # Search key = nodeid in opc-ua binary format + mapNodeId = AddressSpaceSQLite._nodeid_surjection(nodeid) + hexNodeId = AddressSpaceSQLite._to_hex(ua.ua_binary.nodeid_to_binary(mapNodeId)) + + AddressSpaceSQLite._read_attributes(backend, hexNodeId, ndata) + AddressSpaceSQLite._read_references(backend, hexNodeId, ndata) + + @staticmethod + def _to_hex(b): + if AddressSpaceSQLite.PY2: + return "".join("{:02x}".format(ord(c)) for c in b) + else: + return b.hex() + + @staticmethod + def _read_attributes(backend, hexNodeId, ndata, attrTable=ATTR_TABLE_NAME): + cmd = 'SELECT * FROM "{tn}" WHERE NodeId = x\'{h}\''.format(tn=attrTable, h=hexNodeId) + + def CB(row): + (attrId, attr) = AddressSpaceSQLite._read_attribute_row(row) + ndata.attributes[attrId] = attr + backend.execute_read(cmd, CB=CB) + + @staticmethod + def _read_references(backend, hexNodeId, ndata, refsTable=REFS_TABLE_NAME): + cmd = 'SELECT * FROM "{tn}" WHERE NodeId = x\'{h}\''.format(tn=refsTable, h=hexNodeId) + + def CB(row): + ref = AddressSpaceSQLite._read_reference_row(row) + ndata.references.append(ref) + backend.execute_read(cmd, CB=CB) + + # Read and write from attribute table + @staticmethod + def _create_attr_table(backend, table=ATTR_TABLE_NAME, drop=False): + nid = AddressSpaceSQLite.NODEID_COL_NAME + ATTR_COLS = [ + '_Id BLOB PRIMARY KEY NOT NULL', # 0 + '{:s} BLOB'.format(nid), # 1 + 'AttributeId INTEGER', # 2 + 'ServerTimestamp TIMESTAMP', # 3 + 'ServerPicoseconds INTEGER', # 4 + 'SourceTimestamp TIMESTAMP', # 5 + 'SourcePicoseconds INTEGER', # 6 + 'StatusCode INTEGER', # 7 + 'Variant BLOB', # 8 + 'Description STRING', # 9 + ] + AddressSpaceSQLite._create_indexed_table(backend, table, ATTR_COLS, drop) + + @staticmethod + def _create_indexed_table(backend, table, cols, drop): + nid = AddressSpaceSQLite.NODEID_COL_NAME + cmds = [] + if drop is True: + cmds.append('DROP INDEX IF EXISTS "idx_{tn}"'.format(tn=table)) + cmds.append('DROP TABLE IF EXISTS "{tn}"'.format(tn=table)) + cmds.append('CREATE TABLE IF NOT EXISTS "{tn}" ({c})'.format(tn=table, c=', '.join(cols))) + cmds.append('CREATE INDEX IF NOT EXISTS "idx_{tn}" ON "{tn}" ({nid})'.format(tn=table, nid=nid)) + for cmd in cmds: + backend.execute_write(cmd, commit=True) + + def _insert_attribute_threadsafe(self, nodeid, attrId, attr, table=ATTR_TABLE_NAME, commit=True): + with self._lock: + if nodeid == AddressSpaceSQLite.CUR_TIME_NODEID: + # Prevent sd-card wear: don't write the time. Use as trigger for WAL checkpoints. + self.backend.wal_throttled_threadsafe() + else: + AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr, table, commit=commit) + + @staticmethod + def _insert_attribute(backend, nodeid, attrId, attr, table=ATTR_TABLE_NAME, commit=True): + assert(nodeid.NodeIdType == NodeIdType.Numeric) + assert(isinstance(attrId, ua.AttributeIds)) + assert(isinstance(attr, AttributeValue)) + # Callback methods are not supported. + assert(attr.value_callback is None) + # Datachange callbacks not supported and are ignored. + assert(isinstance(attr.datachange_callbacks, dict)) + # DataValue has no opc-ua to_binary: flatten object. + assert(isinstance(attr.value, ua.uatypes.DataValue)) + # Server timestamp + assert(attr.value.ServerTimestamp is None or + isinstance(attr.value.ServerTimestamp, datetime.datetime)) + assert(attr.value.ServerPicoseconds is None or + isinstance(attr.value.ServerTimestamp, int)) + # Source timestamp + assert(attr.value.SourceTimestamp is None or + isinstance(attr.value.SourceTimestamp, datetime.datetime)) + assert(attr.value.SourcePicoseconds is None or + isinstance(attr.value.ServerTimestamp, int)) + assert(isinstance(attr.value.StatusCode, ua.uatypes.StatusCode)) + assert(isinstance(attr.value.Value, ua.uatypes.Variant)) + + binNodeId = ua.ua_binary.nodeid_to_binary(nodeid) + primaryKey = binNodeId + pack(">B", int(attrId)) + + cmd = 'INSERT OR REPLACE INTO "{tn}" VALUES ({q})'.format(tn=table, q=', '.join('?'*10)) + params = ( + sqlite3.Binary(primaryKey), + sqlite3.Binary(binNodeId), + int(attrId), + attr.value.ServerTimestamp, + None if attr.value.ServerPicoseconds is None else int(attr.value.ServerPicoseconds), + attr.value.SourceTimestamp, + None if attr.value.SourcePicoseconds is None else int(attr.value.SourcePicoseconds), + int(attr.value.StatusCode.value), + sqlite3.Binary(ua.ua_binary.variant_to_binary(attr.value.Value)), + str(nodeid) + ) + backend.execute_write(cmd, params=params, commit=commit) + + @staticmethod + def _drop_attributes(backend, nodeid, table=ATTR_TABLE_NAME, commit=True): + assert(nodeid.NodeIdType == NodeIdType.Numeric) + binNodeId = ua.ua_binary.nodeid_to_binary(nodeid) + cmd = 'DELETE FROM "{tn}" WHERE {nid}=?'.format( + tn=table, + nid=AddressSpaceSQLite.NODEID_COL_NAME + ) + params = ( + sqlite3.Binary(binNodeId), + ) + backend.execute_write(cmd, params=params, commit=commit) + + @staticmethod + def _read_attribute_row(row): + attrId = ua.AttributeIds(row[2]) + # Rebuild DataValue instance from flattened. + assert(row[3] is None or isinstance(row[3], datetime.datetime)) + assert(row[4] is None or isinstance(row[4], int)) + assert(row[5] is None or isinstance(row[5], datetime.datetime)) + assert(row[6] is None or isinstance(row[6], int)) + dv = ua.DataValue(ua.ua_binary.variant_from_binary(Buffer(row[8]))) + dv.ServerTimestamp = row[3] + dv.ServerPicoseconds = row[4] + dv.SourceTimestamp = row[5] + dv.SourcePicoseconds = row[6] + dv.StatusCode = ua.StatusCode(row[7]) + attr = AttributeValue(dv) + return (attrId, attr) + + # Read and write from references table + @staticmethod + def _create_refs_table(backend, table=REFS_TABLE_NAME, drop=False): + nid = AddressSpaceSQLite.NODEID_COL_NAME + REFS_COLS = [ + '_Id BLOB PRIMARY KEY NOT NULL', # 0 + '{:s} BLOB'.format(nid), # 1 = the nodeid of this ReferenceDescription + 'ReferenceTypeId BLOB', # 2 + 'IsForward INTEGER', # 3 + 'ReferredNodeId BLOB', # 4 = referred nodeid of ReferenceDescription + 'BrowseName_NamespaceIndex INTEGER', # 5 + 'BrowseName_Name TEXT', # 6 + 'DisplayName_Text TEXT', # 7 + 'DisplayName_Locale TEXT', # 8 + 'DisplayName_Encoding INTEGER', # 9 + 'NodeClass INTEGER', # 10 + 'TypeDefinition BLOB', # 11 + 'Description STRING' # 12 + ] + AddressSpaceSQLite._create_indexed_table(backend, table, REFS_COLS, drop) + + def _insert_reference_threadsafe(self, nodeid, ref, table=REFS_TABLE_NAME, commit=True): + with self._lock: + AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref, table, commit=commit) + + @staticmethod + def _insert_reference(backend, nodeid, ref, table=REFS_TABLE_NAME, commit=True): + # NumericNodeId is required for searching. + assert(nodeid.NodeIdType == NodeIdType.Numeric) + assert(isinstance(ref, ua.uaprotocol_auto.ReferenceDescription)) + assert(isinstance(ref.ReferenceTypeId, ua.uatypes.NodeId)) + assert(isinstance(ref.IsForward, bool)) + assert(isinstance(ref.NodeId, ua.uatypes.NodeId)) + # BrowseName + assert(isinstance(ref.BrowseName, ua.uatypes.QualifiedName)) + assert(isinstance(ref.BrowseName.NamespaceIndex, int)) + assert(ref.BrowseName.Name is None or isinstance(ref.BrowseName.Name, str)) + # DisplayName + assert(isinstance(ref.DisplayName, ua.uatypes.LocalizedText)) + assert(ref.DisplayName.Text is None or isinstance(ref.DisplayName.Text, str)) + assert(ref.DisplayName.Locale is None) + assert(isinstance(ref.DisplayName.Encoding, int)) + # NodeClass is enum, stored as INTEGER + assert(isinstance(ref.NodeClass, (int, ua.uaprotocol_auto.NodeClass))) + assert(isinstance(ref.TypeDefinition, ua.uatypes.NodeId)) + + binNodeId, refNodeId, primaryKey = \ + AddressSpaceSQLite._calcRefPrimaryKey(nodeid, ref) + + cmd = 'INSERT OR REPLACE INTO "{tn}" VALUES ({q})'.format(tn=table, q=', '.join('?'*13)) + params = ( + sqlite3.Binary(primaryKey), + sqlite3.Binary(binNodeId), + sqlite3.Binary(ua.ua_binary.nodeid_to_binary(ref.ReferenceTypeId)), + int(bool(ref.IsForward)), + sqlite3.Binary(refNodeId), + int(ref.BrowseName.NamespaceIndex), + None if ref.BrowseName.Name is None else str(ref.BrowseName.Name), + None if ref.DisplayName.Text is None else str(ref.DisplayName.Text), + None if ref.DisplayName.Locale is None else str(ref.DisplayName.Locale), + int(ref.DisplayName.Encoding), + int(ref.NodeClass), + sqlite3.Binary(ua.ua_binary.nodeid_to_binary(ref.TypeDefinition)), + str(nodeid) + ) + backend.execute_write(cmd, params=params, commit=commit) + + @staticmethod + def _drop_references(backend, nodeid, table=REFS_TABLE_NAME, commit=True): + assert(nodeid.NodeIdType == NodeIdType.Numeric) + binNodeId = ua.ua_binary.nodeid_to_binary(nodeid) + cmd = 'DELETE FROM "{tn}" WHERE {nid}=?'.format( + tn=table, + nid=AddressSpaceSQLite.NODEID_COL_NAME + ) + params = ( + sqlite3.Binary(binNodeId), + ) + backend.execute_write(cmd, params=params, commit=commit) + + def _calcRefPrimaryKey(nodeid, ref): + binNodeId = ua.ua_binary.nodeid_to_binary(nodeid) # Our own nodeid + refNodeId = ua.ua_binary.nodeid_to_binary(ref.NodeId) # Referred nodeid + primaryKey = binNodeId + refNodeId + pack(">B", int(ref.IsForward)) + return binNodeId, refNodeId, primaryKey + + def _remove_reference_threadsafe(self, nodeid, ref, table=REFS_TABLE_NAME, commit=True): + with self._lock: + AddressSpaceSQLite._remove_reference(self.backend, nodeid, ref, table, commit=commit) + + @staticmethod + def _remove_reference(backend, nodeid, ref, table=REFS_TABLE_NAME, commit=True): + cmd = 'DELETE FROM "{tn}" WHERE _Id = ?'.format(tn=table) + binNodeId, refNodeId, primaryKey = \ + AddressSpaceSQLite._calcRefPrimaryKey(nodeid, ref) + params = ( + sqlite3.Binary(primaryKey), + ) + backend.execute_write(cmd, params=params, commit=commit) + + @staticmethod + def _read_reference_row(row): + ref = ua.uaprotocol_auto.ReferenceDescription() + ref.ReferenceTypeId = ua.ua_binary.nodeid_from_binary(Buffer(row[2])) + ref.IsForward = bool(int(row[3])) + ref.NodeId = ua.ua_binary.nodeid_from_binary(Buffer(row[4])) + ref.BrowseName = ua.QualifiedName(str(row[6]) if row[6] else None, int(row[5])) + ref.DisplayName = ua.LocalizedText(str(row[7]) if row[7] else None) + # row[8] ignored: DisplayName.Locale is automatically set. + # row[9] ignored: DisplayName.Encoding is automatically set. + ref.NodeClass = ua.NodeClass(int(row[10])) + ref.TypeDefinition = ua.ua_binary.nodeid_from_binary(Buffer(row[11])) + return ref + + # Compare NodeData instances. + @staticmethod + def _cmp_nodedata(ndata, ndata2): + assert(isinstance(ndata2, NodeData)) + assert(ndata.nodeid == ndata2.nodeid) + for attrId, attr in ndata.attributes.items(): + attr2 = ndata2.attributes[attrId] + AddressSpaceSQLite._cmp_attr(attr, attr2) + for idx, ref in enumerate(ndata.references): + ref2 = ndata2.references[idx] + AddressSpaceSQLite._cmp_refs(ref, ref2) + + @staticmethod + def _cmp_attr(attr, attr2): + assert(attr.value.ServerTimestamp == attr2.value.ServerTimestamp) + assert(attr.value.SourceTimestamp == attr2.value.SourceTimestamp) + assert(attr.value.StatusCode.value == attr2.value.StatusCode.value) + try: + assert(str(attr.value.Value.Value) == str(attr2.value.Value.Value)) + except Exception: + assert(int(attr.value.Value.Value) == int(attr2.value.Value.Value)) + assert(attr.value.Value.VariantType == attr2.value.Value.VariantType) + + @staticmethod + def _cmp_refs(ref, ref2): + assert(isinstance(ref2, ua.uaprotocol_auto.ReferenceDescription)) + assert(ref.ReferenceTypeId == ref2.ReferenceTypeId) + assert(ref.IsForward == ref2.IsForward) + assert(ref.NodeId == ref2.NodeId) + assert(ref.BrowseName == ref2.BrowseName) + assert(ref.DisplayName == ref2.DisplayName) + assert(int(ref.NodeClass) == int(ref2.NodeClass)) + assert(ref.TypeDefinition == ref2.TypeDefinition) + + +class StandardAddressSpaceSQLite(AddressSpaceSQLite): + + def __init__(self, cache=None): + path = os.path.join(os.path.dirname(__file__), "standard_address_space", "standard_address_space.sql") + backend = SQLite3Backend(sqlFile=path, readonly=True) + super(StandardAddressSpaceSQLite, self).__init__(backend, cache) + + def __enter__(self): + self.backend.__enter__() + return super(StandardAddressSpaceSQLite, self).__enter__() + + def __exit__(self, exc_type, exc_value, traceback): + super(StandardAddressSpaceSQLite, self).__exit__(exc_type, exc_value, traceback) + self.backend.__exit__(exc_type, exc_value, traceback) diff --git a/opcua/server/internal_server.py b/opcua/server/internal_server.py index ec5a823f2..bb9cc00b9 100644 --- a/opcua/server/internal_server.py +++ b/opcua/server/internal_server.py @@ -39,7 +39,7 @@ class SessionState(Enum): class InternalServer(object): - def __init__(self, shelffile=None, parent=None, session_cls=None): + def __init__(self, aspace=None, parent=None, session_cls=None): self.logger = logging.getLogger(__name__) self._parent = parent @@ -50,13 +50,14 @@ def __init__(self, shelffile=None, parent=None, session_cls=None): self.disabled_clock = False # for debugging we may want to disable clock that writes too much in log self._local_discovery_service = None # lazy-loading - self.aspace = AddressSpace() + self.aspace = AddressSpace() if aspace is None else aspace self.attribute_service = AttributeService(self.aspace) self.view_service = ViewService(self.aspace) self.method_service = MethodService(self.aspace) self.node_mgt_service = NodeManagementService(self.aspace) - self.load_standard_address_space(shelffile) + if aspace is None: + standard_address_space.fill_address_space(self.node_mgt_service) self.loop = None self.asyncio_transports = [] @@ -66,11 +67,10 @@ def __init__(self, shelffile=None, parent=None, session_cls=None): # create a session to use on server side self.session_cls = session_cls or InternalSession - self.isession = self.session_cls(self, self.aspace, \ + self.isession = self.session_cls(self, \ self.subscription_service, "Internal", user=UserManager.User.Admin) self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) - self._address_space_fixes() self.setup_nodes() @property @@ -100,77 +100,6 @@ def setup_nodes(self): ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray)) ns_node.set_value(uries) - def load_standard_address_space(self, shelffile=None): - if (shelffile is not None) and (os.path.isfile(shelffile) or os.path.isfile(shelffile+".db")): - # import address space from shelf - self.aspace.load_aspace_shelf(shelffile) - else: - # import address space from code generated from xml - standard_address_space.fill_address_space(self.node_mgt_service) - # import address space directly from xml, this has performance impact so disabled - # importer = xmlimporter.XmlImporter(self.node_mgt_service) - # importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml", self) - - # if a cache file was supplied a shelve of the standard address space can now be built for next start up - if shelffile: - self.aspace.make_aspace_shelf(shelffile) - - def _address_space_fixes(self): - """ - Looks like the xml definition of address space has some error. This is a good place to fix them - """ - - it = ua.AddReferencesItem() - it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType) - it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes) - it.IsForward = False - it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder) - it.TargetNodeClass = ua.NodeClass.Object - - it2 = ua.AddReferencesItem() - it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType) - it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes) - it2.IsForward = False - it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder) - it2.TargetNodeClass = ua.NodeClass.Object - - results = self.isession.add_references([it, it2]) - - params = ua.WriteParameters() - for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement, - ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall): - attr = ua.WriteValue() - attr.NodeId = ua.NodeId(nodeid) - attr.AttributeId = ua.AttributeIds.Value - attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good)) - attr.Value.ServerTimestamp = datetime.utcnow() - params.NodesToWrite.append(attr) - result = self.isession.write(params) - result[0].check() - - - def load_address_space(self, path): - """ - Load address space from path - """ - self.aspace.load(path) - - def dump_address_space(self, path): - """ - Dump current address space to path - """ - self.aspace.dump(path) - def start(self): self.logger.info("starting internal server") self.loop = utils.ThreadLoop() @@ -222,7 +151,7 @@ def get_endpoints(self, params=None, sockname=None): return self.endpoints[:] def create_session(self, name, user=UserManager.User.Anonymous, external=False): - return self.session_cls(self, self.aspace, self.subscription_service, name, user=user, external=external) + return self.session_cls(self, self.subscription_service, name, user=user, external=external) def enable_history_data_change(self, node, period=timedelta(days=7), count=0): """ @@ -287,11 +216,10 @@ class InternalSession(object): _counter = 10 _auth_counter = 1000 - def __init__(self, internal_server, aspace, submgr, name, user=UserManager.User.Anonymous, external=False): + def __init__(self, internal_server, submgr, name, user=UserManager.User.Anonymous, external=False): self.logger = logging.getLogger(__name__) self.iserver = internal_server self.external = external # define if session is external, we need to copy some objects if it is internal - self.aspace = aspace self.subscription_service = submgr self.name = name self.user = user @@ -309,6 +237,10 @@ def __init__(self, internal_server, aspace, submgr, name, user=UserManager.User. def user_manager(self): return self.iserver.user_manager + @property + def aspace(self): + return self.iserver.aspace + def __str__(self): return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format( self.name, self.user, self.session_id, self.authentication_token) diff --git a/opcua/server/server.py b/opcua/server/server.py index 33a8d8694..5bc113ad1 100644 --- a/opcua/server/server.py +++ b/opcua/server/server.py @@ -75,7 +75,7 @@ class Server(object): """ - def __init__(self, shelffile=None, iserver=None): + def __init__(self, aspace=None, iserver=None): self.logger = logging.getLogger(__name__) self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/") self._application_uri = "urn:freeopcua:python:server" @@ -87,7 +87,7 @@ def __init__(self, shelffile=None, iserver=None): if iserver is not None: self.iserver = iserver else: - self.iserver = InternalServer(shelffile = shelffile, parent = self) + self.iserver = InternalServer(aspace = aspace, parent = self) self.bserver = None self._policies = [] self.nodes = Shortcuts(self.iserver.isession) diff --git a/opcua/server/standard_address_space/standard_address_space.py b/opcua/server/standard_address_space/standard_address_space.py index 4187c1f83..375b96a2d 100644 --- a/opcua/server/standard_address_space/standard_address_space.py +++ b/opcua/server/standard_address_space/standard_address_space.py @@ -1,7 +1,7 @@ import os.path -import opcua +from opcua import ua from opcua.server.standard_address_space.standard_address_space_part3 import create_standard_address_space_Part3 from opcua.server.standard_address_space.standard_address_space_part4 import create_standard_address_space_Part4 @@ -13,17 +13,17 @@ from opcua.server.standard_address_space.standard_address_space_part13 import create_standard_address_space_Part13 class PostponeReferences(object): - def __init__(self, server): - self.server = server + def __init__(self, nodeservice): + self.nodeservice = nodeservice self.postponed_refs = None self.postponed_nodes = None #self.add_nodes = self.server.add_nodes def add_nodes(self,nodes): - self.postponed_nodes.extend(self.server.try_add_nodes(nodes, check=False)) + self.postponed_nodes.extend(self.nodeservice.try_add_nodes(nodes, check=False)) def add_references(self, refs): - self.postponed_refs.extend(self.server.try_add_references(refs)) + self.postponed_refs.extend(self.nodeservice.try_add_references(refs)) # no return def __enter__(self): @@ -33,11 +33,32 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None and exc_val is None: - remaining_nodes = list(self.server.try_add_nodes(self.postponed_nodes, check=False)) + remaining_nodes = list(self.nodeservice.try_add_nodes(self.postponed_nodes, check=False)) assert len(remaining_nodes) == 0, remaining_nodes - remaining_refs = list(self.server.try_add_references(self.postponed_refs)) + remaining_refs = list(self.nodeservice.try_add_references(self.postponed_refs)) assert len(remaining_refs) == 0, remaining_refs +def _address_space_fixes(nodeservice): + """ + Looks like the xml definition of address space has some error. This is a good place to fix them + """ + + it = ua.AddReferencesItem() + it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType) + it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes) + it.IsForward = False + it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder) + it.TargetNodeClass = ua.NodeClass.Object + + it2 = ua.AddReferencesItem() + it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType) + it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes) + it2.IsForward = False + it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder) + it2.TargetNodeClass = ua.NodeClass.Object + + results = nodeservice.add_references([it, it2]) + def fill_address_space(nodeservice): with PostponeReferences(nodeservice) as server: create_standard_address_space_Part3(server) @@ -48,3 +69,5 @@ def fill_address_space(nodeservice): create_standard_address_space_Part10(server) create_standard_address_space_Part11(server) create_standard_address_space_Part13(server) + + _address_space_fixes(nodeservice) diff --git a/opcua/ua/ua_binary.py b/opcua/ua/ua_binary.py index 019eedb71..64eb7df60 100644 --- a/opcua/ua/ua_binary.py +++ b/opcua/ua/ua_binary.py @@ -125,6 +125,8 @@ def unpack(data): class _Primitive1(object): + PY2 = sys.version_info < (3, 0) + def __init__(self, fmt): self._fmt = fmt st = struct.Struct(fmt.format(1)) @@ -141,15 +143,20 @@ def pack_array(self, data): if data is None: return Primitives.Int32.pack(-1) sizedata = Primitives.Int32.pack(len(data)) - return sizedata + struct.pack(self._fmt.format(len(data)), *data) + fmt = self._str_to_bytes(self._fmt.format(len(data))) + return sizedata + struct.pack(fmt, *data) def unpack_array(self, data, length): if length == -1: return None if length == 0: return () - return struct.unpack(self._fmt.format(length), data.read(self.size * length)) + fmt = self._str_to_bytes(self._fmt.format(length)) + return struct.unpack(fmt, data.read(self.size * length)) + @staticmethod + def _str_to_bytes(_str, PY2=PY2): + return bytes(_str) if PY2 else bytes(_str, 'utf-8') class Primitives1(object): SByte = _Primitive1("<{:d}b") @@ -273,6 +280,8 @@ def to_binary(uatype, val): return getattr(Primitives, uatype).pack(val) elif isinstance(val, (IntEnum, Enum)): return Primitives.UInt32.pack(val.value) + elif isinstance(val, int): + return Primitives.UInt32.pack(val) elif isinstance(val, ua.NodeId): return nodeid_to_binary(val) elif isinstance(val, ua.Variant): diff --git a/schemas/generate_address_space.py b/schemas/generate_address_space.py index e41b802cc..4dc35788e 100644 --- a/schemas/generate_address_space.py +++ b/schemas/generate_address_space.py @@ -318,15 +318,19 @@ def make_refs_code(self, obj, indent): def save_aspace_to_disk(): - import os.path - path = os.path.join("..", "opcua", "binary_address_space.pickle") - print("Savind standard address space to:", path) sys.path.append("..") from opcua.server.standard_address_space import standard_address_space from opcua.server.address_space import NodeManagementService, AddressSpace aspace = AddressSpace() standard_address_space.fill_address_space(NodeManagementService(aspace)) - aspace.dump(path) + # Write in-memory address space to sqlite3 file. + import os.path + path = os.path.join("..", "opcua", "server", "standard_address_space", "standard_address_space.sql") + print("Saving standard address space to:", path) + from opcua.server.address_space_sqlite import AddressSpaceSQLite + from opcua.common.sqlite3_backend import SQLite3Backend + with SQLite3Backend(sqlFile=path, readonly=False) as backend, AddressSpaceSQLite(backend, cache=aspace) as aspace_sql: + aspace_sql.dump(namespaceidx=0) if __name__ == "__main__": logging.basicConfig(level=logging.WARN)