Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: sevev <[email protected]>
  • Loading branch information
sevev committed Jul 30, 2024
1 parent 7f7a679 commit 4ef7622
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 55 deletions.
31 changes: 23 additions & 8 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,24 +583,35 @@ StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema_by_id(int64_t tablet_
// So we will support a part of segments in one rowset to do compaction in the future.
// To keep the consistence of all segments in one rowset, we will use the last rowset tablet schema as the
// output rowset schema. This is because the last rowset may only have part of the segment merged.
StatusOr<TabletSchemaPtr> TabletManager::get_output_rowset_schema(std::vector<RowsetPtr>& input_rowset,
VersionedTablet& tablet) {
const auto& metadata = tablet.metadata();
StatusOr<TabletSchemaPtr> TabletManager::get_output_rowset_schema(std::vector<uint32_t>& input_rowset,
const TabletMetadata* metadata) {
if (metadata->rowset_to_schema().empty() || input_rowset.size() <= 0) {
return tablet.get_schema();
return GlobalTabletSchemaMap::Instance()->emplace(metadata->schema()).first;
}
TabletSchemaPtr tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(metadata->schema()).first;
struct Finder {
uint32_t id;
bool operator()(const RowsetMetadata& r) const { return r.id() == id; }
};

auto input_id = input_rowset[input_rowset.size() - 1]->id();
auto input_id = input_rowset[input_rowset.size() - 1];
auto iter = std::find_if(metadata->rowsets().begin(), metadata->rowsets().end(), Finder{input_id});
if (UNLIKELY(iter == metadata->rowsets().end())) {
return Status::InternalError(fmt::format("input rowset {} not found", input_id));
}
auto schema_id = metadata->rowset_to_schema().at(input_id);
return GlobalTabletSchemaMap::Instance()->emplace(metadata->historical_schemas().at(schema_id)).first;

auto rowset_it = metadata->rowset_to_schema().find(input_id);
if (rowset_it != metadata->rowset_to_schema().end()) {
auto schema_it = metadata->historical_schemas().find(rowset_it->second);
if (schema_it != metadata->historical_schemas().end()) {
tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(schema_it->second).first;
} else {
return Status::InternalError(fmt::format("can not find output rowset schema, id {}", rowset_it->second));
}
} else {
return Status::InternalError(fmt::format("input rowset {} not exist in rowset_to_schema", input_id));
}
return tablet_schema;
}

StatusOr<CompactionTaskPtr> TabletManager::compact(CompactionTaskContext* context) {
Expand All @@ -609,7 +620,11 @@ StatusOr<CompactionTaskPtr> TabletManager::compact(CompactionTaskContext* contex
ASSIGN_OR_RETURN(auto compaction_policy, CompactionPolicy::create(this, tablet_metadata));
ASSIGN_OR_RETURN(auto input_rowsets, compaction_policy->pick_rowsets());
ASSIGN_OR_RETURN(auto algorithm, compaction_policy->choose_compaction_algorithm(input_rowsets));
ASSIGN_OR_RETURN(auto tablet_schema, get_output_rowset_schema(input_rowsets, tablet));
std::vector<uint32_t> input_rowsets_id;
for (auto& rowset : input_rowsets) {
input_rowsets_id.emplace_back(rowset->id());
}
ASSIGN_OR_RETURN(auto tablet_schema, get_output_rowset_schema(input_rowsets_id, tablet_metadata.get()));
if (algorithm == VERTICAL_COMPACTION) {
return std::make_shared<VerticalCompactionTask>(std::move(tablet), std::move(input_rowsets), context,
std::move(tablet_schema));
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/lake/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class TabletManager {

StatusOr<TxnLogPtr> get_txn_vlog(const std::string& path, bool fill_cache = true);

StatusOr<TabletSchemaPtr> get_output_rowset_schema(std::vector<RowsetPtr>& input_rowset, VersionedTablet& tablet);
StatusOr<TabletSchemaPtr> get_output_rowset_schema(std::vector<uint32_t>& input_rowset,
const TabletMetadata* metadata);

#ifdef USE_STAROS
bool is_tablet_in_worker(int64_t tablet_id);
Expand Down
9 changes: 5 additions & 4 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ Status apply_alter_meta_log(TabletMetadataPB* metadata, const TxnLogPB_OpAlterMe
(void)update_mgr->index_cache().try_remove_by_key(metadata->id());
}
// update tablet meta
// 1. rowset_to_schema is empty, maybe upgrade from old version or first time to do fast ddl
// 2. rowset_to_schema is not empty, add the origin tablet schema into historical_schemas and update the
// rowset_to_schema
// 1. rowset_to_schema is empty, maybe upgrade from old version or first time to do fast ddl. So we will
// add the tablet schema before alter into historical schema.
// 2. rowset_to_schema is not empty, no need to update historical schema because we historical schema already
// keep the tablet schema before alter.
if (alter_meta.has_tablet_schema()) {
VLOG(2) << "old schema: " << metadata->schema().DebugString()
<< " new schema: " << alter_meta.tablet_schema().DebugString();
// add/drop field for struct column is under testing, To avoid impacting the existing logic, add the
// `lake_enable_alter_struct` configuration. Once testing is complete, this configuration will be removed.
if (config::lake_enable_alter_struct) {
if (metadata->rowset_to_schema().empty()) {
if (metadata->rowset_to_schema().empty() && metadata->rowsets_size() > 0) {
metadata->mutable_historical_schemas()->clear();
auto schema_id = metadata->schema().id();
auto& item = (*metadata->mutable_historical_schemas())[schema_id];
Expand Down
30 changes: 7 additions & 23 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,27 +684,6 @@ size_t UpdateManager::get_rowset_num_deletes(int64_t tablet_id, int64_t version,
return num_dels;
}

StatusOr<TabletSchemaPtr> UpdateManager::get_output_rowset_schema(const TabletMetadata& metadata,
const TxnLogPB_OpCompaction& op_compaction) {
TabletSchemaPtr tablet_schema = std::make_shared<TabletSchema>(metadata.schema());
if (op_compaction.input_rowsets_size() != 0 && !metadata.rowset_to_schema().empty()) {
auto last_rowset_id = op_compaction.input_rowsets(op_compaction.input_rowsets_size() - 1);
auto rowset_it = metadata.rowset_to_schema().find(last_rowset_id);
if (rowset_it != metadata.rowset_to_schema().end()) {
auto schema_it = metadata.historical_schemas().find(rowset_it->second);
if (schema_it != metadata.historical_schemas().end()) {
tablet_schema = GlobalTabletSchemaMap::Instance()->emplace(schema_it->second).first;
} else {
return Status::InternalError(
fmt::format("can not find output rowset schema, id {}", rowset_it->second));
}
} else {
return Status::InternalError(fmt::format("input rowset {} not exist", last_rowset_id));
}
}
return tablet_schema;
}

bool UpdateManager::_use_light_publish_primary_compaction(int64_t tablet_id, int64_t txn_id) {
// Is config enable ?
if (!config::enable_light_pk_compaction_publish) {
Expand All @@ -724,7 +703,10 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti
int64_t base_version) {
// 1. init some state
auto& index = index_entry->value();
ASSIGN_OR_RETURN(auto tablet_schema, get_output_rowset_schema(metadata, op_compaction));
std::vector<uint32_t> input_rowsets_id(op_compaction.input_rowsets().begin(), op_compaction.input_rowsets().end());
ASSIGN_OR_RETURN(auto tablet_schema, ExecEnv::GetInstance()->lake_tablet_manager()->get_output_rowset_schema(
input_rowsets_id, &metadata));

Rowset output_rowset(tablet.tablet_mgr(), tablet.id(), &op_compaction.output_rowset(), -1 /*unused*/,
tablet_schema);
vector<std::pair<uint32_t, DelVectorPtr>> delvecs;
Expand Down Expand Up @@ -775,7 +757,9 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op
}
auto& index = index_entry->value();
// 1. iterate output rowset, update primary index and generate delvec
ASSIGN_OR_RETURN(auto tablet_schema, get_output_rowset_schema(metadata, op_compaction));
std::vector<uint32_t> input_rowsets_id(op_compaction.input_rowsets().begin(), op_compaction.input_rowsets().end());
ASSIGN_OR_RETURN(auto tablet_schema, ExecEnv::GetInstance()->lake_tablet_manager()->get_output_rowset_schema(
input_rowsets_id, &metadata));
Rowset output_rowset(tablet.tablet_mgr(), tablet.id(), &op_compaction.output_rowset(), -1 /*unused*/,
tablet_schema);
auto compaction_entry = _compaction_cache.get_or_create(cache_key(tablet.id(), txn_id));
Expand Down
3 changes: 0 additions & 3 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ class UpdateManager {
// get del nums from rowset, for compaction policy
size_t get_rowset_num_deletes(int64_t tablet_id, int64_t version, const RowsetMetadataPB& rowset_meta);

StatusOr<TabletSchemaPtr> get_output_rowset_schema(const TabletMetadata& metadata,
const TxnLogPB_OpCompaction& op_compaction);

Status publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, int64_t txn_id,
const TabletMetadata& metadata, const Tablet& tablet, IndexEntry* index_entry,
MetaFileBuilder* builder, int64_t base_version);
Expand Down
3 changes: 1 addition & 2 deletions be/test/exprs/like_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LikeTest : public ::testing::Test {
public:
TExprNode expr_node;
};
/*

TEST_F(LikeTest, startConstPatternLike) {
auto context = FunctionContext::create_test_context();
std::unique_ptr<FunctionContext> ctx(context);
Expand Down Expand Up @@ -694,5 +694,4 @@ TEST_F(LikeTest, splitLikePatternIntoNgramSet) {
VectorizedFunctionCallExpr::split_like_string_to_ngram(pattern, options, ngram_set);
ASSERT_EQ(0, ngram_set.size());
}
*/
} // namespace starrocks
26 changes: 12 additions & 14 deletions be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,10 +765,9 @@ TEST_F(LakeTabletManagerTest, test_get_output_rorwset_schema) {

{
for (int i = 0; i < 5; i++) {
std::vector<lake::RowsetPtr> input_rowsets;
auto rs = std::make_shared<lake::Rowset>(_tablet_manager, tablet_metadata, i);
input_rowsets.emplace_back(std::move(rs));
auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet);
std::vector<uint32_t> input_rowsets;
input_rowsets.emplace_back(tablet_metadata->rowsets(i).id());
auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get());
ASSERT_TRUE(res.ok());
auto schema_id = tablet_metadata->rowset_to_schema().at(tablet_metadata->rowsets(i).id());
ASSERT_EQ(res.value()->id(), schema_id);
Expand All @@ -782,26 +781,25 @@ TEST_F(LakeTabletManagerTest, test_get_output_rorwset_schema) {
auto rs5 = std::make_shared<lake::Rowset>(_tablet_manager, tablet_metadata, 4);

{
std::vector<lake::RowsetPtr> input_rowsets;
input_rowsets.emplace_back(rs1);
input_rowsets.emplace_back(rs2);
auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet);
std::vector<uint32_t> input_rowsets;
input_rowsets.emplace_back(tablet_metadata->rowsets(0).id());
input_rowsets.emplace_back(tablet_metadata->rowsets(1).id());
auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get());
ASSERT_TRUE(res.ok());
ASSERT_EQ(res.value()->id(), schema_id1);

input_rowsets.emplace_back(rs3);
res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet);
input_rowsets.emplace_back(tablet_metadata->rowsets(2).id());
res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get());
ASSERT_TRUE(res.ok());
ASSERT_EQ(res.value()->id(), tablet_metadata->schema().id());
}

{
tablet_metadata->mutable_rowset_to_schema()->clear();
for (int i = 0; i < 5; i++) {
std::vector<lake::RowsetPtr> input_rowsets;
auto rs = std::make_shared<lake::Rowset>(_tablet_manager, tablet_metadata, i);
input_rowsets.emplace_back(std::move(rs));
auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet);
std::vector<uint32_t> input_rowsets;
input_rowsets.emplace_back(tablet_metadata->rowsets(i).id());
auto res = _tablet_manager->get_output_rowset_schema(input_rowsets, tablet_metadata.get());
ASSERT_TRUE(res.ok());
ASSERT_EQ(res.value()->id(), tablet_metadata->schema().id());
}
Expand Down

0 comments on commit 4ef7622

Please sign in to comment.