Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add default hash function and dump query resource usage (backport #52080) #52112

Merged
merged 1 commit into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 2 additions & 204 deletions be/src/column/column_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <cstddef>
#include <cstdint>

#ifdef __SSE4_2__
Expand All @@ -27,206 +28,15 @@

#include "column/type_traits.h"
#include "types/logical_type.h"
#include "util/hash_util.hpp"
#include "util/hash.h"
#include "util/slice.h"
#include "util/unaligned_access.h"

#if defined(__aarch64__)
#include "arm_acle.h"
#endif

namespace starrocks {

typedef unsigned __int128 uint128_t;
inline uint64_t umul128(uint64_t a, uint64_t b, uint64_t* high) {
auto result = static_cast<uint128_t>(a) * static_cast<uint128_t>(b);
*high = static_cast<uint64_t>(result >> 64u);
return static_cast<uint64_t>(result);
}

template <int n>
struct phmap_mix {
inline size_t operator()(size_t) const;
};

template <>
class phmap_mix<4> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t kmul = 0xcc9e2d51UL;
uint64_t l = a * kmul;
return static_cast<size_t>(l ^ (l >> 32u));
}
};

template <>
class phmap_mix<8> {
public:
// Very fast mixing (similar to Abseil)
inline size_t operator()(size_t a) const {
static constexpr uint64_t k = 0xde5fb9d2630458e9ULL;
uint64_t h;
uint64_t l = umul128(a, k, &h);
return static_cast<size_t>(h + l);
}
};

enum PhmapSeed { PhmapSeed1, PhmapSeed2 };

template <int n, PhmapSeed seed>
class phmap_mix_with_seed {
public:
inline size_t operator()(size_t) const;
};

template <>
class phmap_mix_with_seed<4, PhmapSeed1> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t kmul = 0xcc9e2d51UL;
uint64_t l = a * kmul;
return static_cast<size_t>(l ^ (l >> 32u));
}
};

template <>
class phmap_mix_with_seed<8, PhmapSeed1> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t k = 0xde5fb9d2630458e9ULL;
uint64_t h;
uint64_t l = umul128(a, k, &h);
return static_cast<size_t>(h + l);
}
};

template <>
class phmap_mix_with_seed<4, PhmapSeed2> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t kmul = 0xcc9e2d511d;
uint64_t l = a * kmul;
return static_cast<size_t>(l ^ (l >> 32u));
}
};

template <>
class phmap_mix_with_seed<8, PhmapSeed2> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t k = 0xde5fb9d263046000ULL;
uint64_t h;
uint64_t l = umul128(a, k, &h);
return static_cast<size_t>(h + l);
}
};

inline uint32_t crc_hash_32(const void* data, int32_t bytes, uint32_t hash) {
#if defined(__x86_64__) && !defined(__SSE4_2__)
return static_cast<uint32_t>(crc32(hash, (const unsigned char*)data, bytes));
#else
uint32_t words = bytes / sizeof(uint32_t);
bytes = bytes % 4 /*sizeof(uint32_t)*/;

auto* p = reinterpret_cast<const uint8_t*>(data);

while (words--) {
#if defined(__x86_64__)
hash = _mm_crc32_u32(hash, unaligned_load<uint32_t>(p));
#elif defined(__aarch64__)
hash = __crc32cw(hash, unaligned_load<uint32_t>(p));
#else
#error "Not supported architecture"
#endif
p += sizeof(uint32_t);
}

while (bytes--) {
#if defined(__x86_64__)
hash = _mm_crc32_u8(hash, *p);
#elif defined(__aarch64__)
hash = __crc32cb(hash, *p);
#else
#error "Not supported architecture"
#endif
++p;
}

// The lower half of the CRC hash has has poor uniformity, so swap the halves
// for anyone who only uses the first several bits of the hash.
hash = (hash << 16u) | (hash >> 16u);
return hash;
#endif
}

inline uint64_t crc_hash_64(const void* data, int32_t length, uint64_t hash) {
#if defined(__x86_64__) && !defined(__SSE4_2__)
return crc32(hash, (const unsigned char*)data, length);
#else
if (UNLIKELY(length < 8)) {
return crc_hash_32(data, length, static_cast<uint32_t>(hash));
}

uint64_t words = length / sizeof(uint64_t);
auto* p = reinterpret_cast<const uint8_t*>(data);
auto* end = reinterpret_cast<const uint8_t*>(data) + length;
while (words--) {
#if defined(__x86_64__) && defined(__SSE4_2__)
hash = _mm_crc32_u64(hash, unaligned_load<uint64_t>(p));
#elif defined(__aarch64__)
hash = __crc32cd(hash, unaligned_load<uint64_t>(p));
#else
#error "Not supported architecture"
#endif
p += sizeof(uint64_t);
}
// Reduce the branch condition
p = end - 8;
#if defined(__x86_64__)
hash = _mm_crc32_u64(hash, unaligned_load<uint64_t>(p));
#elif defined(__aarch64__)
hash = __crc32cd(hash, unaligned_load<uint64_t>(p));
#else
#error "Not supported architecture"
#endif
p += sizeof(uint64_t);
return hash;
#endif
}

// TODO: 0x811C9DC5 is not prime number
static const uint32_t CRC_HASH_SEED1 = 0x811C9DC5;
static const uint32_t CRC_HASH_SEED2 = 0x811C9DD7;

class SliceHash {
public:
std::size_t operator()(const Slice& slice) const {
return crc_hash_64(slice.data, static_cast<int32_t>(slice.size), CRC_HASH_SEED1);
}
};

template <PhmapSeed>
class SliceHashWithSeed {
public:
std::size_t operator()(const Slice& slice) const;
};

template <>
class SliceHashWithSeed<PhmapSeed1> {
public:
std::size_t operator()(const Slice& slice) const {
return crc_hash_64(slice.data, static_cast<int32_t>(slice.size), CRC_HASH_SEED1);
}
};

template <>
class SliceHashWithSeed<PhmapSeed2> {
public:
std::size_t operator()(const Slice& slice) const {
return crc_hash_64(slice.data, static_cast<int32_t>(slice.size), CRC_HASH_SEED2);
}
};

#if defined(__SSE2__) && !defined(ADDRESS_SANITIZER)

// NOTE: This function will access 15 excessive bytes after p1 and p2, which should has padding bytes when allocating
Expand Down Expand Up @@ -272,18 +82,6 @@ class SliceNormalEqual {
}
};

template <class T>
class StdHash {
public:
std::size_t operator()(T value) const { return phmap_mix<sizeof(size_t)>()(std::hash<T>()(value)); }
};

template <class T, PhmapSeed seed>
class StdHashWithSeed {
public:
std::size_t operator()(T value) const { return phmap_mix_with_seed<sizeof(size_t), seed>()(std::hash<T>()(value)); }
};

inline uint64_t crc_hash_uint64(uint64_t value, uint64_t seed) {
#if defined(__x86_64__) && defined(__SSE4_2__)
return _mm_crc32_u64(seed, value);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/join_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void HashTableProbeState::consider_probe_time_locality() {
if ((probe_chunks & (detect_step - 1)) == 0) {
int window_size = std::min(active_coroutines * 4, 50);
if (probe_row_count > window_size) {
phmap::flat_hash_map<uint32_t, uint32_t> occurrence;
phmap::flat_hash_map<uint32_t, uint32_t, StdHash<uint32_t>> occurrence;
occurrence.reserve(probe_row_count);
uint32_t unique_size = 0;
bool enable_interleaving = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/exchange/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class ExchangeSinkOperator final : public Operator {
int32_t _encode_level = 0;
// Will set in prepare
int32_t _be_number = 0;
phmap::flat_hash_map<int64_t, std::unique_ptr<Channel>> _instance_id2channel;
phmap::flat_hash_map<int64_t, std::unique_ptr<Channel>, StdHash<int64_t>> _instance_id2channel;
std::vector<Channel*> _channels;
// index list for channels
// We need a random order of sending channels to avoid rpc blocking at the same time.
Expand Down
23 changes: 12 additions & 11 deletions be/src/exec/pipeline/exchange/sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,30 @@ class SinkBuffer {
/// because TUniqueId::hi is exactly the same in one query

// num eos per instance
phmap::flat_hash_map<int64_t, int64_t> _num_sinkers;
phmap::flat_hash_map<int64_t, int64_t> _request_seqs;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _num_sinkers;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _request_seqs;
// Considering the following situation
// Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2,
// and field transformation are as following
// a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->()
// b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3)
// c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->()
phmap::flat_hash_map<int64_t, int64_t> _max_continuous_acked_seqs;
phmap::flat_hash_map<int64_t, std::unordered_set<int64_t>> _discontinuous_acked_seqs;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _max_continuous_acked_seqs;
phmap::flat_hash_map<int64_t, std::unordered_set<int64_t>, StdHash<int64_t>> _discontinuous_acked_seqs;
std::atomic<int32_t> _total_in_flight_rpc = 0;
std::atomic<int32_t> _num_uncancelled_sinkers = 0;
std::atomic<int32_t> _num_remaining_eos = 0;

// The request needs the reference to the allocated finst id,
// so cache finst id for each dest fragment instance.
phmap::flat_hash_map<int64_t, PUniqueId> _instance_id2finst_id;
phmap::flat_hash_map<int64_t, std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>>> _buffers;
phmap::flat_hash_map<int64_t, int32_t> _num_finished_rpcs;
phmap::flat_hash_map<int64_t, int32_t> _num_in_flight_rpcs;
phmap::flat_hash_map<int64_t, TimeTrace> _network_times;
phmap::flat_hash_map<int64_t, std::unique_ptr<Mutex>> _mutexes;
phmap::flat_hash_map<int64_t, TNetworkAddress> _dest_addrs;
phmap::flat_hash_map<int64_t, PUniqueId, StdHash<int64_t>> _instance_id2finst_id;
phmap::flat_hash_map<int64_t, std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>>, StdHash<int64_t>>
_buffers;
phmap::flat_hash_map<int64_t, int32_t, StdHash<int64_t>> _num_finished_rpcs;
phmap::flat_hash_map<int64_t, int32_t, StdHash<int64_t>> _num_in_flight_rpcs;
phmap::flat_hash_map<int64_t, TimeTrace, StdHash<int64_t>> _network_times;
phmap::flat_hash_map<int64_t, std::unique_ptr<Mutex>, StdHash<int64_t>> _mutexes;
phmap::flat_hash_map<int64_t, TNetworkAddress, StdHash<int64_t>> _dest_addrs;

// True means that SinkBuffer needn't input chunk and send chunk anymore,
// but there may be still in-flight RPC running.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class PipelineDriver {
DriverState _state{DriverState::NOT_READY};
std::shared_ptr<RuntimeProfile> _runtime_profile = nullptr;

phmap::flat_hash_map<int32_t, OperatorStage> _operator_stages;
phmap::flat_hash_map<int32_t, OperatorStage, StdHash<int32_t>> _operator_stages;

workgroup::WorkGroupPtr _workgroup = nullptr;
DriverQueue* _in_queue = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ QueryContext::~QueryContext() noexcept {
// remaining other RuntimeStates after the current RuntimeState is freed, MemChunkAllocator uses the MemTracker of the
// current RuntimeState to release Operators, OperatorFactories in the remaining RuntimeStates will trigger
// segmentation fault.
if (_mem_tracker != nullptr) {
LOG(INFO) << fmt::format(
"finished query_id:{} context life time:{} cpu costs:{} peak memusage:{} scan_bytes:{} spilled "
"bytes:{}",
print_id(query_id()), lifetime(), cpu_cost(), mem_cost_bytes(), get_scan_bytes(), get_spill_bytes());
}

{
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get());
_fragment_mgr.reset();
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "util/debug/query_trace.h"
#include "util/hash.h"
#include "util/hash_util.hpp"
#include "util/spinlock.h"
#include "util/time.h"
Expand Down Expand Up @@ -292,7 +293,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
// we use spinlock + flat_hash_map here, after upgrading, we can change it to parallel_flat_hash_map
SpinLock _scan_stats_lock;
// table level scan stats
phmap::flat_hash_map<int64_t, std::shared_ptr<ScanStats>> _scan_stats;
phmap::flat_hash_map<int64_t, std::shared_ptr<ScanStats>, StdHash<int64_t>> _scan_stats;

bool _is_final_sink = false;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr; // For receive
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/ngram.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class NgramFunctionImpl {
}

static NgramHash getAsciiHash(const Gram* ch, size_t gram_num) {
return crc_hash_32(ch, gram_num, CRC_HASH_SEED1) & (0xffffu);
return crc_hash_32(ch, gram_num, CRC_HASH_SEEDS::CRC_HASH_SEED1) & (0xffffu);
}
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class DataStreamMgr {
// map from hash value of fragment instance id/node id pair to stream receivers;
// Ownership of the stream revcr is shared between this instance and the caller of
// create_recvr().
typedef phmap::flat_hash_map<PlanNodeId, std::shared_ptr<DataStreamRecvr>> RecvrMap;
typedef phmap::flat_hash_map<PlanNodeId, std::shared_ptr<DataStreamRecvr>, StdHash<PlanNodeId>> RecvrMap;
typedef phmap::flat_hash_map<TUniqueId, std::shared_ptr<RecvrMap>> StreamMap;
StreamMap _receiver_map[BUCKET_NUM];
std::atomic<uint32_t> _fragment_count{0};
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/local_pass_through_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "column/vectorized_fwd.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/descriptors.h" // for PlanNodeId
#include "util/hash.h"

namespace starrocks {

Expand All @@ -32,7 +33,7 @@ class PassThroughChunkBuffer {

struct KeyHash {
size_t operator()(const Key& key) const {
uint64_t hash = CRC_HASH_SEED1;
uint64_t hash = CRC_HASH_SEEDS::CRC_HASH_SEED1;
hash = crc_hash_uint64(std::get<0>(key).hi, hash);
hash = crc_hash_uint64(std::get<0>(key).lo, hash);
hash = crc_hash_uint64(std::get<1>(key), hash);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/sender_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ class DataStreamRecvr::PipelineSenderQueue final : public DataStreamRecvr::Sende
// distribution of received sequence numbers:
// part1: { sequence | 1 <= sequence <= _max_processed_sequence }
// part2: { sequence | seq = _max_processed_sequence + i, i > 1 }
phmap::flat_hash_map<int, int64_t> _max_processed_sequences;
phmap::flat_hash_map<int, int64_t, StdHash<int>> _max_processed_sequences;
// chunk request may be out-of-order, but we have to deal with it in order
// key of first level is be_number
// key of second level is request sequence
phmap::flat_hash_map<int, phmap::flat_hash_map<int64_t, ChunkList>> _buffered_chunk_queues;
phmap::flat_hash_map<int, phmap::flat_hash_map<int64_t, ChunkList>, StdHash<int>> _buffered_chunk_queues;

std::atomic<bool> _is_chunk_meta_built{false};

Expand Down
Loading
Loading