Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-31498 Kafka shared library not constructing/destructing properly #18438

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 43 additions & 125 deletions plugins/kafka/kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ namespace KafkaPlugin
// background activity
const __int32 POLL_TIMEOUT = 1000;

//--------------------------------------------------------------------------
// Static Variables
//--------------------------------------------------------------------------

static std::once_flag pubCacheInitFlag;

//--------------------------------------------------------------------------
// Static Methods (internal)
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -697,7 +691,7 @@ namespace KafkaPlugin
*
* Class used to create and cache publisher objects and connections
*/
static class PublisherCacheObj
class PublisherCacheObj
{
private:

Expand All @@ -714,6 +708,15 @@ namespace KafkaPlugin

}

/**
* Destructor
*
*/
~PublisherCacheObj()
{
deleteAll();
}

void deleteAll()
{
CriticalBlock block(lock);
Expand All @@ -735,45 +738,40 @@ namespace KafkaPlugin

/**
* Remove previously-created objects that have been inactive
* for awhile
* for awhile; assumes a lock is held while modifying cachedPublishers
*/
void expire()
{
if (!cachedPublishers.empty())
{
CriticalBlock block(lock);
time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;

time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;

for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
{
// Expire only if the publisher has been inactive and if
// there are no messages in the outbound queue
if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
{
// Expire only if the publisher has been inactive and if
// there are no messages in the outbound queue
if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
{
// Shutdown the attached poller before deleting
x->second->shutdownPoller();
// Shutdown the attached poller before deleting
x->second->shutdownPoller();

// Delete the object
delete(x->second);
// Delete the object
delete(x->second);

// Erase from map
cachedPublishers.erase(x++);
// Erase from map
cachedPublishers.erase(x++);

++expireCount;
}
else
{
x++;
}
++expireCount;
}

if (doTrace(traceKafka) && expireCount > 0)
else
{
DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
x++;
}
}

if (doTrace(traceKafka) && expireCount > 0)
{
DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
}
}

/**
Expand Down Expand Up @@ -822,6 +820,9 @@ namespace KafkaPlugin
{
DBGLOG("Kafka: Created and cached new publisher object: %s @ %s", topic.c_str(), brokers.c_str());
}

// Expire any old publishers before returning the new one
expire();
}
}

Expand All @@ -837,77 +838,16 @@ namespace KafkaPlugin

ObjMap cachedPublishers; //!< std::map of created Publisher object pointers
CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers
} *publisherCache;

//--------------------------------------------------------------------------

/** @class PublisherCacheExpirerObj
* Class used to expire old publisher objects held within publisherCache
*/
static class PublisherCacheExpirerObj : public Thread
{
public:

PublisherCacheExpirerObj()
: Thread("Kafka::PublisherExpirer"),
shouldRun(false)
{

}

virtual void start()
{
if (!isAlive())
{
shouldRun = true;
Thread::start();
}
}

virtual void stop()
{
if (isAlive())
{
shouldRun = false;
join();
}
}

virtual int run()
{
while (shouldRun)
{
if (publisherCache)
{
publisherCache->expire();
}

usleep(1000);
}

return 0;
}

private:

std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
} *publisherCacheExpirer;
};

//--------------------------------------------------------------------------
// Lazy Initialization
// Singleton Initialization
//--------------------------------------------------------------------------

/**
* Make sure the publisher object cache is initialized as well as the
* associated background thread for expiring idle publishers. This is
* called only once.
*/
static void setupPublisherCache()
static Singleton<PublisherCacheObj> publisherCache;
static PublisherCacheObj & queryPublisherCache()
{
KafkaPlugin::publisherCache = new KafkaPlugin::PublisherCacheObj();

KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj;
KafkaPlugin::publisherCacheExpirer->start();
return *publisherCache.query([] () { return new PublisherCacheObj; });
}

//--------------------------------------------------------------------------
Expand All @@ -916,9 +856,7 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key)
{
std::call_once(pubCacheInitFlag, setupPublisherCache);

Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);

pubObjPtr->sendMessage(message, key);

Expand All @@ -927,9 +865,7 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key)
{
std::call_once(pubCacheInitFlag, setupPublisherCache);

Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);
std::string messageStr(message, rtlUtf8Size(lenMessage, message));
std::string keyStr(key, rtlUtf8Size(lenKey, key));

Expand Down Expand Up @@ -1088,29 +1024,11 @@ ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock* pb)

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
KafkaPlugin::publisherCache = NULL;
KafkaPlugin::publisherCacheExpirer = NULL;

return true;
}

MODULE_EXIT()
{
// Delete the background thread expiring items from the publisher cache
// before deleting the publisher cache
if (KafkaPlugin::publisherCacheExpirer)
{
KafkaPlugin::publisherCacheExpirer->stop();
delete(KafkaPlugin::publisherCacheExpirer);
KafkaPlugin::publisherCacheExpirer = NULL;
}

if (KafkaPlugin::publisherCache)
{
KafkaPlugin::publisherCache->deleteAll();
delete(KafkaPlugin::publisherCache);
KafkaPlugin::publisherCache = NULL;
}

KafkaPlugin::publisherCache.destroy();
RdKafka::wait_destroyed(3000);
}
Loading