From 919cc14882d6d1968312b44ead6478530b48212b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 19 Oct 2024 09:44:31 +0800 Subject: [PATCH] [Enhancement] add default hash function and dump query resource usage (backport #52080) (#52112) Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com> --- be/src/column/column_hash.h | 206 +---------------- be/src/exec/join_hash_map.cpp | 2 +- .../exchange/exchange_sink_operator.h | 2 +- be/src/exec/pipeline/exchange/sink_buffer.h | 23 +- be/src/exec/pipeline/pipeline_driver.h | 2 +- be/src/exec/pipeline/query_context.cpp | 7 + be/src/exec/pipeline/query_context.h | 3 +- be/src/exprs/ngram.cpp | 2 +- be/src/runtime/data_stream_mgr.h | 2 +- be/src/runtime/local_pass_through_buffer.h | 3 +- be/src/runtime/sender_queue.h | 4 +- be/src/util/hash.h | 214 ++++++++++++++++++ 12 files changed, 246 insertions(+), 224 deletions(-) create mode 100644 be/src/util/hash.h diff --git a/be/src/column/column_hash.h b/be/src/column/column_hash.h index 21a8775fa591a..d1af2c50c6391 100644 --- a/be/src/column/column_hash.h +++ b/be/src/column/column_hash.h @@ -14,6 +14,7 @@ #pragma once +#include #include #ifdef __SSE4_2__ @@ -27,9 +28,8 @@ #include "column/type_traits.h" #include "types/logical_type.h" -#include "util/hash_util.hpp" +#include "util/hash.h" #include "util/slice.h" -#include "util/unaligned_access.h" #if defined(__aarch64__) #include "arm_acle.h" @@ -37,196 +37,6 @@ namespace starrocks { -typedef unsigned __int128 uint128_t; -inline uint64_t umul128(uint64_t a, uint64_t b, uint64_t* high) { - auto result = static_cast(a) * static_cast(b); - *high = static_cast(result >> 64u); - return static_cast(result); -} - -template -struct phmap_mix { - inline size_t operator()(size_t) const; -}; - -template <> -class phmap_mix<4> { -public: - inline size_t operator()(size_t a) const { - static constexpr uint64_t kmul = 0xcc9e2d51UL; - uint64_t l = a * kmul; - return static_cast(l ^ (l >> 32u)); - } -}; - -template <> -class phmap_mix<8> { -public: - // Very fast mixing (similar to Abseil) - inline size_t operator()(size_t a) const { - static constexpr uint64_t k = 0xde5fb9d2630458e9ULL; - uint64_t h; - uint64_t l = umul128(a, k, &h); - return static_cast(h + l); - } -}; - -enum PhmapSeed { PhmapSeed1, PhmapSeed2 }; - -template -class phmap_mix_with_seed { -public: - inline size_t operator()(size_t) const; -}; - -template <> -class phmap_mix_with_seed<4, PhmapSeed1> { -public: - inline size_t operator()(size_t a) const { - static constexpr uint64_t kmul = 0xcc9e2d51UL; - uint64_t l = a * kmul; - return static_cast(l ^ (l >> 32u)); - } -}; - -template <> -class phmap_mix_with_seed<8, PhmapSeed1> { -public: - inline size_t operator()(size_t a) const { - static constexpr uint64_t k = 0xde5fb9d2630458e9ULL; - uint64_t h; - uint64_t l = umul128(a, k, &h); - return static_cast(h + l); - } -}; - -template <> -class phmap_mix_with_seed<4, PhmapSeed2> { -public: - inline size_t operator()(size_t a) const { - static constexpr uint64_t kmul = 0xcc9e2d511d; - uint64_t l = a * kmul; - return static_cast(l ^ (l >> 32u)); - } -}; - -template <> -class phmap_mix_with_seed<8, PhmapSeed2> { -public: - inline size_t operator()(size_t a) const { - static constexpr uint64_t k = 0xde5fb9d263046000ULL; - uint64_t h; - uint64_t l = umul128(a, k, &h); - return static_cast(h + l); - } -}; - -inline uint32_t crc_hash_32(const void* data, int32_t bytes, uint32_t hash) { -#if defined(__x86_64__) && !defined(__SSE4_2__) - return static_cast(crc32(hash, (const unsigned char*)data, bytes)); -#else - uint32_t words = bytes / sizeof(uint32_t); - bytes = bytes % 4 /*sizeof(uint32_t)*/; - - auto* p = reinterpret_cast(data); - - while (words--) { -#if defined(__x86_64__) - hash = _mm_crc32_u32(hash, unaligned_load(p)); -#elif defined(__aarch64__) - hash = __crc32cw(hash, unaligned_load(p)); -#else -#error "Not supported architecture" -#endif - p += sizeof(uint32_t); - } - - while (bytes--) { -#if defined(__x86_64__) - hash = _mm_crc32_u8(hash, *p); -#elif defined(__aarch64__) - hash = __crc32cb(hash, *p); -#else -#error "Not supported architecture" -#endif - ++p; - } - - // The lower half of the CRC hash has has poor uniformity, so swap the halves - // for anyone who only uses the first several bits of the hash. - hash = (hash << 16u) | (hash >> 16u); - return hash; -#endif -} - -inline uint64_t crc_hash_64(const void* data, int32_t length, uint64_t hash) { -#if defined(__x86_64__) && !defined(__SSE4_2__) - return crc32(hash, (const unsigned char*)data, length); -#else - if (UNLIKELY(length < 8)) { - return crc_hash_32(data, length, static_cast(hash)); - } - - uint64_t words = length / sizeof(uint64_t); - auto* p = reinterpret_cast(data); - auto* end = reinterpret_cast(data) + length; - while (words--) { -#if defined(__x86_64__) && defined(__SSE4_2__) - hash = _mm_crc32_u64(hash, unaligned_load(p)); -#elif defined(__aarch64__) - hash = __crc32cd(hash, unaligned_load(p)); -#else -#error "Not supported architecture" -#endif - p += sizeof(uint64_t); - } - // Reduce the branch condition - p = end - 8; -#if defined(__x86_64__) - hash = _mm_crc32_u64(hash, unaligned_load(p)); -#elif defined(__aarch64__) - hash = __crc32cd(hash, unaligned_load(p)); -#else -#error "Not supported architecture" -#endif - p += sizeof(uint64_t); - return hash; -#endif -} - -// TODO: 0x811C9DC5 is not prime number -static const uint32_t CRC_HASH_SEED1 = 0x811C9DC5; -static const uint32_t CRC_HASH_SEED2 = 0x811C9DD7; - -class SliceHash { -public: - std::size_t operator()(const Slice& slice) const { - return crc_hash_64(slice.data, static_cast(slice.size), CRC_HASH_SEED1); - } -}; - -template -class SliceHashWithSeed { -public: - std::size_t operator()(const Slice& slice) const; -}; - -template <> -class SliceHashWithSeed { -public: - std::size_t operator()(const Slice& slice) const { - return crc_hash_64(slice.data, static_cast(slice.size), CRC_HASH_SEED1); - } -}; - -template <> -class SliceHashWithSeed { -public: - std::size_t operator()(const Slice& slice) const { - return crc_hash_64(slice.data, static_cast(slice.size), CRC_HASH_SEED2); - } -}; - #if defined(__SSE2__) && !defined(ADDRESS_SANITIZER) // NOTE: This function will access 15 excessive bytes after p1 and p2, which should has padding bytes when allocating @@ -272,18 +82,6 @@ class SliceNormalEqual { } }; -template -class StdHash { -public: - std::size_t operator()(T value) const { return phmap_mix()(std::hash()(value)); } -}; - -template -class StdHashWithSeed { -public: - std::size_t operator()(T value) const { return phmap_mix_with_seed()(std::hash()(value)); } -}; - inline uint64_t crc_hash_uint64(uint64_t value, uint64_t seed) { #if defined(__x86_64__) && defined(__SSE4_2__) return _mm_crc32_u64(seed, value); diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index db1752b31a1a7..4a55e6345e8ab 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -35,7 +35,7 @@ void HashTableProbeState::consider_probe_time_locality() { if ((probe_chunks & (detect_step - 1)) == 0) { int window_size = std::min(active_coroutines * 4, 50); if (probe_row_count > window_size) { - phmap::flat_hash_map occurrence; + phmap::flat_hash_map> occurrence; occurrence.reserve(probe_row_count); uint32_t unique_size = 0; bool enable_interleaving = true; diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.h b/be/src/exec/pipeline/exchange/exchange_sink_operator.h index 05dceb8c2fffd..59885e4e0e2f9 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.h +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.h @@ -146,7 +146,7 @@ class ExchangeSinkOperator final : public Operator { int32_t _encode_level = 0; // Will set in prepare int32_t _be_number = 0; - phmap::flat_hash_map> _instance_id2channel; + phmap::flat_hash_map, StdHash> _instance_id2channel; std::vector _channels; // index list for channels // We need a random order of sending channels to avoid rpc blocking at the same time. diff --git a/be/src/exec/pipeline/exchange/sink_buffer.h b/be/src/exec/pipeline/exchange/sink_buffer.h index a8745304a6c97..a6925934f4774 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.h +++ b/be/src/exec/pipeline/exchange/sink_buffer.h @@ -134,29 +134,30 @@ class SinkBuffer { /// because TUniqueId::hi is exactly the same in one query // num eos per instance - phmap::flat_hash_map _num_sinkers; - phmap::flat_hash_map _request_seqs; + phmap::flat_hash_map> _num_sinkers; + phmap::flat_hash_map> _request_seqs; // Considering the following situation // Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2, // and field transformation are as following // a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->() // b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3) // c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->() - phmap::flat_hash_map _max_continuous_acked_seqs; - phmap::flat_hash_map> _discontinuous_acked_seqs; + phmap::flat_hash_map> _max_continuous_acked_seqs; + phmap::flat_hash_map, StdHash> _discontinuous_acked_seqs; std::atomic _total_in_flight_rpc = 0; std::atomic _num_uncancelled_sinkers = 0; std::atomic _num_remaining_eos = 0; // The request needs the reference to the allocated finst id, // so cache finst id for each dest fragment instance. - phmap::flat_hash_map _instance_id2finst_id; - phmap::flat_hash_map>> _buffers; - phmap::flat_hash_map _num_finished_rpcs; - phmap::flat_hash_map _num_in_flight_rpcs; - phmap::flat_hash_map _network_times; - phmap::flat_hash_map> _mutexes; - phmap::flat_hash_map _dest_addrs; + phmap::flat_hash_map> _instance_id2finst_id; + phmap::flat_hash_map>, StdHash> + _buffers; + phmap::flat_hash_map> _num_finished_rpcs; + phmap::flat_hash_map> _num_in_flight_rpcs; + phmap::flat_hash_map> _network_times; + phmap::flat_hash_map, StdHash> _mutexes; + phmap::flat_hash_map> _dest_addrs; // True means that SinkBuffer needn't input chunk and send chunk anymore, // but there may be still in-flight RPC running. diff --git a/be/src/exec/pipeline/pipeline_driver.h b/be/src/exec/pipeline/pipeline_driver.h index e2cf6474edf15..5b112b23a68b3 100644 --- a/be/src/exec/pipeline/pipeline_driver.h +++ b/be/src/exec/pipeline/pipeline_driver.h @@ -518,7 +518,7 @@ class PipelineDriver { DriverState _state{DriverState::NOT_READY}; std::shared_ptr _runtime_profile = nullptr; - phmap::flat_hash_map _operator_stages; + phmap::flat_hash_map> _operator_stages; workgroup::WorkGroupPtr _workgroup = nullptr; DriverQueue* _in_queue = nullptr; diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index 4222e0230ddee..c4ecc6fc806fb 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -56,6 +56,13 @@ QueryContext::~QueryContext() noexcept { // remaining other RuntimeStates after the current RuntimeState is freed, MemChunkAllocator uses the MemTracker of the // current RuntimeState to release Operators, OperatorFactories in the remaining RuntimeStates will trigger // segmentation fault. + if (_mem_tracker != nullptr) { + LOG(INFO) << fmt::format( + "finished query_id:{} context life time:{} cpu costs:{} peak memusage:{} scan_bytes:{} spilled " + "bytes:{}", + print_id(query_id()), lifetime(), cpu_cost(), mem_cost_bytes(), get_scan_bytes(), get_spill_bytes()); + } + { SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get()); _fragment_mgr.reset(); diff --git a/be/src/exec/pipeline/query_context.h b/be/src/exec/pipeline/query_context.h index db2092580c7cf..e7a444ff6e8da 100644 --- a/be/src/exec/pipeline/query_context.h +++ b/be/src/exec/pipeline/query_context.h @@ -31,6 +31,7 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "util/debug/query_trace.h" +#include "util/hash.h" #include "util/hash_util.hpp" #include "util/spinlock.h" #include "util/time.h" @@ -292,7 +293,7 @@ class QueryContext : public std::enable_shared_from_this { // we use spinlock + flat_hash_map here, after upgrading, we can change it to parallel_flat_hash_map SpinLock _scan_stats_lock; // table level scan stats - phmap::flat_hash_map> _scan_stats; + phmap::flat_hash_map, StdHash> _scan_stats; bool _is_final_sink = false; std::shared_ptr _sub_plan_query_statistics_recvr; // For receive diff --git a/be/src/exprs/ngram.cpp b/be/src/exprs/ngram.cpp index a81e2f237cfac..843889cc8030b 100644 --- a/be/src/exprs/ngram.cpp +++ b/be/src/exprs/ngram.cpp @@ -294,7 +294,7 @@ class NgramFunctionImpl { } static NgramHash getAsciiHash(const Gram* ch, size_t gram_num) { - return crc_hash_32(ch, gram_num, CRC_HASH_SEED1) & (0xffffu); + return crc_hash_32(ch, gram_num, CRC_HASH_SEEDS::CRC_HASH_SEED1) & (0xffffu); } }; diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index ae6ec24eda427..2971b74991125 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -118,7 +118,7 @@ class DataStreamMgr { // map from hash value of fragment instance id/node id pair to stream receivers; // Ownership of the stream revcr is shared between this instance and the caller of // create_recvr(). - typedef phmap::flat_hash_map> RecvrMap; + typedef phmap::flat_hash_map, StdHash> RecvrMap; typedef phmap::flat_hash_map> StreamMap; StreamMap _receiver_map[BUCKET_NUM]; std::atomic _fragment_count{0}; diff --git a/be/src/runtime/local_pass_through_buffer.h b/be/src/runtime/local_pass_through_buffer.h index 55faba2d67c41..5eeeb353e5b4f 100644 --- a/be/src/runtime/local_pass_through_buffer.h +++ b/be/src/runtime/local_pass_through_buffer.h @@ -19,6 +19,7 @@ #include "column/vectorized_fwd.h" #include "gen_cpp/Types_types.h" // for TUniqueId #include "runtime/descriptors.h" // for PlanNodeId +#include "util/hash.h" namespace starrocks { @@ -32,7 +33,7 @@ class PassThroughChunkBuffer { struct KeyHash { size_t operator()(const Key& key) const { - uint64_t hash = CRC_HASH_SEED1; + uint64_t hash = CRC_HASH_SEEDS::CRC_HASH_SEED1; hash = crc_hash_uint64(std::get<0>(key).hi, hash); hash = crc_hash_uint64(std::get<0>(key).lo, hash); hash = crc_hash_uint64(std::get<1>(key), hash); diff --git a/be/src/runtime/sender_queue.h b/be/src/runtime/sender_queue.h index d55fbb947b24b..584459605fa54 100644 --- a/be/src/runtime/sender_queue.h +++ b/be/src/runtime/sender_queue.h @@ -260,11 +260,11 @@ class DataStreamRecvr::PipelineSenderQueue final : public DataStreamRecvr::Sende // distribution of received sequence numbers: // part1: { sequence | 1 <= sequence <= _max_processed_sequence } // part2: { sequence | seq = _max_processed_sequence + i, i > 1 } - phmap::flat_hash_map _max_processed_sequences; + phmap::flat_hash_map> _max_processed_sequences; // chunk request may be out-of-order, but we have to deal with it in order // key of first level is be_number // key of second level is request sequence - phmap::flat_hash_map> _buffered_chunk_queues; + phmap::flat_hash_map, StdHash> _buffered_chunk_queues; std::atomic _is_chunk_meta_built{false}; diff --git a/be/src/util/hash.h b/be/src/util/hash.h new file mode 100644 index 0000000000000..da0ae46e14241 --- /dev/null +++ b/be/src/util/hash.h @@ -0,0 +1,214 @@ +#pragma once +#include +#include + +#include "util/hash_util.hpp" +#include "util/slice.h" +#include "util/unaligned_access.h" + +namespace starrocks { + +typedef unsigned __int128 uint128_t; +inline uint64_t umul128(uint64_t a, uint64_t b, uint64_t* high) { + auto result = static_cast(a) * static_cast(b); + *high = static_cast(result >> 64u); + return static_cast(result); +} + +template +struct phmap_mix { + inline size_t operator()(size_t) const; +}; + +template <> +class phmap_mix<4> { +public: + inline size_t operator()(size_t a) const { + static constexpr uint64_t kmul = 0xcc9e2d51UL; + uint64_t l = a * kmul; + return static_cast(l ^ (l >> 32u)); + } +}; + +template <> +class phmap_mix<8> { +public: + // Very fast mixing (similar to Abseil) + inline size_t operator()(size_t a) const { + static constexpr uint64_t k = 0xde5fb9d2630458e9ULL; + uint64_t h; + uint64_t l = umul128(a, k, &h); + return static_cast(h + l); + } +}; + +enum PhmapSeed { PhmapSeed1, PhmapSeed2 }; + +template +class phmap_mix_with_seed { +public: + inline size_t operator()(size_t) const; +}; + +template <> +class phmap_mix_with_seed<4, PhmapSeed1> { +public: + inline size_t operator()(size_t a) const { + static constexpr uint64_t kmul = 0xcc9e2d51UL; + uint64_t l = a * kmul; + return static_cast(l ^ (l >> 32u)); + } +}; + +template <> +class phmap_mix_with_seed<8, PhmapSeed1> { +public: + inline size_t operator()(size_t a) const { + static constexpr uint64_t k = 0xde5fb9d2630458e9ULL; + uint64_t h; + uint64_t l = umul128(a, k, &h); + return static_cast(h + l); + } +}; + +template <> +class phmap_mix_with_seed<4, PhmapSeed2> { +public: + inline size_t operator()(size_t a) const { + static constexpr uint64_t kmul = 0xcc9e2d511d; + uint64_t l = a * kmul; + return static_cast(l ^ (l >> 32u)); + } +}; + +template <> +class phmap_mix_with_seed<8, PhmapSeed2> { +public: + inline size_t operator()(size_t a) const { + static constexpr uint64_t k = 0xde5fb9d263046000ULL; + uint64_t h; + uint64_t l = umul128(a, k, &h); + return static_cast(h + l); + } +}; + +inline uint32_t crc_hash_32(const void* data, int32_t bytes, uint32_t hash) { +#if defined(__x86_64__) && !defined(__SSE4_2__) + return static_cast(crc32(hash, (const unsigned char*)data, bytes)); +#else + uint32_t words = bytes / sizeof(uint32_t); + bytes = bytes % 4 /*sizeof(uint32_t)*/; + + auto* p = reinterpret_cast(data); + + while (words--) { +#if defined(__x86_64__) + hash = _mm_crc32_u32(hash, unaligned_load(p)); +#elif defined(__aarch64__) + hash = __crc32cw(hash, unaligned_load(p)); +#else +#error "Not supported architecture" +#endif + p += sizeof(uint32_t); + } + + while (bytes--) { +#if defined(__x86_64__) + hash = _mm_crc32_u8(hash, *p); +#elif defined(__aarch64__) + hash = __crc32cb(hash, *p); +#else +#error "Not supported architecture" +#endif + ++p; + } + + // The lower half of the CRC hash has has poor uniformity, so swap the halves + // for anyone who only uses the first several bits of the hash. + hash = (hash << 16u) | (hash >> 16u); + return hash; +#endif +} + +inline uint64_t crc_hash_64(const void* data, int32_t length, uint64_t hash) { +#if defined(__x86_64__) && !defined(__SSE4_2__) + return crc32(hash, (const unsigned char*)data, length); +#else + if (UNLIKELY(length < 8)) { + return crc_hash_32(data, length, static_cast(hash)); + } + + uint64_t words = length / sizeof(uint64_t); + auto* p = reinterpret_cast(data); + auto* end = reinterpret_cast(data) + length; + while (words--) { +#if defined(__x86_64__) && defined(__SSE4_2__) + hash = _mm_crc32_u64(hash, unaligned_load(p)); +#elif defined(__aarch64__) + hash = __crc32cd(hash, unaligned_load(p)); +#else +#error "Not supported architecture" +#endif + p += sizeof(uint64_t); + } + // Reduce the branch condition + p = end - 8; +#if defined(__x86_64__) + hash = _mm_crc32_u64(hash, unaligned_load(p)); +#elif defined(__aarch64__) + hash = __crc32cd(hash, unaligned_load(p)); +#else +#error "Not supported architecture" +#endif + p += sizeof(uint64_t); + return hash; +#endif +} + +struct CRC_HASH_SEEDS { + // TODO: 0x811C9DC5 is not prime number + static const uint32_t CRC_HASH_SEED1 = 0x811C9DC5; + static const uint32_t CRC_HASH_SEED2 = 0x811C9DD7; +}; + +class SliceHash { +public: + std::size_t operator()(const Slice& slice) const { + return crc_hash_64(slice.data, static_cast(slice.size), CRC_HASH_SEEDS::CRC_HASH_SEED1); + } +}; + +template +class SliceHashWithSeed { +public: + std::size_t operator()(const Slice& slice) const; +}; + +template <> +class SliceHashWithSeed { +public: + std::size_t operator()(const Slice& slice) const { + return crc_hash_64(slice.data, static_cast(slice.size), CRC_HASH_SEEDS::CRC_HASH_SEED1); + } +}; + +template <> +class SliceHashWithSeed { +public: + std::size_t operator()(const Slice& slice) const { + return crc_hash_64(slice.data, static_cast(slice.size), CRC_HASH_SEEDS::CRC_HASH_SEED2); + } +}; + +template +class StdHash { +public: + std::size_t operator()(T value) const { return phmap_mix()(std::hash()(value)); } +}; + +template +class StdHashWithSeed { +public: + std::size_t operator()(T value) const { return phmap_mix_with_seed()(std::hash()(value)); } +}; +} // namespace starrocks \ No newline at end of file