Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidation Plan REST support - part 2: rest client #4537

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions test/src/test-capi-consolidation-plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* Tests the ConsolidationPlan API
*/

#include <test/support/src/vfs_helpers.h>
#include "test/support/src/helpers.h"
#include "tiledb/api/c_api/context/context_api_internal.h"
#include "tiledb/sm/c_api/tiledb.h"
Expand All @@ -42,17 +43,13 @@
using namespace tiledb;
using namespace tiledb::test;

struct ConsolidationPlanFx {
// Constants.
const char* SPARSE_ARRAY_NAME = "test_deletes_array";

// TileDB context.
Context ctx_;
VFS vfs_;

std::string key_ = "0123456789abcdeF0123456789abcdeF";
const tiledb_encryption_type_t enc_type_ = TILEDB_AES_256_GCM;
#ifndef TILEDB_TESTS_ENABLE_REST
constexpr bool rest_tests = false;
#else
constexpr bool rest_tests = true;
#endif

struct ConsolidationPlanFx {
// Constructors/destructors.
ConsolidationPlanFx();
~ConsolidationPlanFx();
Expand All @@ -69,20 +66,39 @@ struct ConsolidationPlanFx {
void remove_array(const std::string& array_name);
bool is_array(const std::string& array_name);
void check_last_error(std::string expected);

// TileDB context.
Context ctx_;
// Full URI initialized using fs_vec_ random temp directory.
std::string array_name_;

// Vector of supported filsystems
tiledb_vfs_handle_t* vfs_c_{nullptr};
tiledb_ctx_handle_t* ctx_c_{nullptr};
const std::vector<std::unique_ptr<test::SupportedFs>> fs_vec_;

std::string key_ = "0123456789abcdeF0123456789abcdeF";
const tiledb_encryption_type_t enc_type_ = TILEDB_AES_256_GCM;
};

ConsolidationPlanFx::ConsolidationPlanFx()
: vfs_(ctx_) {
: fs_vec_(test::vfs_test_get_fs_vec()) {
Config config;
config.set("sm.consolidation.buffer_size", "1000");
ctx_ = Context(config);
vfs_ = VFS(ctx_);

remove_sparse_array();
REQUIRE(
test::vfs_test_init(fs_vec_, &ctx_c_, &vfs_c_, config.ptr().get()).ok());
ctx_ = Context(ctx_c_);
std::string temp_dir = fs_vec_[0]->temp_dir();
if constexpr (rest_tests) {
array_name_ = "tiledb://unit/";
}
array_name_ += temp_dir + "test_consolidation_plan_array";
test::vfs_test_create_temp_dir(ctx_c_, vfs_c_, temp_dir);
}

ConsolidationPlanFx::~ConsolidationPlanFx() {
remove_sparse_array();
Array::delete_array(ctx_, array_name_);
REQUIRE(test::vfs_test_close(fs_vec_, ctx_c_, vfs_c_).ok());
}

void ConsolidationPlanFx::create_sparse_array(bool allows_dups, bool encrypt) {
Expand Down Expand Up @@ -115,9 +131,9 @@ void ConsolidationPlanFx::create_sparse_array(bool allows_dups, bool encrypt) {
schema.set_coords_filter_list(filter_list);

if (encrypt) {
Array::create(SPARSE_ARRAY_NAME, schema, enc_type_, key_);
Array::create(array_name_, schema, enc_type_, key_);
} else {
Array::create(SPARSE_ARRAY_NAME, schema);
Array::create(array_name_, schema);
}
}

Expand All @@ -132,16 +148,13 @@ void ConsolidationPlanFx::write_sparse(
if (encrypt) {
array = std::make_unique<Array>(
ctx_,
SPARSE_ARRAY_NAME,
array_name_,
TILEDB_WRITE,
TemporalPolicy(TimeTravel, timestamp),
EncryptionAlgorithm(AESGCM, key_.c_str()));
} else {
array = std::make_unique<Array>(
ctx_,
SPARSE_ARRAY_NAME,
TILEDB_WRITE,
TemporalPolicy(TimeTravel, timestamp));
ctx_, array_name_, TILEDB_WRITE, TemporalPolicy(TimeTravel, timestamp));
}

// Create query.
Expand All @@ -152,28 +165,12 @@ void ConsolidationPlanFx::write_sparse(
query.set_data_buffer("d2", dim2);

// Submit/finalize the query.
query.submit();
query.finalize();
query.submit_and_finalize();

// Close array.
array->close();
}

void ConsolidationPlanFx::remove_array(const std::string& array_name) {
if (!is_array(array_name))
return;

vfs_.remove_dir(array_name);
}

void ConsolidationPlanFx::remove_sparse_array() {
remove_array(SPARSE_ARRAY_NAME);
}

bool ConsolidationPlanFx::is_array(const std::string& array_name) {
return vfs_.is_dir(array_name);
}

void ConsolidationPlanFx::check_last_error(std::string expected) {
const char* msg = "unset";
tiledb_error_t* err{nullptr};
Expand All @@ -188,11 +185,11 @@ void ConsolidationPlanFx::check_last_error(std::string expected) {
TEST_CASE_METHOD(
ConsolidationPlanFx,
"CAPI: Consolidation plan",
"[capi][consolidation-plan]") {
"[capi][consolidation-plan][rest]") {
create_sparse_array();
write_sparse({0, 1, 2, 3}, {1, 1, 1, 2}, {1, 2, 4, 3}, 1);

Array array{ctx_, SPARSE_ARRAY_NAME, TILEDB_READ};
Array array{ctx_, array_name_, TILEDB_READ};

tiledb_consolidation_plan_t* consolidation_plan{};
CHECK(
Expand Down Expand Up @@ -231,11 +228,11 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
ConsolidationPlanFx,
"CAPI: Consolidation plan dump",
"[capi][consolidation-plan][dump]") {
"[capi][consolidation-plan][dump][rest]") {
create_sparse_array();
write_sparse({0, 1, 2, 3}, {1, 1, 1, 2}, {1, 2, 4, 3}, 1);

Array array{ctx_, SPARSE_ARRAY_NAME, TILEDB_READ};
Array array{ctx_, array_name_, TILEDB_READ};

tiledb_consolidation_plan_t* consolidation_plan{};
CHECK(
Expand Down
5 changes: 5 additions & 0 deletions tiledb/sm/array/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ class Array {
/** Load array directory for non-remote arrays */
const ArrayDirectory& load_array_directory();

/* Get the REST client */
[[nodiscard]] inline shared_ptr<RestClient> rest_client() const {
return resources_.rest_client();
}

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand Down
16 changes: 15 additions & 1 deletion tiledb/sm/consolidation_plan/consolidation_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "tiledb/sm/consolidation_plan/consolidation_plan.h"
#include "tiledb/common/common.h"
#include "tiledb/common/logger.h"
#include "tiledb/sm/rest/rest_client.h"

using namespace tiledb::sm;
using namespace tiledb::common;
Expand All @@ -44,7 +45,20 @@ using namespace tiledb::common;
ConsolidationPlan::ConsolidationPlan(
shared_ptr<Array> array, uint64_t fragment_size)
: desired_fragment_size_(fragment_size) {
generate(array);
if (array->is_remote()) {
auto rest_client = array->rest_client();
if (!rest_client) {
throw std::runtime_error(
"Failed to create a consolidation plan; Remote array"
"with no REST client.");
}
// reach out to the REST client to populate class members
fragment_uris_per_node_ = rest_client->post_consolidation_plan_from_rest(
array->array_uri(), array->config(), fragment_size);
num_nodes_ = fragment_uris_per_node_.size();
} else {
generate(array);
}
}

ConsolidationPlan::ConsolidationPlan(
Expand Down
49 changes: 49 additions & 0 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,48 @@ Status RestClient::post_vacuum_to_rest(const URI& uri, const Config& config) {
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

std::vector<std::vector<std::string>>
RestClient::post_consolidation_plan_from_rest(
const URI& uri, const Config& config, uint64_t fragment_size) {
Buffer buff;
serialization::serialize_consolidation_plan_request(
fragment_size, config, serialization_type_, buff);

// Wrap in a list
BufferList serialized;
throw_if_not_ok(serialized.add_buffer(std::move(buff)));

// Init curl and form the URL
Curl curlc(logger_);
std::string array_ns, array_uri;
throw_if_not_ok(uri.get_rest_components(&array_ns, &array_uri));
const std::string cache_key = array_ns + ":" + array_uri;
throw_if_not_ok(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
"/" + curlc.url_escape(array_uri) +
"/consolidate/plan";

// Get the data
Buffer returned_data;
throw_if_not_ok(curlc.post_data(
stats_,
url,
serialization_type_,
&serialized,
&returned_data,
cache_key));
if (returned_data.data() == nullptr || returned_data.size() == 0) {
throw Status_RestError(
"Error getting query plan from REST; server returned no data.");
}

// Ensure data has a null delimiter for cap'n proto if using JSON
throw_if_not_ok(ensure_json_null_delimited_string(&returned_data));
return serialization::deserialize_consolidation_plan_response(
serialization_type_, returned_data);
}

#else

RestClient::RestClient() {
Expand Down Expand Up @@ -1789,6 +1831,13 @@ Status RestClient::post_vacuum_to_rest(const URI&, const Config&) {
Status_RestError("Cannot use rest client; serialization not enabled."));
}

std::vector<std::vector<std::string>>
RestClient::post_consolidation_plan_from_rest(
const URI&, const Config&, uint64_t) {
throw StatusException(
Status_RestError("Cannot use rest client; serialization not enabled."));
}

#endif // TILEDB_SERIALIZATION

} // namespace sm
Expand Down
11 changes: 11 additions & 0 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,17 @@ class RestClient {
return rest_server_;
}

/**
* Get consolidation plan from the REST server via POST request.
*
* @param uri Array URI.
* @param config Config of the array.
* @param fragment_size Maximum fragment size for constructing the plan.
* @return The requested consolidation plan
*/
std::vector<std::vector<std::string>> post_consolidation_plan_from_rest(
const URI& uri, const Config& config, uint64_t fragment_size);

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand Down
Loading