Skip to content

Commit

Permalink
roc-streaminggh-751: Implemented moving quantile using partition heap
Browse files Browse the repository at this point in the history
  • Loading branch information
novertia committed Jul 17, 2024
1 parent 5c1283e commit 2bc56a1
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 74 deletions.
174 changes: 121 additions & 53 deletions src/internal_modules/roc_core/mov_quantile.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,51 +23,53 @@ template <typename T> class MovQuantile {
public:
const size_t win_len_;
const size_t percentile_;
size_t k_;
size_t old_heap_root_index_;
const size_t heap_root_;
size_t heap_size_;
size_t max_heap_index_;
size_t min_heap_index_;
size_t elem_index_;
bool win_filled_;
bool valid_;
Array<T> heap_;
Array<T> heap_;
Array<size_t> elem_index_heap_index_;
Array<size_t> heap_index_elem_index_;

MovQuantile(IArena& arena, const size_t win_len, const size_t percentile)
: win_len_(win_len)
, percentile_(percentile)
, k_(0)
, old_heap_root_index_(0)
, heap_root_((percentile * (win_len - 1))/100)
, heap_size_(0)
, max_heap_index_(heap_root_)
, min_heap_index_(heap_root_)
, elem_index_(0)
, win_filled_(false)
, valid_(false)
, heap_(arena)
, elem_index_heap_index_(arena)
, heap_index_elem_index_(arena) {
if (win_len == 0) {
if (win_len == 0) {
roc_panic("mov quantile: window length must be greater than 0");
}
if (percentile > 100){
roc_panic("mov quantile: percentile should be between 0 and 100");
}
}
if (!heap_.resize(win_len)) {
return;
}
if (!elem_index_heap_index_.resize(win_len)) {
}
if(!elem_index_heap_index_.resize(win_len)){
return;
}
if (!heap_index_elem_index_.resize(win_len)) {
if (!heap_index_elem_index_.resize(win_len)){
return;
}
for(size_t i = 0 ; i < win_len; i++){
elem_index_heap_index_[i] = i;
heap_index_elem_index_[i] = i;
}
}
valid_ = true;
}
bool is_valid(){
return valid_;
}

void swap(size_t index_1, size_t index_2){
size_t elem_index_1 = heap_index_elem_index_[index_1];
size_t elem_index_2 = heap_index_elem_index_[index_2];
Expand All @@ -81,72 +83,138 @@ template <typename T> class MovQuantile {

elem_index_heap_index_[elem_index_1] = index_2;
elem_index_heap_index_[elem_index_2] = index_1;
}
}

void heapify_up(size_t heap_index){
//sift up
if (heap_index == 0){
void min_heapify_up(size_t heap_index){
if (heap_index == heap_root_){
return;
}
size_t parent = (heap_index - 1)/2;
size_t parent = (heap_index - heap_root_ - 1)/2 + heap_root_;
if(heap_[parent] > heap_[heap_index]){
swap(heap_index, parent);
heapify_up(parent);
min_heapify_up(parent);
}
}

void max_heapify_up(size_t heap_index){
//sift up
if (heap_index == heap_root_){
return;
}
size_t parent = heap_root_ - ((heap_root_ - heap_index - 1)/2);
if(heap_[parent] < heap_[heap_index]){
swap(heap_index, parent);
max_heapify_up(parent);
}
}

void heapify_down(size_t heap_index){
size_t largest = heap_index;
size_t left = 2*heap_index + 1;
if(left < heap_size_ && heap_[left] < heap_[largest])
void min_heapify_down(size_t heap_index){
size_t largest = heap_index;

size_t left = 2*(heap_index - heap_root_) + 1 + heap_root_;
if(left <= min_heap_index_ && heap_[left] < heap_[largest])
largest = left;
size_t right = 2*heap_index + 2;
if(right < heap_size_ && heap_[right] < heap_[largest])
size_t right = 2*(heap_index - heap_root_) + 2 + heap_root_;
if(right <= min_heap_index_ && heap_[right] < heap_[largest])
largest = right;

if(largest != heap_index){
swap(heap_index,largest);
heapify_down(largest);
min_heapify_down(largest);
}
}

void max_heapify_down(size_t heap_index){
size_t largest = heap_index;

size_t left = 2*(heap_root_ - heap_index) + 1;
if(left <= (heap_root_ - max_heap_index_) && heap_[heap_root_ - left] > heap_[largest])
largest = heap_root_ - left;
size_t right = 2*(heap_root_ - heap_index) + 2;
if(right <= (heap_root_ - max_heap_index_) && heap_[heap_root_ - right] > heap_[largest])
largest = heap_root_ - right;

if(largest != heap_index){
swap(heap_index,largest);
max_heapify_down(largest);
}
}

void heapify(size_t heap_index){
size_t parent = (heap_index-1)/2;
if(heap_index!=0 && heap_[parent] > heap_[heap_index]){
heapify_up(heap_index);
return;
if(heap_index < heap_root_){
size_t parent = heap_root_ - ((heap_root_ - heap_index - 1)/2);
if(heap_[parent] < heap_[heap_index]){
max_heapify_up(heap_index);
min_heapify_down(heap_root_);
}
else{
max_heapify_down(heap_index);
}
}
else{
heapify_down(heap_index);
return;
else if(heap_root_ == heap_index){
max_heapify_down(heap_index);
min_heapify_down(heap_root_);
}
}

else{
size_t parent = (heap_index - heap_root_ - 1)/2 + heap_root_;
if(heap_[parent] > heap_[heap_index]){
min_heapify_up(heap_index);
max_heapify_down(heap_root_);
}
else{
min_heapify_down(heap_index);
}
}
}

void add(const T& x){
if ((elem_index_ + 1) == win_len_)
if (elem_index_ == win_len_)
win_filled_ = true;
heap_size_ = elem_index_ + 1;

heap_size_ = elem_index_ + 1;
elem_index_ = (elem_index_) % win_len_;
if(win_filled_){
heap_size_ = win_len_;
heap_size_ = win_len_;
min_heap_index_ = win_len_ - 1;
max_heap_index_ = 0;
size_t heap_index = elem_index_heap_index_[elem_index_];
heap_[heap_index] = x;
heapify(heap_index);
}
elem_index_ = (elem_index_) % win_len_;
k_ = (percentile_ * heap_size_)/100;
size_t heap_index = elem_index_heap_index_[elem_index_];
heap_[heap_index] = x;
heapify(heap_index);
else{
size_t k = (percentile_ * (heap_size_ - 1))/100;
size_t heap_index;
if (elem_index_ == 0){
heap_index = heap_root_;
elem_index_heap_index_[elem_index_] = heap_index;
heap_[heap_index] = x;
heap_index_elem_index_[heap_index] = elem_index_;
}
else {
if(old_heap_root_index_ == k){
min_heap_index_ += 1;
heap_index = min_heap_index_;

}
else{
max_heap_index_ -= 1;
heap_index = max_heap_index_;
}
elem_index_heap_index_[elem_index_] = heap_index;
heap_[heap_index] = x;
heap_index_elem_index_[heap_index] = elem_index_;
heapify(heap_index);
old_heap_root_index_ = k;
}

}

elem_index_ = elem_index_ + 1;

}

T sliding_minimum(){
return heap_[0];
}

size_t k_value(){
return k_;
}
T sliding_quantile(){
return heap_[heap_root_];
}
};


Expand Down
130 changes: 109 additions & 21 deletions src/tests/roc_core/test_mov_quantile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,118 @@ TEST_GROUP(movquantile){
HeapArena arena;
};

TEST(movquantile, testing_code){
const size_t n = 7;
MovQuantile<int64_t> quant(arena,n,62);
quant.add(5);
quant.add(4);
quant.add(2);
quant.add(6);
quant.add(6);

int64_t x = quant.sliding_minimum();
size_t k1 = quant.k_value();
LONGS_EQUAL((int64_t)2,x);
CHECK_EQUAL((size_t)3,k1);
quant.add(10);
TEST(movquantile, testing_minimum){
const size_t n = 9;
MovQuantile<int64_t> quant(arena,n,0);
CHECK(quant.is_valid());
quant.add(14);
quant.add(28);
quant.add(11);
quant.add(12);
LONGS_EQUAL((int64_t)2,quant.sliding_minimum());
quant.add(18);
quant.add(15);
quant.add(25);
LONGS_EQUAL((int64_t)11,quant.sliding_quantile()); // test window incomplete
quant.add(32);
quant.add(14);
quant.add(19);
quant.add(16);
quant.add(35);
LONGS_EQUAL((int64_t)12,quant.sliding_quantile()); // test window complete
}

TEST(movquantile, testing_lower_side){
const size_t n = 12;
MovQuantile<int64_t> quant(arena,n,34);
CHECK(quant.is_valid());
quant.add(10);
quant.add(12);
quant.add(25);
quant.add(22);
quant.add(18);
quant.add(6);
quant.add(24);
LONGS_EQUAL((int64_t)12,quant.sliding_quantile()); // test window incomplete
quant.add(22);
quant.add(35);
quant.add(42);
quant.add(31);
quant.add(39);
quant.add(27);
quant.add(4);
LONGS_EQUAL((int64_t)2,quant.sliding_minimum());
quant.add(45);
quant.add(49);
quant.add(37);
int64_t x1 = quant.sliding_quantile(); //test complete window insertion
LONGS_EQUAL((int64_t)24,x1);
}

TEST(movquantile, testing_median){
const size_t n = 10;
MovQuantile<int64_t> quant(arena,n,50);
CHECK(quant.is_valid());
quant.add(18);
quant.add(12);
quant.add(55);
quant.add(72);
quant.add(25);
quant.add(6);
quant.add(37);
LONGS_EQUAL((int64_t)25,quant.sliding_quantile()); // test window incomplete
quant.add(23);
quant.add(48);
quant.add(100);
quant.add(62);
quant.add(57);
quant.add(92);
quant.add(1);
quant.add(8);
int64_t x1 = quant.sliding_minimum();
size_t k2 = quant.k_value();
CHECK_EQUAL((size_t)4,k2);
LONGS_EQUAL((int64_t)1,x1);
quant.add(72);
quant.add(83);
quant.add(37);
LONGS_EQUAL((int64_t)57,quant.sliding_quantile()); //test complete window
}

TEST(movquantile, testing_upper_side){
const size_t n = 11;
MovQuantile<int64_t> quant(arena,n,78);
quant.add(18);
quant.add(18);
quant.add(22);
quant.add(14);
quant.add(39);
quant.add(52);
quant.add(14);
quant.add(46);
LONGS_EQUAL((int64_t)39,quant.sliding_quantile()); //test incomplete window
quant.add(14);
quant.add(14);
quant.add(100);
quant.add(32);
quant.add(83);
LONGS_EQUAL((int64_t)46,quant.sliding_quantile()); //test complete window
}

TEST(movquantile, test_maximum){
const size_t n = 7;
MovQuantile<int64_t> quant(arena,n,100);
quant.add(21);
quant.add(14);
quant.add(38);
quant.add(72);
quant.add(63);
LONGS_EQUAL((int64_t)72,quant.sliding_quantile()); //test incomplete window
quant.add(35);
quant.add(76);
quant.add(42);
quant.add(13);
quant.add(15);
quant.add(11);
quant.add(102);
quant.add(56);
quant.add(20);
LONGS_EQUAL((int64_t)102,quant.sliding_quantile()); //test complete window
}

}
}

0 comments on commit 2bc56a1

Please sign in to comment.