From 24907d04a4e93abed74aa0c352c20d8c39570394 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 11 Mar 2024 18:02:25 +0800 Subject: [PATCH] Use write and rename to flush file to avoid zero length catalog file. (#755) ### What problem does this PR solve? 1. Use write and rename to flush file to avoid zero length catalog file. 2. Fix drop db_entry no dir field introduce restart crash issue. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring --------- Signed-off-by: Jin Hai --- docs/build_from_source.md | 4 +- src/storage/io/file_system.cpp | 2 + src/storage/io/file_system.cppm | 2 + src/storage/meta/catalog.cpp | 47 +++++++++++++---------- src/storage/meta/catalog.cppm | 4 +- src/storage/meta/entry/db_entry.cpp | 8 +++- src/storage/meta/entry/segment_entry.cpp | 2 +- src/storage/meta/entry/segment_entry.cppm | 2 +- src/storage/txn/txn.cpp | 17 +++++--- src/storage/wal/catalog_delta_entry.cpp | 4 +- src/storage/wal/catalog_delta_entry.cppm | 2 +- 11 files changed, 57 insertions(+), 37 deletions(-) diff --git a/docs/build_from_source.md b/docs/build_from_source.md index 96b718270d..d2a66af362 100644 --- a/docs/build_from_source.md +++ b/docs/build_from_source.md @@ -48,7 +48,7 @@ echo 'deb https://apt.llvm.org/jammy/ llvm-toolchain-jammy-17 main' | sudo tee / wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | sudo tee /etc/apt/trusted.gpg.d/apt.llvm.org.asc sudo add-apt-repository -P ppa:ubuntu-toolchain-r/test sudo add-apt-repository -P ppa:mhier/libboost-latest -sudo apt update && sudo apt install g++-13 clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev libthrift-dev +sudo apt update && sudo apt install g++-13 clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev ln -s /usr/lib/llvm-17/bin/clang-scan-deps /usr/bin/clang-scan-deps ``` @@ -87,7 +87,7 @@ sudo apt update && sudo apt install -y git wget wget https://cmake.org/files/v3.28/cmake-3.28.1-linux-x86_64.tar.gz tar zxvf cmake-3.28.1-linux-x86_64.tar.gz sudo cp -rf cmake-3.28.1-linux-x86_64/bin/* /usr/local/bin && sudo cp -rf cmake-3.28.1-linux-x86_64/share/* /usr/local/share && rm -rf cmake-3.28.1-linux-x86_64 -sudo apt install -y ninja-build clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev libthrift-dev +sudo apt install -y ninja-build clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev ln -s /usr/lib/llvm-17/bin/clang-scan-deps /usr/bin/clang-scan-deps ``` diff --git a/src/storage/io/file_system.cpp b/src/storage/io/file_system.cpp index 98067b560a..562e428011 100644 --- a/src/storage/io/file_system.cpp +++ b/src/storage/io/file_system.cpp @@ -24,6 +24,8 @@ i64 FileHandler::Read(void *data, u64 nbytes) { return file_system_.Read(*this, i64 FileHandler::Write(const void *data, u64 nbytes) { return file_system_.Write(*this, data, nbytes); } +void FileHandler::Rename(const String &old_name, const String &new_name) { return file_system_.Rename(old_name, new_name); } + void FileHandler::Sync() { return file_system_.SyncFile(*this); } void FileHandler::Close() { return file_system_.Close(*this); } diff --git a/src/storage/io/file_system.cppm b/src/storage/io/file_system.cppm index f6736475f6..bca2c223de 100644 --- a/src/storage/io/file_system.cppm +++ b/src/storage/io/file_system.cppm @@ -35,6 +35,8 @@ public: i64 Write(const void *data, u64 nbytes); + void Rename(const String& old_name, const String& new_name); + void Sync(); void Close(); diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index c7b3808806..7230ef8ed4 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -793,36 +793,46 @@ void Catalog::Deserialize(const nlohmann::json &catalog_json, BufferManager *buf } } -void Catalog::SaveAsFile(const String &catalog_path, TxnTimeStamp max_commit_ts) { +UniquePtr Catalog::SaveFullCatalog(const String &catalog_dir, TxnTimeStamp max_commit_ts) { + UniquePtr catalog_path_ptr = MakeUnique(fmt::format("{}/META_CATALOG.{}.json", catalog_dir, max_commit_ts)); + String catalog_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.json", catalog_dir, max_commit_ts)); + + // Serialize catalog to string nlohmann::json catalog_json = Serialize(max_commit_ts, true); String catalog_str = catalog_json.dump(); + // Save catalog to tmp file. // FIXME: Temp implementation, will be replaced by async task. LocalFileSystem fs; u8 fileflags = FileFlags::WRITE_FLAG; - if (!fs.Exists(catalog_path)) { + + if (!fs.Exists(catalog_tmp_path)) { fileflags |= FileFlags::CREATE_FLAG; } - UniquePtr catalog_file_handler = fs.OpenFile(catalog_path, fileflags, FileLockType::kWriteLock); + UniquePtr catalog_file_handler = fs.OpenFile(catalog_tmp_path, fileflags, FileLockType::kWriteLock); // TODO: Save as a temp filename, then rename it to the real filename. SizeT n_bytes = catalog_file_handler->Write(catalog_str.data(), catalog_str.size()); if (n_bytes != catalog_str.size()) { - LOG_ERROR(fmt::format("Saving catalog file failed: {}", catalog_path)); - RecoverableError(Status::CatalogCorrupted(catalog_path)); + LOG_ERROR(fmt::format("Saving catalog file failed: {}", catalog_tmp_path)); + RecoverableError(Status::CatalogCorrupted(catalog_tmp_path)); } catalog_file_handler->Sync(); catalog_file_handler->Close(); - LOG_INFO(fmt::format("Saved catalog to: {}", catalog_path)); + // Rename temp file to regular catalog file + catalog_file_handler->Rename(catalog_tmp_path, *catalog_path_ptr); + + LOG_INFO(fmt::format("Saved catalog to: {}", *catalog_path_ptr)); + return catalog_path_ptr; } // called by bg_task -bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, TxnTimeStamp max_commit_ts, bool is_full_checkpoint) { - LOG_INFO("FLUSH GLOBAL DELTA CATALOG ENTRY"); - LOG_INFO(fmt::format("Global catalog delta entry commit ts:{}, checkpoint max commit ts:{}.", +bool Catalog::SaveDeltaCatalog(const String &delta_catalog_path, TxnTimeStamp max_commit_ts) { + LOG_INFO("SAVING DELTA CATALOG"); + LOG_INFO(fmt::format("Save delta catalog commit ts:{}, checkpoint max commit ts:{}.", global_catalog_delta_entry_->commit_ts(), max_commit_ts)); @@ -830,11 +840,14 @@ bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, Txn UniquePtr flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts); if (flush_delta_entry->operations().empty()) { - LOG_INFO("Global catalog delta entry ops is empty. Skip flush."); + LOG_INFO("Save delta catalog ops is empty. Skip flush."); return true; } Vector flushed_txn_ids; + HashSet flushed_unique_txn_ids; for (auto &op : flush_delta_entry->operations()) { + flushed_unique_txn_ids.insert(op->txn_id()); + switch (op->GetType()) { case CatalogDeltaOpType::ADD_TABLE_ENTRY: { auto add_table_entry_op = static_cast(op.get()); @@ -844,7 +857,7 @@ bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, Txn case CatalogDeltaOpType::ADD_SEGMENT_ENTRY: { auto add_segment_entry_op = static_cast(op.get()); LOG_TRACE(fmt::format("Flush segment entry: {}", add_segment_entry_op->ToString())); - add_segment_entry_op->FlushDataToDisk(max_commit_ts, is_full_checkpoint); + add_segment_entry_op->FlushDataToDisk(max_commit_ts); break; } case CatalogDeltaOpType::ADD_SEGMENT_INDEX_ENTRY: { @@ -866,7 +879,7 @@ bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, Txn flush_delta_entry->WriteAdv(ptr); i32 act_size = ptr - buf.data(); if (exp_size != act_size) { - UnrecoverableError(fmt::format("Flush global catalog delta entry failed, exp_size: {}, act_size: {}", exp_size, act_size)); + UnrecoverableError(fmt::format("Save delta catalog failed, exp_size: {}, act_size: {}", exp_size, act_size)); } std::ofstream outfile; @@ -874,15 +887,9 @@ bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, Txn outfile.write((reinterpret_cast(buf.data())), act_size); outfile.close(); - LOG_INFO(fmt::format("Flush global catalog delta entry to: {}, size: {}.", delta_catalog_path, act_size)); + LOG_INFO(fmt::format("Save delta catalog to: {}, size: {}.", delta_catalog_path, act_size)); - if (!is_full_checkpoint) { - HashSet flushed_txn_ids; - for (auto &op : flush_delta_entry->operations()) { - flushed_txn_ids.insert(op->txn_id()); - } - txn_mgr_->RemoveWaitFlushTxns(Vector(flushed_txn_ids.begin(), flushed_txn_ids.end())); - } + txn_mgr_->RemoveWaitFlushTxns(Vector(flushed_unique_txn_ids.begin(), flushed_unique_txn_ids.end())); return false; } diff --git a/src/storage/meta/catalog.cppm b/src/storage/meta/catalog.cppm index 08afc7c750..9b09e7a385 100644 --- a/src/storage/meta/catalog.cppm +++ b/src/storage/meta/catalog.cppm @@ -209,9 +209,9 @@ public: // Serialization and Deserialization nlohmann::json Serialize(TxnTimeStamp max_commit_ts, bool is_full_checkpoint); - void SaveAsFile(const String &catalog_path, TxnTimeStamp max_commit_ts); + UniquePtr SaveFullCatalog(const String &catalog_dir, TxnTimeStamp max_commit_ts); - bool FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, TxnTimeStamp max_commit_ts, bool is_full_checkpoint); + bool SaveDeltaCatalog(const String &catalog_dir, TxnTimeStamp max_commit_ts); static void Deserialize(const nlohmann::json &catalog_json, BufferManager *buffer_mgr, UniquePtr &catalog); diff --git a/src/storage/meta/entry/db_entry.cpp b/src/storage/meta/entry/db_entry.cpp index f7f356fd09..b478b5bc57 100644 --- a/src/storage/meta/entry/db_entry.cpp +++ b/src/storage/meta/entry/db_entry.cpp @@ -206,11 +206,15 @@ nlohmann::json DBEntry::Serialize(TxnTimeStamp max_commit_ts, bool is_full_check UniquePtr DBEntry::Deserialize(const nlohmann::json &db_entry_json, DBMeta *db_meta, BufferManager *buffer_mgr) { nlohmann::json json_res; - SharedPtr db_entry_dir = MakeShared(db_entry_json["db_entry_dir"]); + bool deleted = db_entry_json["deleted"]; SharedPtr db_name = MakeShared(db_entry_json["db_name"]); TransactionID txn_id = db_entry_json["txn_id"]; u64 begin_ts = db_entry_json["begin_ts"]; - bool deleted = db_entry_json["deleted"]; + + SharedPtr db_entry_dir{}; + if(!deleted) { + db_entry_dir = MakeShared(db_entry_json["db_entry_dir"]); + } UniquePtr res = MakeUnique(db_meta, deleted, db_entry_dir, db_name, txn_id, begin_ts); u64 commit_ts = db_entry_json["commit_ts"]; diff --git a/src/storage/meta/entry/segment_entry.cpp b/src/storage/meta/entry/segment_entry.cpp index aaacda4e77..b9145ccd1e 100644 --- a/src/storage/meta/entry/segment_entry.cpp +++ b/src/storage/meta/entry/segment_entry.cpp @@ -479,7 +479,7 @@ SharedPtr SegmentEntry::Deserialize(const nlohmann::json &segment_ return segment_entry; } -void SegmentEntry::FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint) { +void SegmentEntry::FlushDataToDisk(TxnTimeStamp max_commit_ts) { auto block_entry_iter = BlockEntryIter(this); for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) { block_entry->Flush(max_commit_ts); diff --git a/src/storage/meta/entry/segment_entry.cppm b/src/storage/meta/entry/segment_entry.cppm index fc6c5c4809..1382e25d04 100644 --- a/src/storage/meta/entry/segment_entry.cppm +++ b/src/storage/meta/entry/segment_entry.cppm @@ -119,7 +119,7 @@ public: void FlushNewData(TxnTimeStamp flush_ts); - void FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint); + void FlushDataToDisk(TxnTimeStamp max_commit_ts); static bool CheckDeleteConflict(Vector>> &&segments, TransactionID txn_id); diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index c66acae8c7..7c05b9f584 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -537,7 +537,7 @@ void Txn::DeltaCheckpoint(const TxnTimeStamp max_commit_ts) { String dir_name = *txn_mgr_->GetBufferMgr()->BaseDir().get() + "/catalog"; String delta_path = String(fmt::format("{}/CATALOG_DELTA_ENTRY.DELTA.{}", dir_name, max_commit_ts)); // only save the catalog delta entry - bool skip = catalog_->FlushGlobalCatalogDeltaEntry(delta_path, max_commit_ts, false); + bool skip = catalog_->SaveDeltaCatalog(delta_path, max_commit_ts); if (skip) { return; } @@ -546,12 +546,17 @@ void Txn::DeltaCheckpoint(const TxnTimeStamp max_commit_ts) { void Txn::FullCheckpoint(const TxnTimeStamp max_commit_ts) { String dir_name = *txn_mgr_->GetBufferMgr()->BaseDir().get() + "/catalog"; - String delta_path = String(fmt::format("{}/CATALOG_DELTA_ENTRY.FULL.{}", dir_name, max_commit_ts)); - String catalog_path = String(fmt::format("{}/META_CATALOG.{}.json", dir_name, max_commit_ts)); - catalog_->FlushGlobalCatalogDeltaEntry(delta_path, max_commit_ts, true); - catalog_->SaveAsFile(catalog_path, max_commit_ts); - wal_entry_->cmds_.push_back(MakeShared(max_commit_ts, true, catalog_path)); +// String delta_path = String(fmt::format("{}/META_CATALOG.{}.delta", dir_name, max_commit_ts)); +// String delta_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.delta", dir_name, max_commit_ts)); +// +// String catalog_path = String(fmt::format("{}/META_CATALOG.{}.json", dir_name, max_commit_ts)); +// String catalog_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.json", dir_name, max_commit_ts)); + + // Full Check point don't need increment checkpoint + // catalog_->SaveDeltaCatalog(delta_path, max_commit_ts, true); + UniquePtr catalog_path_ptr = catalog_->SaveFullCatalog(dir_name, max_commit_ts); + wal_entry_->cmds_.push_back(MakeShared(max_commit_ts, true, *catalog_path_ptr)); } void Txn::AddDBStore(DBEntry *db_entry) { txn_dbs_.insert(db_entry); } diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index c38eb32291..676dcb976d 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -654,8 +654,8 @@ const String UpdateSegmentBloomFilterDataOp::ToString() const { return fmt::format("UpdateSegmentBloomFilterDataOp db_name: {} table_name: {} segment_id: {}", db_name_, table_name_, segment_id_); } -void AddSegmentEntryOp::FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint) { - this->segment_entry_->FlushDataToDisk(max_commit_ts, is_full_checkpoint); +void AddSegmentEntryOp::FlushDataToDisk(TxnTimeStamp max_commit_ts) { + this->segment_entry_->FlushDataToDisk(max_commit_ts); LOG_TRACE("Segment has flushed to disk, now try set to deprecated if it was compacted before"); this->segment_entry_->TrySetDeprecated(); } diff --git a/src/storage/wal/catalog_delta_entry.cppm b/src/storage/wal/catalog_delta_entry.cppm index d0147c7fab..4f7693dfd4 100644 --- a/src/storage/wal/catalog_delta_entry.cppm +++ b/src/storage/wal/catalog_delta_entry.cppm @@ -362,7 +362,7 @@ public: const String EncodeIndex() const final { return String(fmt::format("{}#{}#{}#{}#{}", i32(GetType()), txn_id_, *this->db_name_, *this->table_name_, this->segment_id_)); } - void FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint); + void FlushDataToDisk(TxnTimeStamp max_commit_ts); public: const String &db_name() const { return *db_name_; }