diff --git a/minemeld/ft/dag.py b/minemeld/ft/dag.py index 16727056..f23bf25d 100644 --- a/minemeld/ft/dag.py +++ b/minemeld/ft/dag.py @@ -23,6 +23,8 @@ import re import collections +import gevent.event + import pan.xapi from . import base @@ -31,11 +33,11 @@ LOG = logging.getLogger(__name__) -SUBRE = re.compile("^[A-Za-z0-9_]") +SUBRE = re.compile("[^A-Za-z0-9_]") class DevicePusher(gevent.Greenlet): - def __init__(self, device, prefix, watermark, attributes): + def __init__(self, device, prefix, watermark, attributes, persistent): super(DevicePusher, self).__init__() self.device = device @@ -51,6 +53,7 @@ def __init__(self, device, prefix, watermark, attributes): self.prefix = prefix self.attributes = attributes self.watermark = watermark + self.persistent = persistent self.q = gevent.queue.Queue() @@ -60,7 +63,7 @@ def put(self, op, address, value): def _get_all_registered_ips(self): self.xapi.op( - cmd='show object registered-ip all', + cmd='show object registered-ip tag %s%s' % (self.prefix, self.watermark), vsys=self.device.get('vsys', None), cmd_xml=True ) @@ -95,7 +98,9 @@ def _dag_message(self, type_, addresses): if addresses is not None and len(addresses) != 0: akeys = sorted(addresses.keys()) for a in akeys: - message.append('' % a) + message.append( + '' % (a, 1 if self.persistent else 0) + ) tags = sorted(addresses[a]) if tags is not None: @@ -132,7 +137,7 @@ def _tags_from_value(self, value): v = str(value[t]) v = SUBRE.sub('_', v) - tag = '%s%s_%s' % (self.prefix, t, str(value[t])) + tag = '%s%s_%s' % (self.prefix, t, v) result.append(tag) @@ -233,6 +238,8 @@ def __init__(self, name, chassis, config): self.ageout_glet = None self.last_ageout_run = None + self.hup_event = gevent.event.Event() + super(DagPusher, self).__init__(name, chassis, config) def configure(self): @@ -252,6 +259,10 @@ def configure(self): 'tag_attributes', ['confidence', 'direction'] ) + self.persistent_registered_ips = self.config.get( + 'persistent_registered_ips', + False + ) def _initialize_table(self, truncate=False): self.table = table.Table(self.name, truncate=truncate) @@ -401,7 +412,8 @@ def _spawn_device_pusher(self, device): device, self.tag_prefix, self.tag_watermark, - self.tag_attributes + self.tag_attributes, + self.persistent_registered_ips ) dp.link_exception(self._device_pusher_died) @@ -424,9 +436,10 @@ def _device_pusher_died(self, g): 'respawning in 60 seconds', self.name, g.device['hostname']) - try: - idx = self.device_pushers.index(g.device) - except ValueError: + for idx in range(len(self.device_pushers)): + if self.device_pushers[idx].device == g.device: + break + else: LOG.info('%s - device pusher for %s removed,' + ' respawning aborted', self.name, g.device['hostname']) @@ -463,6 +476,12 @@ def _load_device_list(self): if g.value is None and not g.started: g.start() + def _huppable_wait(self, wait_time): + hup_called = self.hup_event.wait(timeout=wait_time) + if hup_called: + LOG.debug('%s - clearing poll event', self.name) + self.hup_event.clear() + def _device_list_monitor(self): if self.device_list_path is None: LOG.warning('%s - no device_list path configured', self.name) @@ -474,7 +493,7 @@ def _device_list_monitor(self): except: LOG.debug('%s - error checking mtime of %s', self.name, self.device_list_path) - gevent.sleep(10) + self._huppable_wait(5) continue if mtime != self.device_list_mtime: @@ -486,7 +505,7 @@ def _device_list_monitor(self): except: LOG.exception('%s - exception loading device list') - gevent.sleep(5) + self._huppable_wait(5) def mgmtbus_status(self): result = super(DagPusher, self).mgmtbus_status() @@ -525,3 +544,7 @@ def stop(self): if self.ageout_glet is not None: self.ageout_glet.kill() + + def hup(self, source=None): + LOG.info('%s - hup received, reload device list', self.name) + self.hup_event.set() diff --git a/tests/test_ft_dag.py b/tests/test_ft_dag.py index 783c42a9..09ed28a0 100644 --- a/tests/test_ft_dag.py +++ b/tests/test_ft_dag.py @@ -58,7 +58,7 @@ def gevent_event_mock_factory(): return result -def device_pusher_mock_factory(device, prefix, watermark, attributes): +def device_pusher_mock_factory(device, prefix, watermark, attributes, persistence): def _start_se(x): x.started = True @@ -94,11 +94,12 @@ def tearDown(self): @mock.patch.object(gevent, 'spawn') @mock.patch.object(gevent, 'spawn_later') @mock.patch.object(gevent, 'sleep', side_effect=gevent.GreenletExit()) + @mock.patch.object(minemeld.ft.dag.DagPusher, '_huppable_wait', side_effect=gevent.GreenletExit()) @mock.patch('gevent.event.Event', side_effect=gevent_event_mock_factory) @mock.patch.object(calendar, 'timegm', side_effect=logical_millisec) @mock.patch('minemeld.ft.dag.DevicePusher', side_effect=device_pusher_mock_factory) - def test_device_list_load(self, dp_mock, timegm_mock, event_mock, + def test_device_list_load(self, dp_mock, timegm_mock, event_mock, hw_mock, sleep_mock, spawnl_mock, spawn_mock): device_list_path = os.path.join(MYDIR, 'test_device_list.yml') device_list_path2 = os.path.join(MYDIR, 'test_device_list2.yml') @@ -140,7 +141,7 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock, a._device_list_monitor() except gevent.GreenletExit: pass - sleep_mock.assert_called_with(5) + hw_mock.assert_called_with(5) self.assertEqual(len(a.devices), len(dlist)) self.assertEqual(len(a.device_pushers), len(dlist)) self.assertEqual(dp_mock.call_count, len(dlist)) @@ -154,14 +155,14 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock, GEVENT_SLEEP(1) shutil.copyfile(device_list_path, DLIST_NAME) - sleep_mock.reset_mock() + hw_mock.reset_mock() dp_mock.reset_mock() try: a._device_list_monitor() except gevent.GreenletExit: pass - sleep_mock.assert_called_with(5) + hw_mock.assert_called_with(5) self.assertEqual(len(a.devices), len(dlist)) self.assertEqual(len(a.device_pushers), len(dlist)) self.assertEqual(dp_mock.call_count, 0) @@ -175,14 +176,14 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock, GEVENT_SLEEP(1) shutil.copyfile(device_list_path2, DLIST_NAME) - sleep_mock.reset_mock() + hw_mock.reset_mock() dp_mock.reset_mock() try: a._device_list_monitor() except gevent.GreenletExit: pass - sleep_mock.assert_called_with(5) + hw_mock.assert_called_with(5) self.assertEqual(len(a.devices), len(dlist2)) self.assertEqual(len(a.device_pushers), len(dlist2)) self.assertEqual(dp_mock.call_count, 1) @@ -534,14 +535,15 @@ def test_unicast2(self, dp_mock, timegm_mock, event_mock, @mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory) def test_devicepusher_dag_message(self, panxapi_mock): - RESULT_REG = '1.0updateab' - RESULT_UNREG = '1.0updateab' + RESULT_REG = '1.0updateab' + RESULT_UNREG = '1.0updateab' dp = minemeld.ft.dag.DevicePusher( {'tag': 'test'}, 'mmeld_', 'test', - [] + [], + False ) reg = dp._dag_message('register', {'192.168.1.1': ['a', 'b']}) @@ -556,7 +558,8 @@ def test_devicepusher_tags_from_value(self, panxapi_mock): {'tag': 'test'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) tags = dp._tags_from_value({'confidence': 49, 'direction': 'inbound'}) @@ -574,7 +577,8 @@ def test_devicepusher_get_all_registered_ips(self, panxapi_mock): {'hostname': 'test_ft_dag_devicepusher'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) result = dp._get_all_registered_ips() @@ -589,13 +593,14 @@ def test_devicepusher_push(self, panxapi_mock): {'hostname': 'test_ft_dag_devicepusher'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) dp._push('register', '192.168.1.10', {'confidence': 40, 'direction': 'inbound'}) self.assertEqual( dp.xapi.user_id_calls[0], - '1.0updatemmeld_confidence_lowmmeld_direction_inboundmmeld_test' + '1.0updatemmeld_confidence_lowmmeld_direction_inboundmmeld_test' ) @mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory) @@ -604,7 +609,8 @@ def test_devicepusher_init_resync(self, panxapi_mock): {'hostname': 'test_ft_dag_devicepusher'}, 'mmeld_', 'test', - ['confidence', 'direction'] + ['confidence', 'direction'], + False ) dp.put('init', '192.168.1.1', {'confidence': 75, 'direction': 'inbound'}) @@ -614,9 +620,9 @@ def test_devicepusher_init_resync(self, panxapi_mock): self.assertEqual( dp.xapi.user_id_calls[0], - '1.0updatemmeld_confidence_highmmeld_direction_inboundmmeld_testmmeld_confidence_highmmeld_direction_unknownmmeld_test' + '1.0updatemmeld_confidence_highmmeld_direction_inboundmmeld_testmmeld_confidence_highmmeld_direction_unknownmmeld_test' ) self.assertEqual( dp.xapi.user_id_calls[1], - '1.0updatemmeld_confidence_100mmeld_pushedmmeld_confidence_100' + '1.0updatemmeld_confidence_100mmeld_pushedmmeld_confidence_100' ) diff --git a/tests/test_ft_dag_devicepusher_op_show_object_registered_ip_all.xml b/tests/test_ft_dag_devicepusher_op_show_object_registered_ip_tag_mmeld_test.xml similarity index 100% rename from tests/test_ft_dag_devicepusher_op_show_object_registered_ip_all.xml rename to tests/test_ft_dag_devicepusher_op_show_object_registered_ip_tag_mmeld_test.xml