Skip to content

Commit

Permalink
Use write and rename to flush file to avoid zero length catalog file. (
Browse files Browse the repository at this point in the history
…#755)

### What problem does this PR solve?

1. Use write and rename to flush file to avoid zero length catalog file.
2. Fix drop db_entry no dir field introduce restart crash issue.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Mar 11, 2024
1 parent 972fa70 commit 24907d0
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 37 deletions.
4 changes: 2 additions & 2 deletions docs/build_from_source.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ echo 'deb https://apt.llvm.org/jammy/ llvm-toolchain-jammy-17 main' | sudo tee /
wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | sudo tee /etc/apt/trusted.gpg.d/apt.llvm.org.asc
sudo add-apt-repository -P ppa:ubuntu-toolchain-r/test
sudo add-apt-repository -P ppa:mhier/libboost-latest
sudo apt update && sudo apt install g++-13 clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev libthrift-dev
sudo apt update && sudo apt install g++-13 clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev
ln -s /usr/lib/llvm-17/bin/clang-scan-deps /usr/bin/clang-scan-deps
```

Expand Down Expand Up @@ -87,7 +87,7 @@ sudo apt update && sudo apt install -y git wget
wget https://cmake.org/files/v3.28/cmake-3.28.1-linux-x86_64.tar.gz
tar zxvf cmake-3.28.1-linux-x86_64.tar.gz
sudo cp -rf cmake-3.28.1-linux-x86_64/bin/* /usr/local/bin && sudo cp -rf cmake-3.28.1-linux-x86_64/share/* /usr/local/share && rm -rf cmake-3.28.1-linux-x86_64
sudo apt install -y ninja-build clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev libthrift-dev
sudo apt install -y ninja-build clang-17 clang-tools-17 flex libboost1.81-dev liblz4-dev libevent-dev liburing-dev
ln -s /usr/lib/llvm-17/bin/clang-scan-deps /usr/bin/clang-scan-deps
```

Expand Down
2 changes: 2 additions & 0 deletions src/storage/io/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ i64 FileHandler::Read(void *data, u64 nbytes) { return file_system_.Read(*this,

i64 FileHandler::Write(const void *data, u64 nbytes) { return file_system_.Write(*this, data, nbytes); }

void FileHandler::Rename(const String &old_name, const String &new_name) { return file_system_.Rename(old_name, new_name); }

void FileHandler::Sync() { return file_system_.SyncFile(*this); }

void FileHandler::Close() { return file_system_.Close(*this); }
Expand Down
2 changes: 2 additions & 0 deletions src/storage/io/file_system.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public:

i64 Write(const void *data, u64 nbytes);

void Rename(const String& old_name, const String& new_name);

void Sync();

void Close();
Expand Down
47 changes: 27 additions & 20 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -793,48 +793,61 @@ void Catalog::Deserialize(const nlohmann::json &catalog_json, BufferManager *buf
}
}

void Catalog::SaveAsFile(const String &catalog_path, TxnTimeStamp max_commit_ts) {
UniquePtr<String> Catalog::SaveFullCatalog(const String &catalog_dir, TxnTimeStamp max_commit_ts) {
UniquePtr<String> catalog_path_ptr = MakeUnique<String>(fmt::format("{}/META_CATALOG.{}.json", catalog_dir, max_commit_ts));
String catalog_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.json", catalog_dir, max_commit_ts));

// Serialize catalog to string
nlohmann::json catalog_json = Serialize(max_commit_ts, true);
String catalog_str = catalog_json.dump();

// Save catalog to tmp file.
// FIXME: Temp implementation, will be replaced by async task.
LocalFileSystem fs;

u8 fileflags = FileFlags::WRITE_FLAG;
if (!fs.Exists(catalog_path)) {

if (!fs.Exists(catalog_tmp_path)) {
fileflags |= FileFlags::CREATE_FLAG;
}

UniquePtr<FileHandler> catalog_file_handler = fs.OpenFile(catalog_path, fileflags, FileLockType::kWriteLock);
UniquePtr<FileHandler> catalog_file_handler = fs.OpenFile(catalog_tmp_path, fileflags, FileLockType::kWriteLock);

// TODO: Save as a temp filename, then rename it to the real filename.
SizeT n_bytes = catalog_file_handler->Write(catalog_str.data(), catalog_str.size());
if (n_bytes != catalog_str.size()) {
LOG_ERROR(fmt::format("Saving catalog file failed: {}", catalog_path));
RecoverableError(Status::CatalogCorrupted(catalog_path));
LOG_ERROR(fmt::format("Saving catalog file failed: {}", catalog_tmp_path));
RecoverableError(Status::CatalogCorrupted(catalog_tmp_path));
}
catalog_file_handler->Sync();
catalog_file_handler->Close();

LOG_INFO(fmt::format("Saved catalog to: {}", catalog_path));
// Rename temp file to regular catalog file
catalog_file_handler->Rename(catalog_tmp_path, *catalog_path_ptr);

LOG_INFO(fmt::format("Saved catalog to: {}", *catalog_path_ptr));
return catalog_path_ptr;
}

// called by bg_task
bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, TxnTimeStamp max_commit_ts, bool is_full_checkpoint) {
LOG_INFO("FLUSH GLOBAL DELTA CATALOG ENTRY");
LOG_INFO(fmt::format("Global catalog delta entry commit ts:{}, checkpoint max commit ts:{}.",
bool Catalog::SaveDeltaCatalog(const String &delta_catalog_path, TxnTimeStamp max_commit_ts) {
LOG_INFO("SAVING DELTA CATALOG");
LOG_INFO(fmt::format("Save delta catalog commit ts:{}, checkpoint max commit ts:{}.",
global_catalog_delta_entry_->commit_ts(),
max_commit_ts));

// Check the SegmentEntry's for flush the data to disk.
UniquePtr<CatalogDeltaEntry> flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts);

if (flush_delta_entry->operations().empty()) {
LOG_INFO("Global catalog delta entry ops is empty. Skip flush.");
LOG_INFO("Save delta catalog ops is empty. Skip flush.");
return true;
}
Vector<TransactionID> flushed_txn_ids;
HashSet<TransactionID> flushed_unique_txn_ids;
for (auto &op : flush_delta_entry->operations()) {
flushed_unique_txn_ids.insert(op->txn_id());

switch (op->GetType()) {
case CatalogDeltaOpType::ADD_TABLE_ENTRY: {
auto add_table_entry_op = static_cast<AddTableEntryOp *>(op.get());
Expand All @@ -844,7 +857,7 @@ bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, Txn
case CatalogDeltaOpType::ADD_SEGMENT_ENTRY: {
auto add_segment_entry_op = static_cast<AddSegmentEntryOp *>(op.get());
LOG_TRACE(fmt::format("Flush segment entry: {}", add_segment_entry_op->ToString()));
add_segment_entry_op->FlushDataToDisk(max_commit_ts, is_full_checkpoint);
add_segment_entry_op->FlushDataToDisk(max_commit_ts);
break;
}
case CatalogDeltaOpType::ADD_SEGMENT_INDEX_ENTRY: {
Expand All @@ -866,23 +879,17 @@ bool Catalog::FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, Txn
flush_delta_entry->WriteAdv(ptr);
i32 act_size = ptr - buf.data();
if (exp_size != act_size) {
UnrecoverableError(fmt::format("Flush global catalog delta entry failed, exp_size: {}, act_size: {}", exp_size, act_size));
UnrecoverableError(fmt::format("Save delta catalog failed, exp_size: {}, act_size: {}", exp_size, act_size));
}

std::ofstream outfile;
outfile.open(delta_catalog_path, std::ios::binary);
outfile.write((reinterpret_cast<const char *>(buf.data())), act_size);
outfile.close();

LOG_INFO(fmt::format("Flush global catalog delta entry to: {}, size: {}.", delta_catalog_path, act_size));
LOG_INFO(fmt::format("Save delta catalog to: {}, size: {}.", delta_catalog_path, act_size));

if (!is_full_checkpoint) {
HashSet<TransactionID> flushed_txn_ids;
for (auto &op : flush_delta_entry->operations()) {
flushed_txn_ids.insert(op->txn_id());
}
txn_mgr_->RemoveWaitFlushTxns(Vector<TransactionID>(flushed_txn_ids.begin(), flushed_txn_ids.end()));
}
txn_mgr_->RemoveWaitFlushTxns(Vector<TransactionID>(flushed_unique_txn_ids.begin(), flushed_unique_txn_ids.end()));

return false;
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/meta/catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ public:
// Serialization and Deserialization
nlohmann::json Serialize(TxnTimeStamp max_commit_ts, bool is_full_checkpoint);

void SaveAsFile(const String &catalog_path, TxnTimeStamp max_commit_ts);
UniquePtr<String> SaveFullCatalog(const String &catalog_dir, TxnTimeStamp max_commit_ts);

bool FlushGlobalCatalogDeltaEntry(const String &delta_catalog_path, TxnTimeStamp max_commit_ts, bool is_full_checkpoint);
bool SaveDeltaCatalog(const String &catalog_dir, TxnTimeStamp max_commit_ts);

static void Deserialize(const nlohmann::json &catalog_json, BufferManager *buffer_mgr, UniquePtr<Catalog> &catalog);

Expand Down
8 changes: 6 additions & 2 deletions src/storage/meta/entry/db_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,15 @@ nlohmann::json DBEntry::Serialize(TxnTimeStamp max_commit_ts, bool is_full_check
UniquePtr<DBEntry> DBEntry::Deserialize(const nlohmann::json &db_entry_json, DBMeta *db_meta, BufferManager *buffer_mgr) {
nlohmann::json json_res;

SharedPtr<String> db_entry_dir = MakeShared<String>(db_entry_json["db_entry_dir"]);
bool deleted = db_entry_json["deleted"];
SharedPtr<String> db_name = MakeShared<String>(db_entry_json["db_name"]);
TransactionID txn_id = db_entry_json["txn_id"];
u64 begin_ts = db_entry_json["begin_ts"];
bool deleted = db_entry_json["deleted"];

SharedPtr<String> db_entry_dir{};
if(!deleted) {
db_entry_dir = MakeShared<String>(db_entry_json["db_entry_dir"]);
}
UniquePtr<DBEntry> res = MakeUnique<DBEntry>(db_meta, deleted, db_entry_dir, db_name, txn_id, begin_ts);

u64 commit_ts = db_entry_json["commit_ts"];
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ SharedPtr<SegmentEntry> SegmentEntry::Deserialize(const nlohmann::json &segment_
return segment_entry;
}

void SegmentEntry::FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint) {
void SegmentEntry::FlushDataToDisk(TxnTimeStamp max_commit_ts) {
auto block_entry_iter = BlockEntryIter(this);
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
block_entry->Flush(max_commit_ts);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public:

void FlushNewData(TxnTimeStamp flush_ts);

void FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint);
void FlushDataToDisk(TxnTimeStamp max_commit_ts);

static bool CheckDeleteConflict(Vector<Pair<SegmentEntry *, Vector<SegmentOffset>>> &&segments, TransactionID txn_id);

Expand Down
17 changes: 11 additions & 6 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ void Txn::DeltaCheckpoint(const TxnTimeStamp max_commit_ts) {
String dir_name = *txn_mgr_->GetBufferMgr()->BaseDir().get() + "/catalog";
String delta_path = String(fmt::format("{}/CATALOG_DELTA_ENTRY.DELTA.{}", dir_name, max_commit_ts));
// only save the catalog delta entry
bool skip = catalog_->FlushGlobalCatalogDeltaEntry(delta_path, max_commit_ts, false);
bool skip = catalog_->SaveDeltaCatalog(delta_path, max_commit_ts);
if (skip) {
return;
}
Expand All @@ -546,12 +546,17 @@ void Txn::DeltaCheckpoint(const TxnTimeStamp max_commit_ts) {

void Txn::FullCheckpoint(const TxnTimeStamp max_commit_ts) {
String dir_name = *txn_mgr_->GetBufferMgr()->BaseDir().get() + "/catalog";
String delta_path = String(fmt::format("{}/CATALOG_DELTA_ENTRY.FULL.{}", dir_name, max_commit_ts));
String catalog_path = String(fmt::format("{}/META_CATALOG.{}.json", dir_name, max_commit_ts));

catalog_->FlushGlobalCatalogDeltaEntry(delta_path, max_commit_ts, true);
catalog_->SaveAsFile(catalog_path, max_commit_ts);
wal_entry_->cmds_.push_back(MakeShared<WalCmdCheckpoint>(max_commit_ts, true, catalog_path));
// String delta_path = String(fmt::format("{}/META_CATALOG.{}.delta", dir_name, max_commit_ts));
// String delta_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.delta", dir_name, max_commit_ts));
//
// String catalog_path = String(fmt::format("{}/META_CATALOG.{}.json", dir_name, max_commit_ts));
// String catalog_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.json", dir_name, max_commit_ts));

// Full Check point don't need increment checkpoint
// catalog_->SaveDeltaCatalog(delta_path, max_commit_ts, true);
UniquePtr<String> catalog_path_ptr = catalog_->SaveFullCatalog(dir_name, max_commit_ts);
wal_entry_->cmds_.push_back(MakeShared<WalCmdCheckpoint>(max_commit_ts, true, *catalog_path_ptr));
}

void Txn::AddDBStore(DBEntry *db_entry) { txn_dbs_.insert(db_entry); }
Expand Down
4 changes: 2 additions & 2 deletions src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,8 @@ const String UpdateSegmentBloomFilterDataOp::ToString() const {
return fmt::format("UpdateSegmentBloomFilterDataOp db_name: {} table_name: {} segment_id: {}", db_name_, table_name_, segment_id_);
}

void AddSegmentEntryOp::FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint) {
this->segment_entry_->FlushDataToDisk(max_commit_ts, is_full_checkpoint);
void AddSegmentEntryOp::FlushDataToDisk(TxnTimeStamp max_commit_ts) {
this->segment_entry_->FlushDataToDisk(max_commit_ts);
LOG_TRACE("Segment has flushed to disk, now try set to deprecated if it was compacted before");
this->segment_entry_->TrySetDeprecated();
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/catalog_delta_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public:
const String EncodeIndex() const final {
return String(fmt::format("{}#{}#{}#{}#{}", i32(GetType()), txn_id_, *this->db_name_, *this->table_name_, this->segment_id_));
}
void FlushDataToDisk(TxnTimeStamp max_commit_ts, bool is_full_checkpoint);
void FlushDataToDisk(TxnTimeStamp max_commit_ts);

public:
const String &db_name() const { return *db_name_; }
Expand Down

0 comments on commit 24907d0

Please sign in to comment.