From 9be15c07873d9513f3c460afd1a67046f585973b Mon Sep 17 00:00:00 2001 From: David Randler Date: Fri, 20 Apr 2018 13:53:53 +0200 Subject: [PATCH 1/5] Always trigger datachange callbacks --- opcua/server/address_space.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/opcua/server/address_space.py b/opcua/server/address_space.py index eb94e0646..00874d539 100644 --- a/opcua/server/address_space.py +++ b/opcua/server/address_space.py @@ -640,9 +640,7 @@ def set_attribute_value(self, nodeid, attr, value): attval = node.attributes[attr] old = attval.value attval.value = value - cbs = [] - if old.Value != value.Value: # only send call callback when a value change has happend - cbs = list(attval.datachange_callbacks.items()) + cbs = list(attval.datachange_callbacks.items()) for k, v in cbs: try: From 73e712df55a80c73a99f403eacf3bc3cc408a8e6 Mon Sep 17 00:00:00 2001 From: David Randler Date: Fri, 20 Apr 2018 13:55:38 +0200 Subject: [PATCH 2/5] Filter datachange callbacks according to DataChangeTrigger --- opcua/server/internal_subscription.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/opcua/server/internal_subscription.py b/opcua/server/internal_subscription.py index 96620632b..6e97ae0e0 100644 --- a/opcua/server/internal_subscription.py +++ b/opcua/server/internal_subscription.py @@ -197,9 +197,11 @@ def datachange_callback(self, handle, value, error=None): mdata.mvalue.set_current_value(value.Value.Value) if mdata.filter: deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter) + trigger_flag_pass = self.datachange_trigger_callback(mdata.mvalue, mdata.filter.Trigger) else: deadband_flag_pass = True - if deadband_flag_pass: + trigger_flag_pass = self.datachange_trigger_callback(mdata.mvalue, ua.DataChangeTrigger.StatusValue) + if deadband_flag_pass and trigger_flag_pass: event.ClientHandle = mdata.client_handle event.Value = value self.isub.enqueue_datachange_event(mid, event, mdata.queue_size) @@ -216,6 +218,13 @@ def deadband_callback(self, values, flt): else: return False + def datachange_trigger_callback(self, values, trigger): + if trigger == ua.DataChangeTrigger.StatusValue and \ + values.get_current_value() == values.get_old_value(): + return False + else: + return True + def trigger_event(self, event): with self._lock: if event.SourceNode not in self._monitored_events: From f6fee1f8af8f1017cb2d382d94f4a40e926ccec9 Mon Sep 17 00:00:00 2001 From: David Randler Date: Mon, 23 Apr 2018 14:03:52 +0200 Subject: [PATCH 3/5] Add tests for subscriptions with timestamp trigger --- opcua/common/subscription.py | 11 +++++++++ tests/tests_subscriptions.py | 46 ++++++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/opcua/common/subscription.py b/opcua/common/subscription.py index 0264256f5..11b5e9cdc 100644 --- a/opcua/common/subscription.py +++ b/opcua/common/subscription.py @@ -177,6 +177,17 @@ def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value): """ return self._subscribe(nodes, attr, queuesize=0) + def subscribe_data_timestamp_change(self, nodes, attr=ua.AttributeIds.Value): + """ + Subscribe for data and timestamp change events for a node or list of nodes. + default attribute is Value. + Return a handle which can be used to unsubscribe + If more control is necessary use create_monitored_items method + """ + timestamp_filter = ua.DataChangeFilter() + timestamp_filter.Trigger = ua.DataChangeTrigger(2) # send notification when status, value or timestamp change + return self._subscribe(nodes, attr, mfilter=timestamp_filter, queuesize=0) + def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None, queuesize=0): """ Subscribe to events from a node. Default node is Server node. diff --git a/tests/tests_subscriptions.py b/tests/tests_subscriptions.py index 818d935b7..e65ed462f 100644 --- a/tests/tests_subscriptions.py +++ b/tests/tests_subscriptions.py @@ -157,6 +157,48 @@ def test_subscription_count_empty(self): self.assertEqual(myhandler.datachange_count, 4) sub.delete() + def test_subscription_count_timestamp_trigger(self): + myhandler = MySubHandlerCounter() + sub = self.opc.create_subscription(1, myhandler) + o = self.opc.get_objects_node() + var = o.add_variable(3, 'SubVarCounter', 0.1) + sub.subscribe_data_timestamp_change(var) + nb = 12 + for i in range(nb): + val = var.get_value() + var.set_value(val +1) + time.sleep(0.2) # let last event arrive + self.assertEqual(myhandler.datachange_count, nb + 1) + sub.delete() + + def test_subscription_count_timestamp_trigger_no_change(self): + myhandler = MySubHandlerCounter() + sub = self.opc.create_subscription(1, myhandler) + o = self.opc.get_objects_node() + var = o.add_variable(3, 'SubVarCounter', 0.1) + sub.subscribe_data_timestamp_change(var) + nb = 12 + for i in range(nb): + val = var.get_value() + var.set_value(val) + time.sleep(0.2) # let last event arrive + self.assertEqual(myhandler.datachange_count, nb + 1) + sub.delete() + + def test_subscription_count_timestamp_trigger_same_timestamp(self): + myhandler = MySubHandlerCounter() + sub = self.opc.create_subscription(1, myhandler) + o = self.opc.get_objects_node() + var = o.add_variable(3, 'SubVarCounter', 0.1) + sub.subscribe_data_timestamp_change(var) + nb = 12 + for i in range(nb): + val = var.get_data_value() # use exactly the same value including timestamps + var.set_value(val) + time.sleep(0.2) # let last event arrive + self.assertEqual(myhandler.datachange_count, 1) + sub.delete() + def test_subscription_overload_simple(self): nb = 10 myhandler = MySubHandler() @@ -502,7 +544,7 @@ def test_several_different_events(self): propertystring2 = "This is my test 2" evgen2.event.PropertyNum = propertynum2 evgen2.event.PropertyString = propertystring2 - + for i in range(3): evgen1.trigger() evgen2.trigger() @@ -560,7 +602,7 @@ def test_several_different_events_2(self): propertystring3 = "This is my test 3" evgen3.event.PropertyNum3 = propertynum3 evgen3.event.PropertyString = propertystring2 - + for i in range(3): evgen1.trigger() evgen2.trigger() From 7b67d700b5b6d9c362b10a424e6638e95bda95f4 Mon Sep 17 00:00:00 2001 From: David Randler Date: Mon, 23 Apr 2018 14:09:05 +0200 Subject: [PATCH 4/5] Filter value updates with the exact same timestamps --- opcua/server/address_space.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/opcua/server/address_space.py b/opcua/server/address_space.py index 00874d539..d1b66b66b 100644 --- a/opcua/server/address_space.py +++ b/opcua/server/address_space.py @@ -640,7 +640,9 @@ def set_attribute_value(self, nodeid, attr, value): attval = node.attributes[attr] old = attval.value attval.value = value - cbs = list(attval.datachange_callbacks.items()) + cbs = [] + if old.SourceTimestamp != value.SourceTimestamp or old.ServerTimestamp != value.ServerTimestamp: + cbs = list(attval.datachange_callbacks.items()) for k, v in cbs: try: From 7ddc8931457db345f84eb602bc866b9171a552aa Mon Sep 17 00:00:00 2001 From: David Randler Date: Mon, 23 Apr 2018 14:11:20 +0200 Subject: [PATCH 5/5] Update uasubscribe tool with option to subscribe to timestamp changes --- opcua/tools.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/opcua/tools.py b/opcua/tools.py index f9e761fe1..08e9f069f 100644 --- a/opcua/tools.py +++ b/opcua/tools.py @@ -357,11 +357,11 @@ def uasubscribe(): "--eventtype", dest="eventtype", default="datachange", - choices=['datachange', 'event'], + choices=['datachange', 'timeordatachange', 'event'], help="Event type to subscribe to") args = parse_args(parser, requirenodeid=False) - if args.eventtype == "datachange": + if args.eventtype == "datachange" or args.eventtype == "timeordatachange": _require_nodeid(parser, args) else: # FIXME: this is broken, someone may have written i=84 on purpose @@ -377,6 +377,8 @@ def uasubscribe(): sub = client.create_subscription(500, handler) if args.eventtype == "datachange": sub.subscribe_data_change(node) + elif args.eventtype == "timeordatachange": + sub.subscribe_data_timestamp_change(node) else: sub.subscribe_events(node) print("Type Ctr-C to exit")