Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support add/drop field for struct column(part1) #46451

Merged
merged 7 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/storage/metadata_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ static Status type_desc_to_pb(const std::vector<TTypeNode>& types, int* index, C
// All struct fields all nullable now
RETURN_IF_ERROR(type_desc_to_pb(types, index, field_pb));
field_pb->set_name(field.name);
if (field.__isset.id && field.id >= 0) {
field_pb->set_unique_id(field.id);
}
}
return Status::OK();
}
Expand Down
110 changes: 89 additions & 21 deletions be/src/storage/rowset/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "storage/rowset/bitmap_index_reader.h"
#include "storage/rowset/bloom_filter.h"
#include "storage/rowset/bloom_filter_index_reader.h"
#include "storage/rowset/default_value_column_iterator.h"
#include "storage/rowset/encoding_info.h"
#include "storage/rowset/json_column_iterator.h"
#include "storage/rowset/map_column_iterator.h"
Expand All @@ -71,9 +72,10 @@

namespace starrocks {

StatusOr<std::unique_ptr<ColumnReader>> ColumnReader::create(ColumnMetaPB* meta, Segment* segment) {
StatusOr<std::unique_ptr<ColumnReader>> ColumnReader::create(ColumnMetaPB* meta, Segment* segment,
const TabletColumn* column) {
auto r = std::make_unique<ColumnReader>(private_type(0), segment);
RETURN_IF_ERROR(r->_init(meta));
RETURN_IF_ERROR(r->_init(meta, column));
return std::move(r);
}

Expand Down Expand Up @@ -110,11 +112,15 @@ ColumnReader::~ColumnReader() {
MEM_TRACKER_SAFE_RELEASE(GlobalEnv::GetInstance()->column_metadata_mem_tracker(), sizeof(ColumnReader));
}

Status ColumnReader::_init(ColumnMetaPB* meta) {
Status ColumnReader::_init(ColumnMetaPB* meta, const TabletColumn* column) {
_column_type = static_cast<LogicalType>(meta->type());
_dict_page_pointer = PagePointer(meta->dict_page());
_total_mem_footprint = meta->total_mem_footprint();
_name = meta->has_name() ? meta->name() : "None";
if (column == nullptr) {
_name = meta->has_name() ? meta->name() : "None";
} else {
_name = meta->has_name() ? meta->name() : column->name();
}
_column_unique_id = meta->unique_id();

if (meta->is_nullable()) _flags |= kIsNullableMask;
Expand Down Expand Up @@ -189,7 +195,8 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
if (_column_type == LogicalType::TYPE_JSON) {
_sub_readers = std::make_unique<SubReaderList>();
for (int i = 0; i < meta->children_columns_size(); ++i) {
auto res = ColumnReader::create(meta->mutable_children_columns(i), _segment);
auto sub_column = (column != nullptr) ? column->subcolumn_ptr(i) : nullptr;
auto res = ColumnReader::create(meta->mutable_children_columns(i), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
}
Expand All @@ -204,18 +211,19 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
}
_sub_readers->reserve(3);

auto sub_column = (column != nullptr) ? column->subcolumn_ptr(0) : nullptr;
// elements
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// null flags
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(2), _segment);
res = ColumnReader::create(meta->mutable_children_columns(2), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
} else {
Expand All @@ -224,13 +232,14 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
}
_sub_readers->reserve(2);

auto sub_column = (column != nullptr) ? column->subcolumn_ptr(0) : nullptr;
// elements
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
}
Expand All @@ -244,22 +253,22 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
_sub_readers->reserve(4);

// keys
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// values
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// null flags
res = ColumnReader::create(meta->mutable_children_columns(2), _segment);
res = ColumnReader::create(meta->mutable_children_columns(2), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(3), _segment);
res = ColumnReader::create(meta->mutable_children_columns(3), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
} else {
Expand All @@ -269,27 +278,29 @@ Status ColumnReader::_init(ColumnMetaPB* meta) {
_sub_readers->reserve(3);

// keys
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment);
auto res = ColumnReader::create(meta->mutable_children_columns(0), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// values
res = ColumnReader::create(meta->mutable_children_columns(1), _segment);
res = ColumnReader::create(meta->mutable_children_columns(1), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());

// offsets
res = ColumnReader::create(meta->mutable_children_columns(2), _segment);
res = ColumnReader::create(meta->mutable_children_columns(2), _segment, nullptr);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
}
return Status::OK();
} else if (_column_type == LogicalType::TYPE_STRUCT) {
_sub_readers = std::make_unique<SubReaderList>();
for (int i = 0; i < meta->children_columns_size(); ++i) {
auto res = ColumnReader::create(meta->mutable_children_columns(i), _segment);
auto sub_column = (column != nullptr) ? column->subcolumn_ptr(i) : nullptr;
wyb marked this conversation as resolved.
Show resolved Hide resolved
auto res = ColumnReader::create(meta->mutable_children_columns(i), _segment, sub_column);
RETURN_IF_ERROR(res);
_sub_readers->emplace_back(std::move(res).value());
_update_sub_reader_pos(sub_column, i);
}
return Status::OK();
} else {
Expand Down Expand Up @@ -597,7 +608,60 @@ bool ColumnReader::segment_zone_map_filter(const std::vector<const ColumnPredica
return std::all_of(predicates.begin(), predicates.end(), filter);
}

StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAccessPath* path) {
void ColumnReader::_update_sub_reader_pos(const TabletColumn* column, int pos) {
if (column == nullptr) {
return;
}
auto name = column->name();
int id = column->unique_id();
_sub_reader_pos[{std::string(name), id}] = pos;
}

StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::_create_merge_struct_iter(ColumnAccessPath* path,
wyb marked this conversation as resolved.
Show resolved Hide resolved
const TabletColumn* column) {
DCHECK(_column_type == LogicalType::TYPE_STRUCT);
DCHECK(column != nullptr);
auto num_fields = column->subcolumn_count();

std::unique_ptr<ColumnIterator> null_iter;
if (is_nullable()) {
ASSIGN_OR_RETURN(null_iter, (*_sub_readers)[_sub_readers->size() - 1]->new_iterator());
}

std::vector<ColumnAccessPath*> child_paths(num_fields, nullptr);
if (path != nullptr && !path->children().empty()) {
for (const auto& child : path->children()) {
child_paths[child->index()] = child.get();
}
}

std::vector<std::unique_ptr<ColumnIterator>> field_iters;
for (int i = 0; i < num_fields; ++i) {
auto sub_column = column->subcolumn_ptr(i);
auto iter = _sub_reader_pos.find({std::string(sub_column->name()), sub_column->unique_id()});
if (iter != _sub_reader_pos.end()) {
ASSIGN_OR_RETURN(auto iter, (*_sub_readers)[iter->second]->new_iterator(child_paths[i], sub_column));
field_iters.emplace_back(std::move(iter));
} else {
if (!sub_column->has_default_value() && !sub_column->is_nullable()) {
return Status::InternalError(
fmt::format("invalid nonexistent column({}) without default value.", sub_column->name()));
} else {
const TypeInfoPtr& type_info = get_type_info(*sub_column);
auto default_value_iter = std::make_unique<DefaultValueColumnIterator>(
sub_column->has_default_value(), sub_column->default_value(), sub_column->is_nullable(),
type_info, sub_column->length(), num_rows());
ColumnIteratorOptions iter_opts;
RETURN_IF_ERROR(default_value_iter->init(iter_opts));
field_iters.emplace_back(std::move(default_value_iter));
}
}
}
return create_struct_iter(this, std::move(null_iter), std::move(field_iters), path);
}

StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAccessPath* path,
const TabletColumn* column) {
if (_column_type == LogicalType::TYPE_JSON) {
auto json_iter = std::make_unique<ScalarColumnIterator>(this);
if (path == nullptr || path->children().empty()) {
Expand Down Expand Up @@ -663,7 +727,8 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
}
}

ASSIGN_OR_RETURN(auto element_iterator, (*_sub_readers)[col++]->new_iterator(value_path));
auto sub_column = (column != nullptr) ? column->subcolumn_ptr(0) : nullptr;
ASSIGN_OR_RETURN(auto element_iterator, (*_sub_readers)[col++]->new_iterator(value_path, sub_column));

std::unique_ptr<ColumnIterator> null_iterator;
if (is_nullable()) {
Expand Down Expand Up @@ -691,7 +756,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces

// key must scalar type now
ASSIGN_OR_RETURN(auto keys, (*_sub_readers)[col++]->new_iterator());
ASSIGN_OR_RETURN(auto values, (*_sub_readers)[col++]->new_iterator(value_path));
ASSIGN_OR_RETURN(auto values, (*_sub_readers)[col++]->new_iterator(value_path, nullptr));
std::unique_ptr<ColumnIterator> nulls;
if (is_nullable()) {
ASSIGN_OR_RETURN(nulls, (*_sub_readers)[col++]->new_iterator());
Expand All @@ -700,6 +765,9 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
return std::make_unique<MapColumnIterator>(this, std::move(nulls), std::move(offsets), std::move(keys),
std::move(values), path);
} else if (_column_type == LogicalType::TYPE_STRUCT) {
if (column != nullptr) {
return _create_merge_struct_iter(path, column);
}
auto num_fields = _sub_readers->size();

std::unique_ptr<ColumnIterator> null_iter;
Expand Down
41 changes: 38 additions & 3 deletions be/src/storage/rowset/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,21 @@ struct NgramBloomFilterReaderOptions;
// This will cache data shared by all reader
class ColumnReader {
struct private_type;
struct SubReaderId;

public:
// Create and initialize a ColumnReader.
// This method will not take the ownership of |meta|.
// Note that |meta| is mutable, this method may change its internal state.
//
// The primary purpose of the |column| currently is to obtain the name and unique ID of the sub_column
// to support the add/drop field functionality of the struct column.
// It is important that the |column| needs to be consistent with the tablet schema corresponding to the segment.
// If you can ensure that this column does not involve a struct column, the |column| can be set to nullptr.
//
// To developers: keep this method lightweight, should not incur any I/O.
static StatusOr<std::unique_ptr<ColumnReader>> create(ColumnMetaPB* meta, Segment* segment);
static StatusOr<std::unique_ptr<ColumnReader>> create(ColumnMetaPB* meta, Segment* segment,
imay marked this conversation as resolved.
Show resolved Hide resolved
const TabletColumn* column);

ColumnReader(const private_type&, Segment* segment);
~ColumnReader();
Expand All @@ -105,7 +112,8 @@ class ColumnReader {
void operator=(ColumnReader&&) = delete;

// create a new column iterator.
StatusOr<std::unique_ptr<ColumnIterator>> new_iterator(ColumnAccessPath* path = nullptr);
StatusOr<std::unique_ptr<ColumnIterator>> new_iterator(ColumnAccessPath* path = nullptr,
const TabletColumn* column = nullptr);

// Caller should free returned iterator after unused.
// TODO: StatusOr<std::unique_ptr<ColumnIterator>> new_bitmap_index_iterator()
Expand Down Expand Up @@ -199,7 +207,7 @@ class ColumnReader {
constexpr static uint8_t kHasAllDictEncodedMask = 2;
constexpr static uint8_t kAllDictEncodedMask = 4;

Status _init(ColumnMetaPB* meta);
Status _init(ColumnMetaPB* meta, const TabletColumn* column);

Status _load_zonemap_index(const IndexReadOptions& opts);
Status _load_bitmap_index(const IndexReadOptions& opts);
Expand All @@ -219,6 +227,11 @@ class ColumnReader {

bool _inverted_index_loaded() const { return invoked(_inverted_index_load_once); }

StatusOr<std::unique_ptr<ColumnIterator>> _create_merge_struct_iter(ColumnAccessPath* path,
const TabletColumn* column);

void _update_sub_reader_pos(const TabletColumn* column, int pos);

// ColumnReader will be resident in memory. When there are many columns in the table,
// the meta in ColumnReader takes up a lot of memory,
// and now the content that is not needed in Meta is not saved to ColumnReader
Expand Down Expand Up @@ -247,6 +260,28 @@ class ColumnReader {

using SubReaderList = std::vector<std::unique_ptr<ColumnReader>>;
std::unique_ptr<SubReaderList> _sub_readers;
// Only used for struct column right now
// Use column names and unique IDs to distinguish sub-columns.
// The unnique id is always -1 for historical data, so the column name is needed.
// After support add/drop field for struct column, the following scenarios might occur:
// 1. Drop field v1
// 2. Add field v1
// The field `v1` in step 2 is different from the `v1` in step 1 and needs to be distinguished,
// So we also need to unqiue id.
struct SubReaderId {
std::string name;
int32_t id;
imay marked this conversation as resolved.
Show resolved Hide resolved

bool operator==(const SubReaderId& other) const { return id == other.id && name == other.name; }

bool operator<(const SubReaderId& other) const {
if (id != other.id) {
return id < other.id;
}
return name < other.name;
}
};
std::map<SubReaderId, int> _sub_reader_pos;

// Pointer to its father segment, as the column reader
// is never released before the end of the parent's life cycle,
Expand Down
6 changes: 3 additions & 3 deletions be/src/storage/rowset/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ Status Segment::_create_column_readers(SegmentFooterPB* footer) {
continue;
}

auto res = ColumnReader::create(footer->mutable_columns(iter->second), this);
auto res = ColumnReader::create(footer->mutable_columns(iter->second), this, &column);
if (!res.ok()) {
return res.status();
}
Expand All @@ -410,7 +410,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> Segment::new_column_iterator_or_defaul
ColumnAccessPath* path) {
auto id = column.unique_id();
if (_column_readers.contains(id)) {
ASSIGN_OR_RETURN(auto source_iter, _column_readers[id]->new_iterator(path));
ASSIGN_OR_RETURN(auto source_iter, _column_readers[id]->new_iterator(path, &column));
if (_column_readers[id]->column_type() == column.type()) {
return source_iter;
} else {
Expand Down Expand Up @@ -439,7 +439,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> Segment::new_column_iterator(const Tab
auto id = column.unique_id();
auto iter = _column_readers.find(id);
if (iter != _column_readers.end()) {
ASSIGN_OR_RETURN(auto source_iter, iter->second->new_iterator(path));
ASSIGN_OR_RETURN(auto source_iter, iter->second->new_iterator(path, nullptr));
if (iter->second->column_type() == column.type()) {
return source_iter;
} else {
Expand Down
13 changes: 12 additions & 1 deletion be/src/storage/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,18 @@ class TabletColumn {
void add_sub_column(const TabletColumn& sub_column);
void add_sub_column(TabletColumn&& sub_column);
uint32_t subcolumn_count() const { return _extra_fields ? _extra_fields->sub_columns.size() : 0; }
const TabletColumn& subcolumn(uint32_t i) const { return _extra_fields->sub_columns[i]; }
const TabletColumn& subcolumn(uint32_t i) const {
if (i >= subcolumn_count()) {
throw std::out_of_range("Index i is out of range");
}
return _extra_fields->sub_columns[i];
}
const TabletColumn* subcolumn_ptr(uint32_t i) const {
imay marked this conversation as resolved.
Show resolved Hide resolved
if (i >= subcolumn_count()) {
return nullptr;
}
return &(_extra_fields->sub_columns[i]);
}

friend bool operator==(const TabletColumn& a, const TabletColumn& b);
friend bool operator!=(const TabletColumn& a, const TabletColumn& b);
Expand Down
Loading
Loading