Skip to content

Commit

Permalink
Refactor cleanup ts (#2181)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Refactoring

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Nov 6, 2024
1 parent 8f743de commit 9637f27
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ TxnTimeStamp Txn::Commit() {
txn_store_.PrepareCommit1(); // Only for import and compact, pre-commit segment
// LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts));

if (txn_mgr_->CheckConflict(this)) {
if (txn_mgr_->CheckTxnConflict(this)) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}", txn_id_, commit_ts));
wal_entry_ = nullptr;
txn_mgr_->SendToWAL(this);
Expand Down
37 changes: 21 additions & 16 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import global_resource_usage;
namespace infinity {

TxnManager::TxnManager(BufferManager *buffer_mgr, WalManager *wal_mgr, TxnTimeStamp start_ts)
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false) {
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), current_ts_(start_ts), is_running_(false) {
#ifdef INFINITY_DEBUG
GlobalResourceUsage::IncrObjectCount("TxnManager");
#endif
Expand All @@ -65,7 +65,7 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text, bool ckp_txn) {
u64 new_txn_id = ++catalog_ptr->next_txn_id_;

// Record the start ts of the txn
TxnTimeStamp begin_ts = start_ts_ + 1;
TxnTimeStamp begin_ts = current_ts_ + 1;
if (ckp_txn) {
if (ckp_begin_ts_ != UNCOMMIT_TS) {
// not set ckp_begin_ts_ may not truncate the wal file.
Expand Down Expand Up @@ -120,23 +120,23 @@ bool TxnManager::CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts)
// Prepare to commit ReadTxn
TxnTimeStamp TxnManager::GetReadCommitTS(Txn *txn) {
std::lock_guard guard(locker_);
TxnTimeStamp commit_ts = start_ts_ + 1;
TxnTimeStamp commit_ts = current_ts_ + 1;
txn->SetTxnRead();
return commit_ts;
}

// Prepare to commit WriteTxn
TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
std::lock_guard guard(locker_);
start_ts_ += 2;
TxnTimeStamp commit_ts = start_ts_;
current_ts_ += 2;
TxnTimeStamp commit_ts = current_ts_;
wait_conflict_ck_.emplace(commit_ts, nullptr);
committing_txns_.emplace(txn);
txn->SetTxnWrite();
return commit_ts;
}

bool TxnManager::CheckConflict(Txn *txn) {
bool TxnManager::CheckTxnConflict(Txn *txn) {
TxnTimeStamp commit_ts = txn->CommitTS();
Vector<SharedPtr<Txn>> candidate_txns;
TxnTimeStamp min_checking_ts = UNCOMMIT_TS;
Expand Down Expand Up @@ -286,13 +286,14 @@ UniquePtr<TxnInfo> TxnManager::GetTxnInfoByID(TransactionID txn_id) const {
return MakeUnique<TxnInfo>(iter->first, iter->second->GetTxnText());
}

TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }
TxnTimeStamp TxnManager::CurrentTS() const { return current_ts_; }

TxnTimeStamp TxnManager::GetNewTimeStamp() { return start_ts_ + 1; }
TxnTimeStamp TxnManager::GetNewTimeStamp() { return current_ts_ + 1; }

TxnTimeStamp TxnManager::GetCleanupScanTS() {
std::lock_guard guard(locker_);
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
TxnTimeStamp first_uncommitted_begin_ts = current_ts_;
// Get earliest txn of the ongoing transactions
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() != nullptr) {
Expand All @@ -301,19 +302,23 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() {
}
beginned_txns_.pop_front();
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);

// Get the earlier txn ts between current ongoing txn and last checkpoint ts
TxnTimeStamp last_checkpoint_ts = wal_mgr_->LastCheckpointTS();
TxnTimeStamp least_ts = std::min(first_uncommitted_begin_ts, last_checkpoint_ts);

// Get the earlier txn ts between current least txn and checking conflict TS
if (!checking_ts_.empty()) {
res = std::min(res, *checking_ts_.begin());
least_ts = std::min(least_ts, *checking_ts_.begin());
}

LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts));
return res;
LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", least_ts, last_checkpoint_ts));
return least_ts;
}

void TxnManager::CleanupTxn(Txn *txn) {
TxnType txn_type = txn->GetTxnType();
switch(txn_type) {
switch (txn_type) {
case TxnType::kRead: {
// For read-only Txn only remove txn from txn_map
std::lock_guard guard(locker_);
Expand All @@ -323,7 +328,7 @@ void TxnManager::CleanupTxn(Txn *txn) {
case TxnType::kWrite: {
// For write txn, we need to update the state: committing->committed, rollbacking->rollbacked
TxnState txn_state = txn->GetTxnState();
switch(txn_state) {
switch (txn_state) {
case TxnState::kCommitting: {
txn->SetTxnCommitted();
break;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public:

TxnTimeStamp GetWriteCommitTS(Txn *txn);

bool CheckConflict(Txn *txn);
bool CheckTxnConflict(Txn *txn);

void SendToWAL(Txn *txn);

Expand Down Expand Up @@ -103,7 +103,7 @@ public:
bool InCheckpointProcess(TxnTimeStamp commit_ts);

// Only used by follower and learner when received the replicated log from leader
void SetStartTS(TxnTimeStamp new_start_ts) { start_ts_ = new_start_ts; }
void SetStartTS(TxnTimeStamp new_start_ts) { current_ts_ = new_start_ts; }

private:
mutable std::mutex locker_{};
Expand All @@ -118,8 +118,8 @@ private:

Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> start_ts_{}; // The next txn ts
TxnTimeStamp ckp_begin_ts_ = UNCOMMIT_TS; // cur ckp begin ts, 0 if no ckp is happening
Atomic<TxnTimeStamp> current_ts_{}; // The next txn ts
TxnTimeStamp ckp_begin_ts_ = UNCOMMIT_TS; // current ckp begin ts, UNCOMMIT_TS if no ckp is happening, UNCOMMIT_TS is a maximum u64 integer

// For stop the txn manager
atomic_bool is_running_{false};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void WalManager::PutEntries(Vector<WalEntry *> wal_entries) {
wait_flush_.EnqueueBulk(wal_entries);
}

TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_; }
TxnTimeStamp WalManager::LastCheckpointTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_; }

Vector<SharedPtr<String>> WalManager::GetDiffWalEntryString(TxnTimeStamp start_timestamp) const {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public:

void ReplayWalEntry(const WalEntry &entry, bool on_startup, bool is_replay);

TxnTimeStamp GetCheckpointedTS();
TxnTimeStamp LastCheckpointTS();

Vector<SharedPtr<String>> GetDiffWalEntryString(TxnTimeStamp timestamp) const;
void UpdateCommitState(TxnTimeStamp commit_ts, i64 wal_size);
Expand Down

0 comments on commit 9637f27

Please sign in to comment.