Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding gstream parsing and a few extra bits of monitoring from XRootD #20

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 94 additions & 33 deletions Collectors/DetailedCollector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3

"""
A simple daemon for collecting monitoring packets from a remote XRootD
Expand All @@ -22,6 +22,7 @@
class DetailedCollector(UdpCollector.UdpCollector):

DEFAULT_PORT = 9930
UDP_MON_PORT = 8000


def __init__(self, *args, **kw):
Expand All @@ -30,12 +31,38 @@ def __init__(self, *args, **kw):
self._servers = {}
self._users = {}
self._dictid_map = {}
self._exchange = self.config.get('AMQP', 'exchange')
self._wlcg_exchange = self.config.get('AMQP', 'wlcg_exchange')
self.report_raw_data = not bool(self.config.get('AMQP', 'process_metrics'))
self.last_flush = time.time()
self.seq_data = {}


def addCacheRecord(self, event, hostname, addr, port):

rec = {}
rec['timestamp'] = event.detach_t*1000 # Needed to be in ms

rec['lfn'] = event.lfn
rec['access_cnt'] = event.access_cnt
rec['attach_t'] = event.attach_t
rec['detach_t'] = event.detach_t
rec['size'] = event.size
rec['blk_size'] = event.blk_size
rec['n_blks'] = event.n_blks
rec['n_blks_done'] = event.n_blks_done
rec['b_hit'] = event.b_hit
rec['b_miss'] = event.b_miss
rec['b_bypass'] = event.b_bypass
rec['hostname'] = hostname
rec['addr'] = addr
rec['port'] = port
rec['remotes'] = event.remotes

self.publish("cache-event", rec, exchange=self._wlcg_exchange)

self.logger.debug('Publishing Cache Event: {}'.format(str(rec)))


def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime):
"""
Given information to create a record, send it up to the message queue.
Expand All @@ -57,7 +84,10 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime):
s = self._servers[sid]
rec['serverID'] = sid
rec['server'] = s.addr
rec['site'] = s.site.decode('utf-8')
if s.site:
rec['site'] = s.site.decode('utf-8')
else:
rec['site'] = "unknown"
else:
rec['server'] = addr
# logger.warning('server still not identified: %s',sid)
Expand Down Expand Up @@ -102,35 +132,38 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime):
if transfer_key in self._transfers:
f = self._transfers[transfer_key][1]
fname = f.fileName.decode('utf-8')
#self.logger.debug('{}'.format(self._transfers[transfer_key]))
rec['filename'] = fname
rec['filesize'] = f.fileSize
rec['dirname1'] = "/".join(fname.split('/', 2)[:2])
rec['dirname2'] = "/".join(fname.split('/', 3)[:3])
if fname.startswith('/user'):
rec['logical_dirname'] = rec['dirname2']
elif fname.startswith('/osgconnect/public'):
rec['logical_dirname'] = "/".join(fname.split('/', 4)[:4])
elif fname.startswith('/hcc'):
rec['logical_dirname'] = "/".join(fname.split('/', 6)[:6])
elif fname.startswith('/pnfs/fnal.gov/usr'):
rec['logical_dirname'] = "/".join(f.fileName.decode('utf-8').split('/')[:5])
elif fname.startswith('/gwdata'):
rec['logical_dirname'] = rec['dirname2']
elif fname.startswith('/chtc/'):
rec['logical_dirname'] = '/chtc'
elif fname.startswith('/icecube/'):
rec['logical_dirname'] = '/icecube'

# Check for CMS files
elif fname.startswith('/store') or fname.startswith('/user/dteam'):
rec['logical_dirname'] = rec['dirname2']
lcg_record = True
else:
rec['logical_dirname'] = 'unknown directory'
if not self.report_raw_data:
rec['dirname1'] = "/".join(fname.split('/', 2)[:2])
rec['dirname2'] = "/".join(fname.split('/', 3)[:3])
if fname.startswith('/user'):
rec['logical_dirname'] = rec['dirname2']
elif fname.startswith('/osgconnect/public'):
rec['logical_dirname'] = "/".join(fname.split('/', 4)[:4])
elif fname.startswith('/hcc'):
rec['logical_dirname'] = "/".join(fname.split('/', 6)[:6])
elif fname.startswith('/pnfs/fnal.gov/usr'):
rec['logical_dirname'] = "/".join(f.fileName.decode('utf-8').split('/')[:5])
elif fname.startswith('/gwdata'):
rec['logical_dirname'] = rec['dirname2']
elif fname.startswith('/chtc/'):
rec['logical_dirname'] = '/chtc'
elif fname.startswith('/icecube/'):
rec['logical_dirname'] = '/icecube'

# Check for CMS files
elif fname.startswith('/store') or fname.startswith('/user/dteam'):
rec['logical_dirname'] = rec['dirname2']
lcg_record = True
else:
rec['logical_dirname'] = 'unknown directory'
else:
rec['filename'] = "missing directory"
rec['filesize'] = "-1"
rec['logical_dirname'] = "missing directory"
if not self.report_raw_data:
rec['logical_dirname'] = "missing directory"
rec['read'] = fileClose.read
rec['readv'] = fileClose.readv
rec['write'] = fileClose.write
Expand Down Expand Up @@ -217,7 +250,7 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime):

if not lcg_record:
self.logger.debug("OSG record to send: %s", str(rec))
self.publish("file-close", rec, exchange=self._exchange)
self.publish("file-close", rec, exchange=self._wlcg_exchange)
self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'stashcache'})
else:
wlcg_packet = wlcg_converter.Convert(rec)
Expand Down Expand Up @@ -295,7 +328,8 @@ def process(self, data, addr, port):
self.seq_data[sid][str_header_code] = header.pseq

if header.code == b'f':
# self.logger.debug("Got fstream object")
# TODO break me out into support function
self.logger.debug("Got fstream object")
time_record = decoding.MonFile(data) # first one is always TOD
self.logger.debug(time_record)
data = data[time_record.recSize:]
Expand Down Expand Up @@ -355,11 +389,33 @@ def process(self, data, addr, port):
self.logger.debug("r - redirect stream message.")

elif header.code == b't':
#self.logger.warning("t - stream message. Server at %s should remove 'files', 'io', and "
# "'iov' directives from the monitoring configuration.", addr)
self.logger.warning("t - stream message. Server at %s should remove 'files', 'io', and "
"'iov' directives from the monitoring configuration.", addr)
self.logger.warning("{}".format(data))
pass

elif header.code == b'g':
self.logger.debug('cache header')

infolen = len(data) - 4
mm = decoding.mapheader._make(struct.unpack("!I" + str(infolen) + "s", data))

self.logger.debug(mm)

cacheInfo = decoding.cacheInfo(mm.info)

self.logger.debug('Debug Data: {}'.format(str(cacheInfo)))

try:
hostname = socket.gethostbyaddr(addr)[0]
except:
hostname = 'unresolvable'

for event in cacheInfo:
self.addCacheRecord(event, hostname, addr, port)

else:
self.logger.debug('Header Code is: {}'.format(header.code.decode('utf-8')))
infolen = len(data) - 4
mm = decoding.mapheader._make(struct.unpack("!I" + str(infolen) + "s", data))
try:
Expand All @@ -383,8 +439,9 @@ def process(self, data, addr, port):

elif header.code == b'd':
path = rest
#self.logger.warning('Path information sent (%s). Server at %s should remove "files" '
# 'directive from the monitoring configuration.', path, addr)
self.logger.warning('Path information sent (%s). Server at %s should remove "files" '
'directive from the monitoring configuration.', path, addr)
self.logger.debug('{}'.format(data))

elif header.code == b'i':
appinfo = rest
Expand Down Expand Up @@ -446,13 +503,17 @@ def process(self, data, addr, port):

elif header.code == b'x':
decoding.xfrInfo(rest)
self.logger.debug('Header is x')
# transfer_key = str(sid) + "." + str(xfrInfo.fileID)
# if transfer_key in AllTransfers:
# cur_value = AllTransfers[transfer_key]
# AllTransfers[transfer_key] = (time.time(), cur_value[1], xfrInfo)
# print "Adding xfrInfo"

# print xfrInfo
else:

self.logger.debug('Header is now: {}'.format(header.code.decode('utf-8')))

# Check if we have to flush the AllTransfer
now_time = time.time()
Expand Down
Loading