diff --git a/be/src/util/compression/compression_context_pool.h b/be/src/util/compression/compression_context_pool.h index b3c9f66546298..aca1b25bf0668 100644 --- a/be/src/util/compression/compression_context_pool.h +++ b/be/src/util/compression/compression_context_pool.h @@ -114,6 +114,8 @@ class CompressionContextPool { private: void add(InternalRef ptr) { + // Use explicit producer token to avoid the overhead of too many sub-queues + static thread_local ::moodycamel::ProducerToken producer_token(_ctx_resources); DCHECK(ptr); Status status = _resetter(ptr.get()); // if reset fail, then delete this context @@ -121,7 +123,7 @@ class CompressionContextPool { return; } - _ctx_resources.enqueue(std::move(ptr)); + _ctx_resources.enqueue(producer_token, std::move(ptr)); } Creator _creator; diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index 46a5b4d51cebf..7cb41fbc966b1 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -40,6 +40,7 @@ #include #include "gen_cpp/segment.pb.h" +#include "util/compression/compression_context_pool_singletons.h" #include "util/faststring.h" #include "util/random.h" #include "util/raw_container.h" @@ -386,6 +387,22 @@ TEST_F(BlockCompressionTest, LZ4F_compression_LARGE_PAGE_TEST) { ASSERT_TRUE(st.ok()); } +TEST_F(BlockCompressionTest, test_multi_thread_get_ctx) { + for (int j = 0; j < 10; j++) { + std::vector workers; + for (int cnt = 0; cnt < 30; cnt++) { + workers.emplace_back([]() { + for (uint64_t i = 1; i < 1000; i++) { + StatusOr ref = compression::getLZ4F_CCtx(); + } + }); + } + for (auto& worker : workers) { + worker.join(); + } + } +} + //#define LZ4_BENCHMARK //#define LZ4F_BENCHMARK //#define ZSTD_BENCHMARK