From cd7f90e4a0da2cb1f1f62ba2c1507de9308cc029 Mon Sep 17 00:00:00 2001 From: Yingfeng Date: Wed, 24 Jan 2024 12:06:40 +0800 Subject: [PATCH] Add lock free vector with multiple producer for real time indexing (#470) Rename previous LockFreeVector to SPLockFreeVector which means lfvector for single producer(lock is required if multiple writer is used) Add MPLockFreeVector which allows for multiple producers SPLockFreeVector has higher performance which will be used for near real time indexing. --- src/common/stl.cppm | 14 +- .../common/lock_free_vector.cppm | 3 +- .../invertedindex/common/lock_free_vector.h | 222 ++++++++++++------ .../invertedindex/common/lock_free_vector.cpp | 76 ++++-- 4 files changed, 223 insertions(+), 92 deletions(-) diff --git a/src/common/stl.cppm b/src/common/stl.cppm index 51cb2dca75..8d1e8ac0e0 100644 --- a/src/common/stl.cppm +++ b/src/common/stl.cppm @@ -81,13 +81,19 @@ using std::is_same; using std::fill; using std::lower_bound; -using std::shared_mutex; +using std::condition_variable; +using std::lock_guard; +using std::memory_order; +using std::memory_order_acq_rel; +using std::memory_order_acquire; +using std::memory_order_consume; +using std::memory_order_relaxed; +using std::memory_order_release; +using std::memory_order_seq_cst; using std::mutex; using std::shared_lock; +using std::shared_mutex; using std::unique_lock; -using std::lock_guard; -using std::condition_variable; -using std::memory_order; using std::forward_list; using std::isalpha; diff --git a/src/storage/invertedindex/common/lock_free_vector.cppm b/src/storage/invertedindex/common/lock_free_vector.cppm index 9ce71960d9..89b19fabd2 100644 --- a/src/storage/invertedindex/common/lock_free_vector.cppm +++ b/src/storage/invertedindex/common/lock_free_vector.cppm @@ -5,5 +5,6 @@ module; export module lock_free_vector; namespace infinity { -export using infinity::LockFreeVector; +export using infinity::SPLockFreeVector; +export using infinity::MPLockFreeVector; } diff --git a/src/storage/invertedindex/common/lock_free_vector.h b/src/storage/invertedindex/common/lock_free_vector.h index fbfba06afb..7c9f70d2b4 100644 --- a/src/storage/invertedindex/common/lock_free_vector.h +++ b/src/storage/invertedindex/common/lock_free_vector.h @@ -13,7 +13,7 @@ // limitations under the License. #pragma once - +#include import stl; import spinlock; import memory_pool; @@ -23,24 +23,24 @@ import infinity_exception; namespace infinity { // According to "Lock-free dynamically resizable arrays,D Dechev, 2006" -template -class LockFreeVector { +template +class SPLockFreeVector { using ScopedWrite = std::unique_lock; public: - static u32 constexpr INITIAL_CAPACITY = 32; - static u32 constexpr INITIAL_SIZE = 8; + static u32 constexpr BUCKET_COUNT = 32; + static u32 constexpr INITIAL_BUCKET_SIZE = 8; static u32 constexpr INIT_BIT = 3; - LockFreeVector(MemoryPool *pool); + SPLockFreeVector(MemoryPool *pool); - LockFreeVector(const u32 n, MemoryPool *pool); + SPLockFreeVector(const u32 n, MemoryPool *pool); - LockFreeVector(const LockFreeVector &other); + SPLockFreeVector(const SPLockFreeVector &other); - ~LockFreeVector(); + ~SPLockFreeVector(); - LockFreeVector &operator=(const LockFreeVector &); + SPLockFreeVector &operator=(const SPLockFreeVector &); u32 Size() const { return size_; } @@ -84,25 +84,25 @@ class LockFreeVector { PoolAllocator alloc_; - T *slots_[INITIAL_CAPACITY]; + T *buckets_[BUCKET_COUNT]; u32 size_; SpinLock lock_; }; -template -LockFreeVector::LockFreeVector(MemoryPool *pool) : alloc_(pool), size_(0) { - std::memset(slots_, 0, INITIAL_CAPACITY * sizeof(T *)); +template +SPLockFreeVector::SPLockFreeVector(MemoryPool *pool) : alloc_(pool), size_(0) { + std::memset(buckets_, 0, BUCKET_COUNT * sizeof(T *)); } -template -LockFreeVector::LockFreeVector(const u32 n, MemoryPool *pool) : alloc_(pool), size_(n) { +template +SPLockFreeVector::SPLockFreeVector(const u32 n, MemoryPool *pool) : alloc_(pool), size_(n) { InitMemory(n); } template -LockFreeVector::LockFreeVector(const LockFreeVector &other) : alloc_(other.alloc_), size_(other.size_) { +SPLockFreeVector::SPLockFreeVector(const SPLockFreeVector &other) : alloc_(other.alloc_), size_(other.size_) { InitMemory(size_); for (u32 idx = 0; idx < size_; ++idx) { InternalAt(idx) = other.InternalAt(idx); @@ -110,14 +110,14 @@ LockFreeVector::LockFreeVector(const LockFreeVector &other) : alloc_(other } template -LockFreeVector::~LockFreeVector() { - for (u32 i = 0; i < INITIAL_CAPACITY; ++i) - if (slots_[i] != nullptr) - alloc_.deallocate(slots_[i], INITIAL_SIZE * (1 << i)); +SPLockFreeVector::~SPLockFreeVector() { + for (u32 i = 0; i < BUCKET_COUNT; ++i) + if (buckets_[i] != nullptr) + alloc_.deallocate(buckets_[i], INITIAL_BUCKET_SIZE * (1 << i)); } template -LockFreeVector &LockFreeVector::operator=(const LockFreeVector &rhs) { +SPLockFreeVector &SPLockFreeVector::operator=(const SPLockFreeVector &rhs) { if (*this == rhs) return *this; Clear(); @@ -129,72 +129,72 @@ LockFreeVector &LockFreeVector::operator=(const LockFreeVector &rhs) { return *this; } -template -void LockFreeVector::InitMemory(u32 n) { - std::memset(slots_, 0, INITIAL_CAPACITY * sizeof(T *)); - i32 slot = HighestBit(n + INITIAL_SIZE - 1) - INIT_BIT; - for (u32 i = 0; i <= slot; ++i) { - u32 slot_size = INITIAL_SIZE * (1 << slot); - slots_[i] = alloc_.allocate(slot_size); +template +void SPLockFreeVector::InitMemory(u32 n) { + std::memset(buckets_, 0, BUCKET_COUNT * sizeof(T *)); + i32 bucket = HighestBit(n + INITIAL_BUCKET_SIZE - 1) - INIT_BIT; + for (u32 i = 0; i <= bucket; ++i) { + u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket); + buckets_[i] = alloc_.allocate(bucket_size); } } template -T &LockFreeVector::InternalAt(u32 n) { - u32 pos = n + INITIAL_SIZE; +T &SPLockFreeVector::InternalAt(u32 n) { + u32 pos = n + INITIAL_BUCKET_SIZE; u32 hi_bit = HighestBit(pos); u32 idx = pos ^ (1 << hi_bit); - return slots_[hi_bit - INIT_BIT][idx]; + return buckets_[hi_bit - INIT_BIT][idx]; } template -const T &LockFreeVector::InternalAt(u32 n) const { - u32 pos = n + INITIAL_SIZE; +const T &SPLockFreeVector::InternalAt(u32 n) const { + u32 pos = n + INITIAL_BUCKET_SIZE; u32 hi_bit = HighestBit(pos); u32 idx = pos ^ (1 << hi_bit); - return slots_[hi_bit - INIT_BIT][idx]; + return buckets_[hi_bit - INIT_BIT][idx]; } template -const T &LockFreeVector::At(u32 n) const { +const T &SPLockFreeVector::At(u32 n) const { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } template -T &LockFreeVector::At(u32 n) { +T &SPLockFreeVector::At(u32 n) { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } -template -T &LockFreeVector::operator[](u32 n) { +template +T &SPLockFreeVector::operator[](u32 n) { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } -template -const T &LockFreeVector::operator[](u32 n) const { +template +const T &SPLockFreeVector::operator[](u32 n) const { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } -template -T &LockFreeVector::Front() { - return slots_[0][0]; +template +T &SPLockFreeVector::Front() { + return buckets_[0][0]; } -template -T &LockFreeVector::Back() { +template +T &SPLockFreeVector::Back() { return (*this)[size_ - 1]; } -template -void LockFreeVector::PushBack(const T &elem, bool lock) { +template +void SPLockFreeVector::PushBack(const T &elem, bool lock) { if (lock) { ScopedWrite mutex(lock_); PushBackUnlocked(elem); @@ -202,48 +202,136 @@ void LockFreeVector::PushBack(const T &elem, bool lock) { PushBackUnlocked(elem); } -template -void LockFreeVector::PushBackUnlocked(const T &elem) { - u32 bucket = HighestBit(size_ + INITIAL_SIZE) - INIT_BIT; - if (slots_[bucket] == nullptr) { - u32 bucket_size = INITIAL_SIZE * (1 << bucket); - slots_[bucket] = alloc_.allocate(bucket_size); +template +void SPLockFreeVector::PushBackUnlocked(const T &elem) { + u32 bucket = HighestBit(size_ + INITIAL_BUCKET_SIZE) - INIT_BIT; + if (buckets_[bucket] == nullptr) { + u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket); + buckets_[bucket] = alloc_.allocate(bucket_size); } InternalAt(size_) = elem; ++size_; } -template -void LockFreeVector::Clear() { +template +void SPLockFreeVector::Clear() { ScopedWrite mutex(lock_); size_ = 0; - for (u32 i = 0; i < INITIAL_CAPACITY; ++i) { - alloc_.deallocate(slots_[i], INITIAL_SIZE * (1 << i)); - slots_[i] = 0; + for (u32 i = 0; i < BUCKET_COUNT; ++i) { + alloc_.deallocate(buckets_[i], INITIAL_BUCKET_SIZE * (1 << i)); + buckets_[i] = 0; } } -template -void LockFreeVector::Resize(u32 s) { +template +void SPLockFreeVector::Resize(u32 s) { if (s < 0 || s == size_) return; ScopedWrite mutex(lock_); - const i32 expect_bucket = HighestBit(s + INITIAL_SIZE - 1) - INIT_BIT; - const i32 current_bucket = HighestBit(size_ + INITIAL_SIZE - 1) - INIT_BIT; + const i32 expect_bucket = HighestBit(s + INITIAL_BUCKET_SIZE - 1) - INIT_BIT; + const i32 current_bucket = HighestBit(size_ + INITIAL_BUCKET_SIZE - 1) - INIT_BIT; if (current_bucket > expect_bucket) { size_ = s; for (i32 bucket = expect_bucket + 1; bucket <= current_bucket; ++bucket) { - alloc_.deallocate(slots_[bucket], INITIAL_SIZE * (1 << bucket)); - slots_[bucket] = 0; + alloc_.deallocate(buckets_[bucket], INITIAL_BUCKET_SIZE * (1 << bucket)); + buckets_[bucket] = 0; } } else { for (i32 bucket = current_bucket + 1; bucket <= expect_bucket; ++bucket) { - u32 bucket_size = INITIAL_SIZE * (1 << bucket); - slots_[bucket] = alloc_.allocate(bucket_size); + u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket); + buckets_[bucket] = alloc_.allocate(bucket_size); } size_ = s; } } +// Lock free vector with multiple writer +template +class MPLockFreeVector { +public: + using Bucket = std::pair>; + + MPLockFreeVector(MemoryPool *pool) noexcept : alloc_(pool), size_{}, capacity_{}, used_bucket_count_{} { + for (auto &i : buckets_) { + i.first = 0; + i.second.store(nullptr); + } + AllocateBucket(0); + } + + ~MPLockFreeVector() noexcept { + for (auto &i : buckets_) { + if (i.second) + alloc_.deallocate(i.second, i.first); + } + } + + T &operator[](const u32 idx_) { return At(idx_); } + + const T &operator[](const u32 idx_) const { return At(idx_); } + + void PushBack(const T &value) { + const u32 index = size_.fetch_add(1, std::memory_order_acq_rel); + const auto [bucket, b_idx] = GetBucket(index); + if (buckets_[bucket].first == 0) + AllocateBucket(bucket); + + buckets_[bucket].second.load(std::memory_order_acquire)[b_idx] = value; + } + + u32 Size() const { return size_.load(std::memory_order_acquire); } + + u32 Capacity() const { return capacity_; } + + u32 BucketCount() const { return used_bucket_count_.load(std::memory_order_acquire) + 1; } + +private: + constexpr std::pair GetBucket(const u32 i) const { + const u32 pos = i + INITIAL_BUCKET_SIZE; + const u32 high_bit = HighestBit(pos); + const u32 bucket = high_bit - HighestBit(INITIAL_BUCKET_SIZE); + const u32 idx = pos ^ static_cast(pow(2, high_bit)); + return {bucket, idx}; + } + + T &At(const u32 i) { + auto [bucket, idx] = GetBucket(i); + T *arr = buckets_[bucket].second.load(std::memory_order_acquire); + return arr[idx]; + } + + u32 HighestBit(u32 value) const { +#ifdef __x86_64__ + u32 b = 0; + __asm__ __volatile__("bsrl %1, %0" : "=r"(b) : "r"(value)); + return b; +#else + return 31 - __builtin_clz(value); +#endif + } + + void AllocateBucket(const u32 bucket) { + T *L_VALUE_NULLPTR = nullptr; + if (bucket >= BUCKET_COUNT) + UnrecoverableError("Out of max bucket range."); + const u32 bucket_size = round(pow(INITIAL_BUCKET_SIZE, bucket + 1)); + T *new_mem_block = alloc_.allocate(bucket_size); + if (!buckets_[bucket].second.compare_exchange_strong(L_VALUE_NULLPTR, new_mem_block)) { + alloc_.deallocate(new_mem_block, bucket_size); + } else { + buckets_[bucket].first = bucket_size; + used_bucket_count_.store(std::max(bucket, used_bucket_count_.load(std::memory_order_acquire)), std::memory_order_release); + capacity_ += bucket_size; + } + } + +private: + PoolAllocator alloc_; + Vector buckets_{BUCKET_COUNT}; + Atomic size_; + u32 capacity_; + Atomic used_bucket_count_; +}; + } // namespace infinity \ No newline at end of file diff --git a/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp b/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp index e875b2e7b1..babd9525e2 100644 --- a/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp +++ b/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp @@ -26,56 +26,56 @@ class LockFreeVectorTest : public BaseTest {}; TEST_F(LockFreeVectorTest, test1) { using namespace infinity; MemoryPool memory_pool; - LockFreeVector vec(&memory_pool); + SPLockFreeVector vec(&memory_pool); vec.PushBack(1); vec.PushBack(2); vec.PushBack(3); vec.PushBack(4); vec.PushBack(5); - EXPECT_EQ(vec.Size(), 5); + EXPECT_EQ(vec.Size(), (unsigned)5); for (u32 i = 0; i < vec.Size(); ++i) { EXPECT_EQ(vec[i], i + 1); } } -void PushTest(infinity::LockFreeVector *vec, int number_of_pushes, bool lock) { - for (int i = 0; i < number_of_pushes; i++) { +void PushTest(infinity::SPLockFreeVector *vec, unsigned number_of_pushes, bool lock) { + for (unsigned i = 0; i < number_of_pushes; i++) { vec->PushBack(i, lock); } } -void ReadTest(infinity::LockFreeVector *vec) { - for (int i = 0; i < vec->Size(); i++) { - auto v = vec->At(i); +void ReadTest(infinity::SPLockFreeVector *vec) { + for (unsigned i = 0; i < vec->Size(); i++) { + vec->At(i); } } TEST_F(LockFreeVectorTest, test2) { using namespace infinity; MemoryPool memory_pool; - LockFreeVector vec(&memory_pool); - int number_of_threads = 4; - int number_of_pushes = 100; + SPLockFreeVector vec(&memory_pool); + unsigned number_of_threads = 4; + unsigned number_of_pushes = 100; Thread push_thread_list[number_of_threads]; - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { push_thread_list[i] = Thread(PushTest, &vec, number_of_pushes, true); } - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { push_thread_list[i].join(); } EXPECT_EQ(vec.Size(), (number_of_pushes * number_of_threads)); Vector stl_vec; - for (int i = 0; i < vec.Size(); i++) { + for (unsigned i = 0; i < vec.Size(); i++) { stl_vec.push_back(vec[i]); } std::sort(stl_vec.begin(), stl_vec.end()); // Test that there are number_of_threads copies of each value in the vector - for (int i = 0; i < stl_vec.size() - number_of_threads; i += number_of_threads) { - for (int j = i + 1; j < i + number_of_threads; j++) { + for (unsigned i = 0; i < stl_vec.size() - number_of_threads; i += number_of_threads) { + for (unsigned j = i + 1; j < i + number_of_threads; j++) { EXPECT_EQ(stl_vec[j], stl_vec[i]); } } @@ -84,17 +84,53 @@ TEST_F(LockFreeVectorTest, test2) { TEST_F(LockFreeVectorTest, test3) { using namespace infinity; MemoryPool memory_pool; - LockFreeVector vec(&memory_pool); - int number_of_threads = 16; - int number_of_pushes = 100; + SPLockFreeVector vec(&memory_pool); + unsigned number_of_threads = 16; + unsigned number_of_pushes = 100; Thread push_thread = Thread(PushTest, &vec, number_of_pushes, false); Thread read_thread_list[number_of_threads]; - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { read_thread_list[i] = Thread(ReadTest, &vec); } push_thread.join(); - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { read_thread_list[i].join(); } +} + +void PushTest2(infinity::MPLockFreeVector *vec, unsigned number_of_pushes) { + for (unsigned i = 0; i < number_of_pushes; i++) { + vec->PushBack(i); + } +} +TEST_F(LockFreeVectorTest, test4) { + using namespace infinity; + MemoryPool memory_pool; + MPLockFreeVector vec(&memory_pool); + unsigned number_of_threads = 4; + unsigned number_of_pushes = 100; + + Thread push_thread_list[number_of_threads]; + for (unsigned i = 0; i < number_of_threads; i++) { + push_thread_list[i] = Thread(PushTest2, &vec, number_of_pushes); + } + for (unsigned i = 0; i < number_of_threads; i++) { + push_thread_list[i].join(); + } + EXPECT_EQ(vec.Size(), (number_of_pushes * number_of_threads)); + + Vector stl_vec; + for (unsigned i = 0; i < vec.Size(); i++) { + stl_vec.push_back(vec[i]); + } + + std::sort(stl_vec.begin(), stl_vec.end()); + + // Test that there are number_of_threads copies of each value in the vector + for (unsigned i = 0; i < stl_vec.size() - number_of_threads; i += number_of_threads) { + for (unsigned j = i + 1; j < i + number_of_threads; j++) { + EXPECT_EQ(stl_vec[j], stl_vec[i]); + } + } } \ No newline at end of file