From 5be1b0c66b6cdf79233dc23580d40f47e1b9c7c0 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sun, 15 Sep 2024 14:48:20 +0800 Subject: [PATCH] Fix replay wal error in follower/learner mode (#1878) ### What problem does this PR solve? WAL replay meet empty WAL file will issue unrecoverable error. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Signed-off-by: Jin Hai --- src/main/infinity_context.cpp | 1 + src/storage/storage.cpp | 2 +- src/storage/storage.cppm | 7 ------- src/storage/wal/catalog_delta_entry.cpp | 11 +++++----- src/storage/wal/wal_manager.cpp | 20 +++++++++++++++---- src/storage/wal/wal_manager.cppm | 9 ++++++++- .../storage/wal/catalog_delta_replay.cpp | 10 ++++++++++ test/data/config/test_close_bgtask.toml | 2 ++ .../config/test_close_bgtask_vfs_off.toml | 2 ++ 9 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index e4c37e3a02..a762f25b9e 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -31,6 +31,7 @@ import variables; // import python_instance; import status; import infinity_exception; +import wal_manager; namespace infinity { diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 95e51a9524..6c58352053 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -121,7 +121,7 @@ void Storage::SetStorageMode(StorageMode target_mode) { // Must init catalog before txn manager. // Replay wal file wrap init catalog - TxnTimeStamp system_start_ts = wal_mgr_->ReplayWalFile(); + TxnTimeStamp system_start_ts = wal_mgr_->ReplayWalFile(target_mode); if (system_start_ts == 0) { // Init database, need to create default_db LOG_INFO(fmt::format("Init a new catalog")); diff --git a/src/storage/storage.cppm b/src/storage/storage.cppm index 63133e9c7a..0299c0cf0d 100644 --- a/src/storage/storage.cppm +++ b/src/storage/storage.cppm @@ -31,13 +31,6 @@ export module storage; namespace infinity { -export enum class StorageMode { - kUnInitialized, - kAdmin, - kReadable, - kWritable, -}; - class CleanupInfoTracer; export class Storage { diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index bc7ecb162a..258e0cdfa7 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -765,12 +765,12 @@ const String AddBlockEntryOp::ToString() const { } const String AddColumnEntryOp::ToString() const { - std::ostringstream oss; - oss << fmt::format("AddColumnEntryOp {} outline_infos: [", CatalogDeltaOperation::ToString()); + std::stringstream sstream; + sstream << fmt::format("AddColumnEntryOp {} outline_infos: [", CatalogDeltaOperation::ToString()); const auto &[outline_buffer_count, last_chunk_offset] = outline_info_; - oss << fmt::format("outline_buffer_count: {}, last_chunk_offset: {}", outline_buffer_count, last_chunk_offset); - oss << "]"; - return std::move(oss).str(); + sstream << fmt::format("outline_buffer_count: {}, last_chunk_offset: {}", outline_buffer_count, last_chunk_offset); + sstream << "]"; + return sstream.str(); } const String AddTableIndexEntryOp::ToString() const { @@ -1027,6 +1027,7 @@ UniquePtr CatalogDeltaEntry::ReadAdv(const char *&ptr, i32 ma UnrecoverableError(error_message); } { + LOG_INFO(fmt::format("Deserialize delta op count: {}", entry->operations_.size())); for (const auto &operation : entry->operations_) { LOG_INFO(fmt::format("Read delta op: {}", operation->ToString())); } diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index 43ae46cc3c..39788cbbf1 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -476,7 +476,7 @@ String WalManager::GetWalFilename() const { return wal_path_; } * @return int64_t The max commit timestamp of the transactions in the wal file. * */ -i64 WalManager::ReplayWalFile() { +i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) { LocalFileSystem fs; Vector wal_list{}; @@ -556,9 +556,21 @@ i64 WalManager::ReplayWalFile() { } if (last_commit_ts == 0) { - // once wal is not empty, a checkpoint should always be found. - String error_message = "No checkpoint found in wal"; - UnrecoverableError(error_message); + switch(targe_storage_mode) { + case StorageMode::kWritable: { + // once wal is not empty, a checkpoint should always be found in leader or standalone mode. + String error_message = "No checkpoint found in wal"; + UnrecoverableError(error_message); + break; + } + case StorageMode::kReadable: { + return 0; + } + default: { + String error_message = "Unreachable branch"; + UnrecoverableError(error_message); + } + } } LOG_INFO(fmt::format("Checkpoint found, replay the catalog")); auto catalog_fileinfo = CatalogFile::ParseValidCheckpointFilenames(catalog_dir, max_commit_ts); diff --git a/src/storage/wal/wal_manager.cppm b/src/storage/wal/wal_manager.cppm index 9e1f13f30d..440e86f62e 100644 --- a/src/storage/wal/wal_manager.cppm +++ b/src/storage/wal/wal_manager.cppm @@ -32,6 +32,13 @@ class TableEntry; class Txn; struct SegmentEntry; +export enum class StorageMode { + kUnInitialized, + kAdmin, + kReadable, + kWritable, +}; + export class WalManager { public: WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOptionType flush_option); @@ -63,7 +70,7 @@ public: String GetWalFilename() const; - i64 ReplayWalFile(); + i64 ReplayWalFile(StorageMode targe_storage_mode); Optional>> GetCatalogFiles() const; diff --git a/src/unit_test/storage/wal/catalog_delta_replay.cpp b/src/unit_test/storage/wal/catalog_delta_replay.cpp index 6b8f94f927..90bbfdfe56 100644 --- a/src/unit_test/storage/wal/catalog_delta_replay.cpp +++ b/src/unit_test/storage/wal/catalog_delta_replay.cpp @@ -622,8 +622,18 @@ TEST_P(CatalogDeltaReplayTest, replay_with_full_checkpoint) { txn_mgr->CommitTxn(txn_record3); EXPECT_EQ(table_entry->row_count(), 3ul); + // TODO: Need to start txn to do the delta check point + usleep(100000); WaitFlushDeltaOp(storage); } + { + auto *txn = txn_mgr->BeginTxn(MakeUnique("get table")); + auto [table_entry, table_status] = txn->GetTableByName(*db_name, *table_name); + EXPECT_TRUE(table_status.ok()); + + EXPECT_EQ(table_entry->row_count(), 3ul); + txn_mgr->CommitTxn(txn); + } infinity::InfinityContext::instance().UnInit(); } diff --git a/test/data/config/test_close_bgtask.toml b/test/data/config/test_close_bgtask.toml index a3b907802c..525232412e 100644 --- a/test/data/config/test_close_bgtask.toml +++ b/test/data/config/test_close_bgtask.toml @@ -4,6 +4,8 @@ time_zone = "utc-8" [network] [log] +log_to_stdout = true +log_level = "info" [storage] # close auto optimize diff --git a/test/data/config/test_close_bgtask_vfs_off.toml b/test/data/config/test_close_bgtask_vfs_off.toml index 82421022f6..da6ccc3d80 100644 --- a/test/data/config/test_close_bgtask_vfs_off.toml +++ b/test/data/config/test_close_bgtask_vfs_off.toml @@ -4,6 +4,8 @@ time_zone = "utc-8" [network] [log] +log_to_stdout = true +log_level = "info" [storage] data_dir = "/var/infinity/data"