diff --git a/src/common/stl.cppm b/src/common/stl.cppm index 51cb2dca75..8d1e8ac0e0 100644 --- a/src/common/stl.cppm +++ b/src/common/stl.cppm @@ -81,13 +81,19 @@ using std::is_same; using std::fill; using std::lower_bound; -using std::shared_mutex; +using std::condition_variable; +using std::lock_guard; +using std::memory_order; +using std::memory_order_acq_rel; +using std::memory_order_acquire; +using std::memory_order_consume; +using std::memory_order_relaxed; +using std::memory_order_release; +using std::memory_order_seq_cst; using std::mutex; using std::shared_lock; +using std::shared_mutex; using std::unique_lock; -using std::lock_guard; -using std::condition_variable; -using std::memory_order; using std::forward_list; using std::isalpha; diff --git a/src/storage/invertedindex/common/lock_free_vector.cppm b/src/storage/invertedindex/common/lock_free_vector.cppm index 9ce71960d9..89b19fabd2 100644 --- a/src/storage/invertedindex/common/lock_free_vector.cppm +++ b/src/storage/invertedindex/common/lock_free_vector.cppm @@ -5,5 +5,6 @@ module; export module lock_free_vector; namespace infinity { -export using infinity::LockFreeVector; +export using infinity::SPLockFreeVector; +export using infinity::MPLockFreeVector; } diff --git a/src/storage/invertedindex/common/lock_free_vector.h b/src/storage/invertedindex/common/lock_free_vector.h index fbfba06afb..7c9f70d2b4 100644 --- a/src/storage/invertedindex/common/lock_free_vector.h +++ b/src/storage/invertedindex/common/lock_free_vector.h @@ -13,7 +13,7 @@ // limitations under the License. #pragma once - +#include import stl; import spinlock; import memory_pool; @@ -23,24 +23,24 @@ import infinity_exception; namespace infinity { // According to "Lock-free dynamically resizable arrays,D Dechev, 2006" -template -class LockFreeVector { +template +class SPLockFreeVector { using ScopedWrite = std::unique_lock; public: - static u32 constexpr INITIAL_CAPACITY = 32; - static u32 constexpr INITIAL_SIZE = 8; + static u32 constexpr BUCKET_COUNT = 32; + static u32 constexpr INITIAL_BUCKET_SIZE = 8; static u32 constexpr INIT_BIT = 3; - LockFreeVector(MemoryPool *pool); + SPLockFreeVector(MemoryPool *pool); - LockFreeVector(const u32 n, MemoryPool *pool); + SPLockFreeVector(const u32 n, MemoryPool *pool); - LockFreeVector(const LockFreeVector &other); + SPLockFreeVector(const SPLockFreeVector &other); - ~LockFreeVector(); + ~SPLockFreeVector(); - LockFreeVector &operator=(const LockFreeVector &); + SPLockFreeVector &operator=(const SPLockFreeVector &); u32 Size() const { return size_; } @@ -84,25 +84,25 @@ class LockFreeVector { PoolAllocator alloc_; - T *slots_[INITIAL_CAPACITY]; + T *buckets_[BUCKET_COUNT]; u32 size_; SpinLock lock_; }; -template -LockFreeVector::LockFreeVector(MemoryPool *pool) : alloc_(pool), size_(0) { - std::memset(slots_, 0, INITIAL_CAPACITY * sizeof(T *)); +template +SPLockFreeVector::SPLockFreeVector(MemoryPool *pool) : alloc_(pool), size_(0) { + std::memset(buckets_, 0, BUCKET_COUNT * sizeof(T *)); } -template -LockFreeVector::LockFreeVector(const u32 n, MemoryPool *pool) : alloc_(pool), size_(n) { +template +SPLockFreeVector::SPLockFreeVector(const u32 n, MemoryPool *pool) : alloc_(pool), size_(n) { InitMemory(n); } template -LockFreeVector::LockFreeVector(const LockFreeVector &other) : alloc_(other.alloc_), size_(other.size_) { +SPLockFreeVector::SPLockFreeVector(const SPLockFreeVector &other) : alloc_(other.alloc_), size_(other.size_) { InitMemory(size_); for (u32 idx = 0; idx < size_; ++idx) { InternalAt(idx) = other.InternalAt(idx); @@ -110,14 +110,14 @@ LockFreeVector::LockFreeVector(const LockFreeVector &other) : alloc_(other } template -LockFreeVector::~LockFreeVector() { - for (u32 i = 0; i < INITIAL_CAPACITY; ++i) - if (slots_[i] != nullptr) - alloc_.deallocate(slots_[i], INITIAL_SIZE * (1 << i)); +SPLockFreeVector::~SPLockFreeVector() { + for (u32 i = 0; i < BUCKET_COUNT; ++i) + if (buckets_[i] != nullptr) + alloc_.deallocate(buckets_[i], INITIAL_BUCKET_SIZE * (1 << i)); } template -LockFreeVector &LockFreeVector::operator=(const LockFreeVector &rhs) { +SPLockFreeVector &SPLockFreeVector::operator=(const SPLockFreeVector &rhs) { if (*this == rhs) return *this; Clear(); @@ -129,72 +129,72 @@ LockFreeVector &LockFreeVector::operator=(const LockFreeVector &rhs) { return *this; } -template -void LockFreeVector::InitMemory(u32 n) { - std::memset(slots_, 0, INITIAL_CAPACITY * sizeof(T *)); - i32 slot = HighestBit(n + INITIAL_SIZE - 1) - INIT_BIT; - for (u32 i = 0; i <= slot; ++i) { - u32 slot_size = INITIAL_SIZE * (1 << slot); - slots_[i] = alloc_.allocate(slot_size); +template +void SPLockFreeVector::InitMemory(u32 n) { + std::memset(buckets_, 0, BUCKET_COUNT * sizeof(T *)); + i32 bucket = HighestBit(n + INITIAL_BUCKET_SIZE - 1) - INIT_BIT; + for (u32 i = 0; i <= bucket; ++i) { + u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket); + buckets_[i] = alloc_.allocate(bucket_size); } } template -T &LockFreeVector::InternalAt(u32 n) { - u32 pos = n + INITIAL_SIZE; +T &SPLockFreeVector::InternalAt(u32 n) { + u32 pos = n + INITIAL_BUCKET_SIZE; u32 hi_bit = HighestBit(pos); u32 idx = pos ^ (1 << hi_bit); - return slots_[hi_bit - INIT_BIT][idx]; + return buckets_[hi_bit - INIT_BIT][idx]; } template -const T &LockFreeVector::InternalAt(u32 n) const { - u32 pos = n + INITIAL_SIZE; +const T &SPLockFreeVector::InternalAt(u32 n) const { + u32 pos = n + INITIAL_BUCKET_SIZE; u32 hi_bit = HighestBit(pos); u32 idx = pos ^ (1 << hi_bit); - return slots_[hi_bit - INIT_BIT][idx]; + return buckets_[hi_bit - INIT_BIT][idx]; } template -const T &LockFreeVector::At(u32 n) const { +const T &SPLockFreeVector::At(u32 n) const { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } template -T &LockFreeVector::At(u32 n) { +T &SPLockFreeVector::At(u32 n) { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } -template -T &LockFreeVector::operator[](u32 n) { +template +T &SPLockFreeVector::operator[](u32 n) { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } -template -const T &LockFreeVector::operator[](u32 n) const { +template +const T &SPLockFreeVector::operator[](u32 n) const { if (n < 0 || n >= size_) UnrecoverableError("Out of vector range."); return InternalAt(n); } -template -T &LockFreeVector::Front() { - return slots_[0][0]; +template +T &SPLockFreeVector::Front() { + return buckets_[0][0]; } -template -T &LockFreeVector::Back() { +template +T &SPLockFreeVector::Back() { return (*this)[size_ - 1]; } -template -void LockFreeVector::PushBack(const T &elem, bool lock) { +template +void SPLockFreeVector::PushBack(const T &elem, bool lock) { if (lock) { ScopedWrite mutex(lock_); PushBackUnlocked(elem); @@ -202,48 +202,136 @@ void LockFreeVector::PushBack(const T &elem, bool lock) { PushBackUnlocked(elem); } -template -void LockFreeVector::PushBackUnlocked(const T &elem) { - u32 bucket = HighestBit(size_ + INITIAL_SIZE) - INIT_BIT; - if (slots_[bucket] == nullptr) { - u32 bucket_size = INITIAL_SIZE * (1 << bucket); - slots_[bucket] = alloc_.allocate(bucket_size); +template +void SPLockFreeVector::PushBackUnlocked(const T &elem) { + u32 bucket = HighestBit(size_ + INITIAL_BUCKET_SIZE) - INIT_BIT; + if (buckets_[bucket] == nullptr) { + u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket); + buckets_[bucket] = alloc_.allocate(bucket_size); } InternalAt(size_) = elem; ++size_; } -template -void LockFreeVector::Clear() { +template +void SPLockFreeVector::Clear() { ScopedWrite mutex(lock_); size_ = 0; - for (u32 i = 0; i < INITIAL_CAPACITY; ++i) { - alloc_.deallocate(slots_[i], INITIAL_SIZE * (1 << i)); - slots_[i] = 0; + for (u32 i = 0; i < BUCKET_COUNT; ++i) { + alloc_.deallocate(buckets_[i], INITIAL_BUCKET_SIZE * (1 << i)); + buckets_[i] = 0; } } -template -void LockFreeVector::Resize(u32 s) { +template +void SPLockFreeVector::Resize(u32 s) { if (s < 0 || s == size_) return; ScopedWrite mutex(lock_); - const i32 expect_bucket = HighestBit(s + INITIAL_SIZE - 1) - INIT_BIT; - const i32 current_bucket = HighestBit(size_ + INITIAL_SIZE - 1) - INIT_BIT; + const i32 expect_bucket = HighestBit(s + INITIAL_BUCKET_SIZE - 1) - INIT_BIT; + const i32 current_bucket = HighestBit(size_ + INITIAL_BUCKET_SIZE - 1) - INIT_BIT; if (current_bucket > expect_bucket) { size_ = s; for (i32 bucket = expect_bucket + 1; bucket <= current_bucket; ++bucket) { - alloc_.deallocate(slots_[bucket], INITIAL_SIZE * (1 << bucket)); - slots_[bucket] = 0; + alloc_.deallocate(buckets_[bucket], INITIAL_BUCKET_SIZE * (1 << bucket)); + buckets_[bucket] = 0; } } else { for (i32 bucket = current_bucket + 1; bucket <= expect_bucket; ++bucket) { - u32 bucket_size = INITIAL_SIZE * (1 << bucket); - slots_[bucket] = alloc_.allocate(bucket_size); + u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket); + buckets_[bucket] = alloc_.allocate(bucket_size); } size_ = s; } } +// Lock free vector with multiple writer +template +class MPLockFreeVector { +public: + using Bucket = std::pair>; + + MPLockFreeVector(MemoryPool *pool) noexcept : alloc_(pool), size_{}, capacity_{}, used_bucket_count_{} { + for (auto &i : buckets_) { + i.first = 0; + i.second.store(nullptr); + } + AllocateBucket(0); + } + + ~MPLockFreeVector() noexcept { + for (auto &i : buckets_) { + if (i.second) + alloc_.deallocate(i.second, i.first); + } + } + + T &operator[](const u32 idx_) { return At(idx_); } + + const T &operator[](const u32 idx_) const { return At(idx_); } + + void PushBack(const T &value) { + const u32 index = size_.fetch_add(1, std::memory_order_acq_rel); + const auto [bucket, b_idx] = GetBucket(index); + if (buckets_[bucket].first == 0) + AllocateBucket(bucket); + + buckets_[bucket].second.load(std::memory_order_acquire)[b_idx] = value; + } + + u32 Size() const { return size_.load(std::memory_order_acquire); } + + u32 Capacity() const { return capacity_; } + + u32 BucketCount() const { return used_bucket_count_.load(std::memory_order_acquire) + 1; } + +private: + constexpr std::pair GetBucket(const u32 i) const { + const u32 pos = i + INITIAL_BUCKET_SIZE; + const u32 high_bit = HighestBit(pos); + const u32 bucket = high_bit - HighestBit(INITIAL_BUCKET_SIZE); + const u32 idx = pos ^ static_cast(pow(2, high_bit)); + return {bucket, idx}; + } + + T &At(const u32 i) { + auto [bucket, idx] = GetBucket(i); + T *arr = buckets_[bucket].second.load(std::memory_order_acquire); + return arr[idx]; + } + + u32 HighestBit(u32 value) const { +#ifdef __x86_64__ + u32 b = 0; + __asm__ __volatile__("bsrl %1, %0" : "=r"(b) : "r"(value)); + return b; +#else + return 31 - __builtin_clz(value); +#endif + } + + void AllocateBucket(const u32 bucket) { + T *L_VALUE_NULLPTR = nullptr; + if (bucket >= BUCKET_COUNT) + UnrecoverableError("Out of max bucket range."); + const u32 bucket_size = round(pow(INITIAL_BUCKET_SIZE, bucket + 1)); + T *new_mem_block = alloc_.allocate(bucket_size); + if (!buckets_[bucket].second.compare_exchange_strong(L_VALUE_NULLPTR, new_mem_block)) { + alloc_.deallocate(new_mem_block, bucket_size); + } else { + buckets_[bucket].first = bucket_size; + used_bucket_count_.store(std::max(bucket, used_bucket_count_.load(std::memory_order_acquire)), std::memory_order_release); + capacity_ += bucket_size; + } + } + +private: + PoolAllocator alloc_; + Vector buckets_{BUCKET_COUNT}; + Atomic size_; + u32 capacity_; + Atomic used_bucket_count_; +}; + } // namespace infinity \ No newline at end of file diff --git a/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp b/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp index e875b2e7b1..babd9525e2 100644 --- a/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp +++ b/src/unit_test/storage/invertedindex/common/lock_free_vector.cpp @@ -26,56 +26,56 @@ class LockFreeVectorTest : public BaseTest {}; TEST_F(LockFreeVectorTest, test1) { using namespace infinity; MemoryPool memory_pool; - LockFreeVector vec(&memory_pool); + SPLockFreeVector vec(&memory_pool); vec.PushBack(1); vec.PushBack(2); vec.PushBack(3); vec.PushBack(4); vec.PushBack(5); - EXPECT_EQ(vec.Size(), 5); + EXPECT_EQ(vec.Size(), (unsigned)5); for (u32 i = 0; i < vec.Size(); ++i) { EXPECT_EQ(vec[i], i + 1); } } -void PushTest(infinity::LockFreeVector *vec, int number_of_pushes, bool lock) { - for (int i = 0; i < number_of_pushes; i++) { +void PushTest(infinity::SPLockFreeVector *vec, unsigned number_of_pushes, bool lock) { + for (unsigned i = 0; i < number_of_pushes; i++) { vec->PushBack(i, lock); } } -void ReadTest(infinity::LockFreeVector *vec) { - for (int i = 0; i < vec->Size(); i++) { - auto v = vec->At(i); +void ReadTest(infinity::SPLockFreeVector *vec) { + for (unsigned i = 0; i < vec->Size(); i++) { + vec->At(i); } } TEST_F(LockFreeVectorTest, test2) { using namespace infinity; MemoryPool memory_pool; - LockFreeVector vec(&memory_pool); - int number_of_threads = 4; - int number_of_pushes = 100; + SPLockFreeVector vec(&memory_pool); + unsigned number_of_threads = 4; + unsigned number_of_pushes = 100; Thread push_thread_list[number_of_threads]; - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { push_thread_list[i] = Thread(PushTest, &vec, number_of_pushes, true); } - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { push_thread_list[i].join(); } EXPECT_EQ(vec.Size(), (number_of_pushes * number_of_threads)); Vector stl_vec; - for (int i = 0; i < vec.Size(); i++) { + for (unsigned i = 0; i < vec.Size(); i++) { stl_vec.push_back(vec[i]); } std::sort(stl_vec.begin(), stl_vec.end()); // Test that there are number_of_threads copies of each value in the vector - for (int i = 0; i < stl_vec.size() - number_of_threads; i += number_of_threads) { - for (int j = i + 1; j < i + number_of_threads; j++) { + for (unsigned i = 0; i < stl_vec.size() - number_of_threads; i += number_of_threads) { + for (unsigned j = i + 1; j < i + number_of_threads; j++) { EXPECT_EQ(stl_vec[j], stl_vec[i]); } } @@ -84,17 +84,53 @@ TEST_F(LockFreeVectorTest, test2) { TEST_F(LockFreeVectorTest, test3) { using namespace infinity; MemoryPool memory_pool; - LockFreeVector vec(&memory_pool); - int number_of_threads = 16; - int number_of_pushes = 100; + SPLockFreeVector vec(&memory_pool); + unsigned number_of_threads = 16; + unsigned number_of_pushes = 100; Thread push_thread = Thread(PushTest, &vec, number_of_pushes, false); Thread read_thread_list[number_of_threads]; - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { read_thread_list[i] = Thread(ReadTest, &vec); } push_thread.join(); - for (int i = 0; i < number_of_threads; i++) { + for (unsigned i = 0; i < number_of_threads; i++) { read_thread_list[i].join(); } +} + +void PushTest2(infinity::MPLockFreeVector *vec, unsigned number_of_pushes) { + for (unsigned i = 0; i < number_of_pushes; i++) { + vec->PushBack(i); + } +} +TEST_F(LockFreeVectorTest, test4) { + using namespace infinity; + MemoryPool memory_pool; + MPLockFreeVector vec(&memory_pool); + unsigned number_of_threads = 4; + unsigned number_of_pushes = 100; + + Thread push_thread_list[number_of_threads]; + for (unsigned i = 0; i < number_of_threads; i++) { + push_thread_list[i] = Thread(PushTest2, &vec, number_of_pushes); + } + for (unsigned i = 0; i < number_of_threads; i++) { + push_thread_list[i].join(); + } + EXPECT_EQ(vec.Size(), (number_of_pushes * number_of_threads)); + + Vector stl_vec; + for (unsigned i = 0; i < vec.Size(); i++) { + stl_vec.push_back(vec[i]); + } + + std::sort(stl_vec.begin(), stl_vec.end()); + + // Test that there are number_of_threads copies of each value in the vector + for (unsigned i = 0; i < stl_vec.size() - number_of_threads; i += number_of_threads) { + for (unsigned j = i + 1; j < i + number_of_threads; j++) { + EXPECT_EQ(stl_vec[j], stl_vec[i]); + } + } } \ No newline at end of file