Skip to content

Commit

Permalink
[Feature] Support add/drop field for struct column(part1) (backport #…
Browse files Browse the repository at this point in the history
…46451) (#47110)

Signed-off-by: sevev <[email protected]>
Co-authored-by: zhangqiang <[email protected]>
  • Loading branch information
mergify[bot] and sevev authored Jun 27, 2024
1 parent e1fecb3 commit 33c4a74
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 37 deletions.
3 changes: 3 additions & 0 deletions be/src/storage/metadata_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ static Status type_desc_to_pb(const std::vector<TTypeNode>& types, int* index, C
// All struct fields all nullable now
RETURN_IF_ERROR(type_desc_to_pb(types, index, field_pb));
field_pb->set_name(field.name);
if (field.__isset.id && field.id >= 0) {
field_pb->set_unique_id(field.id);
}
}
return Status::OK();
}
Expand Down
101 changes: 82 additions & 19 deletions be/src/storage/rowset/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "storage/rowset/bitmap_index_reader.h"
#include "storage/rowset/bloom_filter.h"
#include "storage/rowset/bloom_filter_index_reader.h"
#include "storage/rowset/default_value_column_iterator.h"
#include "storage/rowset/encoding_info.h"
#include "storage/rowset/map_column_iterator.h"
#include "storage/rowset/page_handle.h"
Expand All @@ -64,9 +65,10 @@

namespace starrocks {

StatusOr<std::unique_ptr<ColumnReader>> ColumnReader::create(ColumnMetaPB* meta, Segment* segment) {
StatusOr<std::unique_ptr<ColumnReader>> ColumnReader::create(ColumnMetaPB* meta, Segment* segment,
const TabletColumn* column) {
auto r = std::make_unique<ColumnReader>(private_type(0), segment);
RETURN_IF_ERROR(r->_init(meta));
RETURN_IF_ERROR(r->_init(meta, column));
return std::move(r);
}

Expand Down Expand Up @@ -103,7 +105,7 @@ ColumnReader::~ColumnReader() {
MEM_TRACKER_SAFE_RELEASE(GlobalEnv::GetInstance()->column_metadata_mem_tracker(), sizeof(ColumnReader));
}

Status ColumnReader::_init(ColumnMetaPB* meta) {
Status ColumnReader::_init(ColumnMetaPB* meta, const TabletColumn* column) {
_column_type = static_cast<LogicalType>(meta->type());
_dict_page_pointer = PagePointer(meta->dict_page());
_total_mem_footprint = meta->total_mem_footprint();
Expand Down Expand Up @@ -185,18 +187,19 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
}
_sub_readers->reserve(3);

auto sub_column = (column != nullptr) ? column->subcolumn_ptr(0) : nullptr;
// elements
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// null flags
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(2), _segment);
res = ColumnReader::create(meta->mutable_children_columns(2), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
} else {
Expand All @@ -205,13 +208,14 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
}
_sub_readers->reserve(2);

auto sub_column = (column != nullptr) ? column->subcolumn_ptr(0) : nullptr;
// elements
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
}
Expand All @@ -225,22 +229,22 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
_sub_readers->reserve(4);

// keys
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// values
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// null flags
res = ColumnReader::create(meta->mutable_children_columns(2), _segment);
res = ColumnReader::create(meta->mutable_children_columns(2), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(3), _segment);
res = ColumnReader::create(meta->mutable_children_columns(3), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
} else {
Expand All @@ -250,27 +254,29 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
_sub_readers->reserve(3);

// keys
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// values
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(2), _segment);
res = ColumnReader::create(meta->mutable_children_columns(2), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
}
return Status::OK();
} else if (_column_type == LogicalType::TYPE_STRUCT) {
_sub_readers = std::make_unique<SubReaderList>();
for (int i = 0; i < meta->children_columns_size(); ++i) {
auto res = ColumnReader::create(meta->mutable_children_columns(i), _segment);
auto sub_column = (column != nullptr) ? column->subcolumn_ptr(i) : nullptr;
auto res = ColumnReader::create(meta->mutable_children_columns(i), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
_update_sub_reader_pos(sub_column, i);
}
return Status::OK();
} else {
Expand Down Expand Up @@ -488,7 +494,60 @@ bool ColumnReader::segment_zone_map_filter(const std::vector<const ColumnPredica
return std::all_of(predicates.begin(), predicates.end(), filter);
}

StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAccessPath* path) {
void ColumnReader::_update_sub_reader_pos(const TabletColumn* column, int pos) {
if (column == nullptr) {
return;
}
auto name = column->name();
int id = column->unique_id();
_sub_reader_pos[{std::string(name), id}] = pos;
}

StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::_create_merge_struct_iter(ColumnAccessPath* path,
const TabletColumn* column) {
DCHECK(_column_type == LogicalType::TYPE_STRUCT);
DCHECK(column != nullptr);
auto num_fields = column->subcolumn_count();

std::unique_ptr<ColumnIterator> null_iter;
if (is_nullable()) {
ASSIGN_OR_RETURN(null_iter, (*_sub_readers)[_sub_readers->size() - 1]->new_iterator());
}

std::vector<ColumnAccessPath*> child_paths(num_fields, nullptr);
if (path != nullptr && !path->children().empty()) {
for (const auto& child : path->children()) {
child_paths[child->index()] = child.get();
}
}

std::vector<std::unique_ptr<ColumnIterator>> field_iters;
for (int i = 0; i < num_fields; ++i) {
auto sub_column = column->subcolumn_ptr(i);
auto iter = _sub_reader_pos.find({std::string(sub_column->name()), sub_column->unique_id()});
if (iter != _sub_reader_pos.end()) {
ASSIGN_OR_RETURN(auto iter, (*_sub_readers)[iter->second]->new_iterator(child_paths[i], sub_column));
field_iters.emplace_back(std::move(iter));
} else {
if (!sub_column->has_default_value() && !sub_column->is_nullable()) {
return Status::InternalError(
fmt::format("invalid nonexistent column({}) without default value.", sub_column->name()));
} else {
const TypeInfoPtr& type_info = get_type_info(*sub_column);
auto default_value_iter = std::make_unique<DefaultValueColumnIterator>(
sub_column->has_default_value(), sub_column->default_value(), sub_column->is_nullable(),
type_info, sub_column->length(), num_rows());
ColumnIteratorOptions iter_opts;
RETURN_IF_ERROR(default_value_iter->init(iter_opts));
field_iters.emplace_back(std::move(default_value_iter));
}
}
}
return create_struct_iter(this, std::move(null_iter), std::move(field_iters), path);
}

StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAccessPath* path,
const TabletColumn* column) {
if (is_scalar_field_type(delegate_type(_column_type))) {
return std::make_unique<ScalarColumnIterator>(this);
} else if (_column_type == LogicalType::TYPE_ARRAY) {
Expand All @@ -507,7 +566,8 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
}
}

ASSIGN_OR_RETURN(auto element_iterator, (*_sub_readers)[col++]->new_iterator(value_path));
auto sub_column = (column != nullptr) ? column->subcolumn_ptr(0) : nullptr;
ASSIGN_OR_RETURN(auto element_iterator, (*_sub_readers)[col++]->new_iterator(value_path, sub_column));

std::unique_ptr<ColumnIterator> null_iterator;
if (is_nullable()) {
Expand Down Expand Up @@ -535,7 +595,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces

// key must scalar type now
ASSIGN_OR_RETURN(auto keys, (*_sub_readers)[col++]->new_iterator());
ASSIGN_OR_RETURN(auto values, (*_sub_readers)[col++]->new_iterator(value_path));
ASSIGN_OR_RETURN(auto values, (*_sub_readers)[col++]->new_iterator(value_path, nullptr));
std::unique_ptr<ColumnIterator> nulls;
if (is_nullable()) {
ASSIGN_OR_RETURN(nulls, (*_sub_readers)[col++]->new_iterator());
Expand All @@ -544,6 +604,9 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
return std::make_unique<MapColumnIterator>(this, std::move(nulls), std::move(offsets), std::move(keys),
std::move(values), path);
} else if (_column_type == LogicalType::TYPE_STRUCT) {
if (column != nullptr) {
return _create_merge_struct_iter(path, column);
}
auto num_fields = _sub_readers->size();

std::unique_ptr<ColumnIterator> null_iter;
Expand Down
41 changes: 38 additions & 3 deletions be/src/storage/rowset/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,21 @@ class Segment;
// This will cache data shared by all reader
class ColumnReader {
struct private_type;
struct SubReaderId;

public:
// Create and initialize a ColumnReader.
// This method will not take the ownership of |meta|.
// Note that |meta| is mutable, this method may change its internal state.
//
// The primary purpose of the |column| currently is to obtain the name and unique ID of the sub_column
// to support the add/drop field functionality of the struct column.
// It is important that the |column| needs to be consistent with the tablet schema corresponding to the segment.
// If you can ensure that this column does not involve a struct column, the |column| can be set to nullptr.
//
// To developers: keep this method lightweight, should not incur any I/O.
static StatusOr<std::unique_ptr<ColumnReader>> create(ColumnMetaPB* meta, Segment* segment);
static StatusOr<std::unique_ptr<ColumnReader>> create(ColumnMetaPB* meta, Segment* segment,
const TabletColumn* column);

ColumnReader(const private_type&, Segment* segment);
~ColumnReader();
Expand All @@ -102,7 +109,8 @@ class ColumnReader {
void operator=(ColumnReader&&) = delete;

// create a new column iterator. Caller should free the returned iterator after unused.
StatusOr<std::unique_ptr<ColumnIterator>> new_iterator(ColumnAccessPath* path = nullptr);
StatusOr<std::unique_ptr<ColumnIterator>> new_iterator(ColumnAccessPath* path = nullptr,
const TabletColumn* column = nullptr);

// Caller should free returned iterator after unused.
// TODO: StatusOr<std::unique_ptr<ColumnIterator>> new_bitmap_index_iterator()
Expand Down Expand Up @@ -167,7 +175,7 @@ class ColumnReader {
constexpr static uint8_t kHasAllDictEncodedMask = 2;
constexpr static uint8_t kAllDictEncodedMask = 4;

Status _init(ColumnMetaPB* meta);
Status _init(ColumnMetaPB* meta, const TabletColumn* column);

Status _load_zonemap_index(const IndexReadOptions& opts);
Status _load_bitmap_index(const IndexReadOptions& opts);
Expand All @@ -180,6 +188,11 @@ class ColumnReader {
Status _zone_map_filter(const std::vector<const ColumnPredicate*>& predicates, const ColumnPredicate* del_predicate,
std::unordered_set<uint32_t>* del_partial_filtered_pages, std::vector<uint32_t>* pages);

StatusOr<std::unique_ptr<ColumnIterator>> _create_merge_struct_iter(ColumnAccessPath* path,
const TabletColumn* column);

void _update_sub_reader_pos(const TabletColumn* column, int pos);

// ColumnReader will be resident in memory. When there are many columns in the table,
// the meta in ColumnReader takes up a lot of memory,
// and now the content that is not needed in Meta is not saved to ColumnReader
Expand All @@ -205,6 +218,28 @@ class ColumnReader {

using SubReaderList = std::vector<std::unique_ptr<ColumnReader>>;
std::unique_ptr<SubReaderList> _sub_readers;
// Only used for struct column right now
// Use column names and unique IDs to distinguish sub-columns.
// The unnique id is always -1 for historical data, so the column name is needed.
// After support add/drop field for struct column, the following scenarios might occur:
// 1. Drop field v1
// 2. Add field v1
// The field `v1` in step 2 is different from the `v1` in step 1 and needs to be distinguished,
// So we also need to unqiue id.
struct SubReaderId {
std::string name;
int32_t id;

bool operator==(const SubReaderId& other) const { return id == other.id && name == other.name; }

bool operator<(const SubReaderId& other) const {
if (id != other.id) {
return id < other.id;
}
return name < other.name;
}
};
std::map<SubReaderId, int> _sub_reader_pos;

// Pointer to its father segment, as the column reader
// is never released before the end of the parent's life cycle,
Expand Down
6 changes: 3 additions & 3 deletions be/src/storage/rowset/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Status Segment::_create_column_readers(SegmentFooterPB* footer) {
continue;
}

auto res = ColumnReader::create(footer->mutable_columns(iter->second), this);
auto res = ColumnReader::create(footer->mutable_columns(iter->second), this, &column);
if (!res.ok()) {
return res.status();
}
Expand All @@ -371,7 +371,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> Segment::new_column_iterator_or_defaul
ColumnAccessPath* path) {
auto id = column.unique_id();
if (_column_readers.contains(id)) {
return _column_readers.at(id)->new_iterator(path);
return _column_readers.at(id)->new_iterator(path, &column);
} else if (!column.has_default_value() && !column.is_nullable()) {
return Status::InternalError(
fmt::format("invalid nonexistent column({}) without default value.", column.name()));
Expand All @@ -389,7 +389,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> Segment::new_column_iterator_or_defaul
StatusOr<std::unique_ptr<ColumnIterator>> Segment::new_column_iterator(ColumnUID id, ColumnAccessPath* path) {
auto iter = _column_readers.find(id);
if (iter != _column_readers.end()) {
return iter->second->new_iterator(path);
return iter->second->new_iterator(path, nullptr);
} else {
return Status::NotFound(fmt::format("{} does not contain column of id {}", _segment_file_info.path, id));
}
Expand Down
13 changes: 12 additions & 1 deletion be/src/storage/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,18 @@ class TabletColumn {
void add_sub_column(const TabletColumn& sub_column);
void add_sub_column(TabletColumn&& sub_column);
uint32_t subcolumn_count() const { return _extra_fields ? _extra_fields->sub_columns.size() : 0; }
const TabletColumn& subcolumn(uint32_t i) const { return _extra_fields->sub_columns[i]; }
const TabletColumn& subcolumn(uint32_t i) const {
if (i >= subcolumn_count()) {
throw std::out_of_range("Index i is out of range");
}
return _extra_fields->sub_columns[i];
}
const TabletColumn* subcolumn_ptr(uint32_t i) const {
if (i >= subcolumn_count()) {
return nullptr;
}
return &(_extra_fields->sub_columns[i]);
}

friend bool operator==(const TabletColumn& a, const TabletColumn& b);
friend bool operator!=(const TabletColumn& a, const TabletColumn& b);
Expand Down
8 changes: 4 additions & 4 deletions be/test/storage/rowset/column_reader_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class ColumnReaderWriterTest : public testing::Test {
// read and check
{
// read and check
auto res = ColumnReader::create(&meta, segment.get());
auto res = ColumnReader::create(&meta, segment.get(), nullptr);
ASSERT_TRUE(res.ok());
auto reader = std::move(res).value();

Expand Down Expand Up @@ -386,7 +386,7 @@ class ColumnReaderWriterTest : public testing::Test {

// read and check
{
auto res = ColumnReader::create(&meta, segment.get());
auto res = ColumnReader::create(&meta, segment.get(), nullptr);
ASSERT_TRUE(res.ok());
auto reader = std::move(res).value();

Expand Down Expand Up @@ -709,7 +709,7 @@ TEST_F(ColumnReaderWriterTest, test_scalar_column_total_mem_footprint) {
// read and check
{
// read and check
auto res = ColumnReader::create(&meta, segment.get());
auto res = ColumnReader::create(&meta, segment.get(), nullptr);
ASSERT_TRUE(res.ok());
auto reader = std::move(res).value();
ASSERT_EQ(1024, meta.num_rows());
Expand Down Expand Up @@ -770,7 +770,7 @@ TEST_F(ColumnReaderWriterTest, test_large_varchar_column_writer) {
ASSERT_TRUE(wfile->close().ok());
// read and check result
auto segment = create_dummy_segment(fs, fname);
auto res = ColumnReader::create(&meta, segment.get());
auto res = ColumnReader::create(&meta, segment.get(), nullptr);
ASSERT_TRUE(res.ok());
auto reader = std::move(res).value();
ASSIGN_OR_ABORT(auto iter, reader->new_iterator());
Expand Down
Loading

0 comments on commit 33c4a74

Please sign in to comment.