Skip to content

Commit

Permalink
Started implementing PaloAltoNetworks#80
Browse files Browse the repository at this point in the history
Signed-off-by: Luigi Mori <[email protected]>
  • Loading branch information
jtschichold committed Nov 3, 2016
1 parent e803ee0 commit 9c7a360
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 28 deletions.
45 changes: 34 additions & 11 deletions minemeld/ft/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import re
import collections

import gevent.event

import pan.xapi

from . import base
Expand All @@ -31,11 +33,11 @@

LOG = logging.getLogger(__name__)

SUBRE = re.compile("^[A-Za-z0-9_]")
SUBRE = re.compile("[^A-Za-z0-9_]")


class DevicePusher(gevent.Greenlet):
def __init__(self, device, prefix, watermark, attributes):
def __init__(self, device, prefix, watermark, attributes, persistent):
super(DevicePusher, self).__init__()

self.device = device
Expand All @@ -51,6 +53,7 @@ def __init__(self, device, prefix, watermark, attributes):
self.prefix = prefix
self.attributes = attributes
self.watermark = watermark
self.persistent = persistent

self.q = gevent.queue.Queue()

Expand All @@ -60,7 +63,7 @@ def put(self, op, address, value):

def _get_all_registered_ips(self):
self.xapi.op(
cmd='show object registered-ip all',
cmd='show object registered-ip tag %s%s' % (self.prefix, self.watermark),
vsys=self.device.get('vsys', None),
cmd_xml=True
)
Expand Down Expand Up @@ -95,7 +98,9 @@ def _dag_message(self, type_, addresses):
if addresses is not None and len(addresses) != 0:
akeys = sorted(addresses.keys())
for a in akeys:
message.append('<entry ip="%s">' % a)
message.append(
'<entry ip="%s" persistent="%d">' % (a, 1 if self.persistent else 0)
)

tags = sorted(addresses[a])
if tags is not None:
Expand Down Expand Up @@ -132,7 +137,7 @@ def _tags_from_value(self, value):
v = str(value[t])
v = SUBRE.sub('_', v)

tag = '%s%s_%s' % (self.prefix, t, str(value[t]))
tag = '%s%s_%s' % (self.prefix, t, v)

result.append(tag)

Expand Down Expand Up @@ -233,6 +238,8 @@ def __init__(self, name, chassis, config):
self.ageout_glet = None
self.last_ageout_run = None

self.hup_event = gevent.event.Event()

super(DagPusher, self).__init__(name, chassis, config)

def configure(self):
Expand All @@ -252,6 +259,10 @@ def configure(self):
'tag_attributes',
['confidence', 'direction']
)
self.persistent_registered_ips = self.config.get(
'persistent_registered_ips',
False
)

def _initialize_table(self, truncate=False):
self.table = table.Table(self.name, truncate=truncate)
Expand Down Expand Up @@ -401,7 +412,8 @@ def _spawn_device_pusher(self, device):
device,
self.tag_prefix,
self.tag_watermark,
self.tag_attributes
self.tag_attributes,
self.persistent_registered_ips
)
dp.link_exception(self._device_pusher_died)

Expand All @@ -424,9 +436,10 @@ def _device_pusher_died(self, g):
'respawning in 60 seconds',
self.name, g.device['hostname'])

try:
idx = self.device_pushers.index(g.device)
except ValueError:
for idx in range(len(self.device_pushers)):
if self.device_pushers[idx].device == g.device:
break
else:
LOG.info('%s - device pusher for %s removed,' +
' respawning aborted',
self.name, g.device['hostname'])
Expand Down Expand Up @@ -463,6 +476,12 @@ def _load_device_list(self):
if g.value is None and not g.started:
g.start()

def _huppable_wait(self, wait_time):
hup_called = self.hup_event.wait(timeout=wait_time)
if hup_called:
LOG.debug('%s - clearing poll event', self.name)
self.hup_event.clear()

def _device_list_monitor(self):
if self.device_list_path is None:
LOG.warning('%s - no device_list path configured', self.name)
Expand All @@ -474,7 +493,7 @@ def _device_list_monitor(self):
except:
LOG.debug('%s - error checking mtime of %s',
self.name, self.device_list_path)
gevent.sleep(10)
self._huppable_wait(5)
continue

if mtime != self.device_list_mtime:
Expand All @@ -486,7 +505,7 @@ def _device_list_monitor(self):
except:
LOG.exception('%s - exception loading device list')

gevent.sleep(5)
self._huppable_wait(5)

def mgmtbus_status(self):
result = super(DagPusher, self).mgmtbus_status()
Expand Down Expand Up @@ -525,3 +544,7 @@ def stop(self):

if self.ageout_glet is not None:
self.ageout_glet.kill()

def hup(self, source=None):
LOG.info('%s - hup received, reload device list', self.name)
self.hup_event.set()
40 changes: 23 additions & 17 deletions tests/test_ft_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def gevent_event_mock_factory():
return result


def device_pusher_mock_factory(device, prefix, watermark, attributes):
def device_pusher_mock_factory(device, prefix, watermark, attributes, persistence):
def _start_se(x):
x.started = True

Expand Down Expand Up @@ -94,11 +94,12 @@ def tearDown(self):
@mock.patch.object(gevent, 'spawn')
@mock.patch.object(gevent, 'spawn_later')
@mock.patch.object(gevent, 'sleep', side_effect=gevent.GreenletExit())
@mock.patch.object(minemeld.ft.dag.DagPusher, '_huppable_wait', side_effect=gevent.GreenletExit())
@mock.patch('gevent.event.Event', side_effect=gevent_event_mock_factory)
@mock.patch.object(calendar, 'timegm', side_effect=logical_millisec)
@mock.patch('minemeld.ft.dag.DevicePusher',
side_effect=device_pusher_mock_factory)
def test_device_list_load(self, dp_mock, timegm_mock, event_mock,
def test_device_list_load(self, dp_mock, timegm_mock, event_mock, hw_mock,
sleep_mock, spawnl_mock, spawn_mock):
device_list_path = os.path.join(MYDIR, 'test_device_list.yml')
device_list_path2 = os.path.join(MYDIR, 'test_device_list2.yml')
Expand Down Expand Up @@ -140,7 +141,7 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock,
a._device_list_monitor()
except gevent.GreenletExit:
pass
sleep_mock.assert_called_with(5)
hw_mock.assert_called_with(5)
self.assertEqual(len(a.devices), len(dlist))
self.assertEqual(len(a.device_pushers), len(dlist))
self.assertEqual(dp_mock.call_count, len(dlist))
Expand All @@ -154,14 +155,14 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock,
GEVENT_SLEEP(1)
shutil.copyfile(device_list_path, DLIST_NAME)

sleep_mock.reset_mock()
hw_mock.reset_mock()
dp_mock.reset_mock()

try:
a._device_list_monitor()
except gevent.GreenletExit:
pass
sleep_mock.assert_called_with(5)
hw_mock.assert_called_with(5)
self.assertEqual(len(a.devices), len(dlist))
self.assertEqual(len(a.device_pushers), len(dlist))
self.assertEqual(dp_mock.call_count, 0)
Expand All @@ -175,14 +176,14 @@ def test_device_list_load(self, dp_mock, timegm_mock, event_mock,
GEVENT_SLEEP(1)
shutil.copyfile(device_list_path2, DLIST_NAME)

sleep_mock.reset_mock()
hw_mock.reset_mock()
dp_mock.reset_mock()

try:
a._device_list_monitor()
except gevent.GreenletExit:
pass
sleep_mock.assert_called_with(5)
hw_mock.assert_called_with(5)
self.assertEqual(len(a.devices), len(dlist2))
self.assertEqual(len(a.device_pushers), len(dlist2))
self.assertEqual(dp_mock.call_count, 1)
Expand Down Expand Up @@ -534,14 +535,15 @@ def test_unicast2(self, dp_mock, timegm_mock, event_mock,

@mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory)
def test_devicepusher_dag_message(self, panxapi_mock):
RESULT_REG = '<uid-message><version>1.0</version><type>update</type><payload><register><entry ip="192.168.1.1"><tag><member>a</member><member>b</member></tag></entry></register></payload></uid-message>'
RESULT_UNREG = '<uid-message><version>1.0</version><type>update</type><payload><unregister><entry ip="192.168.1.1"><tag><member>a</member><member>b</member></tag></entry></unregister></payload></uid-message>'
RESULT_REG = '<uid-message><version>1.0</version><type>update</type><payload><register><entry ip="192.168.1.1" persistent="0"><tag><member>a</member><member>b</member></tag></entry></register></payload></uid-message>'
RESULT_UNREG = '<uid-message><version>1.0</version><type>update</type><payload><unregister><entry ip="192.168.1.1" persistent="0"><tag><member>a</member><member>b</member></tag></entry></unregister></payload></uid-message>'

dp = minemeld.ft.dag.DevicePusher(
{'tag': 'test'},
'mmeld_',
'test',
[]
[],
False
)

reg = dp._dag_message('register', {'192.168.1.1': ['a', 'b']})
Expand All @@ -556,7 +558,8 @@ def test_devicepusher_tags_from_value(self, panxapi_mock):
{'tag': 'test'},
'mmeld_',
'test',
['confidence', 'direction']
['confidence', 'direction'],
False
)

tags = dp._tags_from_value({'confidence': 49, 'direction': 'inbound'})
Expand All @@ -574,7 +577,8 @@ def test_devicepusher_get_all_registered_ips(self, panxapi_mock):
{'hostname': 'test_ft_dag_devicepusher'},
'mmeld_',
'test',
['confidence', 'direction']
['confidence', 'direction'],
False
)

result = dp._get_all_registered_ips()
Expand All @@ -589,13 +593,14 @@ def test_devicepusher_push(self, panxapi_mock):
{'hostname': 'test_ft_dag_devicepusher'},
'mmeld_',
'test',
['confidence', 'direction']
['confidence', 'direction'],
False
)

dp._push('register', '192.168.1.10', {'confidence': 40, 'direction': 'inbound'})
self.assertEqual(
dp.xapi.user_id_calls[0],
'<uid-message><version>1.0</version><type>update</type><payload><register><entry ip="192.168.1.10"><tag><member>mmeld_confidence_low</member><member>mmeld_direction_inbound</member><member>mmeld_test</member></tag></entry></register></payload></uid-message>'
'<uid-message><version>1.0</version><type>update</type><payload><register><entry ip="192.168.1.10" persistent="0"><tag><member>mmeld_confidence_low</member><member>mmeld_direction_inbound</member><member>mmeld_test</member></tag></entry></register></payload></uid-message>'
)

@mock.patch.object(pan.xapi, 'PanXapi', side_effect=panos_mock.factory)
Expand All @@ -604,7 +609,8 @@ def test_devicepusher_init_resync(self, panxapi_mock):
{'hostname': 'test_ft_dag_devicepusher'},
'mmeld_',
'test',
['confidence', 'direction']
['confidence', 'direction'],
False
)

dp.put('init', '192.168.1.1', {'confidence': 75, 'direction': 'inbound'})
Expand All @@ -614,9 +620,9 @@ def test_devicepusher_init_resync(self, panxapi_mock):

self.assertEqual(
dp.xapi.user_id_calls[0],
'<uid-message><version>1.0</version><type>update</type><payload><register><entry ip="192.168.1.1"><tag><member>mmeld_confidence_high</member><member>mmeld_direction_inbound</member><member>mmeld_test</member></tag></entry><entry ip="192.168.1.10"><tag><member>mmeld_confidence_high</member><member>mmeld_direction_unknown</member><member>mmeld_test</member></tag></entry></register></payload></uid-message>'
'<uid-message><version>1.0</version><type>update</type><payload><register><entry ip="192.168.1.1" persistent="0"><tag><member>mmeld_confidence_high</member><member>mmeld_direction_inbound</member><member>mmeld_test</member></tag></entry><entry ip="192.168.1.10" persistent="0"><tag><member>mmeld_confidence_high</member><member>mmeld_direction_unknown</member><member>mmeld_test</member></tag></entry></register></payload></uid-message>'
)
self.assertEqual(
dp.xapi.user_id_calls[1],
'<uid-message><version>1.0</version><type>update</type><payload><unregister><entry ip="192.168.1.1"><tag><member>mmeld_confidence_100</member><member>mmeld_pushed</member></tag></entry><entry ip="192.168.1.2"><tag><member>mmeld_confidence_100</member></tag></entry></unregister></payload></uid-message>'
'<uid-message><version>1.0</version><type>update</type><payload><unregister><entry ip="192.168.1.1" persistent="0"><tag><member>mmeld_confidence_100</member><member>mmeld_pushed</member></tag></entry><entry ip="192.168.1.2" persistent="0"><tag><member>mmeld_confidence_100</member></tag></entry></unregister></payload></uid-message>'
)

0 comments on commit 9c7a360

Please sign in to comment.