From 3c8d168723e5162b26d5bd89e4a5051721cd2001 Mon Sep 17 00:00:00 2001 From: zhangqiang Date: Tue, 30 Jul 2024 21:54:47 +0800 Subject: [PATCH] [Feature] Support add/drop filed for struct column in share-data mode (#48430) Support add/drop field for struct column in shared-data mode. Signed-off-by: sevev --- be/src/common/config.h | 3 + be/src/storage/lake/compaction_task.cpp | 5 +- be/src/storage/lake/compaction_task.h | 3 +- .../lake/horizontal_compaction_task.cpp | 20 +- .../storage/lake/horizontal_compaction_task.h | 4 +- be/src/storage/lake/meta_file.cpp | 49 +++ be/src/storage/lake/rowset.cpp | 13 +- be/src/storage/lake/schema_change.cpp | 4 +- be/src/storage/lake/tablet_manager.cpp | 46 ++- be/src/storage/lake/tablet_manager.h | 3 + be/src/storage/lake/tablet_reader.cpp | 11 +- be/src/storage/lake/tablet_reader.h | 5 +- be/src/storage/lake/txn_log_applier.cpp | 62 ++++ be/src/storage/lake/update_manager.cpp | 21 +- be/src/storage/lake/versioned_tablet.cpp | 7 +- be/src/storage/lake/versioned_tablet.h | 5 + .../storage/lake/vertical_compaction_task.cpp | 8 +- .../storage/lake/vertical_compaction_task.h | 5 +- be/test/exprs/like_test.cpp | 1 - .../storage/lake/alter_tablet_meta_test.cpp | 283 ++++++++++++++++++ be/test/storage/lake/tablet_manager_test.cpp | 71 +++++ .../starrocks/alter/SchemaChangeHandler.java | 5 +- .../java/com/starrocks/common/Config.java | 3 + .../analyzer/AlterTableClauseAnalyzer.java | 4 +- gensrc/proto/lake_types.proto | 7 + 25 files changed, 606 insertions(+), 42 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 7faf7c2d95035..2b143984e2353 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1378,4 +1378,7 @@ CONF_mBool(skip_lake_pk_preload, "false"); // Reduce core file size by not dumping jemalloc retain pages CONF_mBool(enable_core_file_size_optimization, "true"); +// Experimental feature, this configuration will be removed after testing is complete. +CONF_mBool(lake_enable_alter_struct, "true"); + } // namespace starrocks::config diff --git a/be/src/storage/lake/compaction_task.cpp b/be/src/storage/lake/compaction_task.cpp index 8dcd5ea175485..60c9bd8302a76 100644 --- a/be/src/storage/lake/compaction_task.cpp +++ b/be/src/storage/lake/compaction_task.cpp @@ -21,14 +21,15 @@ namespace starrocks::lake { CompactionTask::CompactionTask(VersionedTablet tablet, std::vector> input_rowsets, - CompactionTaskContext* context) + CompactionTaskContext* context, std::shared_ptr tablet_schema) : _txn_id(context->txn_id), _tablet(std::move(tablet)), _input_rowsets(std::move(input_rowsets)), _mem_tracker(std::make_unique(MemTracker::COMPACTION, -1, "Compaction-" + std::to_string(_tablet.metadata()->id()), GlobalEnv::GetInstance()->compaction_mem_tracker())), - _context(context) {} + _context(context), + _tablet_schema(std::move(tablet_schema)) {} Status CompactionTask::execute_index_major_compaction(TxnLogPB* txn_log) { if (_tablet.get_schema()->keys_type() == KeysType::PRIMARY_KEYS) { diff --git a/be/src/storage/lake/compaction_task.h b/be/src/storage/lake/compaction_task.h index b6d3dbc729b91..a6898f574e8c1 100644 --- a/be/src/storage/lake/compaction_task.h +++ b/be/src/storage/lake/compaction_task.h @@ -38,7 +38,7 @@ class CompactionTask { using CancelFunc = std::function; explicit CompactionTask(VersionedTablet tablet, std::vector> input_rowsets, - CompactionTaskContext* context); + CompactionTaskContext* context, std::shared_ptr tablet_schema); virtual ~CompactionTask() = default; virtual Status execute(CancelFunc cancel_func, ThreadPool* flush_pool = nullptr) = 0; @@ -54,6 +54,7 @@ class CompactionTask { std::vector> _input_rowsets; std::unique_ptr _mem_tracker = nullptr; CompactionTaskContext* _context; + std::shared_ptr _tablet_schema; }; } // namespace starrocks::lake diff --git a/be/src/storage/lake/horizontal_compaction_task.cpp b/be/src/storage/lake/horizontal_compaction_task.cpp index 24f3425e7bf47..fcd924fe59ba5 100644 --- a/be/src/storage/lake/horizontal_compaction_task.cpp +++ b/be/src/storage/lake/horizontal_compaction_task.cpp @@ -33,7 +33,6 @@ namespace starrocks::lake { Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush_pool) { SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get()); - auto tablet_schema = _tablet.get_schema(); int64_t total_num_rows = 0; for (auto& rowset : _input_rowsets) { total_num_rows += rowset->num_rows(); @@ -43,8 +42,8 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu VLOG(3) << "Start horizontal compaction. tablet: " << _tablet.id() << ", reader chunk size: " << chunk_size; - Schema schema = ChunkHelper::convert_schema(tablet_schema); - TabletReader reader(_tablet.tablet_manager(), _tablet.metadata(), schema, _input_rowsets); + Schema schema = ChunkHelper::convert_schema(_tablet_schema); + TabletReader reader(_tablet.tablet_manager(), _tablet.metadata(), schema, _input_rowsets, _tablet_schema); RETURN_IF_ERROR(reader.prepare()); TabletReaderParams reader_params; reader_params.reader_type = READER_CUMULATIVE_COMPACTION; @@ -54,7 +53,9 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu reader_params.lake_io_opts = {false, config::lake_compaction_stream_buffer_size_bytes}; RETURN_IF_ERROR(reader.open(reader_params)); - ASSIGN_OR_RETURN(auto writer, _tablet.new_writer(kHorizontal, _txn_id, 0, flush_pool, true /** compaction **/)) + ASSIGN_OR_RETURN(auto writer, + _tablet.new_writer_with_schema(kHorizontal, _txn_id, 0, flush_pool, true /** compaction **/, + _tablet_schema /** output rowset schema**/)) RETURN_IF_ERROR(writer->open()); DeferOp defer([&]() { writer->close(); }); @@ -78,7 +79,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu { SCOPED_RAW_TIMER(&reader_time_ns); auto st = Status::OK(); - if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && enable_light_pk_compaction_publish) { + if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && enable_light_pk_compaction_publish) { st = reader.get_next(chunk.get(), &rssid_rowids); } else { st = reader.get_next(chunk.get()); @@ -89,7 +90,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu return st; } } - ChunkHelper::padding_char_columns(char_field_indexes, schema, tablet_schema, chunk.get()); + ChunkHelper::padding_char_columns(char_field_indexes, schema, _tablet_schema, chunk.get()); if (rssid_rowids.empty()) { RETURN_IF_ERROR(writer->write(*chunk)); } else { @@ -136,10 +137,10 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu op_compaction->set_compact_version(_tablet.metadata()->version()); RETURN_IF_ERROR(execute_index_major_compaction(txn_log.get())); RETURN_IF_ERROR(_tablet.tablet_manager()->put_txn_log(txn_log)); - if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) { + if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) { // preload primary key table's compaction state Tablet t(_tablet.tablet_manager(), _tablet.id()); - _tablet.tablet_manager()->update_mgr()->preload_compaction_state(*txn_log, t, tablet_schema); + _tablet.tablet_manager()->update_mgr()->preload_compaction_state(*txn_log, t, _tablet_schema); } LOG(INFO) << "Horizontal compaction finished. tablet: " << _tablet.id() << ", txn_id: " << _txn_id @@ -152,7 +153,6 @@ StatusOr HorizontalCompactionTask::calculate_chunk_size() { int64_t total_num_rows = 0; int64_t total_input_segs = 0; int64_t total_mem_footprint = 0; - auto tablet_schema = _tablet.get_schema(); for (auto& rowset : _input_rowsets) { total_num_rows += rowset->num_rows(); total_input_segs += rowset->is_overlapped() ? rowset->num_segments() : 1; @@ -162,7 +162,7 @@ StatusOr HorizontalCompactionTask::calculate_chunk_size() { ASSIGN_OR_RETURN(auto segments, rowset->segments(lake_io_opts, fill_meta_cache)); for (auto& segment : segments) { for (size_t i = 0; i < segment->num_columns(); ++i) { - auto uid = tablet_schema->column(i).unique_id(); + auto uid = _tablet_schema->column(i).unique_id(); const auto* column_reader = segment->column_with_uid(uid); if (column_reader == nullptr) { continue; diff --git a/be/src/storage/lake/horizontal_compaction_task.h b/be/src/storage/lake/horizontal_compaction_task.h index b0503395916f4..9328257cead06 100644 --- a/be/src/storage/lake/horizontal_compaction_task.h +++ b/be/src/storage/lake/horizontal_compaction_task.h @@ -31,8 +31,8 @@ class TabletWriter; class HorizontalCompactionTask : public CompactionTask { public: explicit HorizontalCompactionTask(VersionedTablet tablet, std::vector> input_rowsets, - CompactionTaskContext* context) - : CompactionTask(std::move(tablet), std::move(input_rowsets), context) {} + CompactionTaskContext* context, std::shared_ptr tablet_schema) + : CompactionTask(std::move(tablet), std::move(input_rowsets), context, std::move(tablet_schema)) {} ~HorizontalCompactionTask() override = default; diff --git a/be/src/storage/lake/meta_file.cpp b/be/src/storage/lake/meta_file.cpp index b03b1fadf1c6d..6601a22ea6ce3 100644 --- a/be/src/storage/lake/meta_file.cpp +++ b/be/src/storage/lake/meta_file.cpp @@ -135,6 +135,14 @@ void MetaFileBuilder::apply_opwrite(const TxnLogPB_OpWrite& op_write, const std: file_meta.set_name(orphan_file); _tablet_meta->mutable_orphan_files()->Add(std::move(file_meta)); } + if (!_tablet_meta->rowset_to_schema().empty()) { + auto schema_id = _tablet_meta->schema().id(); + (*_tablet_meta->mutable_rowset_to_schema())[rowset->id()] = schema_id; + if (_tablet_meta->historical_schemas().count(schema_id) <= 0) { + auto& item = (*_tablet_meta->mutable_historical_schemas())[schema_id]; + item.CopyFrom(_tablet_meta->schema()); + } + } } void MetaFileBuilder::apply_column_mode_partial_update(const TxnLogPB_OpWrite& op_write) { @@ -210,6 +218,12 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact uint32_t id; bool operator()(const uint32_t rowid) const { return rowid == id; } }; + + struct RowsetFinder { + uint32_t id; + bool operator()(const RowsetMetadata& r) const { return r.id() == id; } + }; + // Only used for cloud native persistent index. std::vector collect_del_files; auto it = _tablet_meta->mutable_rowsets()->begin(); @@ -257,6 +271,8 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact } // add output rowset + bool has_output_rowset = false; + uint32_t output_rowset_id = 0; if (op_compaction.has_output_rowset() && (op_compaction.output_rowset().segments_size() > 0 || !collect_del_files.empty())) { // NOTICE: we need output rowset in two scenarios: @@ -271,6 +287,39 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact rowset->add_del_files()->CopyFrom(each); } _tablet_meta->set_next_rowset_id(_tablet_meta->next_rowset_id() + std::max(1, rowset->segments_size())); + has_output_rowset = true; + output_rowset_id = rowset->id(); + } + + // update rowset schema id + if (!_tablet_meta->rowset_to_schema().empty()) { + int64_t output_rowset_schema_id = _tablet_meta->schema().id(); + if (has_output_rowset) { + auto last_rowset_id = op_compaction.input_rowsets(op_compaction.input_rowsets_size() - 1); + output_rowset_schema_id = _tablet_meta->rowset_to_schema().at(last_rowset_id); + } + + for (int i = 0; i < op_compaction.input_rowsets_size(); i++) { + _tablet_meta->mutable_rowset_to_schema()->erase(op_compaction.input_rowsets(i)); + } + + if (has_output_rowset) { + _tablet_meta->mutable_rowset_to_schema()->insert({output_rowset_id, output_rowset_schema_id}); + } + + std::unordered_set schema_id; + for (auto& pair : _tablet_meta->rowset_to_schema()) { + schema_id.insert(pair.second); + } + + for (auto it = _tablet_meta->mutable_historical_schemas()->begin(); + it != _tablet_meta->mutable_historical_schemas()->end();) { + if (schema_id.find(it->first) == schema_id.end()) { + it = _tablet_meta->mutable_historical_schemas()->erase(it); + } else { + it++; + } + } } VLOG(2) << fmt::format( diff --git a/be/src/storage/lake/rowset.cpp b/be/src/storage/lake/rowset.cpp index f8a759a10ef6f..54d5a73d8b0cd 100644 --- a/be/src/storage/lake/rowset.cpp +++ b/be/src/storage/lake/rowset.cpp @@ -45,8 +45,17 @@ Rowset::Rowset(TabletManager* tablet_mgr, TabletMetadataPtr tablet_metadata, int _tablet_id(tablet_metadata->id()), _metadata(&tablet_metadata->rowsets(rowset_index)), _index(rowset_index), - _tablet_schema(GlobalTabletSchemaMap::Instance()->emplace(tablet_metadata->schema()).first), - _tablet_metadata(std::move(tablet_metadata)) {} + _tablet_metadata(std::move(tablet_metadata)) { + auto rowset_id = _tablet_metadata->rowsets(rowset_index).id(); + if (_tablet_metadata->rowset_to_schema().empty()) { + _tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->schema()).first; + } else { + auto schema_id = _tablet_metadata->rowset_to_schema().at(rowset_id); + CHECK(_tablet_metadata->historical_schemas().count(schema_id) > 0); + _tablet_schema = + GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->historical_schemas().at(schema_id)).first; + } +} Rowset::~Rowset() { if (_tablet_metadata) { diff --git a/be/src/storage/lake/schema_change.cpp b/be/src/storage/lake/schema_change.cpp index e82d61d5d0be4..886a0fd13d9e3 100644 --- a/be/src/storage/lake/schema_change.cpp +++ b/be/src/storage/lake/schema_change.cpp @@ -166,7 +166,7 @@ Status ConvertedSchemaChange::init() { Status DirectSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_metadata) { // create reader auto reader = std::make_unique(_base_tablet.tablet_manager(), _base_tablet.metadata(), _base_schema, - std::vector{rowset}); + std::vector{rowset}, _base_tablet.get_schema()); RETURN_IF_ERROR(reader->prepare()); RETURN_IF_ERROR(reader->open(_read_params)); @@ -238,7 +238,7 @@ Status SortedSchemaChange::init() { Status SortedSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_metadata) { // create reader auto reader = std::make_unique(_base_tablet.tablet_manager(), _base_tablet.metadata(), _base_schema, - std::vector{rowset}); + std::vector{rowset}, _base_tablet.get_schema()); RETURN_IF_ERROR(reader->prepare()); RETURN_IF_ERROR(reader->open(_read_params)); diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index 391a3b25da473..92b928ba37a66 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -579,17 +579,59 @@ StatusOr TabletManager::get_tablet_schema_by_id(int64_t tablet_ } } +// If one rowset has much segments, we may use a lot of memory if we compaction all segments once a time. +// So we will support a part of segments in one rowset to do compaction in the future. +// To keep the consistence of all segments in one rowset, we will use the last rowset tablet schema as the +// output rowset schema. This is because the last rowset may only have part of the segment merged. +StatusOr TabletManager::get_output_rowset_schema(std::vector& input_rowset, + const TabletMetadata* metadata) { + if (metadata->rowset_to_schema().empty() || input_rowset.size() <= 0) { + return GlobalTabletSchemaMap::Instance()->emplace(metadata->schema()).first; + } + TabletSchemaPtr tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(metadata->schema()).first; + struct Finder { + uint32_t id; + bool operator()(const RowsetMetadata& r) const { return r.id() == id; } + }; + + auto input_id = input_rowset[input_rowset.size() - 1]; + auto iter = std::find_if(metadata->rowsets().begin(), metadata->rowsets().end(), Finder{input_id}); + if (UNLIKELY(iter == metadata->rowsets().end())) { + return Status::InternalError(fmt::format("input rowset {} not found", input_id)); + } + + auto rowset_it = metadata->rowset_to_schema().find(input_id); + if (rowset_it != metadata->rowset_to_schema().end()) { + auto schema_it = metadata->historical_schemas().find(rowset_it->second); + if (schema_it != metadata->historical_schemas().end()) { + tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(schema_it->second).first; + } else { + return Status::InternalError(fmt::format("can not find output rowset schema, id {}", rowset_it->second)); + } + } else { + return Status::InternalError(fmt::format("input rowset {} not exist in rowset_to_schema", input_id)); + } + return tablet_schema; +} + StatusOr TabletManager::compact(CompactionTaskContext* context) { ASSIGN_OR_RETURN(auto tablet, get_tablet(context->tablet_id, context->version)); auto tablet_metadata = tablet.metadata(); ASSIGN_OR_RETURN(auto compaction_policy, CompactionPolicy::create(this, tablet_metadata)); ASSIGN_OR_RETURN(auto input_rowsets, compaction_policy->pick_rowsets()); ASSIGN_OR_RETURN(auto algorithm, compaction_policy->choose_compaction_algorithm(input_rowsets)); + std::vector input_rowsets_id; + for (auto& rowset : input_rowsets) { + input_rowsets_id.emplace_back(rowset->id()); + } + ASSIGN_OR_RETURN(auto tablet_schema, get_output_rowset_schema(input_rowsets_id, tablet_metadata.get())); if (algorithm == VERTICAL_COMPACTION) { - return std::make_shared(std::move(tablet), std::move(input_rowsets), context); + return std::make_shared(std::move(tablet), std::move(input_rowsets), context, + std::move(tablet_schema)); } else { DCHECK(algorithm == HORIZONTAL_COMPACTION); - return std::make_shared(std::move(tablet), std::move(input_rowsets), context); + return std::make_shared(std::move(tablet), std::move(input_rowsets), context, + std::move(tablet_schema)); } } diff --git a/be/src/storage/lake/tablet_manager.h b/be/src/storage/lake/tablet_manager.h index dec23d587c225..46f513727a113 100644 --- a/be/src/storage/lake/tablet_manager.h +++ b/be/src/storage/lake/tablet_manager.h @@ -114,6 +114,9 @@ class TabletManager { StatusOr get_txn_vlog(const std::string& path, bool fill_cache = true); + StatusOr get_output_rowset_schema(std::vector& input_rowset, + const TabletMetadata* metadata); + #ifdef USE_STAROS bool is_tablet_in_worker(int64_t tablet_id); #endif // USE_STAROS diff --git a/be/src/storage/lake/tablet_reader.cpp b/be/src/storage/lake/tablet_reader.cpp index 428ef176b78eb..b48fd75fe2012 100644 --- a/be/src/storage/lake/tablet_reader.cpp +++ b/be/src/storage/lake/tablet_reader.cpp @@ -59,18 +59,21 @@ TabletReader::TabletReader(TabletManager* tablet_mgr, std::shared_ptr metadata, Schema schema, - std::vector rowsets) + std::vector rowsets, std::shared_ptr tablet_schema) : ChunkIterator(std::move(schema)), _tablet_mgr(tablet_mgr), _tablet_metadata(std::move(metadata)), + _tablet_schema(std::move(tablet_schema)), _rowsets_inited(true), _rowsets(std::move(rowsets)) {} TabletReader::TabletReader(TabletManager* tablet_mgr, std::shared_ptr metadata, Schema schema, - std::vector rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer) + std::vector rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer, + std::shared_ptr tablet_schema) : ChunkIterator(std::move(schema)), _tablet_mgr(tablet_mgr), _tablet_metadata(std::move(metadata)), + _tablet_schema(std::move(tablet_schema)), _rowsets_inited(true), _rowsets(std::move(rowsets)), _is_vertical_merge(true), @@ -84,7 +87,9 @@ TabletReader::~TabletReader() { } Status TabletReader::prepare() { - _tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->schema()).first; + if (_tablet_schema == nullptr) { + _tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(_tablet_metadata->schema()).first; + } if (UNLIKELY(_tablet_schema == nullptr)) { return Status::InternalError("failed to construct tablet schema"); } diff --git a/be/src/storage/lake/tablet_reader.h b/be/src/storage/lake/tablet_reader.h index febda5b31bf54..9288a7fa8de90 100644 --- a/be/src/storage/lake/tablet_reader.h +++ b/be/src/storage/lake/tablet_reader.h @@ -58,9 +58,10 @@ class TabletReader final : public ChunkIterator { TabletReader(TabletManager* tablet_mgr, std::shared_ptr metadata, Schema schema, bool need_split, bool could_split_physically); TabletReader(TabletManager* tablet_mgr, std::shared_ptr metadata, Schema schema, - std::vector rowsets); + std::vector rowsets, std::shared_ptr tablet_schema); TabletReader(TabletManager* tablet_mgr, std::shared_ptr metadata, Schema schema, - std::vector rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer); + std::vector rowsets, bool is_key, RowSourceMaskBuffer* mask_buffer, + std::shared_ptr tablet_schema); ~TabletReader() override; DISALLOW_COPY_AND_MOVE(TabletReader); diff --git a/be/src/storage/lake/txn_log_applier.cpp b/be/src/storage/lake/txn_log_applier.cpp index 581e5ec640f3e..528d5b38a62cf 100644 --- a/be/src/storage/lake/txn_log_applier.cpp +++ b/be/src/storage/lake/txn_log_applier.cpp @@ -44,9 +44,28 @@ Status apply_alter_meta_log(TabletMetadataPB* metadata, const TxnLogPB_OpAlterMe // But it will be remove from index cache after apply is finished (void)update_mgr->index_cache().try_remove_by_key(metadata->id()); } + // update tablet meta + // 1. rowset_to_schema is empty, maybe upgrade from old version or first time to do fast ddl. So we will + // add the tablet schema before alter into historical schema. + // 2. rowset_to_schema is not empty, no need to update historical schema because we historical schema already + // keep the tablet schema before alter. if (alter_meta.has_tablet_schema()) { VLOG(2) << "old schema: " << metadata->schema().DebugString() << " new schema: " << alter_meta.tablet_schema().DebugString(); + // add/drop field for struct column is under testing, To avoid impacting the existing logic, add the + // `lake_enable_alter_struct` configuration. Once testing is complete, this configuration will be removed. + if (config::lake_enable_alter_struct) { + if (metadata->rowset_to_schema().empty() && metadata->rowsets_size() > 0) { + metadata->mutable_historical_schemas()->clear(); + auto schema_id = metadata->schema().id(); + auto& item = (*metadata->mutable_historical_schemas())[schema_id]; + item.CopyFrom(metadata->schema()); + for (int i = 0; i < metadata->rowsets_size(); i++) { + (*metadata->mutable_rowset_to_schema())[metadata->rowsets(i).id()] = schema_id; + } + } + // no need to update + } metadata->mutable_schema()->CopyFrom(alter_meta.tablet_schema()); } } @@ -358,6 +377,15 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier { rowset->CopyFrom(op_write.rowset()); rowset->set_id(_metadata->next_rowset_id()); _metadata->set_next_rowset_id(_metadata->next_rowset_id() + std::max(1, rowset->segments_size())); + if (!_metadata->rowset_to_schema().empty()) { + auto schema_id = _metadata->schema().id(); + (*_metadata->mutable_rowset_to_schema())[rowset->id()] = schema_id; + // first rowset of latest schema + if (_metadata->historical_schemas().count(schema_id) <= 0) { + auto& item = (*_metadata->mutable_historical_schemas())[schema_id]; + item.CopyFrom(_metadata->schema()); + } + } } return Status::OK(); } @@ -404,6 +432,8 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier { } auto first_idx = static_cast(first_input_pos - _metadata->mutable_rowsets()->begin()); + bool has_output_rowset = false; + uint32_t output_rowset_id = 0; if (op_compaction.has_output_rowset() && op_compaction.output_rowset().num_rows() > 0) { // Replace the first input rowset with output rowset auto output_rowset = _metadata->mutable_rowsets(first_idx); @@ -411,10 +441,42 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier { output_rowset->set_id(_metadata->next_rowset_id()); _metadata->set_next_rowset_id(_metadata->next_rowset_id() + output_rowset->segments_size()); ++first_input_pos; + has_output_rowset = true; + output_rowset_id = output_rowset->id(); } // Erase input rowsets from _metadata _metadata->mutable_rowsets()->erase(first_input_pos, end_input_pos); + // Update historical schema and rowset schema id + if (!_metadata->rowset_to_schema().empty()) { + int64_t output_rowset_schema_id = _metadata->schema().id(); + if (has_output_rowset) { + auto last_rowset_id = op_compaction.input_rowsets(op_compaction.input_rowsets_size() - 1); + output_rowset_schema_id = _metadata->rowset_to_schema().at(last_rowset_id); + } + for (int i = 0; i < op_compaction.input_rowsets_size(); i++) { + _metadata->mutable_rowset_to_schema()->erase(op_compaction.input_rowsets(i)); + } + + if (has_output_rowset) { + (*_metadata->mutable_rowset_to_schema())[output_rowset_id] = output_rowset_schema_id; + } + + std::unordered_set schema_id; + for (auto& pair : _metadata->rowset_to_schema()) { + schema_id.insert(pair.second); + } + + for (auto it = _metadata->mutable_historical_schemas()->begin(); + it != _metadata->mutable_historical_schemas()->end();) { + if (schema_id.find(it->first) == schema_id.end()) { + it = _metadata->mutable_historical_schemas()->erase(it); + } else { + it++; + } + } + } + // Set new cumulative point uint32_t new_cumulative_point = 0; // size tiered compaction policy does not need cumulative point diff --git a/be/src/storage/lake/update_manager.cpp b/be/src/storage/lake/update_manager.cpp index b13a5aa947405..30f30ad72154b 100644 --- a/be/src/storage/lake/update_manager.cpp +++ b/be/src/storage/lake/update_manager.cpp @@ -32,6 +32,7 @@ #include "storage/rowset/default_value_column_iterator.h" #include "storage/tablet_manager.h" #include "testutil/sync_point.h" +#include "util/failpoint/fail_point.h" #include "util/pretty_printer.h" #include "util/trace.h" @@ -186,11 +187,16 @@ void UpdateManager::unload_and_remove_primary_index(int64_t tablet_id) { } } +DEFINE_FAIL_POINT(hook_publish_primary_key_tablet); // |metadata| contain last tablet meta info with new version Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_write, int64_t txn_id, const TabletMetadataPtr& metadata, Tablet* tablet, IndexEntry* index_entry, MetaFileBuilder* builder, int64_t base_version) { + FAIL_POINT_TRIGGER_EXECUTE(hook_publish_primary_key_tablet, { + builder->apply_opwrite(op_write, {}, {}); + return Status::OK(); + }); auto& index = index_entry->value(); // 1. load rowset update data to cache, get upsert and delete list const uint32_t rowset_id = metadata->next_rowset_id(); @@ -697,7 +703,10 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti int64_t base_version) { // 1. init some state auto& index = index_entry->value(); - std::shared_ptr tablet_schema = std::make_shared(metadata.schema()); + std::vector input_rowsets_id(op_compaction.input_rowsets().begin(), op_compaction.input_rowsets().end()); + ASSIGN_OR_RETURN(auto tablet_schema, ExecEnv::GetInstance()->lake_tablet_manager()->get_output_rowset_schema( + input_rowsets_id, &metadata)); + Rowset output_rowset(tablet.tablet_mgr(), tablet.id(), &op_compaction.output_rowset(), -1 /*unused*/, tablet_schema); vector> delvecs; @@ -728,10 +737,16 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti return Status::OK(); } +DEFINE_FAIL_POINT(hook_publish_primary_key_tablet_compaction); Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, int64_t txn_id, const TabletMetadata& metadata, const Tablet& tablet, IndexEntry* index_entry, MetaFileBuilder* builder, int64_t base_version) { + FAIL_POINT_TRIGGER_EXECUTE(hook_publish_primary_key_tablet_compaction, { + builder->apply_opcompaction(op_compaction, *std::max_element(op_compaction.input_rowsets().begin(), + op_compaction.input_rowsets().end())); + return Status::OK(); + }); if (CompactionUpdateConflictChecker::conflict_check(op_compaction, txn_id, metadata, builder)) { // conflict happens return Status::OK(); @@ -742,7 +757,9 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op } auto& index = index_entry->value(); // 1. iterate output rowset, update primary index and generate delvec - std::shared_ptr tablet_schema = std::make_shared(metadata.schema()); + std::vector input_rowsets_id(op_compaction.input_rowsets().begin(), op_compaction.input_rowsets().end()); + ASSIGN_OR_RETURN(auto tablet_schema, ExecEnv::GetInstance()->lake_tablet_manager()->get_output_rowset_schema( + input_rowsets_id, &metadata)); Rowset output_rowset(tablet.tablet_mgr(), tablet.id(), &op_compaction.output_rowset(), -1 /*unused*/, tablet_schema); auto compaction_entry = _compaction_cache.get_or_create(cache_key(tablet.id(), txn_id)); diff --git a/be/src/storage/lake/versioned_tablet.cpp b/be/src/storage/lake/versioned_tablet.cpp index 0dabb13dc3fad..ee486d30bd056 100644 --- a/be/src/storage/lake/versioned_tablet.cpp +++ b/be/src/storage/lake/versioned_tablet.cpp @@ -38,7 +38,12 @@ int64_t VersionedTablet::version() const { StatusOr> VersionedTablet::new_writer(WriterType type, int64_t txn_id, uint32_t max_rows_per_segment, ThreadPool* flush_pool, bool is_compaction) { - auto tablet_schema = get_schema(); + return new_writer_with_schema(type, txn_id, max_rows_per_segment, flush_pool, is_compaction, get_schema()); +} + +StatusOr> VersionedTablet::new_writer_with_schema( + WriterType type, int64_t txn_id, uint32_t max_rows_per_segment, ThreadPool* flush_pool, bool is_compaction, + const std::shared_ptr& tablet_schema) { if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) { if (type == kHorizontal) { return std::make_unique(_tablet_mgr, id(), tablet_schema, txn_id, flush_pool, diff --git a/be/src/storage/lake/versioned_tablet.h b/be/src/storage/lake/versioned_tablet.h index 2477552fb38b3..96214ceed2cec 100644 --- a/be/src/storage/lake/versioned_tablet.h +++ b/be/src/storage/lake/versioned_tablet.h @@ -67,6 +67,11 @@ class VersionedTablet { RowsetList get_rowsets() const; // `segment_max_rows` is used in vertical writer + // create a tablet writer with given `tablet_schema` + StatusOr> new_writer_with_schema( + WriterType type, int64_t txn_id, uint32_t max_rows_per_segment, ThreadPool* flush_pool, bool is_compaction, + const std::shared_ptr& tablet_schema); + StatusOr> new_writer(WriterType type, int64_t txn_id, uint32_t max_rows_per_segment = 0, ThreadPool* flush_pool = nullptr, bool is_compaction = false); diff --git a/be/src/storage/lake/vertical_compaction_task.cpp b/be/src/storage/lake/vertical_compaction_task.cpp index e4c44741000c1..52d538c0a32aa 100644 --- a/be/src/storage/lake/vertical_compaction_task.cpp +++ b/be/src/storage/lake/vertical_compaction_task.cpp @@ -34,7 +34,6 @@ namespace starrocks::lake { Status VerticalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush_pool) { SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get()); - _tablet_schema = _tablet.get_schema(); for (auto& rowset : _input_rowsets) { _total_num_rows += rowset->num_rows(); _total_data_size += rowset->data_size(); @@ -48,8 +47,8 @@ Status VerticalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush uint32_t max_rows_per_segment = CompactionUtils::get_segment_max_rows(config::max_segment_file_size, _total_num_rows, _total_data_size); - ASSIGN_OR_RETURN(auto writer, _tablet.new_writer(kVertical, _txn_id, max_rows_per_segment, flush_pool, - true /** is compaction**/)); + ASSIGN_OR_RETURN(auto writer, _tablet.new_writer_with_schema(kVertical, _txn_id, max_rows_per_segment, flush_pool, + true /** is compaction**/, _tablet_schema)); RETURN_IF_ERROR(writer->open()); DeferOp defer([&]() { writer->close(); }); @@ -156,7 +155,8 @@ Status VerticalCompactionTask::compact_column_group(bool is_key, int column_grou ? ChunkHelper::convert_schema(_tablet_schema, column_group) : ChunkHelper::get_sort_key_schema(_tablet_schema)) : ChunkHelper::convert_schema(_tablet_schema, column_group); - TabletReader reader(_tablet.tablet_manager(), _tablet.metadata(), schema, _input_rowsets, is_key, mask_buffer); + TabletReader reader(_tablet.tablet_manager(), _tablet.metadata(), schema, _input_rowsets, is_key, mask_buffer, + _tablet_schema); RETURN_IF_ERROR(reader.prepare()); TabletReaderParams reader_params; reader_params.reader_type = READER_CUMULATIVE_COMPACTION; diff --git a/be/src/storage/lake/vertical_compaction_task.h b/be/src/storage/lake/vertical_compaction_task.h index 3ba9e400a4fd7..91acad908a910 100644 --- a/be/src/storage/lake/vertical_compaction_task.h +++ b/be/src/storage/lake/vertical_compaction_task.h @@ -34,8 +34,8 @@ class TabletWriter; class VerticalCompactionTask : public CompactionTask { public: explicit VerticalCompactionTask(VersionedTablet tablet, std::vector> input_rowsets, - CompactionTaskContext* context) - : CompactionTask(std::move(tablet), std::move(input_rowsets), context) {} + CompactionTaskContext* context, std::shared_ptr tablet_schema) + : CompactionTask(std::move(tablet), std::move(input_rowsets), context, std::move(tablet_schema)) {} ~VerticalCompactionTask() override = default; @@ -48,7 +48,6 @@ class VerticalCompactionTask : public CompactionTask { const std::vector& column_group, std::unique_ptr& writer, RowSourceMaskBuffer* mask_buffer, std::vector* source_masks, const CancelFunc& cancel_func); - std::shared_ptr _tablet_schema; int64_t _total_num_rows = 0; int64_t _total_data_size = 0; int64_t _total_input_segs = 0; diff --git a/be/test/exprs/like_test.cpp b/be/test/exprs/like_test.cpp index 37b718a40a510..0951923dd424b 100644 --- a/be/test/exprs/like_test.cpp +++ b/be/test/exprs/like_test.cpp @@ -694,5 +694,4 @@ TEST_F(LikeTest, splitLikePatternIntoNgramSet) { VectorizedFunctionCallExpr::split_like_string_to_ngram(pattern, options, ngram_set); ASSERT_EQ(0, ngram_set.size()); } - } // namespace starrocks diff --git a/be/test/storage/lake/alter_tablet_meta_test.cpp b/be/test/storage/lake/alter_tablet_meta_test.cpp index 61b3076c94b61..9ff589274c91d 100644 --- a/be/test/storage/lake/alter_tablet_meta_test.cpp +++ b/be/test/storage/lake/alter_tablet_meta_test.cpp @@ -18,10 +18,13 @@ #include "fs/fs_util.h" #include "storage/chunk_helper.h" #include "storage/lake/schema_change.h" +#include "storage/lake/tablet.h" #include "storage/lake/tablet_manager.h" #include "storage/lake/tablet_writer.h" +#include "storage/lake/txn_log_applier.h" #include "test_util.h" #include "testutil/id_generator.h" +#include "util/failpoint/fail_point.h" namespace starrocks::lake { @@ -43,6 +46,8 @@ class AlterTabletMetaTest : public TestBase { void TearDown() override { remove_test_dir_ignore_error(); } + void test_alter_update_tablet_schema(KeysType keys_type); + protected: constexpr static const char* const kTestDirectory = "test_alter_tablet_meta"; @@ -181,4 +186,282 @@ TEST_F(AlterTabletMetaTest, test_alter_enable_persistent_index_not_change) { ASSERT_EQ(true, new_tablet_meta2.value()->enable_persistent_index()); } +void AlterTabletMetaTest::test_alter_update_tablet_schema(KeysType keys_type) { + std::shared_ptr tablet_metadata = generate_simple_tablet_metadata(keys_type); + auto rs1 = tablet_metadata->add_rowsets(); + rs1->set_id(next_id()); + + // write new rowset + { + TxnLogPB log; + auto op_write_meta = log.mutable_op_write(); + auto rs_meta = op_write_meta->mutable_rowset(); + rs_meta->set_id(next_id()); + rs_meta->set_num_rows(10); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 0); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 0); + ASSERT_TRUE(tablet_metadata->rowsets_size() == 2); + } + + // update meta + auto schema_id1 = tablet_metadata->schema().id(); + { + TxnLogPB log; + auto alter_metadata = log.mutable_op_alter_metadata(); + auto update_info = alter_metadata->add_metadata_update_infos(); + auto tablet_schema_pb = update_info->mutable_tablet_schema(); + tablet_schema_pb->CopyFrom(tablet_metadata->schema()); + tablet_schema_pb->set_id(next_id()); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id1); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 1); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + } + + // add rowset + auto schema_id2 = tablet_metadata->schema().id(); + { + TxnLogPB log; + auto op_write_meta = log.mutable_op_write(); + auto rs_meta = op_write_meta->mutable_rowset(); + rs_meta->set_id(next_id()); + rs_meta->set_num_rows(10); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + auto rowset_id2 = tablet_metadata->rowsets(2).id(); + ASSERT_TRUE(tablet_metadata->rowsets_size() == 3); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 3); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id2) > 0); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id2) == schema_id2); + } + + // update meta + { + TxnLogPB log; + auto alter_metadata = log.mutable_op_alter_metadata(); + auto update_info = alter_metadata->add_metadata_update_infos(); + auto tablet_schema_pb = update_info->mutable_tablet_schema(); + tablet_schema_pb->CopyFrom(tablet_metadata->schema()); + tablet_schema_pb->set_id(next_id()); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + auto rowset_id2 = tablet_metadata->rowsets(2).id(); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 3); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id2) > 0); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id2) == schema_id2); + } + + // compaction + { + TxnLogPB log; + auto op_compaction_meta = log.mutable_op_compaction(); + op_compaction_meta->add_input_rowsets(tablet_metadata->rowsets(1).id()); + op_compaction_meta->add_input_rowsets(tablet_metadata->rowsets(2).id()); + auto rs_meta = op_compaction_meta->mutable_output_rowset(); + auto rs_id = next_id(); + rs_meta->set_id(rs_id); + rs_meta->set_num_rows(10); + rs_meta->add_segments("segment1"); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + ASSERT_TRUE(tablet_metadata->rowsets_size() == 2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id2) > 0); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id2); + } + + // compaction one rowset + { + TxnLogPB log; + auto op_compaction_meta = log.mutable_op_compaction(); + + int32_t input_rowset_idx = 0; + auto input_rs = tablet_metadata->mutable_rowsets(input_rowset_idx); + input_rs->set_num_rows(0); + op_compaction_meta->add_input_rowsets(input_rs->id()); + tablet_metadata->mutable_rowsets(0)->clear_segments(); + tablet_metadata->mutable_rowsets(1)->clear_segments(); + + auto rs_meta = op_compaction_meta->mutable_output_rowset(); + rs_meta->set_id(next_id()); + rs_meta->set_num_rows(10); + rs_meta->add_segments("segment1"); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + ASSERT_TRUE(tablet_metadata->rowsets_size() == 2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id2) > 0); + if (keys_type == PRIMARY_KEYS) { + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id1); + } else { + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id2); + } + } + + auto schema_id3 = tablet_metadata->schema().id(); + { + TxnLogPB log; + auto op_write_meta = log.mutable_op_write(); + auto rs_meta = op_write_meta->mutable_rowset(); + rs_meta->set_id(next_id()); + rs_meta->set_num_rows(10); + + tablet_metadata->mutable_rowsets(0)->clear_segments(); + tablet_metadata->mutable_rowsets(1)->clear_segments(); + tablet_metadata->mutable_rowsets(0)->set_num_rows(0); + tablet_metadata->mutable_rowsets(1)->set_num_rows(0); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + auto rowset_id2 = tablet_metadata->rowsets(2).id(); + ASSERT_TRUE(tablet_metadata->rowsets_size() == 3); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 3); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 3); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id2) > 0); + if (keys_type == PRIMARY_KEYS) { + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id1); + } else { + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id2); + } + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id2) == schema_id3); + } + + { + TxnLogPB log; + auto op_compaction_meta = log.mutable_op_compaction(); + op_compaction_meta->add_input_rowsets(tablet_metadata->rowsets(1).id()); + op_compaction_meta->add_input_rowsets(tablet_metadata->rowsets(2).id()); + auto rs_meta = op_compaction_meta->mutable_output_rowset(); + auto rs_id = next_id(); + rs_meta->set_id(rs_id); + rs_meta->set_num_rows(10); + rs_meta->add_segments("segment1"); + + tablet_metadata->mutable_rowsets(0)->clear_segments(); + tablet_metadata->mutable_rowsets(1)->clear_segments(); + tablet_metadata->mutable_rowsets(2)->clear_segments(); + tablet_metadata->mutable_rowsets(0)->set_num_rows(0); + tablet_metadata->mutable_rowsets(1)->set_num_rows(0); + tablet_metadata->mutable_rowsets(2)->set_num_rows(0); + + auto tablet_id = tablet_metadata->id(); + auto version = tablet_metadata->version() + 1; + std::unique_ptr log_applier = + new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version); + + ASSERT_OK(log_applier->apply(log)); + auto rowset_id0 = tablet_metadata->rowsets(0).id(); + auto rowset_id1 = tablet_metadata->rowsets(1).id(); + ASSERT_TRUE(tablet_metadata->rowsets_size() == 2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().size() == 2); + ASSERT_TRUE(tablet_metadata->historical_schemas().size() == 2); + if (keys_type == PRIMARY_KEYS) { + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id2) > 0); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id2); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id3); + } else { + ASSERT_TRUE(tablet_metadata->historical_schemas().count(schema_id1) > 0); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id0) == schema_id1); + ASSERT_TRUE(tablet_metadata->rowset_to_schema().at(rowset_id1) == schema_id3); + } + } +} + +TEST_F(AlterTabletMetaTest, test_alter_non_pk_update_tablet_schema) { + test_alter_update_tablet_schema(DUP_KEYS); +} + +TEST_F(AlterTabletMetaTest, test_alter_pk_update_tablet_schema) { + PFailPointTriggerMode trigger_mode; + trigger_mode.set_mode(FailPointTriggerModeType::ENABLE); + // enable hook_publish_primary_key_tablet + auto fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("hook_publish_primary_key_tablet"); + fp->setMode(trigger_mode); + + fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("hook_publish_primary_key_tablet_compaction"); + fp->setMode(trigger_mode); + + test_alter_update_tablet_schema(PRIMARY_KEYS); + + // disable hook_publish_primary_key_tablet + trigger_mode.set_mode(FailPointTriggerModeType::DISABLE); + fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("hook_publish_primary_key_tablet"); + fp->setMode(trigger_mode); + + fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("hook_publish_primary_key_tablet_compaction"); + fp->setMode(trigger_mode); +} + } // namespace starrocks::lake diff --git a/be/test/storage/lake/tablet_manager_test.cpp b/be/test/storage/lake/tablet_manager_test.cpp index 8f7d778092e16..b42a7c8940491 100644 --- a/be/test/storage/lake/tablet_manager_test.cpp +++ b/be/test/storage/lake/tablet_manager_test.cpp @@ -31,6 +31,7 @@ #include "storage/lake/versioned_tablet.h" #include "storage/options.h" #include "storage/tablet_schema.h" +#include "test_util.h" #include "testutil/assert.h" #include "testutil/id_generator.h" #include "util/bthreads/util.h" @@ -735,6 +736,76 @@ TEST_F(LakeTabletManagerTest, test_in_writing_data_size) { ASSERT_EQ(_tablet_manager->in_writing_data_size(1), 0); } +TEST_F(LakeTabletManagerTest, test_get_output_rorwset_schema) { + std::shared_ptr tablet_metadata = lake::generate_simple_tablet_metadata(DUP_KEYS); + for (int i = 0; i < 5; i++) { + auto rs = tablet_metadata->add_rowsets(); + rs->set_id(next_id()); + } + + // set historical schema + auto schema_id1 = next_id(); + auto& schema_pb1 = (*tablet_metadata->mutable_historical_schemas())[schema_id1]; + schema_pb1.set_id(schema_id1); + + auto schema_id2 = next_id(); + auto& schema_pb2 = (*tablet_metadata->mutable_historical_schemas())[schema_id2]; + schema_pb2.set_id(schema_id2); + + auto schema_id3 = tablet_metadata->schema().id(); + auto& schema_pb3 = (*tablet_metadata->mutable_historical_schemas())[schema_id3]; + schema_pb3.set_id(schema_id3); + + (*tablet_metadata->mutable_rowset_to_schema())[tablet_metadata->rowsets(0).id()] = schema_id3; + (*tablet_metadata->mutable_rowset_to_schema())[tablet_metadata->rowsets(1).id()] = schema_id1; + (*tablet_metadata->mutable_rowset_to_schema())[tablet_metadata->rowsets(2).id()] = schema_id3; + (*tablet_metadata->mutable_rowset_to_schema())[tablet_metadata->rowsets(3).id()] = schema_id2; + (*tablet_metadata->mutable_rowset_to_schema())[tablet_metadata->rowsets(4).id()] = schema_id3; + lake::VersionedTablet tablet(_tablet_manager, tablet_metadata); + + { + for (int i = 0; i < 5; i++) { + std::vector input_rowsets; + input_rowsets.emplace_back(tablet_metadata->rowsets(i).id()); + auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get()); + ASSERT_TRUE(res.ok()); + auto schema_id = tablet_metadata->rowset_to_schema().at(tablet_metadata->rowsets(i).id()); + ASSERT_EQ(res.value()->id(), schema_id); + } + } + + auto rs1 = std::make_shared(_tablet_manager, tablet_metadata, 0); + auto rs2 = std::make_shared(_tablet_manager, tablet_metadata, 1); + auto rs3 = std::make_shared(_tablet_manager, tablet_metadata, 2); + auto rs4 = std::make_shared(_tablet_manager, tablet_metadata, 3); + auto rs5 = std::make_shared(_tablet_manager, tablet_metadata, 4); + + { + std::vector input_rowsets; + input_rowsets.emplace_back(tablet_metadata->rowsets(0).id()); + input_rowsets.emplace_back(tablet_metadata->rowsets(1).id()); + auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get()); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(res.value()->id(), schema_id1); + + input_rowsets.emplace_back(tablet_metadata->rowsets(2).id()); + res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get()); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(res.value()->id(), tablet_metadata->schema().id()); + } + + { + tablet_metadata->mutable_rowset_to_schema()->clear(); + for (int i = 0; i < 5; i++) { + std::vector input_rowsets; + input_rowsets.emplace_back(tablet_metadata->rowsets(i).id()); + auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get()); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(res.value()->id(), tablet_metadata->schema().id()); + } + } +} + #endif // USE_STAROS } // namespace starrocks diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java index cda592d2b5f0a..bc93c418c857a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java @@ -1912,7 +1912,7 @@ public int getAsInt() { // modify column fastSchemaEvolution &= processModifyColumn(modifyColumnClause, olapTable, indexSchemaMap); } else if (alterClause instanceof AddFieldClause) { - if (RunMode.isSharedDataMode()) { + if (RunMode.isSharedDataMode() && !Config.enable_alter_struct_column) { throw new DdlException("Add field for struct column not support shared-data mode so far"); } if (!fastSchemaEvolution) { @@ -1921,11 +1921,10 @@ public int getAsInt() { AddFieldClause addFieldClause = (AddFieldClause) alterClause; modifyFieldColumns = Set.of(addFieldClause.getColName()); checkModifiedColumWithMaterializedViews(olapTable, modifyFieldColumns); - int id = colUniqueIdSupplier.getAsInt(); processAddField((AddFieldClause) alterClause, olapTable, indexSchemaMap, id, newIndexes); } else if (alterClause instanceof DropFieldClause) { - if (RunMode.isSharedDataMode()) { + if (RunMode.isSharedDataMode() && !Config.enable_alter_struct_column) { throw new DdlException("Drop field for struct column not support shared-data mode so far"); } if (!fastSchemaEvolution) { diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index bec9cf504d12e..4caaeacf85bff 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -3071,4 +3071,7 @@ public class Config extends ConfigBase { // backuped table is colocated @ConfField(mutable = true) public static boolean enable_colocate_restore = false; + + @ConfField + public static boolean enable_alter_struct_column = true; } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java index f7af108e60f8b..348b531092b51 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java @@ -725,7 +725,7 @@ public Void visitAddFieldClause(AddFieldClause clause, ConnectContext context) { throw new SemanticException(PARSER_ERROR_MSG.invalidColFormat(columnName)); } - if (!table.isOlapTable()) { + if (!table.isOlapTable() && !table.isCloudNativeTable()) { throw new SemanticException("Add field only support olap table"); } @@ -746,7 +746,7 @@ public Void visitDropFieldClause(DropFieldClause clause, ConnectContext context) throw new SemanticException(PARSER_ERROR_MSG.invalidColFormat(columnName)); } - if (!table.isOlapTable()) { + if (!table.isOlapTable() && !table.isCloudNativeTable()) { throw new SemanticException("Drop field only support olap table"); } diff --git a/gensrc/proto/lake_types.proto b/gensrc/proto/lake_types.proto index 00f0b2ab0bf24..55f88ca14f1d6 100644 --- a/gensrc/proto/lake_types.proto +++ b/gensrc/proto/lake_types.proto @@ -141,6 +141,13 @@ message TabletMetadataPB { optional TabletSchemaPB source_schema = 14; optional PersistentIndexSstableMetaPB sstable_meta = 15; optional DeltaColumnGroupMetadataPB dcg_meta = 16; + // Keep the tablet schema before each Alter operation + // If no rowset use the tablet schema any more, it will be gc by compaction + // The latest schema is also saved here, and it is same with the above schema + // It can be removed in future versions + map historical_schemas = 17; + // rowset_id -> schema_id + map rowset_to_schema = 18; } message MetadataUpdateInfoPB {