Skip to content

Commit

Permalink
add timestamp for deleting suffix when flushdb
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh committed Jul 10, 2024
1 parent 7b125d8 commit 86578e2
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 42 deletions.
4 changes: 0 additions & 4 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
bool IsBgSaving();
BgSaveInfo bgsave_info();
pstd::Status GetKeyNum(std::vector<storage::KeyInfo>* key_info);
void ResetIsFlushingDBToFalse() {
is_flushing_db_.store(false);
}

private:
bool opened_ = false;
Expand All @@ -163,7 +160,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
pstd::Mutex key_info_protector_;
std::atomic<bool> binlog_io_error_;
std::shared_mutex dbs_rw_;
std::atomic<bool> is_flushing_db_;
// class may be shared, using shared_ptr would be a better choice
std::shared_ptr<pstd::lock::LockMgr> lock_mgr_;
std::shared_ptr<storage::Storage> storage_;
Expand Down
1 change: 0 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ class PikaServer : public pstd::noncopyable {
* Flushall & Flushdb used
*/
void PurgeDir(const std::string& path);
void PurgeDirWithCallBack(const std::string& path, const std::function<void()>& call_back);
void PurgeDirTaskSchedule(void (*function)(void*), void* arg);

/*
Expand Down
14 changes: 7 additions & 7 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,14 @@ bool FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
if (!db) {
LOG(ERROR) << "Flushall, but DB not found";
return false;
} else {
bool success = db->FlushDBWithoutLock();
if (!success) {
//if the db is not flushed, return before clear the cache
return success;
}
DoUpdateCache(db);
}
bool success = db->FlushDBWithoutLock();
if (!success) {
// if the db is not flushed, return before clear the cache
return success;
}
DoUpdateCache(db);

return true;
}
void FlushallCmd::DoBinlog() {
Expand Down
17 changes: 6 additions & 11 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ DB::DB(std::string db_name, const std::string& db_path,
pstd::CreatePath(log_path_);
lock_mgr_ = std::make_shared<pstd::lock::LockMgr>(1000, 0, std::make_shared<pstd::lock::MutexFactoryImpl>());
binlog_io_error_.store(false);
is_flushing_db_.store(false);
opened_ = s.ok();
assert(storage_);
assert(s.ok());
Expand Down Expand Up @@ -198,20 +197,18 @@ bool DB::FlushDBWithoutLock() {
return false;
}

bool expected = false;
if (!is_flushing_db_.compare_exchange_strong(expected, true, std::memory_order::memory_order_seq_cst)) {
LOG(WARNING) << db_name_ << ": Aborting curr FlushDB task due to the previous FlushDB task is still executing (deleting old db path is an async task)";
return false;
}

LOG(INFO) << db_name_ << " Delete old db...";
storage_.reset();

std::string dbpath = db_path_;
if (dbpath[dbpath.length() - 1] == '/') {
dbpath.erase(dbpath.length() - 1);
}
dbpath.append("_deleting/");
std::string delete_suffix("_deleting_");
delete_suffix.append(std::to_string(NowMicros()));
delete_suffix.append("/");
dbpath.append(delete_suffix);
LOG(INFO) << "remaned:" << dbpath;
auto rename_success = pstd::RenameFile(db_path_, dbpath);
storage_ = std::make_shared<storage::Storage>(g_pika_conf->db_instance_num(),
g_pika_conf->default_slot_num(), g_pika_conf->classic_mode());
Expand All @@ -225,9 +222,7 @@ bool DB::FlushDBWithoutLock() {
}
LOG(INFO) << db_name_ << " Open new db success";

//when dbpath successfully deleted, a call back fun will reset flushing db flag to false, so the next FlushDB task can be exec
auto reset_flag_call_back = [this]{ this->ResetIsFlushingDBToFalse(); };
g_pika_server->PurgeDirWithCallBack(dbpath, reset_flag_call_back);
g_pika_server->PurgeDir(dbpath);
return true;
}

Expand Down
19 changes: 0 additions & 19 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,6 @@ void DoPurgeDir(void* arg) {
LOG(INFO) << "Delete dir: " << *path << " done";
}

struct PurgeDirArgsWithCallBackArgs {
PurgeDirArgsWithCallBackArgs(std::string path, const std::function<void()>& call_back)
: path_(std::move(path)), call_back_(call_back) {}
std::string path_;
std::function<void()> call_back_;
};

void DoPurgeDirWithCallBack(void* arg) {
std::unique_ptr<PurgeDirArgsWithCallBackArgs> args(static_cast<PurgeDirArgsWithCallBackArgs*>(arg));
LOG(INFO) << "Delete dir: " << args->path_ << " start";
pstd::DeleteDir(args->path_);
LOG(INFO) << "Delete dir: " << args->path_ << " done";
args->call_back_();
}


PikaServer::PikaServer()
: exit_(false),
Expand Down Expand Up @@ -810,10 +795,6 @@ void PikaServer::PurgeDir(const std::string& path) {
PurgeDirTaskSchedule(&DoPurgeDir, static_cast<void*>(dir_path));
}

void PikaServer::PurgeDirWithCallBack(const std::string& path, const std::function<void()>& call_back) {
auto args = new PurgeDirArgsWithCallBackArgs(path, call_back);
PurgeDirTaskSchedule(&DoPurgeDirWithCallBack, static_cast<void*>(args));
}

void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.set_thread_name("PurgeDirTask");
Expand Down

0 comments on commit 86578e2

Please sign in to comment.