From 303f82e9aaa8221eb2fccd07911b681fd4cabe24 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 13 Oct 2023 20:37:41 -0400 Subject: [PATCH] trying to fix #770. have not reproduced problem, best guess. --- sarracenia/moth/amqp.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index 0a46d9866..e1775560e 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -136,7 +136,8 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message: msg['local_offset'] = 0 msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic']) if not msg.validate(): - self.channel.basic_ack(msg['ack_id']) + if hasattr(self,channel): + self.channel.basic_ack(msg['ack_id']) logger.error('message acknowledged and discarded: %s' % msg) msg = None else: @@ -411,7 +412,8 @@ def putCleanUp(self) -> None: if self.o['dry_run']: logger.info("deleted exchange (dry run): %s (if unused)" % x) else: - self.channel.exchange_delete(x, if_unused=True) + if hasattr(self,'channel'): + self.channel.exchange_delete(x, if_unused=True) logger.info("deleted exchange: %s" % x) except amqp.exceptions.PreconditionFailed as err: err_msg = str(err).replace("Exchange.delete: (406) PRECONDITION_FAILED - exchange ", "") @@ -425,10 +427,11 @@ def getCleanUp(self) -> None: try: if self.o['dry_run']: - logger.info("deleteing queue (dry run) %s" % self.o['queueName'] ) + logger.info("deleting queue (dry run) %s" % self.o['queueName'] ) else: - logger.info("deleteing queue %s" % self.o['queueName'] ) - self.channel.queue_delete(self.o['queueName']) + logger.info("deleting queue %s" % self.o['queueName'] ) + if hasattr(self,'channel'): + self.channel.queue_delete(self.o['queueName']) except Exception as err: logger.error("failed to {} with {}".format( self.o['broker'].url.hostname, err)) @@ -517,7 +520,8 @@ def ack(self, m: sarracenia.Message) -> None: ebo = 1 while True: try: - self.channel.basic_ack(m['ack_id']) + if hasattr(self, 'channel'): + self.channel.basic_ack(m['ack_id']) del m['ack_id'] m['_deleteOnPost'].remove('ack_id') # Break loop if no exceptions encountered