Skip to content

Commit

Permalink
Remove checks from expire()
Browse files Browse the repository at this point in the history
  • Loading branch information
dcamper committed Mar 22, 2024
1 parent ff743c8 commit 90c27c0
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions plugins/kafka/kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ namespace KafkaPlugin
if (!isAlive() && parentPtr)
{
shouldRun = true;
Thread::start(false);
Thread::start();
}
}

Expand Down Expand Up @@ -738,43 +738,40 @@ namespace KafkaPlugin

/**
* Remove previously-created objects that have been inactive
* for awhile
* for awhile; assumes a lock is held while modifying cachedPublishers
*/
void expire()
{
if (!cachedPublishers.empty())
{
time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;
time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;

for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
{
// Expire only if the publisher has been inactive and if
// there are no messages in the outbound queue
if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
{
// Expire only if the publisher has been inactive and if
// there are no messages in the outbound queue
if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
{
// Shutdown the attached poller before deleting
x->second->shutdownPoller();
// Shutdown the attached poller before deleting
x->second->shutdownPoller();

// Delete the object
delete(x->second);
// Delete the object
delete(x->second);

// Erase from map
cachedPublishers.erase(x++);
// Erase from map
cachedPublishers.erase(x++);

++expireCount;
}
else
{
x++;
}
++expireCount;
}

if (doTrace(traceKafka) && expireCount > 0)
else
{
DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
x++;
}
}

if (doTrace(traceKafka) && expireCount > 0)
{
DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
}
}

/**
Expand Down

0 comments on commit 90c27c0

Please sign in to comment.