-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] fix compression context pool slow down after long running #53172
Conversation
Signed-off-by: luohaha <[email protected]>
Signed-off-by: luohaha <[email protected]>
@@ -114,14 +114,19 @@ class CompressionContextPool { | |||
|
|||
private: | |||
void add(InternalRef ptr) { | |||
// Use explicit producer token to avoid the overhead of too many sub-ququeues | |||
static thread_local std::unique_ptr<::moodycamel::ProducerToken> producer_token; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look right, the number of producer_token equals to the number of threads to be used for the compression. This depends on how the compression context is used by the caller, if the compression is performed in a fixed number of thread pool, that can be still OK, but if it is dynamic thread pool with thread create and destroy, the producer token can be still a lot.
Not sure if I understand this correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can check the implementation of ExplicitProducer
, ProducerToken can reuse queue:
ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue)
: producer(queue.recycle_or_create_producer(true))
{
if (producer != nullptr) {
producer->token = this;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
{
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
debug::DebugLock lock(implicitProdMutex);
#endif
// Try to re-use one first
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
bool expected = true;
if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
// We caught one! It's been marked as activated, the caller can have it
recycled = true;
return ptr;
}
}
}
recycled = false;
return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if the threads are created and destroyed frequently. This thread local producerToken will be created and destroyed the same frequent as the thread. How can the the inner queue be reused by the producer token in such case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the upper bound of producer queue number is max(flush thread) + max(compaction thread).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of depending on the caller's thread num, how about the context thread pool itself controls the number of total producer's token, and use thread id hash or round robin to assign to certain producer token (slot).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is it overkill to use multiple-producers concurrent queue just for a memory pool usage. Is a single producer good enough to serve the usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Producer token can't be shared by two threads at the same time, so we will still need max(flush thread) + max(compaction thread)
tokens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next thing is, what if the thread lives longer than this _ctx_resources? the destruction of a thread local producer_token
will cause invalid access to the _ctx_resources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ctx_resources
is part of CompressionContextPool
, and all of CompressionContextPool
are static and global. CompressionContextPool
will always live longer than Ref
.
Signed-off-by: luohaha <[email protected]>
if (producer_token == nullptr) { | ||
producer_token = std::make_unique<::moodycamel::ProducerToken>(_ctx_resources); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not thread-safe? could change it to the following:
static thread_local ::moodycamel::ProducerToken producer_token(_ctx_resources);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's not thread safe, producer_token is thread local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's thrad-safe, just a refactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: luohaha <[email protected]>
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]✅ pass : 2 / 2 (100.00%) file detail
|
@Mergifyio backport branch-3.4 |
@Mergifyio backport branch-3.3 |
@Mergifyio backport branch-3.2 |
✅ Backports have been created
|
@Mergifyio backport branch-3.1 |
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
…53172) Signed-off-by: luohaha <[email protected]> (cherry picked from commit b141be8)
…53172) Signed-off-by: luohaha <[email protected]> (cherry picked from commit b141be8)
…53172) Signed-off-by: luohaha <[email protected]> (cherry picked from commit b141be8)
…53172) Signed-off-by: luohaha <[email protected]> (cherry picked from commit b141be8)
…ackport #53172) (#53233) Co-authored-by: Yixin Luo <[email protected]>
…ackport #53172) (#53232) Co-authored-by: Yixin Luo <[email protected]>
…ackport #53172) (#53230) Co-authored-by: Yixin Luo <[email protected]>
…ackport #53172) (#53231) Co-authored-by: Yixin Luo <[email protected]>
Why I'm doing:
In current implementation, we use
moodycamel::concurrentqueue
to reuse compression context, each time we start compress one block, we will try to dequeue ctx from pool, and then return ctx to pool after compression finish.And now we use implicit enqueue method, which causes an automatically-allocated thread-local producer sub-queue to be allocated, and it won't destroy after thread finish:
So after long running, sub-queue will keep growing without bound and slow down consumer.
And in doc (https://github.com/cameron314/concurrentqueue?tab=readme-ov-file#basic-use), author recommend to use explicit producer tokens instead.
What I'm doing:
Use explicit producer tokens to avoid the overhead of too many sub-ququeues.
This pull request introduces improvements to the compression context pool and adds new tests to ensure the robustness of the multi-threaded context retrieval. The most important changes include the addition of a producer token to optimize the context pool and the introduction of a new multi-threaded test.
Improvements to compression context pool:
be/src/util/compression/compression_context_pool.h
: Added a thread-localProducerToken
to theadd
method to reduce the overhead of multiple sub-queues when enqueuing contexts.Enhancements to testing:
be/test/util/block_compression_test.cpp
: Included thecompression_context_pool_singletons.h
header to support new tests.be/test/util/block_compression_test.cpp
: Added a new testtest_multi_thread_get_ctx
to verify the behavior of multi-threaded context retrieval from the LZ4F context pool.What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: