Skip to content

Commit

Permalink
[Feature] Support add/drop filed for struct column in share-data mode (
Browse files Browse the repository at this point in the history
…#48430)

Support add/drop field for struct column in shared-data mode.


Signed-off-by: sevev <[email protected]>
  • Loading branch information
sevev authored Jul 30, 2024
1 parent 576c814 commit 3c8d168
Show file tree
Hide file tree
Showing 25 changed files with 606 additions and 42 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1378,4 +1378,7 @@ CONF_mBool(skip_lake_pk_preload, "false");
// Reduce core file size by not dumping jemalloc retain pages
CONF_mBool(enable_core_file_size_optimization, "true");

// Experimental feature, this configuration will be removed after testing is complete.
CONF_mBool(lake_enable_alter_struct, "true");

} // namespace starrocks::config
5 changes: 3 additions & 2 deletions be/src/storage/lake/compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
namespace starrocks::lake {

CompactionTask::CompactionTask(VersionedTablet tablet, std::vector<std::shared_ptr<Rowset>> input_rowsets,
CompactionTaskContext* context)
CompactionTaskContext* context, std::shared_ptr<const TabletSchema> tablet_schema)
: _txn_id(context->txn_id),
_tablet(std::move(tablet)),
_input_rowsets(std::move(input_rowsets)),
_mem_tracker(std::make_unique<MemTracker>(MemTracker::COMPACTION, -1,
"Compaction-" + std::to_string(_tablet.metadata()->id()),
GlobalEnv::GetInstance()->compaction_mem_tracker())),
_context(context) {}
_context(context),
_tablet_schema(std::move(tablet_schema)) {}

Status CompactionTask::execute_index_major_compaction(TxnLogPB* txn_log) {
if (_tablet.get_schema()->keys_type() == KeysType::PRIMARY_KEYS) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/lake/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class CompactionTask {
using CancelFunc = std::function<Status()>;

explicit CompactionTask(VersionedTablet tablet, std::vector<std::shared_ptr<Rowset>> input_rowsets,
CompactionTaskContext* context);
CompactionTaskContext* context, std::shared_ptr<const TabletSchema> tablet_schema);
virtual ~CompactionTask() = default;

virtual Status execute(CancelFunc cancel_func, ThreadPool* flush_pool = nullptr) = 0;
Expand All @@ -54,6 +54,7 @@ class CompactionTask {
std::vector<std::shared_ptr<Rowset>> _input_rowsets;
std::unique_ptr<MemTracker> _mem_tracker = nullptr;
CompactionTaskContext* _context;
std::shared_ptr<const TabletSchema> _tablet_schema;
};

} // namespace starrocks::lake
20 changes: 10 additions & 10 deletions be/src/storage/lake/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ namespace starrocks::lake {
Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush_pool) {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get());

auto tablet_schema = _tablet.get_schema();
int64_t total_num_rows = 0;
for (auto& rowset : _input_rowsets) {
total_num_rows += rowset->num_rows();
Expand All @@ -43,8 +42,8 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu

VLOG(3) << "Start horizontal compaction. tablet: " << _tablet.id() << ", reader chunk size: " << chunk_size;

Schema schema = ChunkHelper::convert_schema(tablet_schema);
TabletReader reader(_tablet.tablet_manager(), _tablet.metadata(), schema, _input_rowsets);
Schema schema = ChunkHelper::convert_schema(_tablet_schema);
TabletReader reader(_tablet.tablet_manager(), _tablet.metadata(), schema, _input_rowsets, _tablet_schema);
RETURN_IF_ERROR(reader.prepare());
TabletReaderParams reader_params;
reader_params.reader_type = READER_CUMULATIVE_COMPACTION;
Expand All @@ -54,7 +53,9 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
reader_params.lake_io_opts = {false, config::lake_compaction_stream_buffer_size_bytes};
RETURN_IF_ERROR(reader.open(reader_params));

ASSIGN_OR_RETURN(auto writer, _tablet.new_writer(kHorizontal, _txn_id, 0, flush_pool, true /** compaction **/))
ASSIGN_OR_RETURN(auto writer,
_tablet.new_writer_with_schema(kHorizontal, _txn_id, 0, flush_pool, true /** compaction **/,
_tablet_schema /** output rowset schema**/))
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });

Expand All @@ -78,7 +79,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
{
SCOPED_RAW_TIMER(&reader_time_ns);
auto st = Status::OK();
if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && enable_light_pk_compaction_publish) {
if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && enable_light_pk_compaction_publish) {
st = reader.get_next(chunk.get(), &rssid_rowids);
} else {
st = reader.get_next(chunk.get());
Expand All @@ -89,7 +90,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
return st;
}
}
ChunkHelper::padding_char_columns(char_field_indexes, schema, tablet_schema, chunk.get());
ChunkHelper::padding_char_columns(char_field_indexes, schema, _tablet_schema, chunk.get());
if (rssid_rowids.empty()) {
RETURN_IF_ERROR(writer->write(*chunk));
} else {
Expand Down Expand Up @@ -136,10 +137,10 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
op_compaction->set_compact_version(_tablet.metadata()->version());
RETURN_IF_ERROR(execute_index_major_compaction(txn_log.get()));
RETURN_IF_ERROR(_tablet.tablet_manager()->put_txn_log(txn_log));
if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
// preload primary key table's compaction state
Tablet t(_tablet.tablet_manager(), _tablet.id());
_tablet.tablet_manager()->update_mgr()->preload_compaction_state(*txn_log, t, tablet_schema);
_tablet.tablet_manager()->update_mgr()->preload_compaction_state(*txn_log, t, _tablet_schema);
}

LOG(INFO) << "Horizontal compaction finished. tablet: " << _tablet.id() << ", txn_id: " << _txn_id
Expand All @@ -152,7 +153,6 @@ StatusOr<int32_t> HorizontalCompactionTask::calculate_chunk_size() {
int64_t total_num_rows = 0;
int64_t total_input_segs = 0;
int64_t total_mem_footprint = 0;
auto tablet_schema = _tablet.get_schema();
for (auto& rowset : _input_rowsets) {
total_num_rows += rowset->num_rows();
total_input_segs += rowset->is_overlapped() ? rowset->num_segments() : 1;
Expand All @@ -162,7 +162,7 @@ StatusOr<int32_t> HorizontalCompactionTask::calculate_chunk_size() {
ASSIGN_OR_RETURN(auto segments, rowset->segments(lake_io_opts, fill_meta_cache));
for (auto& segment : segments) {
for (size_t i = 0; i < segment->num_columns(); ++i) {
auto uid = tablet_schema->column(i).unique_id();
auto uid = _tablet_schema->column(i).unique_id();
const auto* column_reader = segment->column_with_uid(uid);
if (column_reader == nullptr) {
continue;
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/lake/horizontal_compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class TabletWriter;
class HorizontalCompactionTask : public CompactionTask {
public:
explicit HorizontalCompactionTask(VersionedTablet tablet, std::vector<std::shared_ptr<Rowset>> input_rowsets,
CompactionTaskContext* context)
: CompactionTask(std::move(tablet), std::move(input_rowsets), context) {}
CompactionTaskContext* context, std::shared_ptr<const TabletSchema> tablet_schema)
: CompactionTask(std::move(tablet), std::move(input_rowsets), context, std::move(tablet_schema)) {}

~HorizontalCompactionTask() override = default;

Expand Down
49 changes: 49 additions & 0 deletions be/src/storage/lake/meta_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ void MetaFileBuilder::apply_opwrite(const TxnLogPB_OpWrite& op_write, const std:
file_meta.set_name(orphan_file);
_tablet_meta->mutable_orphan_files()->Add(std::move(file_meta));
}
if (!_tablet_meta->rowset_to_schema().empty()) {
auto schema_id = _tablet_meta->schema().id();
(*_tablet_meta->mutable_rowset_to_schema())[rowset->id()] = schema_id;
if (_tablet_meta->historical_schemas().count(schema_id) <= 0) {
auto& item = (*_tablet_meta->mutable_historical_schemas())[schema_id];
item.CopyFrom(_tablet_meta->schema());
}
}
}

void MetaFileBuilder::apply_column_mode_partial_update(const TxnLogPB_OpWrite& op_write) {
Expand Down Expand Up @@ -210,6 +218,12 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact
uint32_t id;
bool operator()(const uint32_t rowid) const { return rowid == id; }
};

struct RowsetFinder {
uint32_t id;
bool operator()(const RowsetMetadata& r) const { return r.id() == id; }
};

// Only used for cloud native persistent index.
std::vector<DelfileWithRowsetId> collect_del_files;
auto it = _tablet_meta->mutable_rowsets()->begin();
Expand Down Expand Up @@ -257,6 +271,8 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact
}

// add output rowset
bool has_output_rowset = false;
uint32_t output_rowset_id = 0;
if (op_compaction.has_output_rowset() &&
(op_compaction.output_rowset().segments_size() > 0 || !collect_del_files.empty())) {
// NOTICE: we need output rowset in two scenarios:
Expand All @@ -271,6 +287,39 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact
rowset->add_del_files()->CopyFrom(each);
}
_tablet_meta->set_next_rowset_id(_tablet_meta->next_rowset_id() + std::max(1, rowset->segments_size()));
has_output_rowset = true;
output_rowset_id = rowset->id();
}

// update rowset schema id
if (!_tablet_meta->rowset_to_schema().empty()) {
int64_t output_rowset_schema_id = _tablet_meta->schema().id();
if (has_output_rowset) {
auto last_rowset_id = op_compaction.input_rowsets(op_compaction.input_rowsets_size() - 1);
output_rowset_schema_id = _tablet_meta->rowset_to_schema().at(last_rowset_id);
}

for (int i = 0; i < op_compaction.input_rowsets_size(); i++) {
_tablet_meta->mutable_rowset_to_schema()->erase(op_compaction.input_rowsets(i));
}

if (has_output_rowset) {
_tablet_meta->mutable_rowset_to_schema()->insert({output_rowset_id, output_rowset_schema_id});
}

std::unordered_set<int64_t> schema_id;
for (auto& pair : _tablet_meta->rowset_to_schema()) {
schema_id.insert(pair.second);
}

for (auto it = _tablet_meta->mutable_historical_schemas()->begin();
it != _tablet_meta->mutable_historical_schemas()->end();) {
if (schema_id.find(it->first) == schema_id.end()) {
it = _tablet_meta->mutable_historical_schemas()->erase(it);
} else {
it++;
}
}
}

VLOG(2) << fmt::format(
Expand Down
13 changes: 11 additions & 2 deletions be/src/storage/lake/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,17 @@ Rowset::Rowset(TabletManager* tablet_mgr, TabletMetadataPtr tablet_metadata, int
_tablet_id(tablet_metadata->id()),
_metadata(&tablet_metadata->rowsets(rowset_index)),
_index(rowset_index),
_tablet_schema(GlobalTabletSchemaMap::Instance()->emplace(tablet_metadata->schema()).first),
_tablet_metadata(std::move(tablet_metadata)) {}
_tablet_metadata(std::move(tablet_metadata)) {
auto rowset_id = _tablet_metadata->rowsets(rowset_index).id();
if (_tablet_metadata->rowset_to_schema().empty()) {
_tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->schema()).first;
} else {
auto schema_id = _tablet_metadata->rowset_to_schema().at(rowset_id);
CHECK(_tablet_metadata->historical_schemas().count(schema_id) > 0);
_tablet_schema =
GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->historical_schemas().at(schema_id)).first;
}
}

Rowset::~Rowset() {
if (_tablet_metadata) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/lake/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Status ConvertedSchemaChange::init() {
Status DirectSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_metadata) {
// create reader
auto reader = std::make_unique<TabletReader>(_base_tablet.tablet_manager(), _base_tablet.metadata(), _base_schema,
std::vector<RowsetPtr>{rowset});
std::vector<RowsetPtr>{rowset}, _base_tablet.get_schema());
RETURN_IF_ERROR(reader->prepare());
RETURN_IF_ERROR(reader->open(_read_params));

Expand Down Expand Up @@ -238,7 +238,7 @@ Status SortedSchemaChange::init() {
Status SortedSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_metadata) {
// create reader
auto reader = std::make_unique<TabletReader>(_base_tablet.tablet_manager(), _base_tablet.metadata(), _base_schema,
std::vector<RowsetPtr>{rowset});
std::vector<RowsetPtr>{rowset}, _base_tablet.get_schema());
RETURN_IF_ERROR(reader->prepare());
RETURN_IF_ERROR(reader->open(_read_params));

Expand Down
46 changes: 44 additions & 2 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,59 @@ StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema_by_id(int64_t tablet_
}
}

// If one rowset has much segments, we may use a lot of memory if we compaction all segments once a time.
// So we will support a part of segments in one rowset to do compaction in the future.
// To keep the consistence of all segments in one rowset, we will use the last rowset tablet schema as the
// output rowset schema. This is because the last rowset may only have part of the segment merged.
StatusOr<TabletSchemaPtr> TabletManager::get_output_rowset_schema(std::vector<uint32_t>& input_rowset,
const TabletMetadata* metadata) {
if (metadata->rowset_to_schema().empty() || input_rowset.size() <= 0) {
return GlobalTabletSchemaMap::Instance()->emplace(metadata->schema()).first;
}
TabletSchemaPtr tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(metadata->schema()).first;
struct Finder {
uint32_t id;
bool operator()(const RowsetMetadata& r) const { return r.id() == id; }
};

auto input_id = input_rowset[input_rowset.size() - 1];
auto iter = std::find_if(metadata->rowsets().begin(), metadata->rowsets().end(), Finder{input_id});
if (UNLIKELY(iter == metadata->rowsets().end())) {
return Status::InternalError(fmt::format("input rowset {} not found", input_id));
}

auto rowset_it = metadata->rowset_to_schema().find(input_id);
if (rowset_it != metadata->rowset_to_schema().end()) {
auto schema_it = metadata->historical_schemas().find(rowset_it->second);
if (schema_it != metadata->historical_schemas().end()) {
tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(schema_it->second).first;
} else {
return Status::InternalError(fmt::format("can not find output rowset schema, id {}", rowset_it->second));
}
} else {
return Status::InternalError(fmt::format("input rowset {} not exist in rowset_to_schema", input_id));
}
return tablet_schema;
}

StatusOr<CompactionTaskPtr> TabletManager::compact(CompactionTaskContext* context) {
ASSIGN_OR_RETURN(auto tablet, get_tablet(context->tablet_id, context->version));
auto tablet_metadata = tablet.metadata();
ASSIGN_OR_RETURN(auto compaction_policy, CompactionPolicy::create(this, tablet_metadata));
ASSIGN_OR_RETURN(auto input_rowsets, compaction_policy->pick_rowsets());
ASSIGN_OR_RETURN(auto algorithm, compaction_policy->choose_compaction_algorithm(input_rowsets));
std::vector<uint32_t> input_rowsets_id;
for (auto& rowset : input_rowsets) {
input_rowsets_id.emplace_back(rowset->id());
}
ASSIGN_OR_RETURN(auto tablet_schema, get_output_rowset_schema(input_rowsets_id, tablet_metadata.get()));
if (algorithm == VERTICAL_COMPACTION) {
return std::make_shared<VerticalCompactionTask>(std::move(tablet), std::move(input_rowsets), context);
return std::make_shared<VerticalCompactionTask>(std::move(tablet), std::move(input_rowsets), context,
std::move(tablet_schema));
} else {
DCHECK(algorithm == HORIZONTAL_COMPACTION);
return std::make_shared<HorizontalCompactionTask>(std::move(tablet), std::move(input_rowsets), context);
return std::make_shared<HorizontalCompactionTask>(std::move(tablet), std::move(input_rowsets), context,
std::move(tablet_schema));
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/lake/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ class TabletManager {

StatusOr<TxnLogPtr> get_txn_vlog(const std::string& path, bool fill_cache = true);

StatusOr<TabletSchemaPtr> get_output_rowset_schema(std::vector<uint32_t>& input_rowset,
const TabletMetadata* metadata);

#ifdef USE_STAROS
bool is_tablet_in_worker(int64_t tablet_id);
#endif // USE_STAROS
Expand Down
11 changes: 8 additions & 3 deletions be/src/storage/lake/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ TabletReader::TabletReader(TabletManager* tablet_mgr, std::shared_ptr<const Tabl
_could_split_physically(could_split_physically) {}

TabletReader::TabletReader(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> metadata, Schema schema,
std::vector<RowsetPtr> rowsets)
std::vector<RowsetPtr> rowsets, std::shared_ptr<const TabletSchema> tablet_schema)
: ChunkIterator(std::move(schema)),
_tablet_mgr(tablet_mgr),
_tablet_metadata(std::move(metadata)),
_tablet_schema(std::move(tablet_schema)),
_rowsets_inited(true),
_rowsets(std::move(rowsets)) {}

TabletReader::TabletReader(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> metadata, Schema schema,
std::vector<RowsetPtr> rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer)
std::vector<RowsetPtr> rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer,
std::shared_ptr<const TabletSchema> tablet_schema)
: ChunkIterator(std::move(schema)),
_tablet_mgr(tablet_mgr),
_tablet_metadata(std::move(metadata)),
_tablet_schema(std::move(tablet_schema)),
_rowsets_inited(true),
_rowsets(std::move(rowsets)),
_is_vertical_merge(true),
Expand All @@ -84,7 +87,9 @@ TabletReader::~TabletReader() {
}

Status TabletReader::prepare() {
_tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->schema()).first;
if (_tablet_schema == nullptr) {
_tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->schema()).first;
}
if (UNLIKELY(_tablet_schema == nullptr)) {
return Status::InternalError("failed to construct tablet schema");
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/storage/lake/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ class TabletReader final : public ChunkIterator {
TabletReader(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> metadata, Schema schema,
bool need_split, bool could_split_physically);
TabletReader(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> metadata, Schema schema,
std::vector<RowsetPtr> rowsets);
std::vector<RowsetPtr> rowsets, std::shared_ptr<const TabletSchema> tablet_schema);
TabletReader(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> metadata, Schema schema,
std::vector<RowsetPtr> rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer);
std::vector<RowsetPtr> rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer,
std::shared_ptr<const TabletSchema> tablet_schema);
~TabletReader() override;

DISALLOW_COPY_AND_MOVE(TabletReader);
Expand Down
Loading

0 comments on commit 3c8d168

Please sign in to comment.