Skip to content
This repository has been archived by the owner on Mar 16, 2023. It is now read-only.

Commit

Permalink
Merge pull request #84 from jtschichold/er-80
Browse files Browse the repository at this point in the history
Started implementing #80
  • Loading branch information
jtschichold authored Nov 4, 2016
2 parents e803ee0 + b446034 commit e57d077
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 100 deletions.
177 changes: 121 additions & 56 deletions minemeld/ft/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import logging
import yaml
import netaddr
import gevent
import gevent.queue
import os
import re
import collections
import itertools

import gevent
import gevent.queue
import gevent.event

import pan.xapi

Expand All @@ -31,11 +34,11 @@

LOG = logging.getLogger(__name__)

SUBRE = re.compile("^[A-Za-z0-9_]")
SUBRE = re.compile("[^A-Za-z0-9_]")


class DevicePusher(gevent.Greenlet):
def __init__(self, device, prefix, watermark, attributes):
def __init__(self, device, prefix, watermark, attributes, persistent):
super(DevicePusher, self).__init__()

self.device = device
Expand All @@ -51,37 +54,49 @@ def __init__(self, device, prefix, watermark, attributes):
self.prefix = prefix
self.attributes = attributes
self.watermark = watermark
self.persistent = persistent

self.q = gevent.queue.Queue()

def put(self, op, address, value):
LOG.debug('adding %s:%s to device queue', op, address)
self.q.put([op, address, value])

def _get_registered_ip_tags(self, ip):
self.xapi.op(
cmd='<show><object><registered-ip><ip>%s</ip></registered-ip></object></show>' % ip,
vsys=self.device.get('vsys', None),
cmd_xml=False
)

entries = self.xapi.element_root.findall('./result/entry')
if entries is None or len(entries) == 0:
return None

tags = [member.text for member in entries[0].findall('./tag/member')
if member.text and member.text.startswith(self.prefix)]

return tags

def _get_all_registered_ips(self):
cmd = (
'<show><object><registered-ip><tag><entry name="%s%s"/></tag></registered-ip></object></show>' %
(self.prefix, self.watermark)
)
self.xapi.op(
cmd='show object registered-ip all',
cmd=cmd,
vsys=self.device.get('vsys', None),
cmd_xml=True
cmd_xml=False
)

entries = self.xapi.element_root.findall('./result/entry')
if not entries:
return {}
return

addresses = {}
for entry in entries:
ip = entry.get("ip")

members = entry.findall("./tag/member")

tags = [member.text for member in members
if member.text and member.text.startswith(self.prefix)]

if len(tags) > 0:
addresses[ip] = (tags if len(tags) != members else None)

return addresses
yield ip, self._get_registered_ip_tags(ip)

def _dag_message(self, type_, addresses):
message = [
Expand All @@ -95,7 +110,9 @@ def _dag_message(self, type_, addresses):
if addresses is not None and len(addresses) != 0:
akeys = sorted(addresses.keys())
for a in akeys:
message.append('<entry ip="%s">' % a)
message.append(
'<entry ip="%s" persistent="%d">' % (a, 1 if self.persistent else 0)
)

tags = sorted(addresses[a])
if tags is not None:
Expand All @@ -111,6 +128,23 @@ def _dag_message(self, type_, addresses):

return ''.join(message)

def _user_id(self, cmd=None):
try:
self.xapi.user_id(cmd=cmd)

except gevent.GreenletExit:
raise

except pan.xapi.PanXapiError as e:
if 'already exists, ignore' in str(e):
pass
elif 'Failed to register' in str(e):
pass
else:
LOG.exception('XAPI exception in pusher for device %s: %s',
self.device.get('hostname', None), str(e))
raise

def _tags_from_value(self, value):
result = []

Expand All @@ -132,7 +166,7 @@ def _tags_from_value(self, value):
v = str(value[t])
v = SUBRE.sub('_', v)

tag = '%s%s_%s' % (self.prefix, t, str(value[t]))
tag = '%s%s_%s' % (self.prefix, t, v)

result.append(tag)

Expand All @@ -153,10 +187,10 @@ def _push(self, op, address, value):

msg = self._dag_message(op, {address: tags})

self.xapi.user_id(cmd=msg)
self._user_id(cmd=msg)

def _init_resync(self):
ctags = set()
ctags = collections.defaultdict(set)
while True:
op, address, value = self.q.get()
if op == 'EOI':
Expand All @@ -168,38 +202,54 @@ def _init_resync(self):
(self.device.get('hostname', None), op)
)

ctags.add('%s@%s%s' % (address, self.prefix, self.watermark))
ctags[address].add('%s%s' % (self.prefix, self.watermark))
for t in self._tags_from_value(value):
ctags.add('%s@%s' % (address, t))
ctags[address].add(t)

regtags = set()
regaddresses = self._get_all_registered_ips()
for a, atags in regaddresses.iteritems():
if atags is None:
continue
for t in atags:
regtags.add('%s@%s' % (a, t))

added = ctags - regtags
removed = regtags - ctags
LOG.debug(ctags)

register = collections.defaultdict(list)
for t in added:
a, tag = t.split('@', 1)
register[a].append(tag)

unregister = collections.defaultdict(list)
for t in removed:
a, tag = t.split('@', 1)
unregister[a].append(tag)
for a, atags in self._get_all_registered_ips():
regtags = set()
if atags is not None:
for t in atags:
regtags.add(t)

added = ctags[a] - regtags
removed = regtags - ctags[a]

for t in added:
register[a].append(t)

for t in removed:
unregister[a].append(t)

ctags.pop(a)

for a, atags in ctags.iteritems():
register[a] = atags

LOG.debug(register)
LOG.debug(unregister)

if len(register) != 0:
rmsg = self._dag_message('register', register)
self.xapi.user_id(cmd=rmsg)
addrs = iter(register)
for i in xrange(0, len(register), 1000):
rmsg = self._dag_message(
'register',
{k: register[k] for k in itertools.islice(addrs, 1000)}
)
self._user_id(cmd=rmsg)

if len(unregister) != 0:
urmsg = self._dag_message('unregister', unregister)
self.xapi.user_id(cmd=urmsg)
addrs = iter(unregister)
for i in xrange(0, len(unregister), 1000):
urmsg = self._dag_message(
'unregister',
{k: unregister[k] for k in itertools.islice(addrs, 1000)}
)
self._user_id(cmd=urmsg)

def _run(self):
self._init_resync()
Expand All @@ -214,12 +264,9 @@ def _run(self):
break

except pan.xapi.PanXapiError as e:
if 'already exists, ignore' not in str(e):
LOG.exception('XAPI exception in pusher for device %s: %s',
self.device.get('hostname', None), str(e))
raise
else:
self.q.get()
LOG.exception('XAPI exception in pusher for device %s: %s',
self.device.get('hostname', None), str(e))
raise


class DagPusher(base.BaseFT):
Expand All @@ -233,6 +280,8 @@ def __init__(self, name, chassis, config):
self.ageout_glet = None
self.last_ageout_run = None

self.hup_event = gevent.event.Event()

super(DagPusher, self).__init__(name, chassis, config)

def configure(self):
Expand All @@ -252,6 +301,10 @@ def configure(self):
'tag_attributes',
['confidence', 'direction']
)
self.persistent_registered_ips = self.config.get(
'persistent_registered_ips',
True
)

def _initialize_table(self, truncate=False):
self.table = table.Table(self.name, truncate=truncate)
Expand Down Expand Up @@ -401,7 +454,8 @@ def _spawn_device_pusher(self, device):
device,
self.tag_prefix,
self.tag_watermark,
self.tag_attributes
self.tag_attributes,
self.persistent_registered_ips
)
dp.link_exception(self._device_pusher_died)

Expand All @@ -424,9 +478,10 @@ def _device_pusher_died(self, g):
'respawning in 60 seconds',
self.name, g.device['hostname'])

try:
idx = self.device_pushers.index(g.device)
except ValueError:
for idx in range(len(self.device_pushers)):
if self.device_pushers[idx].device == g.device:
break
else:
LOG.info('%s - device pusher for %s removed,' +
' respawning aborted',
self.name, g.device['hostname'])
Expand Down Expand Up @@ -463,6 +518,12 @@ def _load_device_list(self):
if g.value is None and not g.started:
g.start()

def _huppable_wait(self, wait_time):
hup_called = self.hup_event.wait(timeout=wait_time)
if hup_called:
LOG.debug('%s - clearing poll event', self.name)
self.hup_event.clear()

def _device_list_monitor(self):
if self.device_list_path is None:
LOG.warning('%s - no device_list path configured', self.name)
Expand All @@ -474,7 +535,7 @@ def _device_list_monitor(self):
except:
LOG.debug('%s - error checking mtime of %s',
self.name, self.device_list_path)
gevent.sleep(10)
self._huppable_wait(5)
continue

if mtime != self.device_list_mtime:
Expand All @@ -486,7 +547,7 @@ def _device_list_monitor(self):
except:
LOG.exception('%s - exception loading device list')

gevent.sleep(5)
self._huppable_wait(5)

def mgmtbus_status(self):
result = super(DagPusher, self).mgmtbus_status()
Expand Down Expand Up @@ -525,3 +586,7 @@ def stop(self):

if self.ageout_glet is not None:
self.ageout_glet.kill()

def hup(self, source=None):
LOG.info('%s - hup received, reload device list', self.name)
self.hup_event.set()
Loading

0 comments on commit e57d077

Please sign in to comment.