diff --git a/plugins/kafka/kafka.cpp b/plugins/kafka/kafka.cpp index 5903ebbd11f..e59b21d0f48 100644 --- a/plugins/kafka/kafka.cpp +++ b/plugins/kafka/kafka.cpp @@ -255,7 +255,7 @@ namespace KafkaPlugin if (!isAlive() && parentPtr) { shouldRun = true; - Thread::start(false); + Thread::start(); } } @@ -738,43 +738,40 @@ namespace KafkaPlugin /** * Remove previously-created objects that have been inactive - * for awhile + * for awhile; assumes a lock is held while modifying cachedPublishers */ void expire() { - if (!cachedPublishers.empty()) - { - time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS; - __int32 expireCount = 0; + time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS; + __int32 expireCount = 0; - for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */) + for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */) + { + // Expire only if the publisher has been inactive and if + // there are no messages in the outbound queue + if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0) { - // Expire only if the publisher has been inactive and if - // there are no messages in the outbound queue - if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0) - { - // Shutdown the attached poller before deleting - x->second->shutdownPoller(); + // Shutdown the attached poller before deleting + x->second->shutdownPoller(); - // Delete the object - delete(x->second); + // Delete the object + delete(x->second); - // Erase from map - cachedPublishers.erase(x++); + // Erase from map + cachedPublishers.erase(x++); - ++expireCount; - } - else - { - x++; - } + ++expireCount; } - - if (doTrace(traceKafka) && expireCount > 0) + else { - DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s")); + x++; } } + + if (doTrace(traceKafka) && expireCount > 0) + { + DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s")); + } } /**