-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-31498 Kafka shared library not constructing/destructing properly #18438
HPCC-31498 Kafka shared library not constructing/destructing properly #18438
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcamper I don't think this is quite right.
plugins/kafka/kafka.cpp
Outdated
KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj; | ||
KafkaPlugin::publisherCacheExpirer->start(); | ||
queryPublisherCache(); | ||
queryPublisherCacheExpirer().start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition - two threads could get here at the same time, and both pass the !isAlive() test.
I think the correct solution (using an auto pattern jake uses) is
static PublisherCacheExpirerObj & queryPublisherCache()
{
auto initFunction = [] ()
{
PublisherCacheExpirerObj * publisher = new PublisherCacheExpirerObj;
KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj;
KafkaPlugin::publisherCacheExpirer->start();
return publisher;
};
return *publisherCacheExpirer.query(initFunction);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you can delete the calls to setupPublisherCache() since queryPublisherCache() will handle it for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the circular dependencies between PublisherCacheObj and PublisherCacheExpirerObj I opted to refactor: The expirer class (and its background thread) are now gone. Expiration is done after a new publisher object is created. This simplifies things a great deal and probably increases system performance slightly (one less thread to manage).
plugins/kafka/kafka.cpp
Outdated
delete(KafkaPlugin::publisherCache); | ||
KafkaPlugin::publisherCache = NULL; | ||
} | ||
KafkaPlugin::publisherCacheExpirer.destroy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to call stop - simplest is in the destructor since nothing is derived from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks reasonable. One question.
Since the expiry is now handled by the main calling thread, if a publisher is being expired the call to get a publisher could block for pollTimeout before it returns (while it waits to join the thread). Is that going to cause any issues?
@ghalliday With the current value of poll timeout (1s) it should not be a problem. In practice, a given job will publish to only one Kafka topic on a single broker (and therefore there will be only one publisher object). |
@ghalliday I am retargeting this PR to 9.4.x, as soon as my internal testing succeeds. I will need a final once-over review after I push those changes. |
0ee7612
to
ff743c8
Compare
Retargeted to 9.4.x. Also removed an unnecessary check from within expire() method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving assume testing goes as you expect.
90c27c0
to
a601526
Compare
Squashed in preparation for merge, but don't merge yet. I am awaiting customer verification. |
@dcamper can you also change the commit message so that it matches the normal convention. HPCC-XXXXX ... |
Use Singleton pattern instead of std::once.
a601526
to
11750e0
Compare
@ghalliday My apologies! Commit message amended. |
@ghalliday Please merge. Thanks! |
Jira Issue: https://hpccsystems.atlassian.net//browse/HPCC-31498 Jirabot Action Result: |
Use Singleton pattern instead of std::once.
Obscure problems arose with the destruction of a static std::once, when the plugin was loaded and unloaded multiple times in a single job.
Type of change:
Checklist:
Smoketest:
Testing:
Manual testing with a local Kafka instance, publishing and consuming messages.