From b446034bd7d47434535d2bc87097a089862a047c Mon Sep 17 00:00:00 2001 From: Luigi Mori Date: Wed, 2 Nov 2016 21:14:48 +0000 Subject: [PATCH] Started implementing #80 Signed-off-by: Luigi Mori --- minemeld/ft/dag.py | 177 ++++++++++++------ tests/test_ft_dag.py | 46 ++--- ..._ip___registered_ip___object___show__0.xml | 12 ++ ..._ip___registered_ip___object___show__0.xml | 11 ++ ...tag___registered_ip___object___show__0.xml | 15 ++ ...usher_op_show_object_registered_ip_all.xml | 23 --- 6 files changed, 184 insertions(+), 100 deletions(-) create mode 100644 tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_1__ip___registered_ip___object___show__0.xml create mode 100644 tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_2__ip___registered_ip___object___show__0.xml create mode 100644 tests/test_ft_dag_devicepusher_op__show__object__registered_ip__tag__entry_name__mmeld_test_____tag___registered_ip___object___show__0.xml delete mode 100644 tests/test_ft_dag_devicepusher_op_show_object_registered_ip_all.xml diff --git a/minemeld/ft/dag.py b/minemeld/ft/dag.py index 16727056..99aff138 100644 --- a/minemeld/ft/dag.py +++ b/minemeld/ft/dag.py @@ -17,11 +17,14 @@ import logging import yaml import netaddr -import gevent -import gevent.queue import os import re import collections +import itertools + +import gevent +import gevent.queue +import gevent.event import pan.xapi @@ -31,11 +34,11 @@ LOG = logging.getLogger(__name__) -SUBRE = re.compile("^[A-Za-z0-9_]") +SUBRE = re.compile("[^A-Za-z0-9_]") class DevicePusher(gevent.Greenlet): - def __init__(self, device, prefix, watermark, attributes): + def __init__(self, device, prefix, watermark, attributes, persistent): super(DevicePusher, self).__init__() self.device = device @@ -51,6 +54,7 @@ def __init__(self, device, prefix, watermark, attributes): self.prefix = prefix self.attributes = attributes self.watermark = watermark + self.persistent = persistent self.q = gevent.queue.Queue() @@ -58,30 +62,41 @@ def put(self, op, address, value): LOG.debug('adding %s:%s to device queue', op, address) self.q.put([op, address, value]) + def _get_registered_ip_tags(self, ip): + self.xapi.op( + cmd='%s' % ip, + vsys=self.device.get('vsys', None), + cmd_xml=False + ) + + entries = self.xapi.element_root.findall('./result/entry') + if entries is None or len(entries) == 0: + return None + + tags = [member.text for member in entries[0].findall('./tag/member') + if member.text and member.text.startswith(self.prefix)] + + return tags + def _get_all_registered_ips(self): + cmd = ( + '' % + (self.prefix, self.watermark) + ) self.xapi.op( - cmd='show object registered-ip all', + cmd=cmd, vsys=self.device.get('vsys', None), - cmd_xml=True + cmd_xml=False ) entries = self.xapi.element_root.findall('./result/entry') if not entries: - return {} + return - addresses = {} for entry in entries: ip = entry.get("ip") - members = entry.findall("./tag/member") - - tags = [member.text for member in members - if member.text and member.text.startswith(self.prefix)] - - if len(tags) > 0: - addresses[ip] = (tags if len(tags) != members else None) - - return addresses + yield ip, self._get_registered_ip_tags(ip) def _dag_message(self, type_, addresses): message = [ @@ -95,7 +110,9 @@ def _dag_message(self, type_, addresses): if addresses is not None and len(addresses) != 0: akeys = sorted(addresses.keys()) for a in akeys: - message.append('' % a) + message.append( + '' % (a, 1 if self.persistent else 0) + ) tags = sorted(addresses[a]) if tags is not None: @@ -111,6 +128,23 @@ def _dag_message(self, type_, addresses): return ''.join(message) + def _user_id(self, cmd=None): + try: + self.xapi.user_id(cmd=cmd) + + except gevent.GreenletExit: + raise + + except pan.xapi.PanXapiError as e: + if 'already exists, ignore' in str(e): + pass + elif 'Failed to register' in str(e): + pass + else: + LOG.exception('XAPI exception in pusher for device %s: %s', + self.device.get('hostname', None), str(e)) + raise + def _tags_from_value(self, value): result = [] @@ -132,7 +166,7 @@ def _tags_from_value(self, value): v = str(value[t]) v = SUBRE.sub('_', v) - tag = '%s%s_%s' % (self.prefix, t, str(value[t])) + tag = '%s%s_%s' % (self.prefix, t, v) result.append(tag) @@ -153,10 +187,10 @@ def _push(self, op, address, value): msg = self._dag_message(op, {address: tags}) - self.xapi.user_id(cmd=msg) + self._user_id(cmd=msg) def _init_resync(self): - ctags = set() + ctags = collections.defaultdict(set) while True: op, address, value = self.q.get() if op == 'EOI': @@ -168,38 +202,54 @@ def _init_resync(self): (self.device.get('hostname', None), op) ) - ctags.add('%s@%s%s' % (address, self.prefix, self.watermark)) + ctags[address].add('%s%s' % (self.prefix, self.watermark)) for t in self._tags_from_value(value): - ctags.add('%s@%s' % (address, t)) + ctags[address].add(t) - regtags = set() - regaddresses = self._get_all_registered_ips() - for a, atags in regaddresses.iteritems(): - if atags is None: - continue - for t in atags: - regtags.add('%s@%s' % (a, t)) - - added = ctags - regtags - removed = regtags - ctags + LOG.debug(ctags) register = collections.defaultdict(list) - for t in added: - a, tag = t.split('@', 1) - register[a].append(tag) - unregister = collections.defaultdict(list) - for t in removed: - a, tag = t.split('@', 1) - unregister[a].append(tag) + for a, atags in self._get_all_registered_ips(): + regtags = set() + if atags is not None: + for t in atags: + regtags.add(t) + + added = ctags[a] - regtags + removed = regtags - ctags[a] + + for t in added: + register[a].append(t) + + for t in removed: + unregister[a].append(t) + + ctags.pop(a) + + for a, atags in ctags.iteritems(): + register[a] = atags + + LOG.debug(register) + LOG.debug(unregister) if len(register) != 0: - rmsg = self._dag_message('register', register) - self.xapi.user_id(cmd=rmsg) + addrs = iter(register) + for i in xrange(0, len(register), 1000): + rmsg = self._dag_message( + 'register', + {k: register[k] for k in itertools.islice(addrs, 1000)} + ) + self._user_id(cmd=rmsg) if len(unregister) != 0: - urmsg = self._dag_message('unregister', unregister) - self.xapi.user_id(cmd=urmsg) + addrs = iter(unregister) + for i in xrange(0, len(unregister), 1000): + urmsg = self._dag_message( + 'unregister', + {k: unregister[k] for k in itertools.islice(addrs, 1000)} + ) + self._user_id(cmd=urmsg) def _run(self): self._init_resync() @@ -214,12 +264,9 @@ def _run(self): break except pan.xapi.PanXapiError as e: - if 'already exists, ignore' not in str(e): - LOG.exception('XAPI exception in pusher for device %s: %s', - self.device.get('hostname', None), str(e)) - raise - else: - self.q.get() + LOG.exception('XAPI exception in pusher for device %s: %s', + self.device.get('hostname', None), str(e)) + raise class DagPusher(base.BaseFT): @@ -233,6 +280,8 @@ def __init__(self, name, chassis, config): self.ageout_glet = None self.last_ageout_run = None + self.hup_event = gevent.event.Event() + super(DagPusher, self).__init__(name, chassis, config) def configure(self): @@ -252,6 +301,10 @@ def configure(self): 'tag_attributes', ['confidence', 'direction'] ) + self.persistent_registered_ips = self.config.get( + 'persistent_registered_ips', + True + ) def _initialize_table(self, truncate=False): self.table = table.Table(self.name, truncate=truncate) @@ -401,7 +454,8 @@ def _spawn_device_pusher(self, device): device, self.tag_prefix, self.tag_watermark, - self.tag_attributes + self.tag_attributes, + self.persistent_registered_ips ) dp.link_exception(self._device_pusher_died) @@ -424,9 +478,10 @@ def _device_pusher_died(self, g): 'respawning in 60 seconds', self.name, g.device['hostname']) - try: - idx = self.device_pushers.index(g.device) - except ValueError: + for idx in range(len(self.device_pushers)): + if self.device_pushers[idx].device == g.device: + break + else: LOG.info('%s - device pusher for %s removed,' + ' respawning aborted', self.name, g.device['hostname']) @@ -463,6 +518,12 @@ def _load_device_list(self): if g.value is None and not g.started: g.start() + def _huppable_wait(self, wait_time): + hup_called = self.hup_event.wait(timeout=wait_time) + if hup_called: + LOG.debug('%s - clearing poll event', self.name) + self.hup_event.clear() + def _device_list_monitor(self): if self.device_list_path is None: LOG.warning('%s - no device_list path configured', self.name) @@ -474,7 +535,7 @@ def _device_list_monitor(self): except: LOG.debug('%s - error checking mtime of %s', self.name, self.device_list_path) - gevent.sleep(10) + self._huppable_wait(5) continue if mtime != self.device_list_mtime: @@ -486,7 +547,7 @@ def _device_list_monitor(self): except: LOG.exception('%s - exception loading device list') - gevent.sleep(5) + self._huppable_wait(5) def mgmtbus_status(self): result = super(DagPusher, self).mgmtbus_status() @@ -525,3 +586,7 @@ def stop(self): if self.ageout_glet is not None: self.ageout_glet.kill() + + def hup(self, source=None): + LOG.info('%s - hup received, reload device list', self.name) + self.hup_event.set() diff --git a/tests/test_ft_dag.py b/tests/test_ft_dag.py index 783c42a9..11c6b5e0 100644 --- a/tests/test_ft_dag.py +++ b/tests/test_ft_dag.py @@ -58,7 +58,7 @@ def gevent_event_mock_factory(): return result -def device_pusher_mock_factory(device, prefix, watermark, attributes): +def device_pusher_mock_factory(device, prefix, watermark, attributes, persistence): def _start_se(x): x.started = True @@ -94,11 +94,12 @@ def tearDown(self): @mock.patch.object(gevent, 'spawn') @mock.patch.object(gevent, 'spawn_later') @mock.patch.object(gevent, 'sleep', side_effect=gevent.GreenletExit()) + @mock.patch.object(minemeld.ft.dag.DagPusher, '_huppable_wait', side_effect=gevent.GreenletExit()) @mock.patch('gevent.event.Event', side_effect=gevent_event_mock_factory) @mock.patch.object(calendar, 'timegm', side_effect=logical_millisec) @mock.patch('minemeld.ft.dag.DevicePusher', side_effect=device_pusher_mock_factory) - def test_device_list_load(self, dp_mock, timegm_mock, event_mock, + def test_device_list_load(self, dp_mock, timegm_mock, event_mock, hw_mock, sleep_mock, spawnl_mock, spawn_mock): device_list_path = os.path.join(MYDIR, 'test_device_list.yml') device_list_path2 = os.path.join(MYDIR, 'test_device_list2.yml') @@ -140,7 +141,7 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock, a._device_list_monitor() except gevent.GreenletExit: pass - sleep_mock.assert_called_with(5) + hw_mock.assert_called_with(5) self.assertEqual(len(a.devices), len(dlist)) self.assertEqual(len(a.device_pushers), len(dlist)) self.assertEqual(dp_mock.call_count, len(dlist)) @@ -154,14 +155,14 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock, GEVENT_SLEEP(1) shutil.copyfile(device_list_path, DLIST_NAME) - sleep_mock.reset_mock() + hw_mock.reset_mock() dp_mock.reset_mock() try: a._device_list_monitor() except gevent.GreenletExit: pass - sleep_mock.assert_called_with(5) + hw_mock.assert_called_with(5) self.assertEqual(len(a.devices), len(dlist)) self.assertEqual(len(a.device_pushers), len(dlist)) self.assertEqual(dp_mock.call_count, 0) @@ -175,14 +176,14 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock, GEVENT_SLEEP(1) shutil.copyfile(device_list_path2, DLIST_NAME) - sleep_mock.reset_mock() + hw_mock.reset_mock() dp_mock.reset_mock() try: a._device_list_monitor() except gevent.GreenletExit: pass - sleep_mock.assert_called_with(5) + hw_mock.assert_called_with(5) self.assertEqual(len(a.devices), len(dlist2)) self.assertEqual(len(a.device_pushers), len(dlist2)) self.assertEqual(dp_mock.call_count, 1) @@ -534,14 +535,15 @@ def test_unicast2(self, dp_mock, timegm_mock, event_mock, @mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory) def test_devicepusher_dag_message(self, panxapi_mock): - RESULT_REG = '1.0updateab' - RESULT_UNREG = '1.0updateab' + RESULT_REG = '1.0updateab' + RESULT_UNREG = '1.0updateab' dp = minemeld.ft.dag.DevicePusher( {'tag': 'test'}, 'mmeld_', 'test', - [] + [], + False ) reg = dp._dag_message('register', {'192.168.1.1': ['a', 'b']}) @@ -556,7 +558,8 @@ def test_devicepusher_tags_from_value(self, panxapi_mock): {'tag': 'test'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) tags = dp._tags_from_value({'confidence': 49, 'direction': 'inbound'}) @@ -574,14 +577,13 @@ def test_devicepusher_get_all_registered_ips(self, panxapi_mock): {'hostname': 'test_ft_dag_devicepusher'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) result = dp._get_all_registered_ips() - self.assertEqual(result, { - '192.168.1.1': ['mmeld_pushed', 'mmeld_confidence_100'], - '192.168.1.2': ['mmeld_confidence_100'] - }) + self.assertEqual(next(result), ('192.168.1.1', ['mmeld_test', 'mmeld_confidence_100', 'mmeld_pushed'])) + self.assertEqual(next(result), ('192.168.1.2', ['mmeld_test', 'mmeld_confidence_100'])) @mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory) def test_devicepusher_push(self, panxapi_mock): @@ -589,13 +591,14 @@ def test_devicepusher_push(self, panxapi_mock): {'hostname': 'test_ft_dag_devicepusher'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) dp._push('register', '192.168.1.10', {'confidence': 40, 'direction': 'inbound'}) self.assertEqual( dp.xapi.user_id_calls[0], - '1.0updatemmeld_confidence_lowmmeld_direction_inboundmmeld_test' + '1.0updatemmeld_confidence_lowmmeld_direction_inboundmmeld_test' ) @mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory) @@ -604,7 +607,8 @@ def test_devicepusher_init_resync(self, panxapi_mock): {'hostname': 'test_ft_dag_devicepusher'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) dp.put('init', '192.168.1.1', {'confidence': 75, 'direction': 'inbound'}) @@ -614,9 +618,9 @@ def test_devicepusher_init_resync(self, panxapi_mock): self.assertEqual( dp.xapi.user_id_calls[0], - '1.0updatemmeld_confidence_highmmeld_direction_inboundmmeld_testmmeld_confidence_highmmeld_direction_unknownmmeld_test' + '1.0updatemmeld_confidence_highmmeld_direction_inboundmmeld_confidence_highmmeld_direction_unknownmmeld_test' ) self.assertEqual( dp.xapi.user_id_calls[1], - '1.0updatemmeld_confidence_100mmeld_pushedmmeld_confidence_100' + '1.0updatemmeld_confidence_100mmeld_pushedmmeld_confidence_100mmeld_test' ) diff --git a/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_1__ip___registered_ip___object___show__0.xml b/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_1__ip___registered_ip___object___show__0.xml new file mode 100644 index 00000000..56dab968 --- /dev/null +++ b/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_1__ip___registered_ip___object___show__0.xml @@ -0,0 +1,12 @@ + + + + + mmeld_test + mmeld_confidence_100 + mmeld_pushed + + + 1 + + diff --git a/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_2__ip___registered_ip___object___show__0.xml b/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_2__ip___registered_ip___object___show__0.xml new file mode 100644 index 00000000..958d6fff --- /dev/null +++ b/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__ip_192_168_1_2__ip___registered_ip___object___show__0.xml @@ -0,0 +1,11 @@ + + + + + mmeld_test + mmeld_confidence_100 + + + 1 + + diff --git a/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__tag__entry_name__mmeld_test_____tag___registered_ip___object___show__0.xml b/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__tag__entry_name__mmeld_test_____tag___registered_ip___object___show__0.xml new file mode 100644 index 00000000..2c4ab53e --- /dev/null +++ b/tests/test_ft_dag_devicepusher_op__show__object__registered_ip__tag__entry_name__mmeld_test_____tag___registered_ip___object___show__0.xml @@ -0,0 +1,15 @@ + + + + + mmeld_test + + + + + mmeld_test + + + 2 + + diff --git a/tests/test_ft_dag_devicepusher_op_show_object_registered_ip_all.xml b/tests/test_ft_dag_devicepusher_op_show_object_registered_ip_all.xml deleted file mode 100644 index e47845c3..00000000 --- a/tests/test_ft_dag_devicepusher_op_show_object_registered_ip_all.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - - - mmeld_pushed - mmeld_confidence_100 - - - - - non_mmeld - mmeld_confidence_100 - - - - - non_mmeld - non_mmeld2 - - - 2 - -