Skip to content

Commit

Permalink
[Enhancement] Add some util in script for TDE debug; Fix memleak in w…
Browse files Browse the repository at this point in the history
…renbind (#51813)

Add some util in script for TDE debug;
Add more info in segment load error log;
Fix memleak in wrenbind;

Signed-off-by: Binglin Chang <[email protected]>
(cherry picked from commit e837fe3)
  • Loading branch information
decster committed Oct 15, 2024
1 parent 1ec2953 commit 107ba81
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 13 deletions.
22 changes: 22 additions & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "storage/del_vector.h"
#include "storage/lake/tablet.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/primary_key_dump.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
#include "storage/tablet_meta_manager.h"
#include "storage/tablet_updates.h"
#include "util/stack_util.h"
#include "util/url_coding.h"
#include "wrenbind17/wrenbind17.hpp"

using namespace wrenbind17;
Expand Down Expand Up @@ -282,6 +286,22 @@ class StorageEngineRef {
return ptr;
}

static std::string get_lake_tablet_metadata_json(int64_t tablet_id, int64_t version) {
auto tablet_manager = ExecEnv::GetInstance()->lake_tablet_manager();
RETURN_IF(nullptr == tablet_manager, "");
auto meta_st = tablet_manager->get_tablet_metadata(tablet_id, version, false);
RETURN_IF(!meta_st.ok(), meta_st.status().to_string());
return proto_to_json(*meta_st.value());
}

static std::string decode_encryption_meta(const std::string& meta_base64) {
EncryptionMetaPB pb;
std::string meta_bytes;
RETURN_IF(!base64_decode(meta_base64, &meta_bytes), "bad base64 string");
RETURN_IF(!pb.ParseFromString(meta_bytes), "parse encryption meta failed");
return proto_to_json(pb);
}

static std::shared_ptr<TabletBasicInfo> get_tablet_info(int64_t tablet_id) {
std::vector<TabletBasicInfo> tablet_infos;
auto manager = StorageEngine::instance()->tablet_manager();
Expand Down Expand Up @@ -555,6 +575,8 @@ class StorageEngineRef {
REG_STATIC_METHOD(StorageEngineRef, get_tablet_info);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_infos);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_meta_json);
REG_STATIC_METHOD(StorageEngineRef, get_lake_tablet_metadata_json);
REG_STATIC_METHOD(StorageEngineRef, decode_encryption_meta);
REG_STATIC_METHOD(StorageEngineRef, reset_delvec);
REG_STATIC_METHOD(StorageEngineRef, get_tablet);
REG_STATIC_METHOD(StorageEngineRef, drop_tablet);
Expand Down
16 changes: 9 additions & 7 deletions be/src/storage/lake/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,14 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, const LakeIOOpti
int index = 0;

std::vector<std::future<std::pair<StatusOr<SegmentPtr>, std::string>>> segment_futures;
auto check_status = [&](StatusOr<SegmentPtr>& segment_or, const std::string& seg_name) -> Status {
auto check_status = [&](StatusOr<SegmentPtr>& segment_or, const std::string& seg_name, int seg_id) -> Status {
if (segment_or.ok()) {
segments->emplace_back(std::move(segment_or.value()));
} else if (segment_or.status().is_not_found() && ignore_lost_segment) {
LOG(WARNING) << "Ignored lost segment " << seg_name;
} else {
return segment_or.status();
return segment_or.status().clone_and_prepend(fmt::format(
"load_segments failed tablet:{} rowset:{} segid:{}", _tablet_id, metadata().id(), seg_id));
}
return Status::OK();
};
Expand Down Expand Up @@ -450,7 +451,7 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, const LakeIOOpti
<< ", try to load segment serially, seg_id: " << seg_id;
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id, &footer_size_hint, lake_io_opts,
fill_metadata_cache, _tablet_schema);
if (auto status = check_status(segment_or, seg_name); !status.ok()) {
if (auto status = check_status(segment_or, seg_name, seg_id); !status.ok()) {
return status;
}
}
Expand All @@ -459,16 +460,17 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, const LakeIOOpti
} else {
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id++, &footer_size_hint, lake_io_opts,
fill_metadata_cache, _tablet_schema);
if (auto status = check_status(segment_or, seg_name); !status.ok()) {
if (auto status = check_status(segment_or, seg_name, seg_id); !status.ok()) {
return status;
}
seg_id++;
}
}

for (auto& fut : segment_futures) {
auto result_pair = fut.get();
for (int i = 0; i < segment_futures.size(); i++) {
auto result_pair = segment_futures[i].get();
auto segment_or = result_pair.first;
if (auto status = check_status(segment_or, result_pair.second); !status.ok()) {
if (auto status = check_status(segment_or, result_pair.second, i); !status.ok()) {
return status;
}
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ Status Rowset::do_load() {
auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint,
rowset_meta()->partial_rowset_footer(seg_id));
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
auto st = res.status().clone_and_prepend(fmt::format(
"Load rowset failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(),
rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), seg_id, seg_path));
LOG(WARNING) << st.message();
_segments.clear();
return res.status();
return st;
}
_segments.push_back(std::move(res).value());
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/thirdparty/wrenbind17/wrenbind17/vm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ class VM {
data->config.heapGrowthPercent = heapGrowth;
data->config.userData = data.get();
#if WREN_VERSION_NUMBER >= 4000 // >= 0.4.0
data->config.reallocateFn = [](void* memory, size_t newSize, void* userData) -> void* {
return std::realloc(memory, newSize);
};
data->config.loadModuleFn = [](WrenVM* vm, const char* name) -> WrenLoadModuleResult {
auto res = WrenLoadModuleResult();
auto& self = *reinterpret_cast<VM::Data*>(wrenGetUserData(vm));
Expand Down
9 changes: 9 additions & 0 deletions be/test/fs/key_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ TEST_F(KeyCacheTest, AddKey) {
key->set_id(2);
cache.add_key(key);
ASSERT_EQ(2, cache.size());
std::string result;
ASSERT_TRUE(execute_script("System.print(ExecEnv.key_cache_info())", result).ok());
ASSERT_TRUE(execute_script("System.print(StorageEngine.decode_encryption_meta("
"\"Ch4IARiztZ64BiAAKAE6EHp8EnQlAIgiy8dgbPRP53kKPAgCEAEYs7WeuAYgACgBMiyZegO5j9P16bHelpUAU"
"xEj1c5P4xWQsJSy6sc2yIKC0g/rRPqsGNumdy6WQgo0EAIgACgBMixDCSo3rP5l8oiZLcgtts8x7xJ+M4+/"
"INZvGPhCOA1m9zf2vpCRbjbVoOl2EQ==\"))",
result)
.ok());
ASSERT_TRUE(result.find("keyHierarchy") != result.npos);
}

static void wrap_unwrap_test(int num_level) {
Expand Down
3 changes: 3 additions & 0 deletions be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
// NOTE: intend to put the following header to the end of the include section
// so that our `gutil/dynamic_annotations.h` takes precedence of the absl's.
// NOLINTNEXTLINE
#include "script/script.h"
#include "service/staros_worker.h"

namespace starrocks {
Expand Down Expand Up @@ -88,6 +89,8 @@ TEST_F(LakeTabletManagerTest, tablet_meta_write_and_read) {
rowset_meta_pb->set_num_rows(5);
EXPECT_OK(_tablet_manager->put_tablet_metadata(metadata));
EXPECT_OK(_tablet_manager->tablet_metadata_exists(12345, 2));
string result;
ASSERT_TRUE(execute_script("System.print(StorageEngine.get_lake_tablet_metadata_json(12345,2))", result).ok());
auto res = _tablet_manager->get_tablet_metadata(12345, 2);
EXPECT_TRUE(res.ok());
EXPECT_EQ(res.value()->id(), 12345);
Expand Down
2 changes: 1 addition & 1 deletion be/test/storage/lake/tablet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ TEST_F(DISABLED_LakeLoadSegmentParallelTest, test_normal) {
CHECK_OK(_tablet_mgr->put_tablet_metadata(*_tablet_metadata));

// test reader
auto reader = std::make_shared<TabletReader>(_tablet_mgr.get(), _tablet_metadata, *_schema);
config::enable_load_segment_parallel = true;
auto reader = std::make_shared<TabletReader>(_tablet_mgr.get(), _tablet_metadata, *_schema);
ASSERT_OK(reader->prepare());
TabletReaderParams params;
ASSERT_OK(reader->open(params));
Expand Down
46 changes: 46 additions & 0 deletions be/test/storage/rowset/rowset_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,52 @@ TEST_F(RowsetTest, VerticalWriteTest) {
EXPECT_EQ(count, num_rows);
}

TEST_F(RowsetTest, LoadFailedTest) {
auto tablet_schema = TabletSchemaHelper::create_tablet_schema();

RowsetWriterContext writer_context;
create_rowset_writer_context(12345, tablet_schema, &writer_context);
writer_context.writer_type = kHorizontal;

std::unique_ptr<RowsetWriter> rowset_writer;
ASSERT_TRUE(RowsetFactory::create_rowset_writer(writer_context, &rowset_writer).ok());

int32_t chunk_size = 3000;
size_t num_rows = 10000;

std::vector<std::unique_ptr<SegmentPB>> seg_infos;
{
// k1 k2 v
std::vector<uint32_t> column_indexes{0, 1, 2};
auto schema = ChunkHelper::convert_schema(tablet_schema, column_indexes);
auto chunk = ChunkHelper::new_chunk(schema, chunk_size);
for (auto i = 0; i < num_rows / chunk_size + 1; ++i) {
chunk->reset();
auto& cols = chunk->columns();
for (auto j = 0; j < chunk_size && i * chunk_size + j < num_rows; ++j) {
cols[0]->append_datum(Datum(static_cast<int32_t>(i * chunk_size + j)));
cols[1]->append_datum(Datum(static_cast<int32_t>(i * chunk_size + j + 1)));
cols[2]->append_datum(Datum(static_cast<int32_t>(i * chunk_size + j + 2)));
}
seg_infos.emplace_back(std::make_unique<SegmentPB>());
ASSERT_OK(rowset_writer->flush_chunk(*chunk, seg_infos.back().get()));
}
}

// check rowset
RowsetSharedPtr rowset = rowset_writer->build().value();
ASSERT_EQ(num_rows, rowset->rowset_meta()->num_rows());
ASSERT_EQ(4, rowset->rowset_meta()->num_segments());

// delete segment file
string path = seg_infos[0]->path();
auto delete_st = FileSystem::Default()->delete_file(path);
LOG(INFO) << "delete file: " << path << " " << delete_st;
auto st = rowset->load();
LOG(INFO) << st;
ASSERT_FALSE(st.ok());
}

TEST_F(RowsetTest, SegmentWriteTest) {
auto tablet_schema = TabletSchemaHelper::create_tablet_schema();

Expand Down
5 changes: 5 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ void TabletUpdatesTest::test_writeread(bool enable_persistent_index) {
ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok());
ASSERT_EQ(2, _tablet->updates()->max_version());

string o;
ASSERT_TRUE(execute_script(fmt::format("StorageEngine.reset_delvec({}, {}, 2)", _tablet->tablet_id(), 0), o).ok());
ASSERT_TRUE(execute_script("System.print(ExecEnv.grep_log_as_string(0,0,\"I\",\"tablet_manager\",1))", o).ok());
LOG(INFO) << "grep log: " << o;

auto rs1 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
ASSERT_EQ(3, _tablet->updates()->max_version());
Expand Down

0 comments on commit 107ba81

Please sign in to comment.