Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Add some util in script for TDE debug; Fix memleak in wrenbind #51813

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "storage/del_vector.h"
#include "storage/lake/tablet.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/primary_key_dump.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
#include "storage/tablet_meta_manager.h"
#include "storage/tablet_updates.h"
#include "util/stack_util.h"
#include "util/url_coding.h"
#include "wrenbind17/wrenbind17.hpp"

using namespace wrenbind17;
Expand Down Expand Up @@ -289,6 +293,22 @@ class StorageEngineRef {
return ptr;
}

static std::string get_lake_tablet_metadata_json(int64_t tablet_id, int64_t version) {
auto tablet_manager = ExecEnv::GetInstance()->lake_tablet_manager();
RETURN_IF(nullptr == tablet_manager, "");
auto meta_st = tablet_manager->get_tablet_metadata(tablet_id, version, false);
RETURN_IF(!meta_st.ok(), meta_st.status().to_string());
return proto_to_json(*meta_st.value());
}

static std::string decode_encryption_meta(const std::string& meta_base64) {
EncryptionMetaPB pb;
std::string meta_bytes;
RETURN_IF(!base64_decode(meta_base64, &meta_bytes), "bad base64 string");
RETURN_IF(!pb.ParseFromString(meta_bytes), "parse encryption meta failed");
return proto_to_json(pb);
}

static std::shared_ptr<TabletBasicInfo> get_tablet_info(int64_t tablet_id) {
std::vector<TabletBasicInfo> tablet_infos;
auto manager = StorageEngine::instance()->tablet_manager();
Expand Down Expand Up @@ -563,6 +583,8 @@ class StorageEngineRef {
REG_STATIC_METHOD(StorageEngineRef, get_tablet_info);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_infos);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_meta_json);
REG_STATIC_METHOD(StorageEngineRef, get_lake_tablet_metadata_json);
REG_STATIC_METHOD(StorageEngineRef, decode_encryption_meta);
REG_STATIC_METHOD(StorageEngineRef, reset_delvec);
REG_STATIC_METHOD(StorageEngineRef, get_tablet);
REG_STATIC_METHOD(StorageEngineRef, drop_tablet);
Expand Down
18 changes: 10 additions & 8 deletions be/src/storage/lake/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,14 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, SegmentReadOptio
int index = 0;

std::vector<std::future<std::pair<StatusOr<SegmentPtr>, std::string>>> segment_futures;
auto check_status = [&](StatusOr<SegmentPtr>& segment_or, const std::string& seg_name) -> Status {
auto check_status = [&](StatusOr<SegmentPtr>& segment_or, const std::string& seg_name, int seg_id) -> Status {
if (segment_or.ok()) {
segments->emplace_back(std::move(segment_or.value()));
} else if (segment_or.status().is_not_found() && ignore_lost_segment) {
LOG(WARNING) << "Ignored lost segment " << seg_name;
} else {
return segment_or.status();
return segment_or.status().clone_and_prepend(fmt::format(
"load_segments failed tablet:{} rowset:{} segid:{}", _tablet_id, metadata().id(), seg_id));
}
return Status::OK();
};
Expand Down Expand Up @@ -470,25 +471,26 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, SegmentReadOptio
<< ", try to load segment serially, seg_id: " << seg_id;
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id, &footer_size_hint, lake_io_opts,
lake_io_opts.fill_metadata_cache, _tablet_schema);
if (auto status = check_status(segment_or, seg_name); !status.ok()) {
if (auto status = check_status(segment_or, seg_name, seg_id); !status.ok()) {
return status;
}
}
seg_id++;
segment_futures.push_back(task->get_future());
} else {
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id++, &footer_size_hint, lake_io_opts,
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id, &footer_size_hint, lake_io_opts,
lake_io_opts.fill_metadata_cache, _tablet_schema);
if (auto status = check_status(segment_or, seg_name); !status.ok()) {
if (auto status = check_status(segment_or, seg_name, seg_id); !status.ok()) {
return status;
}
seg_id++;
}
}

for (auto& fut : segment_futures) {
auto result_pair = fut.get();
for (int i = 0; i < segment_futures.size(); i++) {
auto result_pair = segment_futures[i].get();
auto segment_or = result_pair.first;
if (auto status = check_status(segment_or, result_pair.second); !status.ok()) {
if (auto status = check_status(segment_or, result_pair.second, i); !status.ok()) {
return status;
}
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ Status Rowset::do_load() {
auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint,
rowset_meta()->partial_rowset_footer(seg_id));
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
auto st = res.status().clone_and_prepend(fmt::format(
"Load rowset failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(),
rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), seg_id, seg_path));
LOG(WARNING) << st.message();
_segments.clear();
return res.status();
return st;
}
_segments.push_back(std::move(res).value());
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/thirdparty/wrenbind17/wrenbind17/vm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ class VM {
data->config.heapGrowthPercent = heapGrowth;
data->config.userData = data.get();
#if WREN_VERSION_NUMBER >= 4000 // >= 0.4.0
data->config.reallocateFn = [](void* memory, size_t newSize, void* userData) -> void* {
return std::realloc(memory, newSize);
};
data->config.loadModuleFn = [](WrenVM* vm, const char* name) -> WrenLoadModuleResult {
auto res = WrenLoadModuleResult();
auto& self = *reinterpret_cast<VM::Data*>(wrenGetUserData(vm));
Expand Down
9 changes: 9 additions & 0 deletions be/test/fs/key_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ TEST_F(KeyCacheTest, AddKey) {
key->set_id(2);
cache.add_key(key);
ASSERT_EQ(2, cache.size());
std::string result;
ASSERT_TRUE(execute_script("System.print(ExecEnv.key_cache_info())", result).ok());
ASSERT_TRUE(execute_script("System.print(StorageEngine.decode_encryption_meta("
"\"Ch4IARiztZ64BiAAKAE6EHp8EnQlAIgiy8dgbPRP53kKPAgCEAEYs7WeuAYgACgBMiyZegO5j9P16bHelpUAU"
"xEj1c5P4xWQsJSy6sc2yIKC0g/rRPqsGNumdy6WQgo0EAIgACgBMixDCSo3rP5l8oiZLcgtts8x7xJ+M4+/"
"INZvGPhCOA1m9zf2vpCRbjbVoOl2EQ==\"))",
result)
.ok());
ASSERT_TRUE(result.find("keyHierarchy") != result.npos);
}

static void wrap_unwrap_test(int num_level) {
Expand Down
3 changes: 3 additions & 0 deletions be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
// NOTE: intend to put the following header to the end of the include section
// so that our `gutil/dynamic_annotations.h` takes precedence of the absl's.
// NOLINTNEXTLINE
#include "script/script.h"
#include "service/staros_worker.h"

namespace starrocks {
Expand Down Expand Up @@ -87,6 +88,8 @@ TEST_F(LakeTabletManagerTest, tablet_meta_write_and_read) {
rowset_meta_pb->set_num_rows(5);
EXPECT_OK(_tablet_manager->put_tablet_metadata(metadata));
EXPECT_OK(_tablet_manager->tablet_metadata_exists(12345, 2));
string result;
ASSERT_TRUE(execute_script("System.print(StorageEngine.get_lake_tablet_metadata_json(12345,2))", result).ok());
auto res = _tablet_manager->get_tablet_metadata(12345, 2);
EXPECT_TRUE(res.ok());
EXPECT_EQ(res.value()->id(), 12345);
Expand Down
2 changes: 1 addition & 1 deletion be/test/storage/lake/tablet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ TEST_F(DISABLED_LakeLoadSegmentParallelTest, test_normal) {
CHECK_OK(_tablet_mgr->put_tablet_metadata(*_tablet_metadata));

// test reader
auto reader = std::make_shared<TabletReader>(_tablet_mgr.get(), _tablet_metadata, *_schema);
config::enable_load_segment_parallel = true;
auto reader = std::make_shared<TabletReader>(_tablet_mgr.get(), _tablet_metadata, *_schema);
ASSERT_OK(reader->prepare());
TabletReaderParams params;
ASSERT_OK(reader->open(params));
Expand Down
46 changes: 46 additions & 0 deletions be/test/storage/rowset/rowset_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,52 @@ TEST_F(RowsetTest, VerticalWriteTest) {
EXPECT_EQ(count, num_rows);
}

TEST_F(RowsetTest, LoadFailedTest) {
auto tablet_schema = TabletSchemaHelper::create_tablet_schema();

RowsetWriterContext writer_context;
create_rowset_writer_context(12345, tablet_schema, &writer_context);
writer_context.writer_type = kHorizontal;

std::unique_ptr<RowsetWriter> rowset_writer;
ASSERT_TRUE(RowsetFactory::create_rowset_writer(writer_context, &rowset_writer).ok());

int32_t chunk_size = 3000;
size_t num_rows = 10000;

std::vector<std::unique_ptr<SegmentPB>> seg_infos;
{
// k1 k2 v
std::vector<uint32_t> column_indexes{0, 1, 2};
auto schema = ChunkHelper::convert_schema(tablet_schema, column_indexes);
auto chunk = ChunkHelper::new_chunk(schema, chunk_size);
for (auto i = 0; i < num_rows / chunk_size + 1; ++i) {
chunk->reset();
auto& cols = chunk->columns();
for (auto j = 0; j < chunk_size && i * chunk_size + j < num_rows; ++j) {
cols[0]->append_datum(Datum(static_cast<int32_t>(i * chunk_size + j)));
cols[1]->append_datum(Datum(static_cast<int32_t>(i * chunk_size + j + 1)));
cols[2]->append_datum(Datum(static_cast<int32_t>(i * chunk_size + j + 2)));
}
seg_infos.emplace_back(std::make_unique<SegmentPB>());
ASSERT_OK(rowset_writer->flush_chunk(*chunk, seg_infos.back().get()));
}
}

// check rowset
RowsetSharedPtr rowset = rowset_writer->build().value();
ASSERT_EQ(num_rows, rowset->rowset_meta()->num_rows());
ASSERT_EQ(4, rowset->rowset_meta()->num_segments());

// delete segment file
string path = seg_infos[0]->path();
auto delete_st = FileSystem::Default()->delete_file(path);
LOG(INFO) << "delete file: " << path << " " << delete_st;
auto st = rowset->load();
LOG(INFO) << st;
ASSERT_FALSE(st.ok());
}

TEST_F(RowsetTest, SegmentWriteTest) {
auto tablet_schema = TabletSchemaHelper::create_tablet_schema();

Expand Down
5 changes: 5 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ void TabletUpdatesTest::test_writeread(bool enable_persistent_index) {
ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok());
ASSERT_EQ(2, _tablet->updates()->max_version());

string o;
ASSERT_TRUE(execute_script(fmt::format("StorageEngine.reset_delvec({}, {}, 2)", _tablet->tablet_id(), 0), o).ok());
ASSERT_TRUE(execute_script("System.print(ExecEnv.grep_log_as_string(0,0,\"I\",\"tablet_manager\",1))", o).ok());
LOG(INFO) << "grep log: " << o;

auto rs1 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
ASSERT_EQ(3, _tablet->updates()->max_version());
Expand Down
Loading