Skip to content

Commit

Permalink
Add lock free vector with multiple producer for real time indexing (#470
Browse files Browse the repository at this point in the history
)

Rename previous LockFreeVector to SPLockFreeVector which means lfvector for single producer(lock is required if multiple writer is used)

Add MPLockFreeVector which allows for multiple producers

SPLockFreeVector has higher performance which will be used for near real time indexing.
  • Loading branch information
yingfeng authored Jan 24, 2024
1 parent 171b3de commit cd7f90e
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 92 deletions.
14 changes: 10 additions & 4 deletions src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,19 @@ using std::is_same;
using std::fill;
using std::lower_bound;

using std::shared_mutex;
using std::condition_variable;
using std::lock_guard;
using std::memory_order;
using std::memory_order_acq_rel;
using std::memory_order_acquire;
using std::memory_order_consume;
using std::memory_order_relaxed;
using std::memory_order_release;
using std::memory_order_seq_cst;
using std::mutex;
using std::shared_lock;
using std::shared_mutex;
using std::unique_lock;
using std::lock_guard;
using std::condition_variable;
using std::memory_order;

using std::forward_list;
using std::isalpha;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/common/lock_free_vector.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ module;
export module lock_free_vector;

namespace infinity {
export using infinity::LockFreeVector;
export using infinity::SPLockFreeVector;
export using infinity::MPLockFreeVector;
}
222 changes: 155 additions & 67 deletions src/storage/invertedindex/common/lock_free_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#pragma once

#include <math.h>
import stl;
import spinlock;
import memory_pool;
Expand All @@ -23,24 +23,24 @@ import infinity_exception;
namespace infinity {

// According to "Lock-free dynamically resizable arrays,D Dechev, 2006"
template <class T>
class LockFreeVector {
template <typename T>
class SPLockFreeVector {
using ScopedWrite = std::unique_lock<SpinLock>;

public:
static u32 constexpr INITIAL_CAPACITY = 32;
static u32 constexpr INITIAL_SIZE = 8;
static u32 constexpr BUCKET_COUNT = 32;
static u32 constexpr INITIAL_BUCKET_SIZE = 8;
static u32 constexpr INIT_BIT = 3;

LockFreeVector(MemoryPool *pool);
SPLockFreeVector(MemoryPool *pool);

LockFreeVector(const u32 n, MemoryPool *pool);
SPLockFreeVector(const u32 n, MemoryPool *pool);

LockFreeVector(const LockFreeVector<T> &other);
SPLockFreeVector(const SPLockFreeVector<T> &other);

~LockFreeVector();
~SPLockFreeVector();

LockFreeVector<T> &operator=(const LockFreeVector<T> &);
SPLockFreeVector<T> &operator=(const SPLockFreeVector<T> &);

u32 Size() const { return size_; }

Expand Down Expand Up @@ -84,40 +84,40 @@ class LockFreeVector {

PoolAllocator<T> alloc_;

T *slots_[INITIAL_CAPACITY];
T *buckets_[BUCKET_COUNT];

u32 size_;

SpinLock lock_;
};

template <class T>
LockFreeVector<T>::LockFreeVector(MemoryPool *pool) : alloc_(pool), size_(0) {
std::memset(slots_, 0, INITIAL_CAPACITY * sizeof(T *));
template <typename T>
SPLockFreeVector<T>::SPLockFreeVector(MemoryPool *pool) : alloc_(pool), size_(0) {
std::memset(buckets_, 0, BUCKET_COUNT * sizeof(T *));
}

template <class T>
LockFreeVector<T>::LockFreeVector(const u32 n, MemoryPool *pool) : alloc_(pool), size_(n) {
template <typename T>
SPLockFreeVector<T>::SPLockFreeVector(const u32 n, MemoryPool *pool) : alloc_(pool), size_(n) {
InitMemory(n);
}

template <typename T>
LockFreeVector<T>::LockFreeVector(const LockFreeVector<T> &other) : alloc_(other.alloc_), size_(other.size_) {
SPLockFreeVector<T>::SPLockFreeVector(const SPLockFreeVector<T> &other) : alloc_(other.alloc_), size_(other.size_) {
InitMemory(size_);
for (u32 idx = 0; idx < size_; ++idx) {
InternalAt(idx) = other.InternalAt(idx);
}
}

template <typename T>
LockFreeVector<T>::~LockFreeVector() {
for (u32 i = 0; i < INITIAL_CAPACITY; ++i)
if (slots_[i] != nullptr)
alloc_.deallocate(slots_[i], INITIAL_SIZE * (1 << i));
SPLockFreeVector<T>::~SPLockFreeVector() {
for (u32 i = 0; i < BUCKET_COUNT; ++i)
if (buckets_[i] != nullptr)
alloc_.deallocate(buckets_[i], INITIAL_BUCKET_SIZE * (1 << i));
}

template <typename T>
LockFreeVector<T> &LockFreeVector<T>::operator=(const LockFreeVector<T> &rhs) {
SPLockFreeVector<T> &SPLockFreeVector<T>::operator=(const SPLockFreeVector<T> &rhs) {
if (*this == rhs)
return *this;
Clear();
Expand All @@ -129,121 +129,209 @@ LockFreeVector<T> &LockFreeVector<T>::operator=(const LockFreeVector<T> &rhs) {
return *this;
}

template <class T>
void LockFreeVector<T>::InitMemory(u32 n) {
std::memset(slots_, 0, INITIAL_CAPACITY * sizeof(T *));
i32 slot = HighestBit(n + INITIAL_SIZE - 1) - INIT_BIT;
for (u32 i = 0; i <= slot; ++i) {
u32 slot_size = INITIAL_SIZE * (1 << slot);
slots_[i] = alloc_.allocate(slot_size);
template <typename T>
void SPLockFreeVector<T>::InitMemory(u32 n) {
std::memset(buckets_, 0, BUCKET_COUNT * sizeof(T *));
i32 bucket = HighestBit(n + INITIAL_BUCKET_SIZE - 1) - INIT_BIT;
for (u32 i = 0; i <= bucket; ++i) {
u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket);
buckets_[i] = alloc_.allocate(bucket_size);
}
}

template <typename T>
T &LockFreeVector<T>::InternalAt(u32 n) {
u32 pos = n + INITIAL_SIZE;
T &SPLockFreeVector<T>::InternalAt(u32 n) {
u32 pos = n + INITIAL_BUCKET_SIZE;
u32 hi_bit = HighestBit(pos);
u32 idx = pos ^ (1 << hi_bit);
return slots_[hi_bit - INIT_BIT][idx];
return buckets_[hi_bit - INIT_BIT][idx];
}

template <typename T>
const T &LockFreeVector<T>::InternalAt(u32 n) const {
u32 pos = n + INITIAL_SIZE;
const T &SPLockFreeVector<T>::InternalAt(u32 n) const {
u32 pos = n + INITIAL_BUCKET_SIZE;
u32 hi_bit = HighestBit(pos);
u32 idx = pos ^ (1 << hi_bit);
return slots_[hi_bit - INIT_BIT][idx];
return buckets_[hi_bit - INIT_BIT][idx];
}

template <typename T>
const T &LockFreeVector<T>::At(u32 n) const {
const T &SPLockFreeVector<T>::At(u32 n) const {
if (n < 0 || n >= size_)
UnrecoverableError("Out of vector range.");
return InternalAt(n);
}

template <typename T>
T &LockFreeVector<T>::At(u32 n) {
T &SPLockFreeVector<T>::At(u32 n) {
if (n < 0 || n >= size_)
UnrecoverableError("Out of vector range.");
return InternalAt(n);
}

template <class T>
T &LockFreeVector<T>::operator[](u32 n) {
template <typename T>
T &SPLockFreeVector<T>::operator[](u32 n) {
if (n < 0 || n >= size_)
UnrecoverableError("Out of vector range.");
return InternalAt(n);
}

template <class T>
const T &LockFreeVector<T>::operator[](u32 n) const {
template <typename T>
const T &SPLockFreeVector<T>::operator[](u32 n) const {
if (n < 0 || n >= size_)
UnrecoverableError("Out of vector range.");
return InternalAt(n);
}

template <class T>
T &LockFreeVector<T>::Front() {
return slots_[0][0];
template <typename T>
T &SPLockFreeVector<T>::Front() {
return buckets_[0][0];
}

template <class T>
T &LockFreeVector<T>::Back() {
template <typename T>
T &SPLockFreeVector<T>::Back() {
return (*this)[size_ - 1];
}

template <class T>
void LockFreeVector<T>::PushBack(const T &elem, bool lock) {
template <typename T>
void SPLockFreeVector<T>::PushBack(const T &elem, bool lock) {
if (lock) {
ScopedWrite mutex(lock_);
PushBackUnlocked(elem);
} else
PushBackUnlocked(elem);
}

template <class T>
void LockFreeVector<T>::PushBackUnlocked(const T &elem) {
u32 bucket = HighestBit(size_ + INITIAL_SIZE) - INIT_BIT;
if (slots_[bucket] == nullptr) {
u32 bucket_size = INITIAL_SIZE * (1 << bucket);
slots_[bucket] = alloc_.allocate(bucket_size);
template <typename T>
void SPLockFreeVector<T>::PushBackUnlocked(const T &elem) {
u32 bucket = HighestBit(size_ + INITIAL_BUCKET_SIZE) - INIT_BIT;
if (buckets_[bucket] == nullptr) {
u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket);
buckets_[bucket] = alloc_.allocate(bucket_size);
}
InternalAt(size_) = elem;
++size_;
}

template <class T>
void LockFreeVector<T>::Clear() {
template <typename T>
void SPLockFreeVector<T>::Clear() {
ScopedWrite mutex(lock_);
size_ = 0;
for (u32 i = 0; i < INITIAL_CAPACITY; ++i) {
alloc_.deallocate(slots_[i], INITIAL_SIZE * (1 << i));
slots_[i] = 0;
for (u32 i = 0; i < BUCKET_COUNT; ++i) {
alloc_.deallocate(buckets_[i], INITIAL_BUCKET_SIZE * (1 << i));
buckets_[i] = 0;
}
}

template <class T>
void LockFreeVector<T>::Resize(u32 s) {
template <typename T>
void SPLockFreeVector<T>::Resize(u32 s) {
if (s < 0 || s == size_)
return;
ScopedWrite mutex(lock_);

const i32 expect_bucket = HighestBit(s + INITIAL_SIZE - 1) - INIT_BIT;
const i32 current_bucket = HighestBit(size_ + INITIAL_SIZE - 1) - INIT_BIT;
const i32 expect_bucket = HighestBit(s + INITIAL_BUCKET_SIZE - 1) - INIT_BIT;
const i32 current_bucket = HighestBit(size_ + INITIAL_BUCKET_SIZE - 1) - INIT_BIT;
if (current_bucket > expect_bucket) {
size_ = s;
for (i32 bucket = expect_bucket + 1; bucket <= current_bucket; ++bucket) {
alloc_.deallocate(slots_[bucket], INITIAL_SIZE * (1 << bucket));
slots_[bucket] = 0;
alloc_.deallocate(buckets_[bucket], INITIAL_BUCKET_SIZE * (1 << bucket));
buckets_[bucket] = 0;
}
} else {
for (i32 bucket = current_bucket + 1; bucket <= expect_bucket; ++bucket) {
u32 bucket_size = INITIAL_SIZE * (1 << bucket);
slots_[bucket] = alloc_.allocate(bucket_size);
u32 bucket_size = INITIAL_BUCKET_SIZE * (1 << bucket);
buckets_[bucket] = alloc_.allocate(bucket_size);
}
size_ = s;
}
}

// Lock free vector with multiple writer
template <typename T, u32 INITIAL_BUCKET_SIZE = 3, u32 BUCKET_COUNT = 32>
class MPLockFreeVector {
public:
using Bucket = std::pair<u32, Atomic<T *>>;

MPLockFreeVector(MemoryPool *pool) noexcept : alloc_(pool), size_{}, capacity_{}, used_bucket_count_{} {
for (auto &i : buckets_) {
i.first = 0;
i.second.store(nullptr);
}
AllocateBucket(0);
}

~MPLockFreeVector() noexcept {
for (auto &i : buckets_) {
if (i.second)
alloc_.deallocate(i.second, i.first);
}
}

T &operator[](const u32 idx_) { return At(idx_); }

const T &operator[](const u32 idx_) const { return At(idx_); }

void PushBack(const T &value) {
const u32 index = size_.fetch_add(1, std::memory_order_acq_rel);
const auto [bucket, b_idx] = GetBucket(index);
if (buckets_[bucket].first == 0)
AllocateBucket(bucket);

buckets_[bucket].second.load(std::memory_order_acquire)[b_idx] = value;
}

u32 Size() const { return size_.load(std::memory_order_acquire); }

u32 Capacity() const { return capacity_; }

u32 BucketCount() const { return used_bucket_count_.load(std::memory_order_acquire) + 1; }

private:
constexpr std::pair<u32, u32> GetBucket(const u32 i) const {
const u32 pos = i + INITIAL_BUCKET_SIZE;
const u32 high_bit = HighestBit(pos);
const u32 bucket = high_bit - HighestBit(INITIAL_BUCKET_SIZE);
const u32 idx = pos ^ static_cast<u32>(pow(2, high_bit));
return {bucket, idx};
}

T &At(const u32 i) {
auto [bucket, idx] = GetBucket(i);
T *arr = buckets_[bucket].second.load(std::memory_order_acquire);
return arr[idx];
}

u32 HighestBit(u32 value) const {
#ifdef __x86_64__
u32 b = 0;
__asm__ __volatile__("bsrl %1, %0" : "=r"(b) : "r"(value));
return b;
#else
return 31 - __builtin_clz(value);
#endif
}

void AllocateBucket(const u32 bucket) {
T *L_VALUE_NULLPTR = nullptr;
if (bucket >= BUCKET_COUNT)
UnrecoverableError("Out of max bucket range.");
const u32 bucket_size = round(pow(INITIAL_BUCKET_SIZE, bucket + 1));
T *new_mem_block = alloc_.allocate(bucket_size);
if (!buckets_[bucket].second.compare_exchange_strong(L_VALUE_NULLPTR, new_mem_block)) {
alloc_.deallocate(new_mem_block, bucket_size);
} else {
buckets_[bucket].first = bucket_size;
used_bucket_count_.store(std::max(bucket, used_bucket_count_.load(std::memory_order_acquire)), std::memory_order_release);
capacity_ += bucket_size;
}
}

private:
PoolAllocator<T> alloc_;
Vector<Bucket> buckets_{BUCKET_COUNT};
Atomic<u32> size_;
u32 capacity_;
Atomic<u32> used_bucket_count_;
};

} // namespace infinity
Loading

0 comments on commit cd7f90e

Please sign in to comment.