Skip to content

Commit

Permalink
roc-streaminggh-770: Rework resize() for RingQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed Nov 26, 2024
1 parent 61ac37a commit d90cf31
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 71 deletions.
91 changes: 43 additions & 48 deletions src/internal_modules/roc_core/ring_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,28 +166,58 @@ template <class T, size_t EmbeddedCapacity = 0> class RingQueue : public NonCopy
if (is_empty()) {
roc_panic("ring queue: pop_back() called on empty buffer");
}
buff_[end_].~T();
end_ = (end_ - 1 + buff_len_) % buff_len_;
buff_[end_].~T();
}

//! Set ring queue size
//! Change ring queue capacity.
//! @remarks
//! Calls grow() to ensure that there is enough space in ring queue
//! After this call, capacity() is equal to the new value, and size()
//! is either the same as before or smaller if capacity decreased.
//! When needed, performs reallocation or destroys excess elements in
//! the end of the queue.
//! @returns
//! false if the allocation failed
ROC_ATTR_NODISCARD bool resize(size_t new_size) {
if (!grow(new_size)) {
return false;
//! false if the allocation failed
ROC_ATTR_NODISCARD bool resize(size_t new_capacity) {
const size_t old_capacity = capacity();
if (new_capacity == old_capacity) {
return true;
}

for (size_t n = size(); n < new_size; ++n) {
new (&buff_[end_]) T();
end_ = (end_ + 1) % buff_len_;
const size_t old_size = size();
const size_t new_size = std::min(old_size, new_capacity);

T* new_buff = allocate_(new_capacity + 1);
if (!new_buff) {
return false;
}

for (size_t n = size(); n > new_size; n--) {
end_ = (end_ - 1 + buff_len_) % buff_len_;
buff_[end_].~T();
if (new_buff != buff_) {
// Copy old objects to the beginning of the new memory.
for (size_t n = 0; n < new_size; n++) {
new (&new_buff[n]) T(buff_[(begin_ + n) % buff_len_]);
}

// Destruct objects in old memory (in reversed order).
for (size_t n = old_size; n > 0; n--) {
buff_[(begin_ + n - 1) % buff_len_].~T();
}

// Free old memory
deallocate_(buff_);

buff_ = new_buff;
buff_len_ = new_capacity + 1;

begin_ = 0;
end_ = new_size;
} else {
// Destruct old objects (in reversed order) if size decreased.
for (size_t n = old_size; n > new_size; n--) {
buff_[(begin_ + n - 1) % buff_len_].~T();
}

buff_len_ = new_capacity + 1;
}

return true;
Expand Down Expand Up @@ -223,41 +253,6 @@ template <class T, size_t EmbeddedCapacity = 0> class RingQueue : public NonCopy
}
}

bool grow(size_t min_len) {
if (min_len <= capacity()) {
return true;
}

T* new_buff = allocate_(min_len + 1);
if (!new_buff) {
return false;
}

if (new_buff != buff_) {
// Copy old objects to the beginning of the new memory.
size_t end_index = (end_ - 1 + buff_len_) % buff_len_;
for (size_t i = begin_, j = 0; i != end_index; i = (i + 1) % buff_len_, ++j) {
new (new_buff + j) T(buff_[i]);
}

for (size_t i = end_index; i != begin_; i = (i - 1 + buff_len_) % buff_len_) {
buff_[i].~T();
}

// Free old memory
if (buff_) {
deallocate_(buff_);
}

buff_ = new_buff;
end_ = size();
begin_ = 0;
}

buff_len_ = min_len + 1;
return true;
}

T* buff_;
size_t buff_len_;
size_t begin_;
Expand Down
9 changes: 7 additions & 2 deletions src/tests/roc_core/test_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ struct Object {
static long n_objects;

size_t value;
bool valid;

explicit Object(size_t v = 0)
: value(v) {
: value(v)
, valid(true) {
n_objects++;
}

Object(const Object& other)
: value(other.value) {
: value(other.value)
, valid(true) {
n_objects++;
}

~Object() {
CHECK(valid);
valid = false;
n_objects--;
}
};
Expand Down
71 changes: 52 additions & 19 deletions src/tests/roc_core/test_ring_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ struct Object {
static long n_objects;

size_t value;
bool valid;

explicit Object(size_t v = 0)
: value(v) {
: value(v)
, valid(true) {
n_objects++;
}

Object(const Object& other)
: value(other.value) {
: value(other.value)
, valid(true) {
n_objects++;
}

~Object() {
CHECK(valid);
valid = false;
n_objects--;
}
};
Expand All @@ -52,6 +57,7 @@ TEST_GROUP(ring_queue) {

TEST(ring_queue, is_empty_is_full) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

CHECK(queue.is_empty());
CHECK(!queue.is_full());
Expand Down Expand Up @@ -79,6 +85,7 @@ TEST(ring_queue, is_empty_is_full) {

TEST(ring_queue, push_back) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

for (size_t n = 0; n < NumObjects; n++) {
queue.push_back(Object(n));
Expand All @@ -93,6 +100,7 @@ TEST(ring_queue, push_back) {

TEST(ring_queue, push_front) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

for (size_t n = 0; n < NumObjects; n++) {
queue.push_front(Object(n));
Expand All @@ -107,6 +115,7 @@ TEST(ring_queue, push_front) {

TEST(ring_queue, pop_back) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

for (size_t n = 0; n < NumObjects; n++) {
queue.push_back(Object(n));
Expand All @@ -124,6 +133,7 @@ TEST(ring_queue, pop_back) {

TEST(ring_queue, pop_front) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

for (size_t n = 0; n < NumObjects; n++) {
queue.push_back(Object(n));
Expand All @@ -141,6 +151,7 @@ TEST(ring_queue, pop_front) {

TEST(ring_queue, front_back) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

queue.push_back(Object(0));
queue.push_back(Object(1));
Expand All @@ -165,6 +176,7 @@ TEST(ring_queue, front_back) {

TEST(ring_queue, wrap_around) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

for (size_t n = 0; n < NumObjects; n++) {
queue.push_back(Object(n));
Expand All @@ -185,6 +197,7 @@ TEST(ring_queue, wrap_around) {

TEST(ring_queue, wrap_around_loop) {
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);
CHECK(queue.is_valid());

size_t head = 0;
size_t tail = 0;
Expand Down Expand Up @@ -214,6 +227,7 @@ TEST(ring_queue, wrap_around_loop) {

TEST(ring_queue, single_element) {
RingQueue<Object, 1> queue(arena, 1);
CHECK(queue.is_valid());

CHECK(queue.is_valid());
LONGS_EQUAL(1, queue.capacity());
Expand Down Expand Up @@ -258,8 +272,8 @@ TEST(ring_queue, single_element) {

TEST(ring_queue, embedding) {
RingQueue<Object, EmbeddedCap> queue(arena, EmbeddedCap);

CHECK(queue.is_valid());

LONGS_EQUAL(EmbeddedCap, queue.capacity());
LONGS_EQUAL(0, queue.size());
LONGS_EQUAL(0, arena.num_allocations());
Expand Down Expand Up @@ -295,8 +309,8 @@ TEST(ring_queue, constructor_destructor) {

{
RingQueue<Object, EmbeddedCap> queue(arena, NumObjects);

CHECK(queue.is_valid());

LONGS_EQUAL(NumObjects, queue.capacity());
LONGS_EQUAL(0, queue.size());
LONGS_EQUAL(0, Object::n_objects);
Expand All @@ -308,29 +322,48 @@ TEST(ring_queue, constructor_destructor) {
}

TEST(ring_queue, resize) {
size_t half_NumObjects = NumObjects >> 1;
RingQueue<Object, EmbeddedCap> queue(arena, half_NumObjects);
for (size_t n = 0; n < half_NumObjects; ++n) {
queue.push_back(Object(42));
RingQueue<Object, EmbeddedCap> queue(arena, EmbeddedCap);
CHECK(queue.is_valid());

for (size_t n = 0; n < EmbeddedCap; ++n) {
queue.push_back(Object(n));
}
queue.pop_front();
queue.push_back(Object(42));
queue.push_back(Object(EmbeddedCap));

LONGS_EQUAL(0, arena.num_allocations());
LONGS_EQUAL(EmbeddedCap, queue.capacity());
LONGS_EQUAL(EmbeddedCap, queue.size());
LONGS_EQUAL(EmbeddedCap, Object::n_objects);

LONGS_EQUAL(half_NumObjects, queue.capacity());
LONGS_EQUAL(half_NumObjects, queue.size());
// EmbeddedCap => EmbeddedCap/2
CHECK(queue.resize(EmbeddedCap / 2));

CHECK(queue.resize(NumObjects));
LONGS_EQUAL(0, arena.num_allocations());
LONGS_EQUAL(EmbeddedCap / 2, queue.capacity());
LONGS_EQUAL(EmbeddedCap / 2, queue.size());
LONGS_EQUAL(EmbeddedCap / 2, Object::n_objects);

// EmbeddedCap/2 => EmbeddedCap*3
CHECK(queue.resize(EmbeddedCap * 3));

LONGS_EQUAL(1, arena.num_allocations());
LONGS_EQUAL(NumObjects, queue.capacity());
LONGS_EQUAL(NumObjects, queue.size());
LONGS_EQUAL(NumObjects, Object::n_objects);
LONGS_EQUAL(EmbeddedCap * 3, queue.capacity());
LONGS_EQUAL(EmbeddedCap / 2, queue.size());
LONGS_EQUAL(EmbeddedCap / 2, Object::n_objects);

CHECK(queue.resize(half_NumObjects - 2));
// EmbeddedCap*3 => EmbeddedCap*2
CHECK(queue.resize(EmbeddedCap * 2));

LONGS_EQUAL(NumObjects, queue.capacity());
LONGS_EQUAL(half_NumObjects - 2, queue.size());
LONGS_EQUAL(half_NumObjects - 2, Object::n_objects);
LONGS_EQUAL(1, arena.num_allocations());
LONGS_EQUAL(EmbeddedCap * 2, queue.capacity());
LONGS_EQUAL(EmbeddedCap / 2, queue.size());
LONGS_EQUAL(EmbeddedCap / 2, Object::n_objects);

for (size_t n = 0; n < EmbeddedCap / 2; ++n) {
LONGS_EQUAL(n + 1, queue.front().value);
queue.pop_front();
}
}

} // namespace core
Expand Down
9 changes: 7 additions & 2 deletions src/tests/roc_core/test_spsc_ring_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ struct Object {
static long n_objects;

int value;
bool valid;

explicit Object(int v = 0)
: value(v) {
: value(v)
, valid(true) {
n_objects++;
}

Object(const Object& other)
: value(other.value) {
: value(other.value)
, valid(true) {
n_objects++;
}

~Object() {
CHECK(valid);
valid = false;
n_objects--;
}
};
Expand Down

0 comments on commit d90cf31

Please sign in to comment.