Skip to content

Commit

Permalink
Add REST client support for delete_fragments and delete_fragments_list (
Browse files Browse the repository at this point in the history
#3923)

* Rebase on dev

* Add deserialization and test for fragments_list

* Add serialization for FragmentTimestamps

* Add deserialization C APIs

* Add win32 capnp files

* Use pointers to output params, return data in serialization

* Add class FragmentsList with OL fragment for FragmentsList Handle class

* Add Handle class for FragmentsList and use handle for deserialization

* Rebase on dev

* Address comments

* Calculate char length with strlen

* Use string_view in tiledb_fragments_list_get_fragment_uri

* Rebase on dev

* Address minor comments

* Rename capnp structures

* Fix CI failure

* Add tiledb_array_delete_fragments_v2 which directly calls the Rest API

* Move Rest call into C API for delete_fragments_list

* Add uri to ArrayDeleteFragmentsTimestampsRequest

* Add uri to FragmentsList capnp struct

* Delete fragments inside of tiledb_deserialize_array_delete_fragments_timestamps_request

* Delete fragments list in tiledb_deserialize_array_delete_fragments_list_request

* Remove FragmentsList class and its handle class

* Error handling optimization

* Fix test issue, open between timestamps
  • Loading branch information
bekadavis9 authored Sep 29, 2023
1 parent 35df03d commit 553979c
Show file tree
Hide file tree
Showing 20 changed files with 2,171 additions and 70 deletions.
2 changes: 2 additions & 0 deletions test/regression/targets/sc-25116.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ TEST_CASE(
FragmentInfo fi(ctx, array_name);
fi.load();
REQUIRE(fi.fragment_num() == 1);

remove_array();
}
94 changes: 89 additions & 5 deletions test/src/unit-capi-array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
#endif

#include <test/support/tdb_catch.h>
#include "tiledb/sm/c_api/tiledb.h"

#include <iostream>

#include "test/support/src/helpers.h"
#include "test/support/src/serialization_wrappers.h"
#include "test/support/src/vfs_helpers.h"
Expand All @@ -59,6 +55,7 @@
#include "tiledb/sm/enums/serialization_type.h"
#include "tiledb/sm/global_state/unit_test_config.h"
#include "tiledb/sm/serialization/array.h"
#include "tiledb/sm/serialization/fragments.h"
#include "tiledb/storage_format/uri/parse_uri.h"

#include <chrono>
Expand Down Expand Up @@ -385,7 +382,7 @@ void ArrayFx::create_dense_array(const std::string& path) {

void ArrayFx::write_fragment(tiledb_array_t* array, uint64_t timestamp) {
// Open the array at the given timestamp
int rc = tiledb_array_set_open_timestamp_start(ctx_, array, timestamp);
int rc = tiledb_array_set_open_timestamp_end(ctx_, array, timestamp);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_open(ctx_, array, TILEDB_WRITE);
REQUIRE(rc == TILEDB_OK);
Expand Down Expand Up @@ -2666,3 +2663,90 @@ TEST_CASE_METHOD(
remove_temp_dir(local_fs.file_prefix() + local_fs.temp_dir());
#endif
}

TEST_CASE_METHOD(
ArrayFx,
"Test array fragments serialization",
"[array][fragments][serialization]") {
#ifdef TILEDB_SERIALIZATION
SupportedFsLocal local_fs;
std::string array_name = local_fs.file_prefix() + local_fs.temp_dir() +
"array_fragments_serialization";
create_temp_dir(local_fs.file_prefix() + local_fs.temp_dir());
create_dense_vector(array_name);

// Write fragments at timestamps 1, 2
tiledb_array_t* array;
int rc = tiledb_array_alloc(ctx_, array_name.c_str(), &array);
REQUIRE(rc == TILEDB_OK);
uint64_t start_timestamp = 1;
uint64_t end_timestamp = 2;
write_fragment(array, start_timestamp);
write_fragment(array, end_timestamp);
CHECK(tiledb::test::num_commits(array_name) == 2);
CHECK(tiledb::test::num_fragments(array_name) == 2);

// ALlocate buffer
tiledb_buffer_t* buff;
rc = tiledb_buffer_alloc(ctx_, &buff);
REQUIRE(rc == TILEDB_OK);

SECTION("delete_fragments") {
// Serialize fragment timestamps and deserialize delete request
tiledb::sm::serialization::fragments_timestamps_serialize(
array_name,
start_timestamp,
end_timestamp,
tiledb::sm::SerializationType::CAPNP,
&buff->buffer());
rc = tiledb_deserialize_array_delete_fragments_timestamps_request(
ctx_,
(tiledb_serialization_type_t)tiledb::sm::SerializationType::CAPNP,
buff);
REQUIRE(rc == TILEDB_OK);
CHECK(tiledb::test::num_commits(array_name) == 0);
CHECK(tiledb::test::num_fragments(array_name) == 0);
}

SECTION("delete_fragments_list") {
// Get the fragment info object
tiledb_fragment_info_t* fragment_info = nullptr;
rc = tiledb_fragment_info_alloc(ctx_, array_name.c_str(), &fragment_info);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_fragment_info_load(ctx_, fragment_info);
REQUIRE(rc == TILEDB_OK);

// Get the fragment URIs
const char* uri1;
rc = tiledb_fragment_info_get_fragment_uri(ctx_, fragment_info, 0, &uri1);
REQUIRE(rc == TILEDB_OK);
const char* uri2;
rc = tiledb_fragment_info_get_fragment_uri(ctx_, fragment_info, 1, &uri2);
REQUIRE(rc == TILEDB_OK);

std::vector<URI> fragments;
fragments.emplace_back(URI(uri1));
fragments.emplace_back(URI(uri2));

// Serialize fragments list and deserialize delete request
tiledb::sm::serialization::fragments_list_serialize(
array_name,
fragments,
tiledb::sm::SerializationType::CAPNP,
&buff->buffer());
rc = tiledb_deserialize_array_delete_fragments_list_request(
ctx_,
(tiledb_serialization_type_t)tiledb::sm::SerializationType::CAPNP,
buff);
REQUIRE(rc == TILEDB_OK);
CHECK(tiledb::test::num_commits(array_name) == 0);
CHECK(tiledb::test::num_fragments(array_name) == 0);
tiledb_fragment_info_free(&fragment_info);
}

// Clean up
tiledb_array_free(&array);
tiledb_buffer_free(&buff);
remove_temp_dir(local_fs.file_prefix() + local_fs.temp_dir());
#endif
}
23 changes: 3 additions & 20 deletions test/src/unit-cppapi-deletes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1766,16 +1766,6 @@ TEST_CASE_METHOD(
write_sparse({0, 1, 2, 3}, {1, 1, 1, 2}, {1, 2, 4, 3}, 3);
CHECK(tiledb::test::num_fragments(SPARSE_ARRAY_NAME) == 2);

// Open array in WRITE mode and try to delete fragments
std::unique_ptr<Array> array =
std::make_unique<Array>(ctx_, SPARSE_ARRAY_NAME, TILEDB_WRITE);
REQUIRE_THROWS_WITH(
array->delete_fragments(SPARSE_ARRAY_NAME, 0, UINT64_MAX),
Catch::Matchers::ContainsSubstring(
"Query type must be MODIFY_EXCLUSIVE"));
CHECK(tiledb::test::num_fragments(SPARSE_ARRAY_NAME) == 2);
array->close();

// Try to delete a fragment uri that doesn't exist
std::string extraneous_fragment =
std::string(SPARSE_ARRAY_NAME) + "/" +
Expand All @@ -1784,8 +1774,7 @@ TEST_CASE_METHOD(
REQUIRE_THROWS_WITH(
Array::delete_fragments_list(
ctx_, SPARSE_ARRAY_NAME, extraneous_fragments, 1),
Catch::Matchers::ContainsSubstring(
"is not a fragment of the ArrayDirectory"));
Catch::Matchers::ContainsSubstring("Failed to delete fragments_list"));
CHECK(tiledb::test::num_fragments(SPARSE_ARRAY_NAME) == 2);

remove_sparse_array();
Expand Down Expand Up @@ -1836,10 +1825,7 @@ TEST_CASE_METHOD(

// Delete fragments
SECTION("delete fragments by timestamps") {
std::unique_ptr<Array> array = std::make_unique<Array>(
ctx_, SPARSE_ARRAY_NAME, TILEDB_MODIFY_EXCLUSIVE);
array->delete_fragments(SPARSE_ARRAY_NAME, 2, 6);
array->close();
Array::delete_fragments(ctx_, SPARSE_ARRAY_NAME, 2, 6);
}

SECTION("delete fragments by uris") {
Expand Down Expand Up @@ -1926,17 +1912,14 @@ TEST_CASE_METHOD(
CHECK(tiledb::test::num_fragments(SPARSE_ARRAY_NAME) == num_fragments);

// Delete fragments at timestamps 2 - 4
std::unique_ptr<Array> array =
std::make_unique<Array>(ctx_, SPARSE_ARRAY_NAME, TILEDB_MODIFY_EXCLUSIVE);
array->delete_fragments(SPARSE_ARRAY_NAME, 2, 4);
Array::delete_fragments(ctx_, SPARSE_ARRAY_NAME, 2, 4);
if (!vacuum) {
// Vacuum after deletion
auto config = ctx_.config();
Array::vacuum(ctx_, SPARSE_ARRAY_NAME, &config);
num_commits -= 2;
num_fragments -= 2;
}
array->close();

// Validate working directory
CHECK(tiledb::test::num_commits(SPARSE_ARRAY_NAME) == num_commits);
Expand Down
3 changes: 3 additions & 0 deletions tiledb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ set(TILEDB_CORE_SOURCES
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/enumeration.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_info.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragments.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/query.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/consolidation.cc
Expand Down Expand Up @@ -335,6 +336,7 @@ if (TILEDB_SERIALIZATION)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/enumeration.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_info.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragments.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/query.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/consolidation.cc
Expand All @@ -355,6 +357,7 @@ if (TILEDB_SERIALIZATION)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/enumeration.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_info.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragments.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/query.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/consolidation.cc
Expand Down
21 changes: 14 additions & 7 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,14 @@ void Array::delete_fragments(
ensure_array_is_valid_for_delete(uri);

// Delete fragments
// #TODO Add rest support for delete_fragments
if (remote_) {
throw ArrayException(
"[delete_fragments] Remote arrays currently unsupported.");
auto rest_client = resources_.rest_client();
if (rest_client == nullptr) {
throw ArrayException(
"[delete_fragments] Remote array with no REST client.");
}
rest_client->delete_fragments_from_rest(
uri, timestamp_start, timestamp_end);
} else {
storage_manager_->delete_fragments(
uri.c_str(), timestamp_start, timestamp_end);
Expand All @@ -567,11 +571,14 @@ void Array::delete_fragments_list(
// Check that data deletion is allowed
ensure_array_is_valid_for_delete(uri);

// Delete fragments
// #TODO Add rest support for delete_fragments_list
// Delete fragments_list
if (remote_) {
throw ArrayException(
"[delete_fragments_list] Remote arrays currently unsupported.");
auto rest_client = resources_.rest_client();
if (rest_client == nullptr) {
throw ArrayException(
"[delete_fragments_list] Remote array with no REST client.");
}
rest_client->delete_fragments_list_from_rest(uri, fragment_uris);
} else {
auto array_dir = ArrayDirectory(
resources_, uri, 0, std::numeric_limits<uint64_t>::max());
Expand Down
Loading

0 comments on commit 553979c

Please sign in to comment.