Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support consolidation with max_frag_size in Dense. #5190

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions test/src/unit-cppapi-max-fragment-size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,203 @@ TEST_CASE(

array.close();
}

TEST_CASE(
"Setting max_fragment_size in Dense consolidation",
"[global-order-writer][max-frag-size-consolidation]") {
std::string array_name = "cpp_max_fragment_size_bug";
Context ctx;

auto cleanup = [&]() {
auto obj = Object::object(ctx, array_name);
if (obj.type() == Object::Type::Array) {
Object::remove(ctx, array_name);
}
};

cleanup();
std::string cons_buffer_size;

SECTION("More than one loop write, one row or more at the time") {
cons_buffer_size = "30";
}

SECTION("More than one loop write, one or two tiles at the time") {
cons_buffer_size = "10";
}

SECTION("One loop write") {
cons_buffer_size = "10000"; // needed only to speed up consolidation
}

// Remove the array at the end of this test.
ScopedExecutor deferred(cleanup);

// Create an array with exactly 9 tiles and tile extend 1
Domain domain(ctx);
ArraySchema schema(ctx, TILEDB_DENSE);
auto d1 = tiledb::Dimension::create<int32_t>(ctx, "d1", {{0, 2}}, 1);
auto d2 = tiledb::Dimension::create<int32_t>(ctx, "d2", {{0, 2}}, 1);
domain.add_dimension(d1);
domain.add_dimension(d2);

auto a1 = tiledb::Attribute::create<int32_t>(ctx, "a");
schema.add_attribute(a1);

schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
schema.set_domain(domain);

Array::create(array_name, schema);

// Populate array with data from 1 to 9
int value = 0;
for (int i = 0; i < 3; i++) {
Array array(ctx, array_name, TILEDB_WRITE);
Query query(ctx, array);
query.set_layout(TILEDB_ROW_MAJOR);
tiledb::Subarray sub(ctx, array);
sub.set_subarray({i, i, 0, 2});
query.set_subarray(sub);
std::vector<int32_t> data = {++value, ++value, ++value};
query.set_data_buffer("a", data);
query.submit();
array.close();
}

// Read data to validate write and num of fragments.
CHECK(tiledb::test::num_fragments(array_name) == 3);

Array array(ctx, array_name, TILEDB_READ);
tiledb::Subarray sub(ctx, array);
sub.set_subarray({0, 2, 0, 2});
std::vector<int32_t> a(9);
Query query(ctx, array, TILEDB_READ);
query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a);
query.submit();
array.close();

for (int i = 0; i < 9; i++) {
CHECK(a[i] == i + 1);
}

// Consolidate with a size limitation for the fragment. This will result in
// the creation of two new fragments.
tiledb::Config cfg;
cfg.set("sm.consolidation.max_fragment_size", "150");
cfg.set("sm.consolidation.buffer_size", cons_buffer_size);

ctx = Context(cfg);
Array::consolidate(ctx, array_name);
Array::vacuum(ctx, array_name);

// Check that we now have 2 fragments instead of 3
CHECK(tiledb::test::num_fragments(array_name) == 2);

// Read data to validate correctness
Array array2(ctx, array_name, TILEDB_READ);
tiledb::Subarray sub2(ctx, array2);
sub2.set_subarray({0, 2, 0, 2});
std::vector<int32_t> a2(9);
Query query2(ctx, array2, TILEDB_READ);
query2.set_subarray(sub2)
.set_layout(TILEDB_ROW_MAJOR)
.set_data_buffer("a", a2);
query2.submit();
array2.close();

for (int i = 0; i < 9; i++) {
CHECK(a2[i] == i + 1);
}
}

TEST_CASE(
"Setting max_fragment_size in Dense consolidation one dim",
"[global-order-writer][max-frag-size-consolidation]") {
std::string array_name = "cpp_max_fragment_size_bug";
Context ctx;

auto cleanup = [&]() {
auto obj = Object::object(ctx, array_name);
if (obj.type() == Object::Type::Array) {
Object::remove(ctx, array_name);
}
};

cleanup();

// Remove the array at the end of this test.
ScopedExecutor deferred(cleanup);

// Create an array with exactly 9 tiles and tile extend 1
Domain domain(ctx);
ArraySchema schema(ctx, TILEDB_DENSE);
auto d1 = tiledb::Dimension::create<int32_t>(ctx, "d1", {{1, 9}}, 3);
domain.add_dimension(d1);

auto a1 = tiledb::Attribute::create<int32_t>(ctx, "a");
schema.add_attribute(a1);

schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
schema.set_domain(domain);

Array::create(array_name, schema);

// Populate array with data from 1 to 9
int value = 0;
for (int i = 1; i < 10; i += 3) {
Array array(ctx, array_name, TILEDB_WRITE);
Query query(ctx, array);
query.set_layout(TILEDB_ROW_MAJOR);
tiledb::Subarray sub(ctx, array);
sub.set_subarray({i, i + 2});
query.set_subarray(sub);
std::vector<int32_t> data = {++value, ++value, ++value};
query.set_data_buffer("a", data);
query.submit();
array.close();
}

// Read data to validate write and num of fragments.
// CHECK(tiledb::test::num_fragments(array_name) == 3);

Array array(ctx, array_name, TILEDB_READ);
tiledb::Subarray sub(ctx, array);
sub.set_subarray({1, 9});
std::vector<int32_t> a(9);
Query query(ctx, array, TILEDB_READ);
query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a);
query.submit();
array.close();

for (int i = 0; i < 9; i++) {
CHECK(a[i] == i + 1);
}

// Consolidate with a size limitation for the fragment. This will result in
// the creation of two new fragments.
tiledb::Config cfg;
cfg.set("sm.consolidation.max_fragment_size", "80");
cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation
ctx = Context(cfg);
Array::consolidate(ctx, array_name);
Array::vacuum(ctx, array_name);

// Check that we now have 2 fragments instead of 3
CHECK(tiledb::test::num_fragments(array_name) == 2);

// Read data to validate correctness
Array array2(ctx, array_name, TILEDB_READ);
tiledb::Subarray sub2(ctx, array2);
sub2.set_subarray({1, 9});
std::vector<int32_t> a2(9);
Query query2(ctx, array2, TILEDB_READ);
query2.set_subarray(sub2)
.set_layout(TILEDB_ROW_MAJOR)
.set_data_buffer("a", a2);
query2.submit();
array2.close();

for (int i = 0; i < 9; i++) {
CHECK(a2[i] == i + 1);
}
}
7 changes: 7 additions & 0 deletions tiledb/sm/enums/datatype.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ inline bool datatype_is_string(Datatype type) {
type == Datatype::STRING_UCS2 || type == Datatype::STRING_UCS4);
}

/** Returns true if the input datatype is an unsigned type. */
inline bool datatype_is_unsigned(Datatype type) {
return (
type == Datatype::UINT8 || type == Datatype::UINT32 ||
type == Datatype::UINT16 || type == Datatype::UINT64);
}

/** Returns true if the input datatype is an integer type. */
inline bool datatype_is_integer(Datatype type) {
return (
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/fragment/fragment_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ void FragmentMetadata::init_domain(const NDRange& non_empty_domain) {

// Sanity check
assert(!non_empty_domain.empty());
assert(non_empty_domain_.empty());
assert(domain_.empty());
// assert(non_empty_domain_.empty()); todo, this might cause problems
// assert(domain_.empty());

// Set non-empty domain for dense arrays (for sparse it will be calculated
// via the MBRs)
Expand Down
Loading
Loading