Skip to content

Commit

Permalink
Aggregates pushdown: adding incomplete tests. (#4301)
Browse files Browse the repository at this point in the history
* Aggregates pushdown: adding incomplete tests.

This adds tests that make sure the aggregates pipeline handles incompletes properly.

---
TYPE: IMPROVEMENT
DESC: Aggregates pushdown: adding incomplete tests.

* Fixed number of iterations.
  • Loading branch information
KiterLuc authored Sep 16, 2023
1 parent f412aac commit 86b61b8
Showing 1 changed file with 186 additions and 9 deletions.
195 changes: 186 additions & 9 deletions test/src/test-cppapi-aggregates.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ struct CppAggregatesFx {
std::vector<uint64_t>& dim1,
std::vector<uint64_t>& dim2,
std::vector<uint8_t>& a1,
std::vector<uint8_t>& a1_validity);
std::vector<uint8_t>& a1_validity,
const bool validate_count = true);
void validate_data_var(
Query& query,
std::vector<uint64_t>& dim1,
Expand Down Expand Up @@ -764,7 +765,8 @@ void CppAggregatesFx<T>::validate_data(
std::vector<uint64_t>& dim1,
std::vector<uint64_t>& dim2,
std::vector<uint8_t>& a1,
std::vector<uint8_t>& a1_validity) {
std::vector<uint8_t>& a1_validity,
const bool validate_count) {
uint64_t expected_count = 0;
std::vector<uint64_t> expected_dim1;
std::vector<uint64_t> expected_dim2;
Expand Down Expand Up @@ -892,13 +894,6 @@ void CppAggregatesFx<T>::validate_data(
CppAggregatesFx<T>::STRING_CELL_VAL_NUM :
1;

auto result_el = query.result_buffer_elements_nullable();
CHECK(std::get<1>(result_el["d1"]) == expected_count);
CHECK(std::get<1>(result_el["d2"]) == expected_count);
CHECK(std::get<1>(result_el["a1"]) == expected_count * cell_val_num);
if (nullable_) {
CHECK(std::get<2>(result_el["a1"]) == expected_count);
}
dim1.resize(expected_dim1.size());
dim2.resize(expected_dim2.size());
a1.resize(expected_a1.size());
Expand All @@ -910,6 +905,16 @@ void CppAggregatesFx<T>::validate_data(
a1_validity.resize(expected_a1_validity.size());
CHECK(a1_validity == expected_a1_validity);
}

if (validate_count) {
auto result_el = query.result_buffer_elements_nullable();
CHECK(std::get<1>(result_el["d1"]) == expected_count);
CHECK(std::get<1>(result_el["d2"]) == expected_count);
CHECK(std::get<1>(result_el["a1"]) == expected_count * cell_val_num);
if (nullable_) {
CHECK(std::get<2>(result_el["a1"]) == expected_count);
}
}
}

template <class T>
Expand Down Expand Up @@ -2148,6 +2153,178 @@ TEST_CASE_METHOD(
}
}

// Close array.
array.close();
}

TEMPLATE_LIST_TEST_CASE_METHOD(
CppAggregatesFx,
"C++ API: Aggregates incomplete test",
"[cppapi][aggregates][incomplete]",
SumFixedTypesUnderTest) {
typedef TestType T;
CppAggregatesFx<T>::generate_test_params();
CppAggregatesFx<T>::create_array_and_write_fragments();

Array array{
CppAggregatesFx<T>::ctx_, CppAggregatesFx<T>::ARRAY_NAME, TILEDB_READ};

for (bool set_ranges : {true, false}) {
CppAggregatesFx<T>::set_ranges_ = set_ranges;
for (bool set_qc : CppAggregatesFx<T>::set_qc_values_) {
CppAggregatesFx<T>::set_qc_ = set_qc;
for (tiledb_layout_t layout : CppAggregatesFx<T>::layout_values_) {
CppAggregatesFx<T>::layout_ = layout;
Query query(CppAggregatesFx<T>::ctx_, array, TILEDB_READ);

// TODO: Change to real CPPAPI. Add a count aggregator to the query.
// We add both sum and count as they are processed separately in the
// dense case.
query.ptr()->query_->add_aggregator_to_default_channel(
"Count", std::make_shared<tiledb::sm::CountAggregator>());

query.ptr()->query_->add_aggregator_to_default_channel(
"Sum",
std::make_shared<tiledb::sm::SumAggregator<T>>(
tiledb::sm::FieldInfo(
"a1", false, CppAggregatesFx<T>::nullable_, 1)));

CppAggregatesFx<T>::set_ranges_and_condition_if_needed(
array, query, false);

// Set the data buffer for the aggregator.
uint64_t cell_size = sizeof(T);
std::vector<uint64_t> count(1);
std::vector<typename tiledb::sm::sum_type_data<T>::sum_type> sum(1);
std::vector<uint8_t> sum_validity(1);
std::vector<uint64_t> dim1(100);
std::vector<uint64_t> dim2(100);
std::vector<uint8_t> a1(100 * cell_size);
std::vector<uint8_t> a1_validity(100);
query.set_layout(layout);
uint64_t count_data_size = sizeof(uint64_t);
uint64_t sum_data_size =
sizeof(typename tiledb::sm::sum_type_data<T>::sum_type);
// TODO: Change to real CPPAPI.
CHECK(query.ptr()
->query_
->set_data_buffer("Count", count.data(), &count_data_size)
.ok());
CHECK(query.ptr()
->query_->set_data_buffer("Sum", sum.data(), &sum_data_size)
.ok());
uint64_t returned_validity_size = 1;
if (CppAggregatesFx<T>::nullable_) {
// TODO: Change to real CPPAPI. Use set_validity_buffer from the
// internal query directly because the CPPAPI doesn't know what is
// an aggregate and what the size of an aggregate should be.
CHECK(query.ptr()
->query_
->set_validity_buffer(
"Sum", sum_validity.data(), &returned_validity_size)
.ok());
}

// Here we run a few iterations until the query completes and update the
// buffers as we go.
uint64_t curr_elem = 0;
uint64_t num_elems = CppAggregatesFx<T>::dense_ ? 20 : 4;
uint64_t num_iters = 0;
if (CppAggregatesFx<T>::dense_) {
num_iters = 2;
} else {
if (CppAggregatesFx<T>::set_ranges_) {
num_iters = 2;
} else {
num_iters = 4;
}
}

for (uint64_t iter = 0; iter < num_iters; iter++) {
query.set_data_buffer("d1", dim1.data() + curr_elem, num_elems);
query.set_data_buffer("d2", dim2.data() + curr_elem, num_elems);
query.set_data_buffer(
"a1",
static_cast<void*>(a1.data() + curr_elem * cell_size),
num_elems);

if (CppAggregatesFx<T>::nullable_) {
query.set_validity_buffer(
"a1", a1_validity.data() + curr_elem, num_elems);
}

// Submit the query.
query.submit();

// Adjust current element count;
auto result_el = query.result_buffer_elements_nullable();
curr_elem += std::get<1>(result_el["d1"]);

// Stop on query completion.
if (iter < num_iters - 1) {
CHECK(query.query_status() == Query::Status::INCOMPLETE);
} else {
CHECK(query.query_status() == Query::Status::COMPLETE);
}
}

// Check the results.
uint64_t expected_count;
if (CppAggregatesFx<T>::dense_) {
expected_count = set_ranges ? 24 : 36;
} else {
if (set_ranges) {
expected_count = CppAggregatesFx<T>::allow_dups_ ? 8 : 7;
} else {
expected_count = CppAggregatesFx<T>::allow_dups_ ? 16 : 15;
}
}

typename tiledb::sm::sum_type_data<T>::sum_type expected_sum;
if (CppAggregatesFx<T>::dense_) {
if (CppAggregatesFx<T>::nullable_) {
if (set_ranges) {
expected_sum = set_qc ? 197 : 201;
} else {
expected_sum = set_qc ? 315 : 319;
}
} else {
if (set_ranges) {
expected_sum = set_qc ? 398 : 402;
} else {
expected_sum = set_qc ? 591 : 630;
}
}
} else {
if (CppAggregatesFx<T>::nullable_) {
if (set_ranges) {
expected_sum = 42;
} else {
expected_sum = 56;
}
} else {
if (set_ranges) {
expected_sum = CppAggregatesFx<T>::allow_dups_ ? 88 : 81;
} else {
expected_sum = CppAggregatesFx<T>::allow_dups_ ? 120 : 113;
}
}
}

// TODO: use 'std::get<1>(result_el["Count"]) == 1' once we use the
// set_data_buffer api.
CHECK(count[0] == expected_count);

// TODO: use 'std::get<1>(result_el["Sum"]) == 1' once we use the
// set_data_buffer api.
CHECK(sum[0] == expected_sum);

CppAggregatesFx<T>::validate_data(
query, dim1, dim2, a1, a1_validity, false);
}
}
}

// Close array.
array.close();
}

0 comments on commit 86b61b8

Please sign in to comment.