Skip to content

Commit

Permalink
Fix delete_fragments handlers according to new model.
Browse files Browse the repository at this point in the history
This updates the handlers for delete_fragments and delete_fragments_list to the new handler model that takes in an opened array.

---
TYPE: IMPROVEMENT
DESC: Fix delete_fragments handlers according to new model.
  • Loading branch information
KiterLuc committed Oct 12, 2023
1 parent e3967fa commit 8aa7201
Show file tree
Hide file tree
Showing 11 changed files with 9,460 additions and 6,496 deletions.
14 changes: 8 additions & 6 deletions test/src/unit-capi-array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2693,14 +2693,15 @@ TEST_CASE_METHOD(

SECTION("delete_fragments") {
// Serialize fragment timestamps and deserialize delete request
tiledb::sm::serialization::fragments_timestamps_serialize(
array_name,
tiledb::sm::serialization::serialize_delete_fragments_timestamps_request(
array->array_->config(),
start_timestamp,
end_timestamp,
tiledb::sm::SerializationType::CAPNP,
&buff->buffer());
rc = tiledb_deserialize_array_delete_fragments_timestamps_request(
rc = tiledb_handle_array_delete_fragments_timestamps_request(
ctx_,
array,
(tiledb_serialization_type_t)tiledb::sm::SerializationType::CAPNP,
buff);
REQUIRE(rc == TILEDB_OK);
Expand Down Expand Up @@ -2729,13 +2730,14 @@ TEST_CASE_METHOD(
fragments.emplace_back(URI(uri2));

// Serialize fragments list and deserialize delete request
tiledb::sm::serialization::fragments_list_serialize(
array_name,
tiledb::sm::serialization::serialize_delete_fragments_list_request(
array->array_->config(),
fragments,
tiledb::sm::SerializationType::CAPNP,
&buff->buffer());
rc = tiledb_deserialize_array_delete_fragments_list_request(
rc = tiledb_handle_array_delete_fragments_list_request(
ctx_,
array,
(tiledb_serialization_type_t)tiledb::sm::SerializationType::CAPNP,
buff);
REQUIRE(rc == TILEDB_OK);
Expand Down
6 changes: 3 additions & 3 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ void Array::delete_fragments(
throw ArrayException(
"[delete_fragments] Remote array with no REST client.");
}
rest_client->delete_fragments_from_rest(
uri, timestamp_start, timestamp_end);
rest_client->post_delete_fragments_to_rest(
uri, this, timestamp_start, timestamp_end);
} else {
storage_manager_->delete_fragments(
uri.c_str(), timestamp_start, timestamp_end);
Expand All @@ -578,7 +578,7 @@ void Array::delete_fragments_list(
throw ArrayException(
"[delete_fragments_list] Remote array with no REST client.");
}
rest_client->delete_fragments_list_from_rest(uri, fragment_uris);
rest_client->post_delete_fragments_list_to_rest(uri, this, fragment_uris);
} else {
auto array_dir = ArrayDirectory(
resources_, uri, 0, std::numeric_limits<uint64_t>::max());
Expand Down
123 changes: 35 additions & 88 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4022,112 +4022,58 @@ int32_t tiledb_serialize_array_max_buffer_sizes(
return TILEDB_OK;
}

capi_return_t tiledb_deserialize_array_delete_fragments_timestamps_request(
capi_return_t tiledb_handle_array_delete_fragments_timestamps_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialize_type,
const tiledb_buffer_t* buffer) {
api::ensure_buffer_is_valid(buffer);

// Deserialize buffer
auto [uri_str, timestamp_start, timestamp_end] =
tiledb::sm::serialization::fragments_timestamps_deserialize(
(tiledb::sm::SerializationType)serialize_type, buffer->buffer());

auto uri = tiledb::sm::URI(uri_str);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to delete fragments; Invalid input uri");
}

// Allocate an array object
tiledb_array_t* array = new (std::nothrow) tiledb_array_t;
try {
array->array_ =
make_shared<tiledb::sm::Array>(HERE(), uri, ctx->storage_manager());
} catch (...) {
delete array;
array = nullptr;
throw api::CAPIStatusException("Failed to create array");
const tiledb_buffer_t* request) {
if (sanity_check(ctx, array) == TILEDB_ERR) {
throw std::invalid_argument("Array paramter must be valid.");
}

// Set array open timestamps
array->array_->set_timestamp_start(timestamp_start);
array->array_->set_timestamp_end(timestamp_end);
api::ensure_buffer_is_valid(request);

// Open the array for exclusive modification
throw_if_not_ok(array->array_->open(
static_cast<tiledb::sm::QueryType>(TILEDB_MODIFY_EXCLUSIVE),
static_cast<tiledb::sm::EncryptionType>(TILEDB_NO_ENCRYPTION),
nullptr,
0));
// Deserialize buffer
auto [timestamp_start, timestamp_end] = tiledb::sm::serialization::
deserialize_delete_fragments_timestamps_request(
(tiledb::sm::SerializationType)serialize_type, request->buffer());

// Delete fragments
try {
array->array_->delete_fragments(uri, timestamp_start, timestamp_end);
array->array_->delete_fragments(
array->array_->array_uri(), timestamp_start, timestamp_end);
} catch (...) {
throw_if_not_ok(array->array_->close());
delete array;
array = nullptr;
throw api::CAPIStatusException("Failed to delete fragments");
}

// Close and delete the array
throw_if_not_ok(array->array_->close());
delete array;
array = nullptr;

return TILEDB_OK;
}

capi_return_t tiledb_deserialize_array_delete_fragments_list_request(
capi_return_t tiledb_handle_array_delete_fragments_list_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialize_type,
const tiledb_buffer_t* buffer) {
api::ensure_buffer_is_valid(buffer);

// Deserialize buffer
auto [uri_str, uris] = tiledb::sm::serialization::fragments_list_deserialize(
(tiledb::sm::SerializationType)serialize_type, buffer->buffer());

auto uri = tiledb::sm::URI(uri_str);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to delete_fragments_list; Invalid input uri");
const tiledb_buffer_t* request) {
if (sanity_check(ctx, array) == TILEDB_ERR) {
throw std::invalid_argument("Array paramter must be valid.");
}

// Allocate an array object
tiledb_array_t* array = new (std::nothrow) tiledb_array_t;
try {
array->array_ =
make_shared<tiledb::sm::Array>(HERE(), uri, ctx->storage_manager());
} catch (...) {
delete array;
array = nullptr;
throw api::CAPIStatusException("Failed to create array");
}
api::ensure_buffer_is_valid(request);

// Open the array for exclusive modification
throw_if_not_ok(array->array_->open(
static_cast<tiledb::sm::QueryType>(TILEDB_MODIFY_EXCLUSIVE),
static_cast<tiledb::sm::EncryptionType>(TILEDB_NO_ENCRYPTION),
nullptr,
0));
// Deserialize buffer
auto uris =
tiledb::sm::serialization::deserialize_delete_fragments_list_request(
array->array_->array_uri(),
(tiledb::sm::SerializationType)serialize_type,
request->buffer());

// Delete fragments list
try {
array->array_->delete_fragments_list(uri, uris);
array->array_->delete_fragments_list(array->array_->array_uri(), uris);
} catch (...) {
throw_if_not_ok(array->array_->close());
delete array;
array = nullptr;
throw api::CAPIStatusException("Failed to delete fragments_list");
}

// Close and delete the array
throw_if_not_ok(array->array_->close());
delete array;
array = nullptr;

return TILEDB_OK;
}

Expand Down Expand Up @@ -7033,23 +6979,24 @@ int32_t tiledb_serialize_array_max_buffer_sizes(
ctx, array, subarray, serialize_type, buffer);
}

capi_return_t tiledb_deserialize_array_delete_fragments_timestamps_request(
capi_return_t tiledb_handle_array_delete_fragments_timestamps_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialize_type,
const tiledb_buffer_t* buffer) noexcept {
const tiledb_buffer_t* request) noexcept {
return api_entry<
tiledb::api::
tiledb_deserialize_array_delete_fragments_timestamps_request>(
ctx, serialize_type, buffer);
tiledb::api::tiledb_handle_array_delete_fragments_timestamps_request>(
ctx, array, serialize_type, request);
}

capi_return_t tiledb_deserialize_array_delete_fragments_list_request(
capi_return_t tiledb_handle_array_delete_fragments_list_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialize_type,
const tiledb_buffer_t* buffer) noexcept {
const tiledb_buffer_t* request) noexcept {
return api_entry<
tiledb::api::tiledb_deserialize_array_delete_fragments_list_request>(
ctx, serialize_type, buffer);
tiledb::api::tiledb_handle_array_delete_fragments_list_request>(
ctx, array, serialize_type, request);
}

int32_t tiledb_serialize_array_metadata(
Expand Down
18 changes: 11 additions & 7 deletions tiledb/sm/c_api/tiledb_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,32 +432,36 @@ TILEDB_EXPORT int32_t tiledb_serialize_array_max_buffer_sizes(
tiledb_buffer_t** buffer) TILEDB_NOEXCEPT;

/**
* Deserializes a delete fragments request from the given buffer.
* Process a delete fragments request.
*
* @param ctx The TileDB context.
* @param array The TileDB Array.
* @param serialization_type Type of serialization to use
* @param buffer Buffer containing serialized fragment timestamps.
* @param request Buffer containing serialized fragment timestamps.
* @return `TILEDB_OK` for success and `TILEDB_ERR` for error.
*/
TILEDB_EXPORT
capi_return_t tiledb_deserialize_array_delete_fragments_timestamps_request(
capi_return_t tiledb_handle_array_delete_fragments_timestamps_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* buffer) TILEDB_NOEXCEPT;
const tiledb_buffer_t* request) TILEDB_NOEXCEPT;

/**
* Deserializes a delete fragments list request from the given buffer.
* Process a delete fragments list request.
*
* @param ctx The TileDB context.
* @param array The TileDB Array.
* @param serialization_type Type of serialization to use
* @param buffer Buffer containing serialized fragments list.
* @return `TILEDB_OK` for success and `TILEDB_ERR` for error.
*/
TILEDB_EXPORT
capi_return_t tiledb_deserialize_array_delete_fragments_list_request(
capi_return_t tiledb_handle_array_delete_fragments_list_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* buffer) TILEDB_NOEXCEPT;
const tiledb_buffer_t* request) TILEDB_NOEXCEPT;

/**
* Serializes the array metadata into the given buffer.
Expand Down
26 changes: 15 additions & 11 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,14 @@ void RestClient::delete_array_from_rest(const URI& uri) {
stats_, url, serialization_type_, &returned_data, cache_key));
}

void RestClient::delete_fragments_from_rest(
const URI& uri, uint64_t timestamp_start, uint64_t timestamp_end) {
void RestClient::post_delete_fragments_to_rest(
const URI& uri,
Array* array,
uint64_t timestamp_start,
uint64_t timestamp_end) {
Buffer buff;
serialization::fragments_timestamps_serialize(
uri.to_string(),
serialization::serialize_delete_fragments_timestamps_request(
array->config(),
timestamp_start,
timestamp_end,
serialization_type_,
Expand Down Expand Up @@ -429,11 +432,11 @@ void RestClient::delete_fragments_from_rest(
cache_key));
}

void RestClient::delete_fragments_list_from_rest(
const URI& uri, const std::vector<URI>& fragment_uris) {
void RestClient::post_delete_fragments_list_to_rest(
const URI& uri, Array* array, const std::vector<URI>& fragment_uris) {
Buffer buff;
serialization::fragments_list_serialize(
uri.to_string(), fragment_uris, serialization_type_, &buff);
serialization::serialize_delete_fragments_list_request(
array->config(), fragment_uris, serialization_type_, &buff);
// Wrap in a list
BufferList serialized;
throw_if_not_ok(serialized.add_buffer(std::move(buff)));
Expand Down Expand Up @@ -1555,13 +1558,14 @@ void RestClient::delete_array_from_rest(const URI&) {
Status_RestError("Cannot use rest client; serialization not enabled."));
}

void RestClient::delete_fragments_from_rest(const URI&, uint64_t, uint64_t) {
void RestClient::post_delete_fragments_to_rest(
const URI&, Array*, uint64_t, uint64_t) {
throw StatusException(
Status_RestError("Cannot use rest client; serialization not enabled."));
}

void RestClient::delete_fragments_list_from_rest(
const URI&, const std::vector<URI>&) {
void RestClient::post_delete_fragments_list_to_rest(
const URI&, Array*, const std::vector<URI>&) {
throw StatusException(
Status_RestError("Cannot use rest client; serialization not enabled."));
}
Expand Down
13 changes: 9 additions & 4 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,30 @@ class RestClient {
* at the given URI from the REST server.
*
* @param uri Array URI to delete fragments from
* @param array Array to delete fragments from
* @param timestamp_start The start timestamp at which to delete fragments
* @param timestamp_end The end timestamp at which to delete fragments
*
* #TODO Implement API endpoint on TileDBCloud.
*/
void delete_fragments_from_rest(
const URI& uri, uint64_t timestamp_start, uint64_t timestamp_end);
void post_delete_fragments_to_rest(
const URI& uri,
Array* array,
uint64_t timestamp_start,
uint64_t timestamp_end);

/**
* Deletes the fragments with the given URIs from the array at the given URI
* from the REST server.
*
* @param uri Array URI to delete fragments from
* @param array Array to delete fragments from
* @param fragment_uris The uris of the fragments to be deleted
*
* #TODO Implement API endpoint on TileDBCloud.
*/
void delete_fragments_list_from_rest(
const URI& uri, const std::vector<URI>& fragment_uris);
void post_delete_fragments_list_to_rest(
const URI& uri, Array* array, const std::vector<URI>& fragment_uris);

/**
* Deregisters an array at the given URI from the REST server.
Expand Down
Loading

0 comments on commit 8aa7201

Please sign in to comment.