Skip to content

Commit

Permalink
Merge pull request #49 from HSF/dev
Browse files Browse the repository at this point in the history
merge dev to master
  • Loading branch information
tmaeno authored Jan 25, 2021
2 parents 062acb2 + e621afb commit 0a217e4
Show file tree
Hide file tree
Showing 99 changed files with 6,612 additions and 1,096 deletions.
63 changes: 63 additions & 0 deletions atlas/lib/idds/atlas/notifier/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def on_error(self, headers, body):
'''
self.logger.error('[broker] [%s]: %s', self.__broker, body)

def on_message(self, headers, body):
# self.logger.info('[broker] [%s]: %s', self.__broker, body)
pass


class MessagingSender(PluginBase, threading.Thread):
def __init__(self, **kwargs):
Expand Down Expand Up @@ -126,3 +130,62 @@ def run(self):

def __call__(self):
self.run()


class MessagingReceiver(MessagingSender):
def __init__(self, **kwargs):
super(MessagingReceiver, self).__init__(**kwargs)

def subscribe(self, listener=MessagingListener):
self.conns = []

broker_addresses = []
for b in self.brokers:
try:
addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
for addrinfo in addrinfos:
b_addr = addrinfo[4][0]
broker_addresses.append(b_addr)
except socket.gaierror as error:
self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error)))

self.logger.info("Resolved broker addresses: %s" % broker_addresses)

for broker in broker_addresses:
conn = stomp.Connection12(host_and_ports=[(broker, self.port)],
vhost=self.vhost,
keepalive=True)
conn.set_listener('message-receiver', listener(conn.transport._Transport__host_and_ports[0]))
conn.connect(self.username, self.password, wait=True)
conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto')
self.conns.append(conn)

while not self.graceful_stop.is_set():
try:
for conn in self.conns:
if not conn.is_connected():
self.logger.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0])
conn.set_listener('message-receiver', listener(conn.transport._Transport__host_and_ports[0]))
# conn.start()
conn.connect(self.username, self.password, wait=True)
conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto')
time.sleep(1)
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

self.logger.info('receiver graceful stop requested')

for conn in self.conns:
try:
conn.disconnect()
except Exception:
pass

def run(self):
try:
self.subscribe()
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

def __call__(self):
self.run()
4 changes: 2 additions & 2 deletions atlas/lib/idds/atlas/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.0.5"
release_version = "0.1.0"
Loading

0 comments on commit 0a217e4

Please sign in to comment.