From a29d814c583ee0178e5710a9b8cdf76bbd138e60 Mon Sep 17 00:00:00 2001 From: dubeejw Date: Thu, 5 Mar 2020 20:53:27 -0500 Subject: [PATCH] Do not start a consumer for a disabled trigger --- provider/consumer.py | 17 ++++++++++++++--- provider/database.py | 2 +- provider/service.py | 5 ++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/provider/consumer.py b/provider/consumer.py index 2a3bd95f..384968ac 100644 --- a/provider/consumer.py +++ b/provider/consumer.py @@ -156,6 +156,17 @@ def __init__(self, trigger, params, sharedDictionary): if self.isMessageHub: self.username = params["username"] self.password = params["password"] + self.kafkaAdminUrl = params['kafka_admin_url'] + + try: + auth = self.password if self.username.lower() == 'token' else self.username + self.password + response = requests.get(self.kafkaAdminUrl, headers={'X-Auth-Token': auth}, timeout=60.0, verify=check_ssl) + + if response.status_code == 403: + logging.info("[{}] Invalid Kafka auth, disabling trigger... {}".format(self.trigger, response.status_code)) + self.__disableTrigger(response.status_code, 'Automatically disabled due to invalid Kafka credentials.') + except requests.exceptions.RequestException as e: + logging.info("[{}] Exception during Kafka auth, continuing... {}".format(self.trigger, e)) if 'isIamKey' in params and params['isIamKey'] == True: self.authHandler = IAMAuth(params['authKey'], params['iamUrl']) @@ -439,13 +450,13 @@ def __fireTrigger(self, messages): self.consumer.commit(offsets=self.__getOffsetList(messages), async=False) retry = False - def __disableTrigger(self, status_code): + def __disableTrigger(self, status_code, message='Automatically disabled after receiving a {} status code when firing the trigger.'): self.setDesiredState(Consumer.State.Disabled) # when failing to establish a database connection, mark the consumer as dead to restart the consumer try: self.database = Database() - self.database.disableTrigger(self.trigger, status_code) + self.database.disableTrigger(self.trigger, status_code, message) except Exception as e: logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e)) self.__recordState(Consumer.State.Dead) @@ -566,7 +577,7 @@ def __on_assign(self, consumer, partitions): logging.info('[{}] Completed partition assignment. Connected to broker(s)'.format(self.trigger)) if self.currentState() == Consumer.State.Initializing and self.__shouldRun(): - logging.info('[{}] Setting consumer state to runnning.'.format(self.trigger)) + logging.info('[{}] Setting consumer state to running.'.format(self.trigger)) self.__recordState(Consumer.State.Running) def __on_revoke(self, consumer, partitions): diff --git a/provider/database.py b/provider/database.py index bed62962..e865cd87 100644 --- a/provider/database.py +++ b/provider/database.py @@ -60,7 +60,7 @@ def destroy(self): self.client.disconnect() self.client = None - def disableTrigger(self, triggerFQN, status_code, message='Automatically disabled after receiving a {} status code when firing the trigger.'): + def disableTrigger(self, triggerFQN, status_code, message): try: document = self.database[triggerFQN] diff --git a/provider/service.py b/provider/service.py index 26e79096..1680f413 100644 --- a/provider/service.py +++ b/provider/service.py @@ -113,6 +113,7 @@ def __handleDocChange(self, change): self.createAndRunConsumer(document) else: logging.info("[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"])) + else: existingConsumer = self.consumers.getConsumerForTrigger(change["id"]) @@ -181,10 +182,12 @@ def createAndRunConsumer(self, doc): # Create a representation for this trigger, even if it is disabled # This allows it to appear in /health as well as allow it to be deleted # Creating this object is lightweight and does not initialize any connections + + # TODO: don't want to run the process... consumer = Consumer(triggerFQN, doc) self.consumers.addConsumerForTrigger(triggerFQN, consumer) - if self.__isTriggerDocActive(doc): + if self.__isTriggerDocActive(doc) and consumer.desiredState() != Consumer.State.Disabled: logging.info('[{}] Trigger was determined to be active, starting...'.format(triggerFQN)) consumer.start() else: