diff --git a/include/pika_define.h b/include/pika_define.h index fb3a1fed7a..8c5270ae6d 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -40,6 +40,9 @@ const std::string kPikaPidFile = "pika.pid"; const std::string kPikaSecretFile = "rsync.secret"; const std::string kDefaultRsyncAuth = "default"; +/* Rsync */ +const int kMaxRsyncParallelNum = 4; + struct DBStruct { DBStruct(std::string tn, const uint32_t pn, std::set pi) : db_name(std::move(tn)), slot_num(pn), slot_ids(std::move(pi)) {} diff --git a/include/pika_rm.h b/include/pika_rm.h index dc1d0c25fc..dd1a74f826 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -157,6 +157,8 @@ class SyncSlaveSlot : public SyncSlot { std::string LocalIp(); + void StopRsync(); + void ActivateRsync(); bool IsRsyncRunning() {return rsync_cli_->IsRunning();} diff --git a/include/rsync_client.h b/include/rsync_client.h index 45d9894e3e..23e8c323c5 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -6,7 +6,6 @@ #ifndef RSYNC_CLIENT_H_ #define RSYNC_CLIENT_H_ -#include #include #include #include @@ -16,6 +15,8 @@ #include #include +#include + #include "net/include/bg_thread.h" #include "net/include/net_cli.h" #include "pstd/include/env.h" @@ -23,6 +24,7 @@ #include "pstd/include/pstd_hash.h" #include "pstd/include/pstd_string.h" #include "pstd/include/pstd_status.h" +#include "include/pika_define.h" #include "include/rsync_client_thread.h" #include "include/throttle.h" #include "rsync_service.pb.h" @@ -35,6 +37,7 @@ namespace rsync { class RsyncWriter; class Session; class WaitObject; +class WaitObjectManager; class RsyncClient : public net::Thread { public: @@ -45,41 +48,51 @@ class RsyncClient : public net::Thread { }; RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id); void* ThreadMain() override; + void Copy(const std::set& file_set, int index); bool Init(); Status Start(); Status Stop(); bool IsRunning() { return state_.load() == RUNNING; } + bool IsStop() { + return state_.load() == STOP; + } bool IsIdle() { return state_.load() == IDLE;} void OnReceive(RsyncService::RsyncResponse* resp); private: bool Recover(); - Status Wait(RsyncService::RsyncResponse*& resp); - Status CopyRemoteFile(const std::string& filename); + Status CopyRemoteFile(const std::string& filename, int index); Status CopyRemoteMeta(std::string* snapshot_uuid, std::set* file_set); Status LoadLocalMeta(std::string* snapshot_uuid, std::map* file_map); std::string GetLocalMetaFilePath(); Status FlushMetaTable(); Status CleanUpExpiredFiles(bool need_reset_path, const std::set& files); - Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set& expired_files, std::map* localFileMap); + Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set& expired_files, + std::map* localFileMap); void HandleRsyncMetaResponse(RsyncService::RsyncResponse* response); private: + typedef std::unique_ptr NetThreadUPtr; + std::map meta_table_; - int flush_period_ = 10; std::set file_set_; std::string snapshot_uuid_; std::string dir_; std::string db_name_; uint32_t slot_id_ = 0; - std::unique_ptr client_thread_; + + NetThreadUPtr client_thread_; + std::vector work_threads_; + std::atomic finished_work_cnt_ = 0; + std::atomic state_; int max_retries_ = 10; - std::unique_ptr wo_; + std::unique_ptr wo_mgr_; std::condition_variable cond_; std::mutex mu_; + std::unique_ptr throttle_; std::string master_ip_; int master_port_; @@ -129,22 +142,90 @@ class WaitObject { public: WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {} ~WaitObject() {} + void Reset(const std::string& filename, RsyncService::Type t, size_t offset) { + std::lock_guard guard(mu_); resp_ = nullptr; filename_ = filename; type_ = t; offset_ = offset; } - void Reset(RsyncService::Type t) { - resp_ = nullptr; - filename_ = ""; - type_ = t; - offset_ = 0xFFFFFFFF; + + pstd::Status Wait(RsyncService::RsyncResponse*& resp) { + pstd::Status s = Status::Timeout("rsync timeout", "timeout"); + { + std::unique_lock lock(mu_); + auto cv_s = cond_.wait_for(lock, std::chrono::seconds(3), [this] { + return resp_ != nullptr; + }); + if (!cv_s) { + return s; + } + resp = resp_; + s = Status::OK(); + } + return s; } + + void WakeUp(RsyncService::RsyncResponse* resp) { + std::unique_lock lock(mu_); + resp_ = resp; + cond_.notify_all(); + } + + RsyncService::RsyncResponse* Response() {return resp_;} + std::string Filename() {return filename_;} + RsyncService::Type Type() {return type_;} + size_t Offset() {return offset_;} +private: std::string filename_; RsyncService::Type type_; size_t offset_ = 0xFFFFFFFF; RsyncService::RsyncResponse* resp_ = nullptr; + std::condition_variable cond_; + std::mutex mu_; +}; + +class WaitObjectManager { +public: + WaitObjectManager() { + wo_vec_.resize(kMaxRsyncParallelNum); + for (int i = 0; i < kMaxRsyncParallelNum; i++) { + wo_vec_[i] = new WaitObject(); + } + } + ~WaitObjectManager() { + for (int i = 0; i < wo_vec_.size(); i++) { + delete wo_vec_[i]; + wo_vec_[i] = nullptr; + } + } + + WaitObject* UpdateWaitObject(int worker_index, const std::string& filename, + RsyncService::Type type, size_t offset) { + std::lock_guard guard(mu_); + wo_vec_[worker_index]->Reset(filename, type, offset); + return wo_vec_[worker_index]; + } + + void WakeUp(RsyncService::RsyncResponse* resp) { + std::lock_guard guard(mu_); + int index = resp->reader_index(); + if (wo_vec_[index] == nullptr || resp->type() != wo_vec_[index]->Type()) { + delete resp; + return; + } + if (resp->type() == RsyncService::kRsyncFile && + (resp->file_resp().filename() != wo_vec_[index]->Filename())) { + delete resp; + return; + } + wo_vec_[index]->WakeUp(resp); + } + +private: + std::vector wo_vec_; + std::mutex mu_; }; } // end namespace rsync diff --git a/include/rsync_server.h b/include/rsync_server.h index eef55fc661..d01cb47c8d 100644 --- a/include/rsync_server.h +++ b/include/rsync_server.h @@ -21,10 +21,11 @@ #include "rsync_service.pb.h" namespace rsync { +class RsyncServerConn; struct RsyncServerTaskArg { std::shared_ptr req; - std::shared_ptr conn; - RsyncServerTaskArg(std::shared_ptr _req, std::shared_ptr _conn) + std::shared_ptr conn; + RsyncServerTaskArg(std::shared_ptr _req, std::shared_ptr _conn) : req(std::move(_req)), conn(std::move(_conn)) {} }; class RsyncReader; @@ -52,6 +53,8 @@ class RsyncServerConn : public net::PbConn { static void HandleMetaRsyncRequest(void* arg); static void HandleFileRsyncRequest(void* arg); private: + std::vector > readers_; + std::mutex mu_; void* data_ = nullptr; }; @@ -86,6 +89,95 @@ class RsyncServerThread : public net::HolyThread { RsyncServerHandle handle_; }; +class RsyncReader { +public: + RsyncReader() { + block_data_ = new char[kBlockSize]; + } + ~RsyncReader() { + if (!filepath_.empty()) { + Reset(); + } + delete []block_data_; + } + pstd::Status Read(const std::string filepath, const size_t offset, + const size_t count, char* data, size_t* bytes_read, + std::string* checksum, bool* is_eof) { + std::lock_guard guard(mu_); + pstd::Status s = Seek(filepath, offset); + if (!s.ok()) { + return s; + } + size_t offset_in_block = offset % kBlockSize; + size_t copy_count = count > (end_offset_ - offset) ? end_offset_ - offset : count; + memcpy(data, block_data_ + offset_in_block, copy_count); + *bytes_read = copy_count; + *is_eof = (offset + copy_count == total_size_); + return pstd::Status::OK(); + } +private: + pstd::Status Seek(const std::string filepath, const size_t offset) { + if (filepath == filepath_ && offset >= start_offset_ && offset < end_offset_) { + return pstd::Status::OK(); + } + if (filepath != filepath_) { + Reset(); + fd_ = open(filepath.c_str(), O_RDONLY); + if (fd_ < 0) { + return pstd::Status::IOError("fd open failed"); + } + filepath_ = filepath; + struct stat buf; + stat(filepath.c_str(), &buf); + total_size_ = buf.st_size; + } + start_offset_ = (offset / kBlockSize) * kBlockSize; + + size_t read_offset = start_offset_; + size_t read_count = kBlockSize > (total_size_ - read_offset) ? (total_size_ - read_offset) : kBlockSize; + ssize_t bytesin = 0; + char* ptr = block_data_; + while ((bytesin = pread(fd_, ptr, read_count, read_offset)) > 0) { + read_count -= bytesin; + read_offset += bytesin; + ptr += bytesin; + if (read_count <= 0) { + break; + } + } + if (bytesin < 0) { + LOG(ERROR) << "unable to read from " << filepath_; + Reset(); + return pstd::Status::IOError("unable to read from " + filepath); + } + end_offset_ = start_offset_ + (ptr - block_data_); + return pstd::Status::OK(); + } + void Reset() { + total_size_ = -1; + start_offset_ = 0xFFFFFFFF; + end_offset_ = 0xFFFFFFFF; + memset(block_data_, 0, kBlockSize); + md5_.reset(new pstd::MD5()); + filepath_ = ""; + close(fd_); + fd_ = -1; + } + +private: + std::mutex mu_; + const size_t kBlockSize = 16 << 20; + + char* block_data_; + size_t start_offset_ = -1; + size_t end_offset_ = -1; + size_t total_size_ = -1; + + int fd_ = -1; + std::string filepath_; + std::unique_ptr md5_; +}; + } //end namespace rsync #endif diff --git a/src/net/include/server_thread.h b/src/net/include/server_thread.h index 5202b9a7e0..d0d6d63612 100644 --- a/src/net/include/server_thread.h +++ b/src/net/include/server_thread.h @@ -175,6 +175,7 @@ class ServerThread : public Thread { // process events in notify_queue virtual void ProcessNotifyEvents(const NetFiredEvent* pfe); + const ServerHandle* handle_; bool own_handle_ = false; diff --git a/src/net/src/dispatch_thread.cc b/src/net/src/dispatch_thread.cc index 272e1cd9c2..7011aa1aa3 100644 --- a/src/net/src/dispatch_thread.cc +++ b/src/net/src/dispatch_thread.cc @@ -23,7 +23,6 @@ DispatchThread::DispatchThread(int port, int work_num, ConnFactory* conn_factory for (int i = 0; i < work_num_; i++) { worker_thread_.emplace_back(std::make_unique(conn_factory, this, queue_limit, cron_interval)); } - timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();}); } DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, ConnFactory* conn_factory, @@ -35,7 +34,6 @@ DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, Co for (int i = 0; i < work_num_; i++) { worker_thread_.emplace_back(std::make_unique(conn_factory, this, queue_limit, cron_interval)); } - timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();}); } DispatchThread::DispatchThread(const std::set& ips, int port, int work_num, ConnFactory* conn_factory, @@ -47,7 +45,6 @@ DispatchThread::DispatchThread(const std::set& ips, int port, int w for (int i = 0; i < work_num_; i++) { worker_thread_.emplace_back(std::make_unique(conn_factory, this, queue_limit, cron_interval)); } - timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();}); } DispatchThread::~DispatchThread() = default; @@ -67,7 +64,13 @@ int DispatchThread::StartThread() { return ret; } } - timed_scan_thread.StartThread(); + + // Adding timer tasks and run timertaskThread + timerTaskThread_.AddTimerTask( + "blrpop_blocking_info_scan", 250, true, [this] { this->ScanExpiredBlockedConnsOfBlrpop();}); + + + timerTaskThread_.StartThread(); return ServerThread::StartThread(); } @@ -88,7 +91,7 @@ int DispatchThread::StopThread() { worker_thread_[i]->private_data_ = nullptr; } } - timed_scan_thread.StopThread(); + timerTaskThread_.StopThread(); return ServerThread::StopThread(); } @@ -258,6 +261,7 @@ void DispatchThread::ScanExpiredBlockedConnsOfBlrpop() { void DispatchThread::SetQueueLimit(int queue_limit) { queue_limit_ = queue_limit; } + extern ServerThread* NewDispatchThread(int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle) { return new DispatchThread(port, work_num, conn_factory, cron_interval, queue_limit, handle); diff --git a/src/net/src/dispatch_thread.h b/src/net/src/dispatch_thread.h index bf45ba1265..15af2fe952 100644 --- a/src/net/src/dispatch_thread.h +++ b/src/net/src/dispatch_thread.h @@ -54,27 +54,6 @@ class BlockedConnNode { BlockKeyType block_type_; }; -class TimedScanThread : public Thread { - public: - template - void SetTimedTask(double interval, F&& f, Args&&... args) { - time_interval_ = interval; - timed_task_ = [f = std::forward(f), args = std::make_tuple(std::forward(args)...)]{ - std::apply(f, args); - }; - } - private: - void* ThreadMain() override{ - while(!should_stop()){ - timed_task_(); - sleep(time_interval_); - } - return nullptr; - } - std::function timed_task_; - // unit in seconds - double time_interval_; -}; class DispatchThread : public ServerThread { public: @@ -129,9 +108,9 @@ class DispatchThread : public ServerThread { return blocked_conn_to_keys_; } std::shared_mutex& GetBlockMtx() { return block_mtx_; }; - // BlPop/BrPop used end + private: /* * Here we used auto poll to find the next work thread, @@ -168,9 +147,7 @@ class DispatchThread : public ServerThread { */ std::shared_mutex block_mtx_; - //used for blpop/brpop currently - TimedScanThread timed_scan_thread; - + TimerTaskThread timerTaskThread_; }; // class DispatchThread } // namespace net diff --git a/src/net/src/net_util.cc b/src/net/src/net_util.cc index 6bb167994a..791df31e86 100644 --- a/src/net/src/net_util.cc +++ b/src/net/src/net_util.cc @@ -26,4 +26,123 @@ int Setnonblocking(int sockfd) { return flags; } +uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, + const std::function& task) { + TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task}; + id_to_task_[new_task.task_id] = new_task; + + int64_t next_expired_time = NowInMs() + interval_ms; + exec_queue_.insert({next_expired_time, new_task.task_id}); + + if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) { + min_interval_ms_ = interval_ms; + } + // return the id of this task + return new_task.task_id; +} +int64_t TimerTaskManager::NowInMs() { + auto now = std::chrono::system_clock::now(); + return std::chrono::time_point_cast(now).time_since_epoch().count(); +} +int TimerTaskManager::ExecTimerTask() { + std::vector fired_tasks_; + int64_t now_in_ms = NowInMs(); + // traverse in ascending order + for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) { + if (pair->exec_ts <= now_in_ms) { + auto it = id_to_task_.find(pair->id); + assert(it != id_to_task_.end()); + it->second.fun(); + fired_tasks_.push_back({pair->exec_ts, pair->id}); + now_in_ms = NowInMs(); + } else { + break; + } + } + for (auto task : fired_tasks_) { + exec_queue_.erase(task); + auto it = id_to_task_.find(task.id); + assert(it != id_to_task_.end()); + if (it->second.repeat_exec) { + // this task need to be repeatedly exec, register it again + exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id}); + } else { + // this task only need to be exec once, completely remove this task + int interval_del = it->second.interval_ms; + id_to_task_.erase(task.id); + if (interval_del == min_interval_ms_) { + RenewMinIntervalMs(); + } + } + } + return min_interval_ms_; +} +bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) { + // remove the task + auto task_to_del = id_to_task_.find(task_id); + if (task_to_del == id_to_task_.end()) { + return false; + } + int interval_del = task_to_del->second.interval_ms; + id_to_task_.erase(task_to_del); + + // renew the min_interval_ms_ + if (interval_del == min_interval_ms_) { + RenewMinIntervalMs(); + } + + // remove from exec queue + ExecTsWithId target_key = {-1, 0}; + for (auto pair : exec_queue_) { + if (pair.id == task_id) { + target_key = {pair.exec_ts, pair.id}; + break; + } + } + if (target_key.exec_ts != -1) { + exec_queue_.erase(target_key); + } + return true; +} + +void TimerTaskManager::RenewMinIntervalMs() { + min_interval_ms_ = -1; + for (auto pair : id_to_task_) { + if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) { + min_interval_ms_ = pair.second.interval_ms; + } + } +} + +TimerTaskThread::~TimerTaskThread() { + if (!timer_task_manager_.Empty()) { + LOG(INFO) << "TimerTaskThread exit !!!"; + } +} +int TimerTaskThread::StartThread() { + if (timer_task_manager_.Empty()) { + LOG(INFO) << "No Timer task registered, TimerTaskThread won't be created."; + // if there is no timer task registered, no need of start the thread + return -1; + } + LOG(INFO) << "TimerTaskThread Starting..."; + return Thread::StartThread(); +} +int TimerTaskThread::StopThread() { + if (timer_task_manager_.Empty()) { + LOG(INFO) << "TimerTaskThread::StopThread : TimerTaskThread didn't create, no need to stop it."; + // if there is no timer task registered, the thread didn't even start + return -1; + } + return Thread::StopThread(); +} + +void* TimerTaskThread::ThreadMain() { + int timeout; + while (!should_stop()) { + timeout = timer_task_manager_.ExecTimerTask(); + net_multiplexer_->NetPoll(timeout); + } + return nullptr; +} } // namespace net diff --git a/src/net/src/net_util.h b/src/net/src/net_util.h index 80e2d68f1a..a6fcbdc932 100644 --- a/src/net/src/net_util.h +++ b/src/net/src/net_util.h @@ -6,16 +6,95 @@ #ifndef NET_SRC_NET_UTIL_H_ #define NET_SRC_NET_UTIL_H_ #include +#include +#include #include -#include +#include +#include #include +#include #include +#include +#include "net/src/net_multiplexer.h" +#include "net/include/net_thread.h" namespace net { int Setnonblocking(int sockfd); +struct TimedTask{ + uint32_t task_id; + std::string task_name; + int interval_ms; + bool repeat_exec; + std::function fun; +}; +struct ExecTsWithId { + //the next exec time of the task, unit in ms + int64_t exec_ts; + //id of the task to be exec + uint32_t id; + + bool operator<(const ExecTsWithId& other) const{ + if(exec_ts == other.exec_ts){ + return id < other.id; + } + return exec_ts < other.exec_ts; + } + bool operator==(const ExecTsWithId& other) const { + return exec_ts == other.exec_ts && id == other.id; + } +}; + +class TimerTaskManager { + public: + TimerTaskManager() = default; + ~TimerTaskManager() = default; + + uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task); + //return the newest min_minterval_ms + int ExecTimerTask(); + bool DelTimerTaskByTaskId(uint32_t task_id); + int GetMinIntervalMs() const { return min_interval_ms_; } + int64_t NowInMs(); + void RenewMinIntervalMs(); + bool Empty(){ return 0 == last_task_id_; } + + private: + //items stored in std::set are ascending ordered, we regard it as an auto sorted queue + std::set exec_queue_; + std::unordered_map id_to_task_; + uint32_t last_task_id_{0}; + int min_interval_ms_{-1}; +}; + + + +class TimerTaskThread : public Thread { + public: + TimerTaskThread(){ + net_multiplexer_.reset(CreateNetMultiplexer()); + net_multiplexer_->Initialize(); + } + ~TimerTaskThread() override; + int StartThread() override; + int StopThread() override; + + uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task){ + return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task); + }; + + bool DelTimerTaskByTaskId(uint32_t task_id){ + return timer_task_manager_.DelTimerTaskByTaskId(task_id); +}; + + private: + void* ThreadMain() override; + + TimerTaskManager timer_task_manager_; + std::unique_ptr net_multiplexer_; +}; } // namespace net diff --git a/src/net/src/server_thread.cc b/src/net/src/server_thread.cc index e28db3ce50..6c9e894cf3 100644 --- a/src/net/src/server_thread.cc +++ b/src/net/src/server_thread.cc @@ -177,7 +177,6 @@ void* ServerThread::ThreadMain() { char port_buf[32]; char ip_addr[INET_ADDRSTRLEN] = ""; - while (!should_stop()) { if (cron_interval_ > 0) { gettimeofday(&now, nullptr); diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 3db4ad478a..f264b5cceb 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -164,6 +164,7 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { slave_slot->SetMasterSessionId(session_id); std::string slot_name = slave_slot->SlotName(); + slave_slot->StopRsync(); slave_slot->SetReplState(ReplState::kWaitDBSync); LOG(INFO) << "Slot: " << slot_name << " Need Wait To Sync"; } diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 7e2485e382..555fa84405 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -639,6 +639,10 @@ std::string SyncSlaveSlot::LocalIp() { return local_ip_; } +void SyncSlaveSlot::StopRsync() { + rsync_cli_->Stop(); +} + void SyncSlaveSlot::ActivateRsync() { if (!rsync_cli_->IsIdle()) { return; diff --git a/src/rsync_client.cc b/src/rsync_client.cc index bbf45a8f46..aec8caceac 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -17,13 +17,41 @@ using namespace RsyncService; extern PikaServer* g_pika_server; +const int kFlushIntervalUs = 10 * 1000 * 1000; +const int kThrottleBytesPerSecond = 300 << 20; +const int kBytesPerRequest = 4 << 20; +const int kThrottleCheckCycle = 10; + namespace rsync { RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id) - : flush_period_(10), snapshot_uuid_(""), dir_(dir), db_name_(db_name), slot_id_(slot_id), + : snapshot_uuid_(""), dir_(dir), db_name_(db_name), slot_id_(slot_id), state_(IDLE), max_retries_(10), master_ip_(""), master_port_(0) { - client_thread_ = std::make_unique(10 * 1000, 1 * 1000, this); - wo_.reset(new WaitObject()); - throttle_.reset(new Throttle()); + wo_mgr_.reset(new WaitObjectManager()); + client_thread_ = std::make_unique(10 * 1000, 60 * 1000, wo_mgr_.get()); + work_threads_.resize(kMaxRsyncParallelNum); + throttle_.reset(new Throttle(kThrottleBytesPerSecond, kThrottleCheckCycle)); + finished_work_cnt_.store(0); +} + +void RsyncClient::Copy(const std::set& file_set, int index) { + Status s = Status::OK(); + for (const auto& file : file_set) { + while (state_.load() == RUNNING) { + LOG(INFO) << "copy remote file, filename: " << file; + s = CopyRemoteFile(file, index); + if (!s.ok()) { + LOG(WARNING) << "copy remote file failed, msg: " << s.ToString(); + continue; + } + break; + } + if (state_.load() != RUNNING) { + break; + } + } + LOG(INFO) << "work_thread index: " << index << " copy remote files done"; + finished_work_cnt_.fetch_add(1); + cond_.notify_all(); } bool RsyncClient::Init() { @@ -37,113 +65,106 @@ bool RsyncClient::Init() { client_thread_->StartThread(); bool ret = Recover(); if (!ret) { - LOG(WARNING) << "RsyncClient recover failed..."; + LOG(WARNING) << "RsyncClient recover failed"; client_thread_->StopThread(); return false; } - LOG(INFO) << "RsyncClient recover success..."; + finished_work_cnt_.store(0); + LOG(INFO) << "RsyncClient recover success"; return true; } void* RsyncClient::ThreadMain() { - int cnt = 0; - int period = 0; - Status s = Status::OK(); - LOG(INFO) << "RsyncClient ThreadMain..."; if (file_set_.empty()) { + LOG(INFO) << "No remote files need copy, RsyncClient exit"; state_.store(STOP); return nullptr; } + Status s = Status::OK(); + LOG(INFO) << "RsyncClient begin to copy remote files"; + std::vector > file_vec(kMaxRsyncParallelNum); + int index = 0; + for (const auto& file : file_set_) { + file_vec[index++ % kMaxRsyncParallelNum].insert(file); + } + + for (int i = 0; i < kMaxRsyncParallelNum; i++) { + work_threads_[i] = std::move(std::thread(&RsyncClient::Copy, this, file_vec[i], i)); + } + std::string meta_file_path = GetLocalMetaFilePath(); std::ofstream outfile; - outfile.open(meta_file_path, std::ios_base::app); // append instead of overwrite + outfile.open(meta_file_path, std::ios_base::app); std::string meta_rep; + uint64_t start_time = pstd::NowMicros(); + + while (state_.load() == RUNNING) { + uint64_t now = pstd::NowMicros(); + uint64_t elapse = pstd::NowMicros() - start_time; + if (elapse < kFlushIntervalUs) { + int wait_for_us = kFlushIntervalUs - elapse; + std::unique_lock lock(mu_); + cond_.wait_for(lock, std::chrono::microseconds(wait_for_us)); + } - for (const auto& file : file_set_) { - LOG(INFO) << "CopyRemoteFile: " << file; - while (state_.load() == RUNNING) { - s = CopyRemoteFile(file); - if (!s.ok()) { - LOG(WARNING) << "rsync CopyRemoteFile failed, filename: " << file; - continue; - } + if (state_.load() != RUNNING) { break; } - if (state_.load(std::memory_order_relaxed) != RUNNING) { - break; + + start_time = pstd::NowMicros(); + std::map files_map; + { + std::lock_guard guard(mu_); + files_map.swap(meta_table_); } - meta_rep.append(file + ":" + meta_table_[file]); - meta_rep.append("\n"); - if (++period == flush_period_) { - period = 0; - outfile << meta_rep; - outfile.flush(); - meta_rep.clear(); + for (const auto& file : files_map) { + meta_rep.append(file.first + ":" + file.second); + meta_rep.append("\n"); } - } - if (!meta_rep.empty()) { outfile << meta_rep; outfile.flush(); - } - state_.store(STOP); - LOG(INFO) << "RsyncClient fetch copy remote files done..."; - return nullptr; -} + meta_rep.clear(); -void RsyncClient::OnReceive(RsyncResponse* resp) { - std::unique_lock lock(mu_); - if (resp->type() != wo_->type_) { - delete resp; - resp = nullptr; - return; - } - if (resp->type() == kRsyncFile && - (resp->file_resp().filename() != wo_->filename_ || resp->file_resp().offset() != wo_->offset_)) { - delete resp; - resp = nullptr; - return; + if (finished_work_cnt_.load() == kMaxRsyncParallelNum) { + break; + } } - wo_->resp_ = resp; - cond_.notify_all(); -} -Status RsyncClient::Wait(RsyncResponse*& resp) { - Status s = Status::Timeout("rsync timeout", "timeout"); - { - std::unique_lock lock(mu_); - auto cv_s = cond_.wait_for(lock, std::chrono::seconds(3), [this] { - return this->wo_->resp_ != nullptr; - }); - if (!cv_s) { - return s; - } - resp = wo_->resp_; - s = Status::OK(); + for (int i = 0; i < kMaxRsyncParallelNum; i++) { + work_threads_[i].join(); } - return s; + finished_work_cnt_.store(0); + state_.store(STOP); + LOG(INFO) << "RsyncClient copy remote files done"; + return nullptr; } -Status RsyncClient::CopyRemoteFile(const std::string& filename) { - Status s; - int retries = 0; - size_t offset = 0; - size_t copy_file_begin_time = pstd::NowMicros(); - size_t count = throttle_->ThrottledByThroughput(4 * 1024 * 1024); - MD5 md5; +Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { std::unique_ptr writer(new RsyncWriter(dir_ + "/" + filename)); + Status s = Status::OK(); + size_t offset = 0; + int retries = 0; + DEFER { - if (writer) { - writer->Close(); - writer.reset(); - } - if (!s.ok()) { - DeleteFile(filename); - } + if (writer) { + writer->Close(); + writer.reset(); + } + if (!s.ok()) { + DeleteFile(filename); + } }; while (retries < max_retries_) { + size_t copy_file_begin_time = pstd::NowMicros(); + size_t count = throttle_->ThrottledByThroughput(kBytesPerRequest); + if (count == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000 / kThrottleCheckCycle)); + continue; + } RsyncRequest request; + request.set_reader_index(index); request.set_type(kRsyncFile); request.set_db_name(db_name_); request.set_slot_id(slot_id_); @@ -151,35 +172,35 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename) { file_req->set_filename(filename); file_req->set_offset(offset); file_req->set_count(count); + std::string to_send; request.SerializeToString(&to_send); - + WaitObject* wo = wo_mgr_->UpdateWaitObject(index, filename, kRsyncFile, offset); s = client_thread_->Write(master_ip_, master_port_, to_send); if (!s.ok()) { LOG(WARNING) << "send rsync request failed"; continue; } - { - std::lock_guard lock(mu_); - wo_->Reset(filename, kRsyncFile, offset); - } - RsyncResponse* resp = nullptr; - s = Wait(resp); + DEFER { + if (resp) { + delete resp; + } + }; + s = wo->Wait(resp); if (s.IsTimeout() || resp == nullptr) { LOG(WARNING) << "rsync request timeout"; retries++; continue; } - size_t copy_file_end_time = pstd::NowMicros(); - size_t elaspe_time_us = copy_file_end_time - copy_file_begin_time; - throttle_->ReturnUnusedThroughput(count, resp->file_resp().count(), elaspe_time_us); + size_t ret_count = resp->file_resp().count(); + size_t elaspe_time_us = pstd::NowMicros() - copy_file_begin_time; + throttle_->ReturnUnusedThroughput(count, ret_count, elaspe_time_us); if (resp->code() != RsyncService::kOk) { //TODO: handle different error - delete resp; continue; } @@ -187,26 +208,17 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename) { LOG(WARNING) << "receive newer dump, reset state to STOP, local_snapshot_uuid:" << snapshot_uuid_ << "remote snapshot uuid: " << resp->snapshot_uuid(); state_.store(STOP); - delete resp; return s; } - size_t ret_count = resp->file_resp().count(); - resp->file_resp().data(); s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str()); if (!s.ok()) { LOG(WARNING) << "rsync client write file error"; break; } - md5.update(resp->file_resp().data().c_str(), ret_count); offset += resp->file_resp().count(); if (resp->file_resp().eof()) { - if (md5.finalize().hexdigest() != resp->file_resp().checksum()) { - LOG(WARNING) << "mismatch file checksum for file: " << filename; - s = Status::IOError("mismatch checksum", "mismatch checksum"); - return s; - } s = writer->Fsync(); if (!s.ok()) { return s; @@ -216,7 +228,9 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename) { return s; } writer.reset(); - meta_table_[filename] = resp->file_resp().checksum(); + mu_.lock(); + meta_table_[filename] = ""; + mu_.unlock(); break; } retries = 0; @@ -236,6 +250,7 @@ Status RsyncClient::Stop() { } LOG(WARNING) << "RsyncClient stop ..."; state_ = STOP; + cond_.notify_all(); StopThread(); client_thread_->StopThread(); JoinThread(); @@ -245,11 +260,11 @@ Status RsyncClient::Stop() { } bool RsyncClient::Recover() { + std::string local_snapshot_uuid; std::string remote_snapshot_uuid; + std::set local_file_set; std::set remote_file_set; - std::string local_snapshot_uuid; std::map local_file_map; - std::set local_file_set; Status s = CopyRemoteMeta(&remote_snapshot_uuid, &remote_file_set); if (!s.ok()) { @@ -272,13 +287,12 @@ bool RsyncClient::Recover() { file_set_ = remote_file_set; expired_files = local_file_set; } else { - for_each(remote_file_set.begin(), remote_file_set.end(), [](auto& file) {LOG(WARNING) << "remote_file_set: " << file;}); - for_each(local_file_set.begin(), local_file_set.end(), [](auto& file) {LOG(WARNING) << "local_file_set: " << file;}); - std::set newly_files; - set_difference(remote_file_set.begin(), remote_file_set.end(), local_file_set.begin(), local_file_set.end(), + set_difference(remote_file_set.begin(), remote_file_set.end(), + local_file_set.begin(), local_file_set.end(), inserter(newly_files, newly_files.begin())); - set_difference(local_file_set.begin(), local_file_set.end(), remote_file_set.begin(), remote_file_set.end(), + set_difference(local_file_set.begin(), local_file_set.end(), + remote_file_set.begin(), remote_file_set.end(), inserter(expired_files, expired_files.begin())); file_set_.insert(newly_files.begin(), newly_files.end()); } @@ -295,12 +309,17 @@ bool RsyncClient::Recover() { } state_ = RUNNING; - LOG(INFO) << "copy meta data done, slot_id: " << slot_id_ << "snapshot_uuid: " << snapshot_uuid_ - << "file count: " << file_set_.size() << "expired file count: " << expired_files.size() - << ", local file count: " << local_file_set.size() << "remote file count: " << remote_file_set.size() - << "remote snapshot_uuid: " << remote_snapshot_uuid << "local snapshot_uuid: " << local_snapshot_uuid - << "file_set_: " << file_set_.size(); - for_each(file_set_.begin(), file_set_.end(), [](auto& file) {LOG(WARNING) << "file_set: " << file;}); + LOG(INFO) << "copy meta data done, slot_id: " << slot_id_ + << " snapshot_uuid: " << snapshot_uuid_ + << " file count: " << file_set_.size() + << " expired file count: " << expired_files.size() + << " local file count: " << local_file_set.size() + << " remote file count: " << remote_file_set.size() + << " remote snapshot_uuid: " << remote_snapshot_uuid + << " local snapshot_uuid: " << local_snapshot_uuid + << " file_set_: " << file_set_.size(); + for_each(file_set_.begin(), file_set_.end(), + [](auto& file) {LOG(WARNING) << "file_set: " << file;}); return true; } @@ -308,22 +327,25 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::setUpdateWaitObject(0, "", kRsyncMeta, 0xFFFFFFFF); s = client_thread_->Write(master_ip_, master_port_, to_send); if (!s.ok()) { retries++; } - { - std::lock_guard lock(mu_); - wo_->Reset(kRsyncMeta); - } RsyncResponse* resp = nullptr; - s = Wait(resp); + DEFER { + if (resp) { + delete resp; + } + }; + s = wo->Wait(resp); if (s.IsTimeout() || resp == nullptr) { LOG(WARNING) << "rsync CopyRemoteMeta request timeout, " << "retry times: " << retries; @@ -333,7 +355,6 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::setcode() != RsyncService::kOk) { //TODO: handle different error - delete resp; continue; } LOG(INFO) << "receive rsync meta infos, snapshot_uuid: " << resp->snapshot_uuid() @@ -346,8 +367,6 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::setmeta_resp().filenames_size(); i++) { file_set->insert(resp->meta_resp().filenames(i)); } - delete resp; - resp = nullptr; break; } return s; @@ -356,6 +375,7 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::set* file_map) { std::string meta_file_path = GetLocalMetaFilePath(); if (!FileExists(meta_file_path)) { + LOG(WARNING) << kDumpMetaFileName << " not exist"; return Status::OK(); } diff --git a/src/rsync_client_thread.cc b/src/rsync_client_thread.cc index 1672f906fe..8e93a4c69b 100644 --- a/src/rsync_client_thread.cc +++ b/src/rsync_client_thread.cc @@ -31,8 +31,8 @@ int RsyncClientConn::DealMessage() { << " msg_len: " << header_len_; return -1; } - RsyncClient* handler = (RsyncClient*)cb_handler_; - handler->OnReceive(response); + WaitObjectManager* handler = (WaitObjectManager*)cb_handler_; + handler->WakeUp(response); return 0; } diff --git a/src/rsync_server.cc b/src/rsync_server.cc index 526c82b6aa..c8d28f51fb 100644 --- a/src/rsync_server.cc +++ b/src/rsync_server.cc @@ -17,63 +17,8 @@ extern PikaServer* g_pika_server; namespace rsync { using namespace net; -using namespace RsyncService; using namespace pstd; - -//TODO: optimzie file read and calculate checksum, maybe use RsyncReader prefeching file content -Status ReadDumpFile(const std::string filepath, const size_t offset, const size_t count, - char* data, size_t* bytes_read, std::string* checksum) { - int fd = open(filepath.c_str(), O_RDONLY); - if (fd < 0) { - return Status::IOError("fd open failed"); - } - DEFER { close(fd); }; - - const int kMaxCopyBlockSize = 1 << 20; - size_t read_offset = offset; - size_t read_count = count; - if (read_count > kMaxCopyBlockSize) { - read_count = kMaxCopyBlockSize; - } - ssize_t bytesin = 0; - size_t left_read_count = count; - - while ((bytesin = pread(fd, data, read_count, read_offset)) > 0) { - left_read_count -= bytesin; - if (left_read_count < 0) { - break ; - } - if (read_count > left_read_count) { - read_count = left_read_count; - } - - data += bytesin; - *bytes_read += bytesin; - read_offset += bytesin; - } - - if (bytesin == -1) { - LOG(ERROR) << "unable to read from " << filepath; - return pstd::Status::IOError("unable to read from " + filepath); - } - - if (bytesin == 0) { - char* buffer = new char[kMaxCopyBlockSize]; - pstd::MD5 md5; - - while ((bytesin = read(fd, buffer, kMaxCopyBlockSize)) > 0) { - md5.update(buffer, bytesin); - } - if (bytesin == -1) { - LOG(ERROR) << "unable to read from " << filepath; - delete []buffer; - return pstd::Status::IOError("unable to read from " + filepath); - } - delete []buffer; - *checksum = md5.finalize().hexdigest(); - } - return pstd::Status::OK(); -} +using namespace RsyncService; void RsyncWriteResp(RsyncService::RsyncResponse& response, std::shared_ptr conn) { std::string reply_str; @@ -122,9 +67,19 @@ int RsyncServer::Stop() { RsyncServerConn::RsyncServerConn(int connfd, const std::string& ip_port, Thread* thread, void* worker_specific_data, NetMultiplexer* mpx) - : PbConn(connfd, ip_port, thread, mpx), data_(worker_specific_data) {} + : PbConn(connfd, ip_port, thread, mpx), data_(worker_specific_data) { + readers_.resize(kMaxRsyncParallelNum); + for (int i = 0; i < kMaxRsyncParallelNum; i++) { + readers_[i].reset(new RsyncReader()); + } +} -RsyncServerConn::~RsyncServerConn() {} +RsyncServerConn::~RsyncServerConn() { + std::lock_guard guard(mu_); + for (int i = 0; i < readers_.size(); i++) { + readers_[i].reset(); + } +} int RsyncServerConn::DealMessage() { std::shared_ptr req = std::make_shared(); @@ -166,6 +121,7 @@ void RsyncServerConn::HandleMetaRsyncRequest(void* arg) { } RsyncService::RsyncResponse response; + response.set_reader_index(req->reader_index()); response.set_code(RsyncService::kOk); response.set_type(RsyncService::kRsyncMeta); response.set_db_name(db_name); @@ -192,7 +148,7 @@ void RsyncServerConn::HandleMetaRsyncRequest(void* arg) { void RsyncServerConn::HandleFileRsyncRequest(void* arg) { std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; - std::shared_ptr conn = task_arg->conn; + std::shared_ptr conn = task_arg->conn; uint32_t slot_id = req->slot_id(); std::string db_name = req->db_name(); @@ -201,6 +157,7 @@ void RsyncServerConn::HandleFileRsyncRequest(void* arg) { size_t count = req->file_req().count(); RsyncService::RsyncResponse response; + response.set_reader_index(req->reader_index()); response.set_code(RsyncService::kOk); response.set_type(RsyncService::kRsyncFile); response.set_db_name(db_name); @@ -226,9 +183,12 @@ void RsyncServerConn::HandleFileRsyncRequest(void* arg) { const std::string filepath = slot->bgsave_info().path + "/" + filename; char* buffer = new char[req->file_req().count() + 1]; - std::string checksum = ""; size_t bytes_read{0}; - s = ReadDumpFile(filepath, offset, count, buffer, &bytes_read, &checksum); + std::string checksum = ""; + bool is_eof = false; + std::shared_ptr reader = conn->readers_[req->reader_index()]; + s = reader->Read(filepath, offset, count, buffer, + &bytes_read, &checksum, &is_eof); if (!s.ok()) { response.set_code(RsyncService::kErr); RsyncWriteResp(response, conn); @@ -238,7 +198,7 @@ void RsyncServerConn::HandleFileRsyncRequest(void* arg) { RsyncService::FileResponse* file_resp = response.mutable_file_resp(); file_resp->set_data(buffer, bytes_read); - file_resp->set_eof(bytes_read != count); + file_resp->set_eof(is_eof); file_resp->set_checksum(checksum); file_resp->set_filename(filename); file_resp->set_count(bytes_read); diff --git a/src/rsync_service.proto b/src/rsync_service.proto index 73f6005bd6..ee23b3e8a4 100644 --- a/src/rsync_service.proto +++ b/src/rsync_service.proto @@ -32,18 +32,20 @@ message FileResponse { message RsyncRequest { required Type type = 1; - required string db_name = 2; - required uint32 slot_id = 3; - optional FileRequest file_req = 4; + required int32 reader_index = 2; + required string db_name = 3; + required uint32 slot_id = 4; + optional FileRequest file_req = 5; } message RsyncResponse { required Type type = 1; - required string snapshot_uuid = 2; - required string db_name = 3; - required uint32 slot_id = 4; - required StatusCode code = 5; - optional MetaResponse meta_resp = 6; - optional FileResponse file_resp = 7; + required int32 reader_index = 2; + required string snapshot_uuid = 3; + required string db_name = 4; + required uint32 slot_id = 5; + required StatusCode code = 6; + optional MetaResponse meta_resp = 7; + optional FileResponse file_resp = 8; }