Skip to content

Commit

Permalink
More sort and finalize fixes (#1799)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
<!--Example: Fixes #1234. See also #3456.-->
Fixes #1738 
Fixes #1781 
Fixes #1466 
Fixes #1795 
Fixes #1797
Fixes #1807
Fixes #1828

A notable change is that staged writes no longer validate the index is
sorted. The validation is done at the moment
compact_incompletes/finalize_staged_data/sort_and_finalize_staged_data
is called. This is because sort_and_finalize_staged_data does not
require the segments to be sorted, but the call for adding a staged
segment is the same. We should add a separate call for that.

Note also that all incomplete keys for a symbol are deleted if any of
the finalize calls fail. The other option is to leave the segments. In
that case the user will have the responsibility of calling
`delete_staged_data`.
#### What does this implement or fix?

#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <[email protected]>
  • Loading branch information
vasil-pashov and Vasil Pashov authored Sep 24, 2024
1 parent b157c92 commit d607f94
Show file tree
Hide file tree
Showing 36 changed files with 1,224 additions and 354 deletions.
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ set(arcticdb_srcs
util/offset_string.cpp
util/sparse_utils.cpp
util/string_utils.cpp
util/timer.cpp
util/trace.cpp
util/type_handler.cpp
version/local_versioned_engine.cpp
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,15 @@ class Column {
return TypedColumnIterator<TagType, const RawType>(*this, false);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(ssize_t row_offset, T val) {
util::check(sizeof(T) == get_type_size(type_.data_type()), "Type mismatch in set_scalar, expected {}",
get_type_size(type_.data_type()));
util::check(
sizeof(T) == get_type_size(type_.data_type()),
"Type mismatch in set_scalar, expected {} byte scalar got {} byte scalar",
get_type_size(type_.data_type()),
sizeof(T)
);

auto prev_logical_row = last_logical_row_;
last_logical_row_ = row_offset;
Expand Down
28 changes: 17 additions & 11 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ class SegmentInMemory {
impl_->end_row();
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
impl_->set_scalar(idx, val);
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<typename T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
impl_->set_string(idx, val);
}

Expand Down Expand Up @@ -117,16 +119,14 @@ class SegmentInMemory {
impl_->init_column_map();
}

template<class T, template<class> class Tensor, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T, template<class> class Tensor>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, Tensor<T> &val) {
impl_->set_array(pos, val);
}

template<class T, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, py::array_t<T>& val) {
impl_->set_array(pos, val);
}
Expand Down Expand Up @@ -251,12 +251,14 @@ class SegmentInMemory {
impl_->sparsify();
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_external_block(position_t idx, T *val, size_t size) {
impl_->set_external_block(idx, val, size);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_sparse_block(position_t idx, T *val, size_t rows_to_write) {
impl_->set_sparse_block(idx, val, rows_to_write);
}
Expand Down Expand Up @@ -460,6 +462,10 @@ class SegmentInMemory {
return output;
}

void drop_empty_columns() {
impl_->drop_empty_columns();
}

private:
explicit SegmentInMemory(std::shared_ptr<SegmentInMemoryImpl> impl) :
impl_(std::move(impl)) {}
Expand Down
25 changes: 24 additions & 1 deletion cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <arcticdb/column_store/string_pool.hpp>
#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/pipeline/string_pool_utils.hpp>
#include <arcticdb/util/preconditions.hpp>

#include <google/protobuf/any.h>
#include <google/protobuf/any.pb.h>

namespace arcticdb {
Expand Down Expand Up @@ -684,4 +684,27 @@ const google::protobuf::Any* SegmentInMemoryImpl::metadata() const {
return metadata_.get();
}

void SegmentInMemoryImpl::drop_empty_columns() {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
row_count() > 0,
"Dropping all empty columns from an empty segment would result in removing all columns"
);
const StreamDescriptor& original = descriptor();
StreamDescriptor only_non_empty_cols;
only_non_empty_cols.set_id(original.id());
only_non_empty_cols.set_index(descriptor().index());
size_t field_index = 0;
while (field_index < original.index().field_count()) {
only_non_empty_cols.add_field(original.field(field_index++));
}
while (field_index < original.field_count()) {
const Column& col = column(field_index);
if (col.row_count() > 0) {
only_non_empty_cols.add_field(original.field(field_index));
}
field_index++;
}
change_schema(std::move(only_non_empty_cols));
}

}
49 changes: 26 additions & 23 deletions cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,31 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->scalar_at<RawType>(row_id_, column_id_));
});
}

template<class Callable>
auto visit_string(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DTT = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
if constexpr(is_sequence_type(DTT::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)));
return c(parent_->string_at(row_id_, position_t(column_id_)));
});
}

template<class Callable>
auto visit_field(Callable &&c) const {
const auto& field = parent_->descriptor().field(column_id_);
return entity::visit_field(parent_->descriptor().field(column_id_), [&field, that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(field, [&field, this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DataTypeTag = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
using RawType = typename DataTypeTag::raw_type;
if constexpr (is_sequence_type(DataTypeTag::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)), std::string_view{field.name()}, field.type());
return c(parent_->string_at(row_id_, position_t(column_id_)), std::string_view{field.name()}, type_desc_tag);
else if constexpr (is_numeric_type(DataTypeTag::data_type) || is_bool_type(DataTypeTag::data_type))
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_), std::string_view{field.name()}, field.type());
return c(parent_->scalar_at<RawType>(row_id_, column_id_), std::string_view{field.name()}, type_desc_tag);
else if constexpr(is_empty_type(DataTypeTag::data_type))
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("visit_field does not support empty-type columns");
else
Expand All @@ -101,9 +101,9 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->reference_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->reference_at<RawType>(row_id_, column_id_));
});
}

Expand Down Expand Up @@ -454,18 +454,21 @@ class SegmentInMemoryImpl {
});
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
ARCTICDB_TRACE(log::version(), "Segment setting scalar {} at row {} column {}", val, row_id_ + 1, idx);
column(idx).set_scalar(row_id_ + 1, val);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_external_block(position_t idx, T *val, size_t size) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_external_block(position_t idx, T *val, size_t size) {
column_unchecked(idx).set_external_block(row_id_ + 1, val, size);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_sparse_block(position_t idx, T *val, size_t rows_to_write) {
column_unchecked(idx).set_sparse_block(row_id_ + 1, val, rows_to_write);
}
Expand All @@ -478,23 +481,22 @@ class SegmentInMemoryImpl {
column_unchecked(idx).set_sparse_block(std::move(buffer), std::move(shapes), std::move(bitset));
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
set_string(idx, val);
}

template<class T, template<class> class Tensor, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
void set_array(position_t pos, Tensor<T> &val) {
template<class T, template<class> class Tensor>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, Tensor<T> &val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
column_unchecked(pos).set_array(row_id_ + 1, val);
}

template<class T, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, py::array_t<T>& val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
Expand Down Expand Up @@ -786,6 +788,7 @@ class SegmentInMemoryImpl {
const std::vector<uint64_t>& segment_counts) const;

std::vector<std::shared_ptr<SegmentInMemoryImpl>> split(size_t rows) const;
void drop_empty_columns();

private:
ssize_t row_id_ = -1;
Expand Down
21 changes: 19 additions & 2 deletions cpp/arcticdb/entity/merge_descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<std::shared_ptr<FieldCollection>> &entries,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::unordered_set<std::string_view> &filtered_set,
const std::optional<IndexDescriptorImpl>& default_index) {
using namespace arcticdb::stream;
Expand All @@ -34,6 +34,7 @@ StreamDescriptor merge_descriptors(
merged_fields.emplace_back(idx.name());
merged_fields_map.try_emplace(idx.name(), TypeDescriptor{typename IndexType::TypeDescTag{}});
});
index = default_index_type_from_descriptor(*default_index);
} else {
util::raise_rte("Descriptor has uninitialized index and no default supplied");
}
Expand Down Expand Up @@ -71,7 +72,12 @@ StreamDescriptor merge_descriptors(
if(new_descriptor) {
merged_fields_map[field.name()] = *new_descriptor;
} else {
util::raise_rte("No valid common type between {} and {} for column {}", existing_type_desc, type_desc, field.name());
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
"No valid common type between {} and {} for column {}",
existing_type_desc,
type_desc,
field.name()
);
}
}
} else {
Expand Down Expand Up @@ -99,6 +105,17 @@ StreamDescriptor merge_descriptors(
return merge_descriptors(original, entries, filtered_set, default_index);
}

StreamDescriptor merge_descriptors(
const StreamDescriptor& original,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::optional<std::vector<std::string>>& filtered_columns,
const std::optional<IndexDescriptorImpl>& default_index) {
std::unordered_set<std::string_view> filtered_set = filtered_columns.has_value()
? std::unordered_set<std::string_view>(filtered_columns->begin(), filtered_columns->end())
: std::unordered_set<std::string_view>{};
return merge_descriptors(original, entries, filtered_set, default_index);
}

StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<pipelines::SliceAndKey> &entries,
Expand Down
9 changes: 8 additions & 1 deletion cpp/arcticdb/entity/merge_descriptors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

#include <arcticdb/entity/stream_descriptor.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <span>

namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<std::shared_ptr<FieldCollection>> &entries,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::unordered_set<std::string_view> &filtered_set,
const std::optional<IndexDescriptorImpl>& default_index);

Expand All @@ -21,6 +22,12 @@ entity::StreamDescriptor merge_descriptors(
const std::optional<std::vector<std::string>> &filtered_columns,
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);

entity::StreamDescriptor merge_descriptors(
const entity::StreamDescriptor& original,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::optional<std::vector<std::string>>& filtered_columns,
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);

entity::StreamDescriptor merge_descriptors(
const entity::StreamDescriptor &original,
const std::vector<pipelines::SliceAndKey> &entries,
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,11 @@ struct TypeDescriptor {
template<typename Callable>
constexpr auto visit_tag(Callable &&callable) const;

bool operator==(const TypeDescriptor &o) const {
[[nodiscard]] constexpr bool operator==(const TypeDescriptor& o) const {
return data_type_ == o.data_type_ && dimension_ == o.dimension_;
}

bool operator!=(const TypeDescriptor &o) const {
[[nodiscard]] constexpr bool operator!=(const TypeDescriptor& o) const {
return !(*this == o);
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ TimeseriesDescriptor get_merged_tsd(
}
else if (dynamic_schema) {
// In case of dynamic schema
const std::array fields_ptr = {new_frame->desc.fields_ptr()};
merged_descriptor = merge_descriptors(
existing_descriptor,
std::vector<std::shared_ptr<FieldCollection>>{new_frame->desc.fields_ptr()},
fields_ptr,
{}
);
} else {
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/pipeline/pipeline_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct PipelineContext : public std::enable_shared_from_this<PipelineContext> {
// written in, desc_ will be modified such that the return matches what's requested, and this'll be set to the
// original value. It's only set in this edge case.
std::optional<StreamDescriptor> orig_desc_;
// When there are staged segments this holds the combined stream descriptor for all staged segments
// This can be different than desc_ in case dynamic schema is used. Otherwise they must be the same.
std::optional<StreamDescriptor> staged_descriptor_;
StreamId stream_id_;
VersionId version_id_ = 0;
size_t total_rows_ = 0;
Expand Down Expand Up @@ -200,6 +203,7 @@ struct PipelineContext : public std::enable_shared_from_this<PipelineContext> {
swap(left.segment_descriptors_, right.segment_descriptors_);
swap(left.filter_columns_set_, right.filter_columns_set_);
swap(left.compacted_, right.compacted_);
swap(left.staged_descriptor_, right.staged_descriptor_);
}

using iterator = PipelineContextIterator<PipelineContextRow>;
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ struct ReduceColumnTask : async::BaseTask {
} else {
column.default_initialize_rows(0, frame_.row_count(), false);
}
} else {
} else if (column_data != slice_map_->columns_.end()) {
if(dynamic_schema_) {
NullValueReducer null_reducer{column, context_, frame_, shared_data_, handler_data_};
for (const auto &row : column_data->second) {
Expand All @@ -623,6 +623,8 @@ struct ReduceColumnTask : async::BaseTask {

column.set_inflated(frame_.row_count());
}
} else if (!dynamic_schema_ && column_data == slice_map_->columns_.end() && is_sequence_type(column.type().data_type())) {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Column with index {} is not in static schema slice map.", column_index_);
}
return folly::Unit{};
}
Expand Down
Loading

0 comments on commit d607f94

Please sign in to comment.