From 11750e0d28d38f49d463e1d51cbb58e2ac12c5b1 Mon Sep 17 00:00:00 2001 From: "Dan S. Camper" Date: Thu, 21 Mar 2024 09:11:41 -0500 Subject: [PATCH] HPCC-31498 Kafka shared library not constructing/destructing properly Use Singleton pattern instead of std::once. --- plugins/kafka/kafka.cpp | 168 ++++++++++------------------------------ 1 file changed, 43 insertions(+), 125 deletions(-) diff --git a/plugins/kafka/kafka.cpp b/plugins/kafka/kafka.cpp index 1da664ebc10..e59b21d0f48 100644 --- a/plugins/kafka/kafka.cpp +++ b/plugins/kafka/kafka.cpp @@ -50,12 +50,6 @@ namespace KafkaPlugin // background activity const __int32 POLL_TIMEOUT = 1000; - //-------------------------------------------------------------------------- - // Static Variables - //-------------------------------------------------------------------------- - - static std::once_flag pubCacheInitFlag; - //-------------------------------------------------------------------------- // Static Methods (internal) //-------------------------------------------------------------------------- @@ -697,7 +691,7 @@ namespace KafkaPlugin * * Class used to create and cache publisher objects and connections */ - static class PublisherCacheObj + class PublisherCacheObj { private: @@ -714,6 +708,15 @@ namespace KafkaPlugin } + /** + * Destructor + * + */ + ~PublisherCacheObj() + { + deleteAll(); + } + void deleteAll() { CriticalBlock block(lock); @@ -735,45 +738,40 @@ namespace KafkaPlugin /** * Remove previously-created objects that have been inactive - * for awhile + * for awhile; assumes a lock is held while modifying cachedPublishers */ void expire() { - if (!cachedPublishers.empty()) - { - CriticalBlock block(lock); + time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS; + __int32 expireCount = 0; - time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS; - __int32 expireCount = 0; - - for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */) + for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */) + { + // Expire only if the publisher has been inactive and if + // there are no messages in the outbound queue + if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0) { - // Expire only if the publisher has been inactive and if - // there are no messages in the outbound queue - if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0) - { - // Shutdown the attached poller before deleting - x->second->shutdownPoller(); + // Shutdown the attached poller before deleting + x->second->shutdownPoller(); - // Delete the object - delete(x->second); + // Delete the object + delete(x->second); - // Erase from map - cachedPublishers.erase(x++); + // Erase from map + cachedPublishers.erase(x++); - ++expireCount; - } - else - { - x++; - } + ++expireCount; } - - if (doTrace(traceKafka) && expireCount > 0) + else { - DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s")); + x++; } } + + if (doTrace(traceKafka) && expireCount > 0) + { + DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s")); + } } /** @@ -822,6 +820,9 @@ namespace KafkaPlugin { DBGLOG("Kafka: Created and cached new publisher object: %s @ %s", topic.c_str(), brokers.c_str()); } + + // Expire any old publishers before returning the new one + expire(); } } @@ -837,77 +838,16 @@ namespace KafkaPlugin ObjMap cachedPublishers; //!< std::map of created Publisher object pointers CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers - } *publisherCache; - - //-------------------------------------------------------------------------- - - /** @class PublisherCacheExpirerObj - * Class used to expire old publisher objects held within publisherCache - */ - static class PublisherCacheExpirerObj : public Thread - { - public: - - PublisherCacheExpirerObj() - : Thread("Kafka::PublisherExpirer"), - shouldRun(false) - { - - } - - virtual void start() - { - if (!isAlive()) - { - shouldRun = true; - Thread::start(); - } - } - - virtual void stop() - { - if (isAlive()) - { - shouldRun = false; - join(); - } - } - - virtual int run() - { - while (shouldRun) - { - if (publisherCache) - { - publisherCache->expire(); - } - - usleep(1000); - } - - return 0; - } - - private: - - std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop - } *publisherCacheExpirer; + }; //-------------------------------------------------------------------------- - // Lazy Initialization + // Singleton Initialization //-------------------------------------------------------------------------- - /** - * Make sure the publisher object cache is initialized as well as the - * associated background thread for expiring idle publishers. This is - * called only once. - */ - static void setupPublisherCache() + static Singleton publisherCache; + static PublisherCacheObj & queryPublisherCache() { - KafkaPlugin::publisherCache = new KafkaPlugin::PublisherCacheObj(); - - KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj; - KafkaPlugin::publisherCacheExpirer->start(); + return *publisherCache.query([] () { return new PublisherCacheObj; }); } //-------------------------------------------------------------------------- @@ -916,9 +856,7 @@ namespace KafkaPlugin ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key) { - std::call_once(pubCacheInitFlag, setupPublisherCache); - - Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT); + Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT); pubObjPtr->sendMessage(message, key); @@ -927,9 +865,7 @@ namespace KafkaPlugin ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key) { - std::call_once(pubCacheInitFlag, setupPublisherCache); - - Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT); + Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT); std::string messageStr(message, rtlUtf8Size(lenMessage, message)); std::string keyStr(key, rtlUtf8Size(lenKey, key)); @@ -1088,29 +1024,11 @@ ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock* pb) MODULE_INIT(INIT_PRIORITY_STANDARD) { - KafkaPlugin::publisherCache = NULL; - KafkaPlugin::publisherCacheExpirer = NULL; - return true; } MODULE_EXIT() { - // Delete the background thread expiring items from the publisher cache - // before deleting the publisher cache - if (KafkaPlugin::publisherCacheExpirer) - { - KafkaPlugin::publisherCacheExpirer->stop(); - delete(KafkaPlugin::publisherCacheExpirer); - KafkaPlugin::publisherCacheExpirer = NULL; - } - - if (KafkaPlugin::publisherCache) - { - KafkaPlugin::publisherCache->deleteAll(); - delete(KafkaPlugin::publisherCache); - KafkaPlugin::publisherCache = NULL; - } - + KafkaPlugin::publisherCache.destroy(); RdKafka::wait_destroyed(3000); }