Skip to content

Commit

Permalink
trying to fix #770. have not reproduced problem, best guess.
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva authored and petersilva committed Oct 24, 2023
1 parent 63cf2e8 commit 303f82e
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
msg['local_offset'] = 0
msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic'])
if not msg.validate():
self.channel.basic_ack(msg['ack_id'])
if hasattr(self,channel):
self.channel.basic_ack(msg['ack_id'])
logger.error('message acknowledged and discarded: %s' % msg)
msg = None
else:
Expand Down Expand Up @@ -411,7 +412,8 @@ def putCleanUp(self) -> None:
if self.o['dry_run']:
logger.info("deleted exchange (dry run): %s (if unused)" % x)
else:
self.channel.exchange_delete(x, if_unused=True)
if hasattr(self,'channel'):
self.channel.exchange_delete(x, if_unused=True)
logger.info("deleted exchange: %s" % x)
except amqp.exceptions.PreconditionFailed as err:
err_msg = str(err).replace("Exchange.delete: (406) PRECONDITION_FAILED - exchange ", "")
Expand All @@ -425,10 +427,11 @@ def getCleanUp(self) -> None:

try:
if self.o['dry_run']:
logger.info("deleteing queue (dry run) %s" % self.o['queueName'] )
logger.info("deleting queue (dry run) %s" % self.o['queueName'] )
else:
logger.info("deleteing queue %s" % self.o['queueName'] )
self.channel.queue_delete(self.o['queueName'])
logger.info("deleting queue %s" % self.o['queueName'] )
if hasattr(self,'channel'):
self.channel.queue_delete(self.o['queueName'])
except Exception as err:
logger.error("failed to {} with {}".format(
self.o['broker'].url.hostname, err))
Expand Down Expand Up @@ -517,7 +520,8 @@ def ack(self, m: sarracenia.Message) -> None:
ebo = 1
while True:
try:
self.channel.basic_ack(m['ack_id'])
if hasattr(self, 'channel'):
self.channel.basic_ack(m['ack_id'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
# Break loop if no exceptions encountered
Expand Down

0 comments on commit 303f82e

Please sign in to comment.