Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not pass back RTrees to clients unless it's needed #5265

Merged
merged 10 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions tiledb/sm/serialization/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ void array_from_capnp(
// pass the right schema to deserialize fragment metadata
throw_if_not_ok(
fragment_metadata_from_capnp(schema, frag_meta_reader, meta));
if (client_side) {
meta->loaded_metadata()->set_rtree_loaded();
}
fragment_metadata.emplace_back(meta);
}
array->set_fragment_metadata(std::move(fragment_metadata));
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/serialization/fragment_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ single_fragment_info_from_capnp(
meta->non_empty_domain(),
expanded_non_empty_domain,
meta};
// This is needed so that we don't try to load rtee from disk
single_frag_info.meta()->loaded_metadata()->set_rtree_loaded();

return {Status::Ok(), single_frag_info};
}
Expand All @@ -270,6 +268,8 @@ Status single_fragment_info_to_capnp(
auto frag_meta_builder = single_frag_info_builder->initMeta();
RETURN_NOT_OK(
fragment_metadata_to_capnp(*single_frag_info.meta(), &frag_meta_builder));
rtree_to_capnp(
single_frag_info.meta()->loaded_metadata()->rtree(), &frag_meta_builder);

// set fragment size
single_frag_info_builder->setFragmentSize(single_frag_info.fragment_size());
Expand Down
38 changes: 23 additions & 15 deletions tiledb/sm/serialization/fragment_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ Status fragment_metadata_from_capnp(
}
frag_meta->last_tile_cell_num() = frag_meta_reader.getLastTileCellNum();

frag_meta->loaded_metadata()->set_loaded_metadata(loaded_metadata);

if (frag_meta_reader.hasRtree()) {
auto data = frag_meta_reader.getRtree();
auto& domain = fragment_array_schema->domain();
Expand All @@ -388,6 +390,8 @@ Status fragment_metadata_from_capnp(
// deserialize it as well in that way.
frag_meta->loaded_metadata()->rtree().deserialize(
deserializer, &domain, constants::format_version);

frag_meta->loaded_metadata()->set_rtree_loaded();
}

// It's important to do this here as init_domain depends on some fields
Expand All @@ -411,8 +415,6 @@ Status fragment_metadata_from_capnp(
frag_meta_reader.getGtOffsets(), frag_meta->generic_tile_offsets());
}

frag_meta->loaded_metadata()->set_loaded_metadata(loaded_metadata);

return Status::Ok();
}

Expand Down Expand Up @@ -535,6 +537,25 @@ void fragment_meta_sizes_offsets_to_capnp(
}
}
}

rtree_to_capnp(frag_meta.loaded_metadata()->rtree(), frag_meta_builder);
}

void rtree_to_capnp(
const RTree& rtree, capnp::FragmentMetadata::Builder* frag_meta_builder) {
// TODO: Can this be done better? Does this make a lot of copies?
SizeComputationSerializer size_computation_serializer;
rtree.serialize(size_computation_serializer);
if (size_computation_serializer.size() != 0) {
std::vector<uint8_t> buff(size_computation_serializer.size());
Serializer serializer(buff.data(), buff.size());
rtree.serialize(serializer);

auto vec = kj::Vector<uint8_t>();
vec.addAll(
kj::ArrayPtr<uint8_t>(static_cast<uint8_t*>(buff.data()), buff.size()));
frag_meta_builder->setRtree(vec.asPtr());
}
}

Status fragment_metadata_to_capnp(
Expand Down Expand Up @@ -689,19 +710,6 @@ Status fragment_metadata_to_capnp(
frag_meta.non_empty_domain(),
frag_meta.array_schema()->dim_num()));

// TODO: Can this be done better? Does this make a lot of copies?
SizeComputationSerializer size_computation_serializer;
frag_meta.loaded_metadata()->rtree().serialize(size_computation_serializer);

std::vector<uint8_t> buff(size_computation_serializer.size());
Serializer serializer(buff.data(), buff.size());
frag_meta.loaded_metadata()->rtree().serialize(serializer);

auto vec = kj::Vector<uint8_t>();
vec.addAll(
kj::ArrayPtr<uint8_t>(static_cast<uint8_t*>(buff.data()), buff.size()));
frag_meta_builder->setRtree(vec.asPtr());

auto gt_offsets_builder = frag_meta_builder->initGtOffsets();
generic_tile_offsets_to_capnp(
frag_meta.generic_tile_offsets(), gt_offsets_builder);
Expand Down
9 changes: 9 additions & 0 deletions tiledb/sm/serialization/fragment_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ void fragment_meta_sizes_offsets_to_capnp(
const FragmentMetadata& frag_meta,
capnp::FragmentMetadata::Builder* frag_meta_builder);

/**
* Serializes FragmentMetadata's RTree to Cap'n Proto message
*
* @param rtree RTREE to serialize
* @param frag_meta_builder cap'n proto class
*/
void rtree_to_capnp(
const RTree& rtree, capnp::FragmentMetadata::Builder* frag_meta_builder);

/**
* Convert Fragment Metadata to Cap'n Proto message
*
Expand Down
36 changes: 26 additions & 10 deletions tiledb/sm/serialization/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ Status read_state_from_capnp(
const capnp::ReadState::Reader& read_state_reader,
Query* query,
Reader* reader,
ThreadPool* compute_tp) {
ThreadPool* compute_tp,
bool client_side) {
auto read_state = reader->read_state();

read_state->overflowed_ = read_state_reader.getOverflowed();
Expand All @@ -641,7 +642,7 @@ Status read_state_from_capnp(
// If the current partition is unsplittable, this means we need to make
// sure the tile_overlap for the current is computed because we won't go
// to the next partition
read_state->unsplittable_));
read_state->unsplittable_ && !client_side));
}

return Status::Ok();
Expand Down Expand Up @@ -669,7 +670,8 @@ Status dense_read_state_from_capnp(
const capnp::ReadState::Reader& read_state_reader,
Query* query,
DenseReader* reader,
ThreadPool* compute_tp) {
ThreadPool* compute_tp,
bool client_side) {
auto read_state = reader->read_state();

read_state->overflowed_ = read_state_reader.getOverflowed();
Expand All @@ -689,7 +691,7 @@ Status dense_read_state_from_capnp(
// If the current partition is unsplittable, this means we need to make
// sure the tile_overlap for the current is computed because we won't go
// to the next partition
read_state->unsplittable_));
read_state->unsplittable_ && !client_side));
}

return Status::Ok();
Expand Down Expand Up @@ -1097,7 +1099,8 @@ Status reader_from_capnp(
const capnp::QueryReader::Reader& reader_reader,
Query* query,
Reader* reader,
ThreadPool* compute_tp) {
ThreadPool* compute_tp,
bool client_side) {
auto array = query->array();

// Layout
Expand All @@ -1113,7 +1116,12 @@ Status reader_from_capnp(
// Read state
if (reader_reader.hasReadState())
RETURN_NOT_OK(read_state_from_capnp(
array, reader_reader.getReadState(), query, reader, compute_tp));
array,
reader_reader.getReadState(),
query,
reader,
compute_tp,
client_side));

// Query condition
if (reader_reader.hasCondition()) {
Expand Down Expand Up @@ -1174,7 +1182,8 @@ Status dense_reader_from_capnp(
const capnp::QueryReader::Reader& reader_reader,
Query* query,
DenseReader* reader,
ThreadPool* compute_tp) {
ThreadPool* compute_tp,
bool client_side) {
auto array = query->array();

// Layout
Expand All @@ -1190,7 +1199,12 @@ Status dense_reader_from_capnp(
// Read state
if (reader_reader.hasReadState())
RETURN_NOT_OK(dense_read_state_from_capnp(
array, reader_reader.getReadState(), query, reader, compute_tp));
array,
reader_reader.getReadState(),
query,
reader,
compute_tp,
client_side));

// Query condition
if (reader_reader.hasCondition()) {
Expand Down Expand Up @@ -2161,14 +2175,16 @@ Status query_from_capnp(
reader_reader,
query,
dynamic_cast<DenseReader*>(query->strategy()),
compute_tp));
compute_tp,
context == SerializationContext::CLIENT));
} else {
auto reader_reader = query_reader.getReader();
RETURN_NOT_OK(reader_from_capnp(
reader_reader,
query,
dynamic_cast<Reader*>(query->strategy()),
compute_tp));
compute_tp,
context == SerializationContext::CLIENT));
}
} else if (query_type == QueryType::WRITE) {
auto writer_reader = query_reader.getWriter();
Expand Down
Loading