Skip to content

Commit

Permalink
Implement v1 serialization for Enumerations
Browse files Browse the repository at this point in the history
This change adds support for the new
tiledb_handle_load_enumerations_request and adds the corresponding REST
client APIs to make use of it. The changes in this PR cannot be tested
directly until after TileDB-Cloud-REST has merged support for the new
HTTP endpoint that will connect these two new functions. Although I have
tested these locally using REST-CI tests and the basic tests all pass
just fine.
  • Loading branch information
davisp committed Aug 30, 2023
1 parent 466bb62 commit b66c1a0
Show file tree
Hide file tree
Showing 19 changed files with 1,842 additions and 124 deletions.
28 changes: 5 additions & 23 deletions test/src/unit-enumerations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,30 +598,21 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
EnumerationFx,
"Array - Get Enumeration Repeated",
"[enumeration][array][get-enumeration]") {
"[enumeration][array][get-enumeration][repeated]") {
create_array();
auto array = get_array(QueryType::READ);
auto enmr1 = array->get_enumeration("test_enmr");
auto enmr2 = array->get_enumeration("test_enmr");
REQUIRE(enmr1 == enmr2);
}

TEST_CASE_METHOD(
EnumerationFx,
"Array - Get Enumeration Error - REMOTE NOT YET SUPPORTED",
"[enumeration][array][error][get-remote]") {
std::string uri_str = "tiledb://namespace/array_name";
auto array = make_shared<Array>(HERE(), URI(uri_str), ctx_.storage_manager());
auto matcher = Catch::Matchers::ContainsSubstring("Array is remote");
REQUIRE_THROWS_WITH(array->get_enumeration("something_here"), matcher);
}

TEST_CASE_METHOD(
EnumerationFx,
"Array - Get Enumeration Error - Not Open",
"[enumeration][array][error][not-open]") {
auto array = make_shared<Array>(HERE(), uri_, ctx_.storage_manager());
REQUIRE_THROWS(array->get_enumeration("foo"));
auto matcher = Catch::Matchers::ContainsSubstring("Array is not open");
REQUIRE_THROWS(array->get_enumeration("foo"), matcher);
}

TEST_CASE_METHOD(
Expand Down Expand Up @@ -662,21 +653,12 @@ TEST_CASE_METHOD(
REQUIRE(schema->is_enumeration_loaded("test_enmr") == true);
}

TEST_CASE_METHOD(
EnumerationFx,
"Array - Load All Enumerations Error - REMOTE NOT YET SUPPORTED",
"[enumeration][array][error][get-remote]") {
std::string uri_str = "tiledb://namespace/array_name";
auto array = make_shared<Array>(HERE(), URI(uri_str), ctx_.storage_manager());
auto matcher = Catch::Matchers::ContainsSubstring("Array is remote");
REQUIRE_THROWS_WITH(array->load_all_enumerations(), matcher);
}

TEST_CASE_METHOD(
EnumerationFx,
"Array - Load All Enumerations Error - Not Open",
"[enumeration][array][error][not-open]") {
auto array = make_shared<Array>(HERE(), uri_, ctx_.storage_manager());
auto matcher = Catch::Matchers::ContainsSubstring("Array is not open");
REQUIRE_THROWS(array->load_all_enumerations());
}

Expand All @@ -687,7 +669,7 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
EnumerationFx,
"ArrayDirectory - Load Enumerations From Paths",
"[enumeration][array-directory][load-enumeration]") {
"[enumeration][array-directory][load-enumerations-from-paths]") {
create_array();

auto schema = get_array_schema_latest();
Expand Down
5 changes: 4 additions & 1 deletion tiledb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ if (TILEDB_CPP_API)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/array_schema.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/array_schema_evolution.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/array_schema_experimental.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/as_built_experimental.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/as_built_experimental.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/attribute.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/attribute_experimental.h
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/cpp_api/config.h
Expand Down Expand Up @@ -278,6 +278,7 @@ set(TILEDB_CORE_SOURCES
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/array_schema.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/array_schema_evolution.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/config.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/enumeration.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_info.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
Expand Down Expand Up @@ -328,6 +329,7 @@ if (TILEDB_SERIALIZATION)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/array_schema.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/array_schema_evolution.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/config.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/enumeration.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_info.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
Expand All @@ -347,6 +349,7 @@ if (TILEDB_SERIALIZATION)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/array_schema.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/array_schema_evolution.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/config.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/enumeration.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_info.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
Expand Down
47 changes: 28 additions & 19 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,6 @@ shared_ptr<const Enumeration> Array::get_enumeration(

std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(
const std::vector<std::string>& enumeration_names) {
if (remote_) {
throw ArrayException("Unable to load enumerations; Array is remote.");
}

if (!is_open_) {
throw ArrayException("Unable to load enumerations; Array is not open.");
}
Expand All @@ -600,19 +596,36 @@ std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(
deduped.insert(enmr_name);
}

// Create a vector of paths to be loaded.
std::vector<std::string> paths_to_load;
for (auto& enmr_name : deduped) {
if (array_schema_latest_->is_enumeration_loaded(enmr_name)) {
continue;
std::vector<shared_ptr<const Enumeration>> loaded;

if (remote_) {
auto rest_client = resources_.rest_client();
if (rest_client == nullptr) {
throw ArrayException(
"Error loading enumerations; "
"Remote array with no REST client.");
}
loaded = rest_client->post_enumerations_from_rest(
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
this,
enumeration_names);
} else {
// Create a vector of paths to be loaded.
std::vector<std::string> paths_to_load;
for (auto& enmr_name : deduped) {
if (array_schema_latest_->is_enumeration_loaded(enmr_name)) {
continue;
}
auto path = array_schema_latest_->get_enumeration_path_name(enmr_name);
paths_to_load.push_back(path);
}
auto path = array_schema_latest_->get_enumeration_path_name(enmr_name);
paths_to_load.push_back(path);
}

// Load the enumerations from storage
auto loaded = array_dir_.load_enumerations_from_paths(
paths_to_load, get_encryption_key());
// Load the enumerations from storage
loaded = array_dir_.load_enumerations_from_paths(
paths_to_load, get_encryption_key());
}

// Store the loaded enumerations in the schema
for (auto& enmr : loaded) {
Expand All @@ -628,10 +641,6 @@ std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(
}

void Array::load_all_enumerations() {
if (remote_) {
throw ArrayException("Unable to load enumerations; Array is remote.");
}

if (!is_open_) {
throw ArrayException("Unable to load all enumerations; Array is not open.");
}
Expand Down
8 changes: 4 additions & 4 deletions tiledb/sm/array/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,14 @@ class Array {
const std::string& enumeration_name);

/**
* Get the enumerations for the given names.
* Get the enumerations with the given names.
*
* This function retrieves the enumerations for the given names. If any of the
* This function retrieves the enumerations with the given names. If the
* corresponding enumerations have not been loaded from storage they are
* loaded before this function returns.
*
* @param enumeration_names The name of the enumeration.
* @return std::vector<shared_ptr<const Enumeration>> The enumerations.
* @param enumeration_names The names of the enumerations.
* @return std::vector<shared_ptr<const Enumeration>> The loaded enumerations.
*/
std::vector<shared_ptr<const Enumeration>> get_enumerations(
const std::vector<std::string>& enumeration_names);
Expand Down
5 changes: 2 additions & 3 deletions tiledb/sm/array/array_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class ArrayDirectory {
load_all_array_schemas(const EncryptionKey& encryption_key) const;

/**
* Load all enumerations for the given schema.
* Load the enumerations from the provided list of paths.
*
* @param enumeration_paths The list of enumeration paths to load.
* @param encryption_key The encryption key to use.
Expand Down Expand Up @@ -818,9 +818,8 @@ class ArrayDirectory {
bool consolidation_with_timestamps_supported(const URI& uri) const;

/**
* Load an enumeration from schema with the given name.
* Load an enumeration from the given path.
*
* @param schema The ArraySchema that references the enumeration name.
* @param enumeration_path The enumeration path to load.
* @param encryption_key The encryption key to use.
* @return shared_ptr<Enumeration> The loaded enumeration.
Expand Down
38 changes: 38 additions & 0 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "tiledb/sm/serialization/array_schema.h"
#include "tiledb/sm/serialization/array_schema_evolution.h"
#include "tiledb/sm/serialization/config.h"
#include "tiledb/sm/serialization/enumeration.h"
#include "tiledb/sm/serialization/fragment_info.h"
#include "tiledb/sm/serialization/query.h"
#include "tiledb/sm/stats/global_stats.h"
Expand Down Expand Up @@ -4324,6 +4325,33 @@ int32_t tiledb_deserialize_fragment_info(
return TILEDB_OK;
}

capi_return_t tiledb_handle_load_enumerations_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* request,
tiledb_buffer_t* response) {
if (sanity_check(ctx, array) == TILEDB_ERR) {
throw std::invalid_argument("Array paramter must be valid.");
}

api::ensure_buffer_is_valid(request);
api::ensure_buffer_is_valid(response);

auto enumeration_names =
tiledb::sm::serialization::deserialize_load_enumerations_request(
static_cast<tiledb::sm::SerializationType>(serialization_type),
request->buffer());
auto enumerations = array->array_->get_enumerations(enumeration_names);

tiledb::sm::serialization::serialize_load_enumerations_response(
enumerations,
static_cast<tiledb::sm::SerializationType>(serialization_type),
response->buffer());

return TILEDB_OK;
}

/* ****************************** */
/* C++ API */
/* ****************************** */
Expand Down Expand Up @@ -6973,6 +7001,16 @@ int32_t tiledb_deserialize_fragment_info(
ctx, buffer, serialize_type, array_uri, client_side, fragment_info);
}

capi_return_t tiledb_handle_load_enumerations_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* request,
tiledb_buffer_t* response) noexcept {
return api_entry<tiledb::api::tiledb_handle_load_enumerations_request>(
ctx, array, serialization_type, request, response);
}

/* ****************************** */
/* C++ API */
/* ****************************** */
Expand Down
18 changes: 18 additions & 0 deletions tiledb/sm/c_api/tiledb_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,24 @@ TILEDB_EXPORT int32_t tiledb_deserialize_group_metadata(
tiledb_group_t* group,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* buffer) TILEDB_NOEXCEPT;

/**
* Process a load enumerations request.
*
* @param ctx The TileDB context.
* @param array The TileDB Array.
* @param request A buffer containing the LoadEnumerationsRequest Capnp message.
* @param response An allocated buffer that will contain the
* LoadEnumerationsResponse Capnp message.
* @return capi_return_t TILEDB_OK on success, TILEDB_ERR on error.
*/
TILEDB_EXPORT capi_return_t tiledb_handle_load_enumerations_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* request,
tiledb_buffer_t* response) TILEDB_NOEXCEPT;

#ifdef __cplusplus
}
#endif
Expand Down
58 changes: 58 additions & 0 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "tiledb/sm/serialization/array.h"
#include "tiledb/sm/serialization/config.h"
#include "tiledb/sm/serialization/consolidation.h"
#include "tiledb/sm/serialization/enumeration.h"
#include "tiledb/sm/serialization/fragment_info.h"
#include "tiledb/sm/serialization/group.h"
#include "tiledb/sm/serialization/query.h"
Expand Down Expand Up @@ -511,6 +512,58 @@ Status RestClient::post_array_metadata_to_rest(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

std::vector<shared_ptr<const Enumeration>>
RestClient::post_enumerations_from_rest(
const URI& uri,
uint64_t timestamp_start,
uint64_t timestamp_end,
Array* array,
const std::vector<std::string>& enumeration_names) {
if (array == nullptr) {
throw Status_RestError(
"Error getting enumerations from REST; array is null.");
}

Buffer buf;
serialization::serialize_load_enumerations_request(
array->config(), enumeration_names, serialization_type_, buf);

// Wrap in a list
BufferList serialized;
throw_if_not_ok(serialized.add_buffer(std::move(buf)));

// Init curl and form the URL
Curl curlc(logger_);
std::string array_ns, array_uri;
throw_if_not_ok(uri.get_rest_components(&array_ns, &array_uri));
const std::string cache_key = array_ns + ":" + array_uri;
throw_if_not_ok(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
"/" + curlc.url_escape(array_uri) + "/enumerations?" +
"start_timestamp=" + std::to_string(timestamp_start) +
"&end_timestamp=" + std::to_string(timestamp_end);

// Get the data
Buffer returned_data;
throw_if_not_ok(curlc.post_data(
stats_,
url,
serialization_type_,
&serialized,
&returned_data,
cache_key));
if (returned_data.data() == nullptr || returned_data.size() == 0) {
throw Status_RestError(
"Error getting enumerations from REST; server returned no data.");
}

// Ensure data has a null delimiter for cap'n proto if using JSON
throw_if_not_ok(ensure_json_null_delimited_string(&returned_data));
return serialization::deserialize_load_enumerations_response(
serialization_type_, returned_data);
}

Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
// Local state tracking for the current offsets into the user's query buffers.
// This allows resubmission of incomplete queries while appending to the
Expand Down Expand Up @@ -1458,6 +1511,11 @@ Status RestClient::post_array_metadata_to_rest(
Status_RestError("Cannot use rest client; serialization not enabled."));
}

void RestClient::post_enumerations_from_rest(
const URI&, uint64_t, uint64_t, Array*, const std::vector<std::string>&) {
throw Status_RestError("Cannot use rest client; serialization not enabled.");
}

Status RestClient::submit_query_to_rest(const URI&, Query*) {
return LOG_STATUS(
Status_RestError("Cannot use rest client; serialization not enabled."));
Expand Down
16 changes: 16 additions & 0 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ class RestClient {
uint64_t timestamp_end,
Array* array);

/**
* Get the requested enumerations from the REST server via POST request.
*
* @param uri Array URI.
* @param timestamp_start Inclusive starting timestamp at which to open array.
* @param timestamp_end Inclusive ending timestamp at which to open array.
* @param array Array to fetch metadata for.
* @param enumeration_names The names of the enumerations to get.
*/
std::vector<shared_ptr<const Enumeration>> post_enumerations_from_rest(
const URI& uri,
uint64_t timestamp_start,
uint64_t timestamp_end,
Array* array,
const std::vector<std::string>& enumeration_names);

/**
* Post a data query to rest server
*
Expand Down
Loading

0 comments on commit b66c1a0

Please sign in to comment.