Skip to content

Commit

Permalink
atom->3.5 (#1887)
Browse files Browse the repository at this point in the history
* Revised Timer Task Thread (#1862)

* add TimertaskManager, removed TimedscanThread.

* Rsync client support multi thread (#1866)

* define rsync related header file and proto

---------

Co-authored-by: cheniujh <[email protected]>
Co-authored-by: wangshao1 <[email protected]>
  • Loading branch information
3 people committed Aug 4, 2023
1 parent 61aa47b commit b3c5268
Show file tree
Hide file tree
Showing 16 changed files with 582 additions and 238 deletions.
3 changes: 3 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const std::string kPikaPidFile = "pika.pid";
const std::string kPikaSecretFile = "rsync.secret";
const std::string kDefaultRsyncAuth = "default";

/* Rsync */
const int kMaxRsyncParallelNum = 4;

struct DBStruct {
DBStruct(std::string tn, const uint32_t pn, std::set<uint32_t> pi)
: db_name(std::move(tn)), slot_num(pn), slot_ids(std::move(pi)) {}
Expand Down
2 changes: 2 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class SyncSlaveSlot : public SyncSlot {

std::string LocalIp();

void StopRsync();

void ActivateRsync();

bool IsRsyncRunning() {return rsync_cli_->IsRunning();}
Expand Down
105 changes: 93 additions & 12 deletions include/rsync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#ifndef RSYNC_CLIENT_H_
#define RSYNC_CLIENT_H_

#include <glog/logging.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
Expand All @@ -16,13 +15,16 @@
#include <thread>
#include <condition_variable>

#include <glog/logging.h>

#include "net/include/bg_thread.h"
#include "net/include/net_cli.h"
#include "pstd/include/env.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/pstd_hash.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/pstd_status.h"
#include "include/pika_define.h"
#include "include/rsync_client_thread.h"
#include "include/throttle.h"
#include "rsync_service.pb.h"
Expand All @@ -35,6 +37,7 @@ namespace rsync {
class RsyncWriter;
class Session;
class WaitObject;
class WaitObjectManager;

class RsyncClient : public net::Thread {
public:
Expand All @@ -45,41 +48,51 @@ class RsyncClient : public net::Thread {
};
RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id);
void* ThreadMain() override;
void Copy(const std::set<std::string>& file_set, int index);
bool Init();
Status Start();
Status Stop();
bool IsRunning() {
return state_.load() == RUNNING;
}
bool IsStop() {
return state_.load() == STOP;
}
bool IsIdle() { return state_.load() == IDLE;}
void OnReceive(RsyncService::RsyncResponse* resp);

private:
bool Recover();
Status Wait(RsyncService::RsyncResponse*& resp);
Status CopyRemoteFile(const std::string& filename);
Status CopyRemoteFile(const std::string& filename, int index);
Status CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::string>* file_set);
Status LoadLocalMeta(std::string* snapshot_uuid, std::map<std::string, std::string>* file_map);
std::string GetLocalMetaFilePath();
Status FlushMetaTable();
Status CleanUpExpiredFiles(bool need_reset_path, const std::set<std::string>& files);
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files, std::map<std::string, std::string>* localFileMap);
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files,
std::map<std::string, std::string>* localFileMap);
void HandleRsyncMetaResponse(RsyncService::RsyncResponse* response);

private:
typedef std::unique_ptr<RsyncClientThread> NetThreadUPtr;

std::map<std::string, std::string> meta_table_;
int flush_period_ = 10;
std::set<std::string> file_set_;
std::string snapshot_uuid_;
std::string dir_;
std::string db_name_;
uint32_t slot_id_ = 0;
std::unique_ptr<RsyncClientThread> client_thread_;

NetThreadUPtr client_thread_;
std::vector<std::thread> work_threads_;
std::atomic<int> finished_work_cnt_ = 0;

std::atomic<State> state_;
int max_retries_ = 10;
std::unique_ptr<WaitObject> wo_;
std::unique_ptr<WaitObjectManager> wo_mgr_;
std::condition_variable cond_;
std::mutex mu_;

std::unique_ptr<Throttle> throttle_;
std::string master_ip_;
int master_port_;
Expand Down Expand Up @@ -129,22 +142,90 @@ class WaitObject {
public:
WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {}
~WaitObject() {}

void Reset(const std::string& filename, RsyncService::Type t, size_t offset) {
std::lock_guard<std::mutex> guard(mu_);
resp_ = nullptr;
filename_ = filename;
type_ = t;
offset_ = offset;
}
void Reset(RsyncService::Type t) {
resp_ = nullptr;
filename_ = "";
type_ = t;
offset_ = 0xFFFFFFFF;

pstd::Status Wait(RsyncService::RsyncResponse*& resp) {
pstd::Status s = Status::Timeout("rsync timeout", "timeout");
{
std::unique_lock<std::mutex> lock(mu_);
auto cv_s = cond_.wait_for(lock, std::chrono::seconds(3), [this] {
return resp_ != nullptr;
});
if (!cv_s) {
return s;
}
resp = resp_;
s = Status::OK();
}
return s;
}

void WakeUp(RsyncService::RsyncResponse* resp) {
std::unique_lock<std::mutex> lock(mu_);
resp_ = resp;
cond_.notify_all();
}

RsyncService::RsyncResponse* Response() {return resp_;}
std::string Filename() {return filename_;}
RsyncService::Type Type() {return type_;}
size_t Offset() {return offset_;}
private:
std::string filename_;
RsyncService::Type type_;
size_t offset_ = 0xFFFFFFFF;
RsyncService::RsyncResponse* resp_ = nullptr;
std::condition_variable cond_;
std::mutex mu_;
};

class WaitObjectManager {
public:
WaitObjectManager() {
wo_vec_.resize(kMaxRsyncParallelNum);
for (int i = 0; i < kMaxRsyncParallelNum; i++) {
wo_vec_[i] = new WaitObject();
}
}
~WaitObjectManager() {
for (int i = 0; i < wo_vec_.size(); i++) {
delete wo_vec_[i];
wo_vec_[i] = nullptr;
}
}

WaitObject* UpdateWaitObject(int worker_index, const std::string& filename,
RsyncService::Type type, size_t offset) {
std::lock_guard<std::mutex> guard(mu_);
wo_vec_[worker_index]->Reset(filename, type, offset);
return wo_vec_[worker_index];
}

void WakeUp(RsyncService::RsyncResponse* resp) {
std::lock_guard<std::mutex> guard(mu_);
int index = resp->reader_index();
if (wo_vec_[index] == nullptr || resp->type() != wo_vec_[index]->Type()) {
delete resp;
return;
}
if (resp->type() == RsyncService::kRsyncFile &&
(resp->file_resp().filename() != wo_vec_[index]->Filename())) {
delete resp;
return;
}
wo_vec_[index]->WakeUp(resp);
}

private:
std::vector<WaitObject*> wo_vec_;
std::mutex mu_;
};

} // end namespace rsync
Expand Down
96 changes: 94 additions & 2 deletions include/rsync_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
#include "rsync_service.pb.h"

namespace rsync {
class RsyncServerConn;
struct RsyncServerTaskArg {
std::shared_ptr<RsyncService::RsyncRequest> req;
std::shared_ptr<net::PbConn> conn;
RsyncServerTaskArg(std::shared_ptr<RsyncService::RsyncRequest> _req, std::shared_ptr<net::PbConn> _conn)
std::shared_ptr<RsyncServerConn> conn;
RsyncServerTaskArg(std::shared_ptr<RsyncService::RsyncRequest> _req, std::shared_ptr<RsyncServerConn> _conn)
: req(std::move(_req)), conn(std::move(_conn)) {}
};
class RsyncReader;
Expand Down Expand Up @@ -52,6 +53,8 @@ class RsyncServerConn : public net::PbConn {
static void HandleMetaRsyncRequest(void* arg);
static void HandleFileRsyncRequest(void* arg);
private:
std::vector<std::shared_ptr<RsyncReader> > readers_;
std::mutex mu_;
void* data_ = nullptr;
};

Expand Down Expand Up @@ -86,6 +89,95 @@ class RsyncServerThread : public net::HolyThread {
RsyncServerHandle handle_;
};

class RsyncReader {
public:
RsyncReader() {
block_data_ = new char[kBlockSize];
}
~RsyncReader() {
if (!filepath_.empty()) {
Reset();
}
delete []block_data_;
}
pstd::Status Read(const std::string filepath, const size_t offset,
const size_t count, char* data, size_t* bytes_read,
std::string* checksum, bool* is_eof) {
std::lock_guard<std::mutex> guard(mu_);
pstd::Status s = Seek(filepath, offset);
if (!s.ok()) {
return s;
}
size_t offset_in_block = offset % kBlockSize;
size_t copy_count = count > (end_offset_ - offset) ? end_offset_ - offset : count;
memcpy(data, block_data_ + offset_in_block, copy_count);
*bytes_read = copy_count;
*is_eof = (offset + copy_count == total_size_);
return pstd::Status::OK();
}
private:
pstd::Status Seek(const std::string filepath, const size_t offset) {
if (filepath == filepath_ && offset >= start_offset_ && offset < end_offset_) {
return pstd::Status::OK();
}
if (filepath != filepath_) {
Reset();
fd_ = open(filepath.c_str(), O_RDONLY);
if (fd_ < 0) {
return pstd::Status::IOError("fd open failed");
}
filepath_ = filepath;
struct stat buf;
stat(filepath.c_str(), &buf);
total_size_ = buf.st_size;
}
start_offset_ = (offset / kBlockSize) * kBlockSize;

size_t read_offset = start_offset_;
size_t read_count = kBlockSize > (total_size_ - read_offset) ? (total_size_ - read_offset) : kBlockSize;
ssize_t bytesin = 0;
char* ptr = block_data_;
while ((bytesin = pread(fd_, ptr, read_count, read_offset)) > 0) {
read_count -= bytesin;
read_offset += bytesin;
ptr += bytesin;
if (read_count <= 0) {
break;
}
}
if (bytesin < 0) {
LOG(ERROR) << "unable to read from " << filepath_;
Reset();
return pstd::Status::IOError("unable to read from " + filepath);
}
end_offset_ = start_offset_ + (ptr - block_data_);
return pstd::Status::OK();
}
void Reset() {
total_size_ = -1;
start_offset_ = 0xFFFFFFFF;
end_offset_ = 0xFFFFFFFF;
memset(block_data_, 0, kBlockSize);
md5_.reset(new pstd::MD5());
filepath_ = "";
close(fd_);
fd_ = -1;
}

private:
std::mutex mu_;
const size_t kBlockSize = 16 << 20;

char* block_data_;
size_t start_offset_ = -1;
size_t end_offset_ = -1;
size_t total_size_ = -1;

int fd_ = -1;
std::string filepath_;
std::unique_ptr<pstd::MD5> md5_;
};

} //end namespace rsync
#endif

1 change: 1 addition & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class ServerThread : public Thread {

// process events in notify_queue
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);


const ServerHandle* handle_;
bool own_handle_ = false;
Expand Down
14 changes: 9 additions & 5 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ DispatchThread::DispatchThread(int port, int work_num, ConnFactory* conn_factory
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
}

DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, ConnFactory* conn_factory,
Expand All @@ -35,7 +34,6 @@ DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, Co
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
}

DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory,
Expand All @@ -47,7 +45,6 @@ DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int w
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
}

DispatchThread::~DispatchThread() = default;
Expand All @@ -67,7 +64,13 @@ int DispatchThread::StartThread() {
return ret;
}
}
timed_scan_thread.StartThread();

// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask(
"blrpop_blocking_info_scan", 250, true, [this] { this->ScanExpiredBlockedConnsOfBlrpop();});


timerTaskThread_.StartThread();
return ServerThread::StartThread();
}

Expand All @@ -88,7 +91,7 @@ int DispatchThread::StopThread() {
worker_thread_[i]->private_data_ = nullptr;
}
}
timed_scan_thread.StopThread();
timerTaskThread_.StopThread();
return ServerThread::StopThread();
}

Expand Down Expand Up @@ -258,6 +261,7 @@ void DispatchThread::ScanExpiredBlockedConnsOfBlrpop() {

void DispatchThread::SetQueueLimit(int queue_limit) { queue_limit_ = queue_limit; }


extern ServerThread* NewDispatchThread(int port, int work_num, ConnFactory* conn_factory, int cron_interval,
int queue_limit, const ServerHandle* handle) {
return new DispatchThread(port, work_num, conn_factory, cron_interval, queue_limit, handle);
Expand Down
Loading

0 comments on commit b3c5268

Please sign in to comment.