Skip to content

Commit

Permalink
Merge branch 'OpenAtom-Transaction' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Nov 21, 2023
2 parents 5b18c5f + f5a591f commit 4c140cb
Show file tree
Hide file tree
Showing 28 changed files with 1,336 additions and 105 deletions.
12 changes: 11 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "storage/storage.h"

#include "include/pika_command.h"
#include "pika_db.h"

/*
* Admin
Expand Down Expand Up @@ -149,6 +150,7 @@ class SelectCmd : public Cmd {
void DoInitial() override;
void Clear() override { db_name_.clear(); }
std::string db_name_;
std::shared_ptr<DB> select_db_;
};

class FlushallCmd : public Cmd {
Expand All @@ -158,11 +160,13 @@ class FlushallCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }

void Execute() override;
void FlushAllWithoutLock();
private:
void DoInitial() override;
std::string ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum,
uint64_t offset) override;
void DoWithoutLock(std::shared_ptr<Slot> slot);
};

class FlushdbCmd : public Cmd {
Expand All @@ -172,11 +176,15 @@ class FlushdbCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllSlotsWithoutLock(std::shared_ptr<DB> db);
void Execute() override;
std::string GetFlushDname() { return db_name_; }

private:
std::string db_name_;
void DoInitial() override;
void Clear() override { db_name_.clear(); }
void DoWithoutLock(std::shared_ptr<Slot> slot);
};

class ClientCmd : public Cmd {
Expand Down Expand Up @@ -219,6 +227,7 @@ class InfoCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new InfoCmd(*this); }
void Execute() override;

private:
InfoSection info_section_;
Expand Down Expand Up @@ -280,6 +289,7 @@ class ConfigCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new ConfigCmd(*this); }
void Execute() override;

private:
std::vector<std::string> config_args_v_;
Expand Down
41 changes: 37 additions & 4 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_

#include <bitset>
#include <utility>

#include "include/pika_command.h"
Expand Down Expand Up @@ -54,6 +55,14 @@ class PikaClientConn : public net::RedisConn {
uint32_t slot_id;
};

struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
static constexpr uint8_t InitCmdFailed = 1;
static constexpr uint8_t WatchFailed = 2;
static constexpr uint8_t Execing = 3;
};

// Auth related
class AuthStat {
public:
Expand All @@ -72,7 +81,7 @@ class PikaClientConn : public net::RedisConn {

PikaClientConn(int fd, const std::string& ip_port, net::Thread* server_thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() override = default;
~PikaClientConn() = default;

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) override;
Expand All @@ -84,10 +93,30 @@ class PikaClientConn : public net::RedisConn {

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentTable(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override{ return current_db_; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override { return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();

net::ServerThread* server_thread() { return server_thread_; }

AuthStat& auth_stat() { return auth_stat_; }
Expand All @@ -101,14 +130,18 @@ class PikaClientConn : public net::RedisConn {
std::string current_db_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_;
std::bitset<16> txn_state_;
std::unordered_set<std::string> watched_db_keys_;
std::mutex txn_state_mu_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void TryWriteResp();

AuthStat auth_stat_;
Expand Down
33 changes: 28 additions & 5 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_COMMAND_H_
#define PIKA_COMMAND_H_

#include <string>
#include <memory>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -198,6 +199,13 @@ const std::string kCmdNameSDiffstore = "sdiffstore";
const std::string kCmdNameSMove = "smove";
const std::string kCmdNameSRandmember = "srandmember";

// transation
const std::string kCmdNameMulti = "multi";
const std::string kCmdNameExec = "exec";
const std::string kCmdNameDiscard = "discard";
const std::string kCmdNameWatch = "watch";
const std::string kCmdNameUnWatch = "unwatch";

// HyperLogLog
const std::string kCmdNamePfAdd = "pfadd";
const std::string kCmdNamePfCount = "pfcount";
Expand Down Expand Up @@ -298,6 +306,9 @@ class CmdRes {
kInconsistentHashTag,
kErrOther,
KIncrByOverFlow,
kInvalidTransaction,
kTxnQueued,
kTxnAbort,
};

CmdRes() = default;
Expand Down Expand Up @@ -369,6 +380,17 @@ class CmdRes {
result.append(message_);
result.append("'\r\n");
break;
case kInvalidTransaction:
return "-ERR WATCH inside MULTI is not allowed\r\n";
case kTxnQueued:
result = "+QUEUED";
result.append("\r\n");
break;
case kTxnAbort:
result = "-EXECABORT ";
result.append(message_);
result.append(kNewLine);
break;
case kErrOther:
result = "-ERR ";
result.append(message_);
Expand Down Expand Up @@ -403,7 +425,9 @@ class CmdRes {
message_ = content;
}
}

CmdRet GetCmdRet() const {
return ret_;
}
private:
std::string message_;
CmdRet ret_ = kNone;
Expand Down Expand Up @@ -447,11 +471,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {

virtual std::vector<std::string> current_key() const;
virtual void Execute();
virtual void ProcessFlushDBCmd();
virtual void ProcessFlushAllCmd();
virtual void ProcessSingleSlotCmd();
virtual void ProcessMultiSlotCmd();
virtual void ProcessDoNotSpecifySlotCmd();
virtual void Do(std::shared_ptr<Slot> slot = nullptr) = 0;
virtual Cmd* Clone() = 0;
// used for execute multikey command into different slots
Expand All @@ -470,6 +491,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_multi_slot() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
void SetDbName(const std::string& db_name) { db_name_ = db_name; }
std::string GetDBName() { return db_name_; }

std::string name() const;
CmdRes& res();
Expand Down Expand Up @@ -507,7 +530,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
protected:
CmdRes res_;
PikaCmdArgsType argv_;
std::string db_name_;
std::string db_name_{};

std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;
friend class ExecCmd;
friend class FlushdbCmd;
friend class FlushallCmd;

std::string GetDBName();
void BgSaveDB();
Expand Down
1 change: 0 additions & 1 deletion include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class SetCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SetCmd(*this); }

private:
std::string key_;
std::string value_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ class RPopLPushCmd : public BlockingBaseCmd {
}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(source_);
res.push_back(receiver_);
res.push_back(source_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ class PikaReplicaManager {
~PikaReplicaManager() = default;

friend Cmd;
friend class FlushdbCmd;
friend class FlushallCmd;
friend class ExecCmd;

void Start();
void Stop();
Expand Down
5 changes: 4 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
#include "include/pika_migrate_thread.h"
#include "include/rsync_server.h"
#include "include/pika_statistic.h"
#include "include/pika_slot_command.h"
#include "include/pika_migrate_thread.h"
#include "include/pika_transaction.h"
#include "include/pika_cmd_table_manager.h"


Expand Down Expand Up @@ -507,6 +508,8 @@ class PikaServer : public pstd::noncopyable {
friend class InfoCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;
friend class FlushallCmd;
friend class ExecCmd;

private:
/*
Expand Down
2 changes: 2 additions & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
// FlushDB & FlushSubDB use
bool FlushDB();
bool FlushSubDB(const std::string& db_name);
bool FlushDBWithoutLock();
bool FlushSubDBWithoutLock(const std::string& db_name);

// key scan info use
pstd::Status GetKeyNum(std::vector<storage::KeyInfo>* key_info);
Expand Down
Loading

0 comments on commit 4c140cb

Please sign in to comment.