diff --git a/conf/standalone.conf b/conf/standalone.conf new file mode 100644 index 0000000..06c8fef --- /dev/null +++ b/conf/standalone.conf @@ -0,0 +1,71 @@ +## redis standalone server pool define + +##StandaloneServerPool { +## [Password xxx] #default no +## [Databases number] #default 1 +## Hash atol|crc16 +## [HashTag "xx"] #default no +## Distribution modula|random +## [MasterReadPriority [0-100]] #default 50 +## [StaticSlaveReadPriority [0-100]] #default 0 +## [DynamicSlaveReadPriority [0-100]] #default 0 +## RefreshMethod fixed|sentinel # +## [RefreshInterval number[s|ms|us]] #default 1, means 1 second +## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout +## [ServerFailureLimit number] #default 10 +## [ServerRetryTimeout number[s|ms|us]] #default 1 +## [KeepAlive seconds] #default 0, server connection tcp keepalive +## Sentinels [sentinel-password] { +## + addr +## ... +## } +## Group xxx { +## [+ addr] #if RefreshMethod==fixed: the first addr is master in a group, then all addrs is slaves in this group +## ... +## } +##} + + +## Examples: +#StandaloneServerPool { +# Databases 16 +# Hash crc16 +# HashTag "{}" +# Distribution modula +# MasterReadPriority 60 +# StaticSlaveReadPriority 50 +# DynamicSlaveReadPriority 50 +# RefreshMethod sentinel +# RefreshInterval 1 +# ServerTimeout 1 +# ServerFailureLimit 10 +# ServerRetryTimeout 1 +# KeepAlive 120 +# Sentinels { +# + 10.2.2.2:7500 +# + 10.2.2.3:7500 +# + 10.2.2.4:7500 +# } +# Group shard001 { +# } +# Group shard002 { +# } +#} + +#StandaloneServerPool { +# Databases 16 +# Hash crc16 +# HashTag "{}" +# Distribution modula +# MasterReadPriority 60 +# StaticSlaveReadPriority 50 +# DynamicSlaveReadPriority 50 +# RefreshMethod fixed +# ServerTimeout 1 +# ServerFailureLimit 10 +# ServerRetryTimeout 1 +# KeepAlive 120 +# Group shard001 { +# + 10.2.3.2:6379 +# } +#} diff --git a/src/Conf.cpp b/src/Conf.cpp index 8033704..7f2d7b9 100644 --- a/src/Conf.cpp +++ b/src/Conf.cpp @@ -125,7 +125,7 @@ void Conf::setGlobal(const ConfParser::Node* node) { const ConfParser::Node* authority = nullptr; const ConfParser::Node* clusterServerPool = nullptr; - const ConfParser::Node* sentinelServerPool = nullptr; + const ConfParser::Node* standaloneServerPool = nullptr; const ConfParser::Node* dataCenter = nullptr; std::vector latencyMonitors; for (auto p = node; p; p = p->next) { @@ -162,7 +162,9 @@ void Conf::setGlobal(const ConfParser::Node* node) } else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) { clusterServerPool = p; } else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) { - sentinelServerPool = p; + standaloneServerPool = p; + } else if (strcasecmp(p->key.c_str(), "StandaloneServerPool") == 0) { + standaloneServerPool = p; } else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) { dataCenter = p; } else if (strcasecmp(p->key.c_str(), "CustomCommand") == 0) { @@ -174,14 +176,17 @@ void Conf::setGlobal(const ConfParser::Node* node) if (authority) { setAuthority(authority); } - if (clusterServerPool && sentinelServerPool) { - Throw(LogicError, "Can't define ClusterServerPool and SentinelServerPool at the same time"); + if (clusterServerPool && standaloneServerPool) { + Throw(LogicError, "Can't define ClusterServerPool/StandaloneServerPool at the same time"); } else if (clusterServerPool) { setClusterServerPool(clusterServerPool); mServerPoolType = ServerPool::Cluster; - } else if (sentinelServerPool) { - setSentinelServerPool(sentinelServerPool); - mServerPoolType = ServerPool::Sentinel; + } else if (standaloneServerPool) { + if (strcasecmp(standaloneServerPool->key.c_str(), "SentinelServerPool") == 0) { + mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::Sentinel; + } + setStandaloneServerPool(standaloneServerPool); + mServerPoolType = ServerPool::Standalone; } else { Throw(LogicError, "Must define a server pool"); } @@ -253,28 +258,21 @@ void Conf::setAuthority(const ConfParser::Node* node) bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p) { + bool ret = true; if (setStr(sp.password, "Password", p)) { - return true; } else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) { - return true; } else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) { - return true; } else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) { - return true; } else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) { - return true; } else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) { - return true; } else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) { - return true; } else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) { - return true; } else if (setInt(sp.keepalive, "KeepAlive", p, 0)) { - return true; } else if (setInt(sp.databases, "Databases", p, 1, 128)) { - return true; + } else { + ret = false; } - return false; + return ret; } void Conf::setClusterServerPool(const ConfParser::Node* node) @@ -298,41 +296,47 @@ void Conf::setClusterServerPool(const ConfParser::Node* node) } } -void Conf::setSentinelServerPool(const ConfParser::Node* node) +void Conf::setStandaloneServerPool(const ConfParser::Node* node) { if (!node->sub) { - Throw(InvalidValue, "%s:%d SentinelServerPool require scope value", node->file, node->line); + Throw(InvalidValue, "%s:%d StandaloneServerPool require scope value", node->file, node->line); } - mSentinelServerPool.hashTag[0] = '\0'; - mSentinelServerPool.hashTag[1] = '\0'; + mStandaloneServerPool.hashTag[0] = '\0'; + mStandaloneServerPool.hashTag[1] = '\0'; for (auto p = node->sub; p; p = p->next) { - if (setServerPool(mSentinelServerPool, p)) { + if (setServerPool(mStandaloneServerPool, p)) { + } else if (strcasecmp(p->key.c_str(), "RefreshMethod") == 0) { + try { + mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::parse(p->val.c_str()); + } catch (ServerPoolRefreshMethod::InvalidEnumValue& excp) { + Throw(InvalidValue, "%s:%d unknown RefreshMethod:%s", p->file, p->line, p->val.c_str()); + } } else if (strcasecmp(p->key.c_str(), "Distribution") == 0) { - mSentinelServerPool.dist = Distribution::parse(p->val.c_str()); - if (mSentinelServerPool.dist == Distribution::None) { + mStandaloneServerPool.dist = Distribution::parse(p->val.c_str()); + if (mStandaloneServerPool.dist == Distribution::None) { Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line); } } else if (strcasecmp(p->key.c_str(), "Hash") == 0) { - mSentinelServerPool.hash = Hash::parse(p->val.c_str()); - if (mSentinelServerPool.hash == Hash::None) { + mStandaloneServerPool.hash = Hash::parse(p->val.c_str()); + if (mStandaloneServerPool.hash == Hash::None) { Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line); } } else if (strcasecmp(p->key.c_str(), "HashTag") == 0) { if (p->val.empty()) { - mSentinelServerPool.hashTag[0] = '\0'; - mSentinelServerPool.hashTag[1] = '\0'; + mStandaloneServerPool.hashTag[0] = '\0'; + mStandaloneServerPool.hashTag[1] = '\0'; } else if (p->val.size() == 2) { - mSentinelServerPool.hashTag[0] = p->val[0]; - mSentinelServerPool.hashTag[1] = p->val[1]; + mStandaloneServerPool.hashTag[0] = p->val[0]; + mStandaloneServerPool.hashTag[1] = p->val[1]; } else { Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line); } - } else if (setServers(mSentinelServerPool.sentinels, "Sentinels", p)) { - mSentinelServerPool.sentinelPassword = p->val; + } else if (setServers(mStandaloneServerPool.sentinels, "Sentinels", p)) { + mStandaloneServerPool.sentinelPassword = p->val; } else if (strcasecmp(p->key.c_str(), "Group") == 0) { - mSentinelServerPool.groups.push_back(ServerGroupConf{p->val}); + mStandaloneServerPool.groups.push_back(ServerGroupConf{p->val}); if (p->sub) { - auto& g = mSentinelServerPool.groups.back(); + auto& g = mStandaloneServerPool.groups.back(); setServers(g.servers, "Group", p); } } else { @@ -340,18 +344,31 @@ void Conf::setSentinelServerPool(const ConfParser::Node* node) p->file, p->line, p->key.c_str()); } } - if (mSentinelServerPool.sentinels.empty()) { - Throw(LogicError, "SentinelServerPool no sentinel server"); + if (mStandaloneServerPool.groups.empty()) { + Throw(LogicError, "StandaloneServerPool no server group"); } - if (mSentinelServerPool.groups.empty()) { - Throw(LogicError, "SentinelServerPool no server group"); + if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::None) { + Throw(LogicError, "StandaloneServerPool must define RefreshMethod"); + } else if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::Sentinel) { + if (mStandaloneServerPool.sentinels.empty()) { + Throw(LogicError, "StandaloneServerPool with RefreshMethod(sentinel) but no sentinel servers"); + } + } else { + if (!mStandaloneServerPool.sentinels.empty()) { + Throw(LogicError, "StandaloneServerPool with Sentinels but RefreshMethod is not sentinel"); + } + for (auto& g : mStandaloneServerPool.groups) { + if (g.servers.empty()) { + Throw(LogicError, "Group(%s) must add servers", g.name.c_str()); + } + } } - if (mSentinelServerPool.groups.size() > 1) { - if (mSentinelServerPool.dist == Distribution::None) { - Throw(LogicError, "SentinelServerPool must define Dsitribution in multi groups"); + if (mStandaloneServerPool.groups.size() > 1) { + if (mStandaloneServerPool.dist == Distribution::None) { + Throw(LogicError, "StandaloneServerPool must define Dsitribution in multi groups"); } - if (mSentinelServerPool.hash == Hash::None) { - Throw(LogicError, "SentinelServerPool must define Hash in multi groups"); + if (mStandaloneServerPool.hash == Hash::None) { + Throw(LogicError, "StandaloneServerPool must define Hash in multi groups"); } } } diff --git a/src/Conf.h b/src/Conf.h index 6580344..e3fbc56 100644 --- a/src/Conf.h +++ b/src/Conf.h @@ -20,6 +20,7 @@ #include "ConfParser.h" #include "Auth.h" #include "Command.h" +#include "Enums.h" struct AuthConf { @@ -63,8 +64,9 @@ struct ClusterServerPoolConf : public ServerPoolConf std::vector servers; }; -struct SentinelServerPoolConf : public ServerPoolConf +struct StandaloneServerPoolConf : public ServerPoolConf { + ServerPoolRefreshMethod refreshMethod = ServerPoolRefreshMethod::None; Distribution dist = Distribution::None; Hash hash = Hash::None; char hashTag[2]; @@ -175,9 +177,9 @@ class Conf { return mClusterServerPool; } - const SentinelServerPoolConf& sentinelServerPool() const + const StandaloneServerPoolConf& standaloneServerPool() const { - return mSentinelServerPool; + return mStandaloneServerPool; } const std::string& localDC() const { @@ -198,7 +200,7 @@ class Conf void setGlobal(const ConfParser::Node* node); void setAuthority(const ConfParser::Node* node); void setClusterServerPool(const ConfParser::Node* node); - void setSentinelServerPool(const ConfParser::Node* node); + void setStandaloneServerPool(const ConfParser::Node* node); void setDataCenter(const ConfParser::Node* node); void check(); bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n); @@ -229,7 +231,7 @@ class Conf std::vector mAuthConfs; int mServerPoolType; ClusterServerPoolConf mClusterServerPool; - SentinelServerPoolConf mSentinelServerPool; + StandaloneServerPoolConf mStandaloneServerPool; std::vector mDCConfs; std::string mLocalDC; std::vector mLatencyMonitors; diff --git a/src/ConfParser.cpp b/src/ConfParser.cpp index 2da48f2..512c40c 100644 --- a/src/ConfParser.cpp +++ b/src/ConfParser.cpp @@ -283,20 +283,22 @@ ConfParser::Status ConfParser::parse(std::string& line, std::string& key, std::s case SValBody: return KeyVal; case VValBody: + { + auto ret = KeyVal; val.assign(line, pos, line.size() - pos); if (val.back() == '{') { val.resize(val.size() - 1); - int vsp = 0; - for (auto it = val.rbegin(); it != val.rend(); ++it) { - if (isspace(*it)) { - ++vsp; - } + ret = BeginScope; + } + int vsp = 0; + for (auto it = val.rbegin(); it != val.rend(); ++it) { + if (isspace(*it)) { + ++vsp; } - val.resize(val.size() - vsp); - return BeginScope; - } else { - return KeyVal; } + val.resize(val.size() - vsp); + return ret; + } case ScopeReady: return KeyVal; case ScopeBody: diff --git a/src/Enums.cpp b/src/Enums.cpp new file mode 100644 index 0000000..87ba358 --- /dev/null +++ b/src/Enums.cpp @@ -0,0 +1,15 @@ +/* + * predixy - A high performance and full features proxy for redis. + * Copyright (C) 2017 Joyield, Inc. + * All rights reserved. + */ + +#include "Enums.h" + +const ServerPoolRefreshMethod::TypeName +ServerPoolRefreshMethod::sPairs[3] = { + {ServerPoolRefreshMethod::None, "none"}, + {ServerPoolRefreshMethod::Fixed, "fixed"}, + {ServerPoolRefreshMethod::Sentinel, "sentinel"}, +}; + diff --git a/src/Enums.h b/src/Enums.h new file mode 100644 index 0000000..95690ef --- /dev/null +++ b/src/Enums.h @@ -0,0 +1,74 @@ +/* + * predixy - A high performance and full features proxy for redis. + * Copyright (C) 2017 Joyield, Inc. + * All rights reserved. + */ + +#ifndef _PREDIXY_ENUMS_H_ +#define _PREDIXY_ENUMS_H_ + +#include +#include +#include "Exception.h" + +template +class EnumBase +{ +public: + DefException(InvalidEnumValue); + struct TypeName + { + int type; + const char* name; + }; +public: + EnumBase(int t): + mType(t) + { + } + int value() const + { + return mType; + } + bool operator==(const T& t) const + { + return t.value() == mType; + } + bool operator!=(const T& t) const + { + return t.value() != mType; + } + const char* name() const + { + return T::sPairs[mType].name; + } + static T parse(const char* str) + { + for (auto& i : T::sPairs) { + if (strcasecmp(i.name, str) == 0) { + return T(typename T::Type(i.type)); + } + } + Throw(InvalidEnumValue, "invalid enum value:%s", str); + } +protected: + int mType; +}; + +class ServerPoolRefreshMethod : public EnumBase +{ +public: + enum Type + { + None, + Fixed, + Sentinel + }; + static const TypeName sPairs[3]; + ServerPoolRefreshMethod(Type t = None): + EnumBase(t) + { + } +}; + +#endif diff --git a/src/Makefile b/src/Makefile index 18055de..46b6787 100644 --- a/src/Makefile +++ b/src/Makefile @@ -72,6 +72,7 @@ objs = \ Buffer.o \ Command.o \ Distribution.o \ + Enums.o \ Reply.o \ ConfParser.o \ Conf.o \ @@ -87,7 +88,7 @@ objs = \ ServerPool.o \ ClusterNodesParser.o \ ClusterServerPool.o \ - SentinelServerPool.o \ + StandaloneServerPool.o \ ConnectConnectionPool.o \ Handler.o \ Proxy.o \ diff --git a/src/Proxy.cpp b/src/Proxy.cpp index eb42f7b..3ee0842 100644 --- a/src/Proxy.cpp +++ b/src/Proxy.cpp @@ -118,10 +118,10 @@ bool Proxy::init(int argc, char* argv[]) mServPool = p; } break; - case ServerPool::Sentinel: + case ServerPool::Standalone: { - SentinelServerPool* p = new SentinelServerPool(this); - p->init(mConf->sentinelServerPool()); + StandaloneServerPool* p = new StandaloneServerPool(this); + p->init(mConf->standaloneServerPool()); mServPool = p; } break; diff --git a/src/Proxy.h b/src/Proxy.h index 35d2a07..afff0dc 100644 --- a/src/Proxy.h +++ b/src/Proxy.h @@ -13,7 +13,7 @@ #include "DC.h" #include "ServerPool.h" #include "ClusterServerPool.h" -#include "SentinelServerPool.h" +#include "StandaloneServerPool.h" #include "LatencyMonitor.h" class Proxy @@ -51,15 +51,15 @@ class Proxy } bool isSplitMultiKey() const { - return mConf->sentinelServerPool().groups.size() != 1; + return mConf->standaloneServerPool().groups.size() != 1; } bool supportTransaction() const { - return mConf->sentinelServerPool().groups.size() == 1; + return mConf->standaloneServerPool().groups.size() == 1; } bool supportSubscribe() const { - return mConf->sentinelServerPool().groups.size() == 1 || + return mConf->standaloneServerPool().groups.size() == 1 || mConf->clusterServerPool().servers.size() > 0; } const std::vector& handlers() const diff --git a/src/ServerPool.h b/src/ServerPool.h index 4c12cb0..c41fe17 100644 --- a/src/ServerPool.h +++ b/src/ServerPool.h @@ -20,7 +20,7 @@ class ServerPool { Unknown, Cluster, - Sentinel + Standalone }; static const int DefaultServerRetryTimeout = 10000000; static const int DefaultRefreshInterval = 1000000; diff --git a/src/StandaloneServerPool.cpp b/src/StandaloneServerPool.cpp new file mode 100644 index 0000000..1e57367 --- /dev/null +++ b/src/StandaloneServerPool.cpp @@ -0,0 +1,482 @@ +/* + * predixy - A high performance and full features proxy for redis. + * Copyright (C) 2017 Joyield, Inc. + * All rights reserved. + */ + +#include +#include "Logger.h" +#include "ServerGroup.h" +#include "Handler.h" +#include "StandaloneServerPool.h" + +StandaloneServerPool::StandaloneServerPool(Proxy* p): + ServerPoolTmpl(p, Standalone), + mDist(Distribution::Modula) +{ + mSentinels.reserve(MaxSentinelNum); + mServPool.reserve(Const::MaxServNum); + mHashTag[0] = mHashTag[1] = '\0'; +} + +StandaloneServerPool::~StandaloneServerPool() +{ +} + +void StandaloneServerPool::init(const StandaloneServerPoolConf& conf) +{ + ServerPool::init(conf); + mRefreshMethod = conf.refreshMethod; + mDist = conf.dist; + mHash = conf.hash; + mHashTag[0] = conf.hashTag[0]; + mHashTag[1] = conf.hashTag[1]; + int i = 0; + if (conf.refreshMethod == ServerPoolRefreshMethod::Sentinel) { + mSentinels.resize(conf.sentinels.size()); + for (auto& sc : conf.sentinels) { + Server* s = new Server(this, sc.addr, true); + s->setRole(Server::Sentinel); + s->setPassword(sc.password.empty() ? conf.sentinelPassword:sc.password); + mSentinels[i++] = s; + mServs[s->addr()] = s; + } + } + mGroupPool.resize(conf.groups.size()); + i = 0; + for (auto& gc : conf.groups) { + ServerGroup* g = new ServerGroup(this, gc.name); + mGroupPool[i++] = g; + auto role = Server::Master; + for (auto& sc : gc.servers) { + Server* s = new Server(this, sc.addr, true); + s->setPassword(sc.password.empty() ? conf.password : sc.password); + mServPool.push_back(s); + mServs[s->addr()] = s; + g->add(s); + s->setGroup(g); + switch (mRefreshMethod.value()) { + case ServerPoolRefreshMethod::Fixed: + s->setOnline(true); + s->setRole(role); + role = Server::Slave; + break; + default: + s->setOnline(false); + break; + } + } + } +} + +Server* StandaloneServerPool::getServer(Handler* h, Request* req, const String& key) const +{ + FuncCallTimer(); + switch (req->type()) { + case Command::SentinelGetMaster: + case Command::SentinelSlaves: + case Command::SentinelSentinels: + if (mSentinels.empty()) { + return nullptr; + } else { + Server* s = randServer(h, mSentinels); + logDebug("sentinel server pool get server %s for sentinel command", + s->addr().data()); + return s; + } + break; + case Command::Randomkey: + return randServer(h, mServPool); + default: + break; + } + if (mGroupPool.size() == 1) { + return mGroupPool[0]->getServer(h, req); + } else if (mGroupPool.size() > 1) { + switch (mDist) { + case Distribution::Modula: + { + long idx = mHash.hash(key.data(), key.length(), mHashTag); + idx %= mGroupPool.size(); + return mGroupPool[idx]->getServer(h, req); + } + break; + case Distribution::Random: + { + int idx = h->rand() % mGroupPool.size(); + return mGroupPool[idx]->getServer(h, req); + } + break; + default: + break; + } + } + return nullptr; +} + +void StandaloneServerPool::refreshRequest(Handler* h) +{ + logDebug("h %d update standalone server pool", h->id()); + switch (mRefreshMethod.value()) { + case ServerPoolRefreshMethod::Sentinel: + for (auto g : mGroupPool) { + RequestPtr req = RequestAlloc::create(); + req->setSentinels(g->name()); + req->setData(g); + h->handleRequest(req); + req = RequestAlloc::create(); + req->setSentinelGetMaster(g->name()); + req->setData(g); + h->handleRequest(req); + req = RequestAlloc::create(); + req->setSentinelSlaves(g->name()); + req->setData(g); + h->handleRequest(req); + } + break; + default: + break; + } +} + +void StandaloneServerPool::handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res) +{ + switch (req->type()) { + case Command::SentinelSentinels: + handleSentinels(h, s, req, res); + break; + case Command::SentinelGetMaster: + handleGetMaster(h, s, req, res); + break; + case Command::SentinelSlaves: + handleSlaves(h, s, req, res); + break; + default: + break; + } +} + +class AddrParser +{ +public: + enum Status { + Ok, + Error, + Done + }; +public: + AddrParser(const Segment& res): + mRes(res), + mState(Idle), + mCnt(0), + mArgLen(0), + mIp(false), + mPort(false) + { + mRes.rewind(); + } + int count() const {return mCnt;} + Status parse(SString& addr); +private: + enum State { + Idle, + Count, + CountLF, + Arg, + ArgLen, + ArgLenLF, + SubArrayLen, + Body, + BodyLF, + Invalid, + Finished + }; +private: + Segment mRes; + State mState; + int mCnt; + int mArgLen; + bool mIp; + bool mPort; + SString<4> mKey; +}; + +AddrParser::Status AddrParser::parse(SString& addr) +{ + const char* dat; + int len; + addr.clear(); + while (mRes.get(dat, len) && mState != Invalid) { + for (int i = 0; i < len && mState != Invalid; ++i) { + char ch = dat[i]; + switch (mState) { + case Idle: + mState = ch == '*' ? Count : Invalid; + break; + case Count: + if (ch >= '0' && ch <= '9') { + mCnt = mCnt * 10 + (ch - '0'); + } else if (ch == '\r') { + if (mCnt == 0) { + mState = Finished; + return Done; + } else if (mCnt < 0) { + mState = Invalid; + return Error; + } + mState = CountLF; + } else { + mState = Invalid; + } + break; + case CountLF: + mState = ch == '\n' ? Arg : Invalid; + break; + case Arg: + if (ch == '$') { + mState = ArgLen; + mArgLen = 0; + } else if (ch == '*') { + mState = SubArrayLen; + } else { + mState = Invalid; + } + break; + case ArgLen: + if (ch >= '0' && ch <= '9') { + mArgLen = mArgLen * 10 + (ch - '0'); + } else if (ch == '\r') { + mState = ArgLenLF; + } else { + mState = Invalid; + } + break; + case ArgLenLF: + mState = ch == '\n' ? Body : Invalid; + break; + case SubArrayLen: + if (ch == '\n') { + mState = Arg; + } + break; + case Body: + if (ch == '\r') { + mState = BodyLF; + if (mPort) { + mPort = false; + mRes.use(i + 1); + return Ok; + } else if (mIp) { + mIp = false; + addr.append(':'); + } else if (mArgLen == 2 && strcmp(mKey.data(), "ip") == 0) { + mIp = true; + } else if (mArgLen == 4 && strcmp(mKey.data(), "port") == 0) { + mPort = true; + } + break; + } + if (mIp || mPort) { + addr.append(ch); + } else if (mArgLen == 2 || mArgLen == 4) { + mKey.append(ch); + } + break; + case BodyLF: + mKey.clear(); + mState = ch == '\n' ? Arg : Invalid; + break; + default: + break; + } + } + mRes.use(len); + } + return mState != Invalid ? Done : Error; +} + +static bool hasValidPort(const String& addr) +{ + const char* p = addr.data() + addr.length(); + for (int i = 0; i < addr.length(); ++i) { + if (*(--p) == ':') { + int port = atoi(p + 1); + return port > 0 && port < 65536; + } + } + return false; +} + +void StandaloneServerPool::handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res) +{ + if (!res || !res->isArray()) { + return; + } + AddrParser parser(res->body()); + SString addr; + while (true) { + auto st = parser.parse(addr); + if (st == AddrParser::Ok) { + logDebug("sentinel server pool parse sentinel %s", addr.data()); + if (!hasValidPort(addr)) { + logNotice("sentinel server pool parse sentienl %s invalid", + addr.data()); + continue; + } + auto it = mServs.find(addr); + Server* serv = it == mServs.end() ? nullptr : it->second; + if (!serv) { + if (mSentinels.size() == mSentinels.capacity()) { + logWarn("too many sentinels %d, will ignore new sentinel %s", + (int)mSentinels.size(), addr.data()); + continue; + } + serv = new Server(this, addr, false); + serv->setRole(Server::Sentinel); + serv->setPassword(password()); + mSentinels.push_back(serv); + mServs[serv->addr()] = serv; + logNotice("h %d create new sentinel %s", + h->id(), addr.data()); + } + serv->setOnline(true); + } else if (st == AddrParser::Done) { + break; + } else { + logError("sentinel server pool parse sentinel sentinels error"); + break; + } + } +} + +void StandaloneServerPool::handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res) +{ + if (!res || !res->isArray()) { + return; + } + ServerGroup* g = (ServerGroup*)req->data(); + if (!g) { + return; + } + SegmentStr str(res->body()); + if (!str.complete()) { + return; + } + if (strncmp(str.data(), "*2\r\n$", 5) != 0) { + return; + } + SString addr; + const char* p = str.data() + 5; + int len = atoi(p); + if (len <= 0) { + return; + } + p = strchr(p, '\r') + 2; + if (!addr.append(p, len)) { + return; + } + if (!addr.append(':')) { + return; + } + p += len + 3; + len = atoi(p); + if (len <= 0) { + return; + } + p = strchr(p, '\r') + 2; + if (!addr.append(p, len)) { + return; + } + logDebug("sentinel server pool group %s get master %s", + g->name().data(), addr.data()); + auto it = mServs.find(addr); + Server* serv = it == mServs.end() ? nullptr : it->second; + if (serv) { + serv->setOnline(true); + serv->setRole(Server::Master); + auto old = serv->group(); + if (old) { + if (old != g) { + old->remove(serv); + g->add(serv); + serv->setGroup(g); + } + } else { + g->add(serv); + serv->setGroup(g); + } + } else { + if (mServPool.size() == mServPool.capacity()) { + logWarn("too many servers %d, will ignore new master server %s", + (int)mServPool.size(), addr.data()); + return; + } + serv = new Server(this, addr, false); + serv->setRole(Server::Master); + serv->setPassword(password()); + mServPool.push_back(serv); + g->add(serv); + serv->setGroup(g); + mServs[serv->addr()] = serv; + logNotice("sentinel server pool group %s create master server %s %s", + g->name().data(), addr.data(), serv->dcName().data()); + } +} + +void StandaloneServerPool::handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res) +{ + if (!res || !res->isArray()) { + return; + } + ServerGroup* g = (ServerGroup*)req->data(); + if (!g) { + return; + } + AddrParser parser(res->body()); + SString addr; + while (true) { + auto st = parser.parse(addr); + if (st == AddrParser::Ok) { + logDebug("sentinel server pool group %s parse slave %s", + g->name().data(), addr.data()); + auto it = mServs.find(addr); + Server* serv = it == mServs.end() ? nullptr : it->second; + if (serv) { + serv->setOnline(true); + serv->setRole(Server::Slave); + auto old = serv->group(); + if (old) { + if (old != g) { + old->remove(serv); + g->add(serv); + serv->setGroup(g); + } + } else { + g->add(serv); + serv->setGroup(g); + } + } else { + if (mServPool.size() == mServPool.capacity()) { + logWarn("too many servers %d, will ignore new slave server %s", + (int)mServPool.size(), addr.data()); + return; + } + serv = new Server(this, addr, false); + serv->setRole(Server::Slave); + serv->setPassword(password()); + mServPool.push_back(serv); + g->add(serv); + serv->setGroup(g); + mServs[serv->addr()] = serv; + logNotice("sentinel server pool group %s create slave server %s %s", + g->name().data(), addr.data(), serv->dcName().data()); + } + } else if (st == AddrParser::Done) { + break; + } else { + logError("sentinel server pool group %s parse sentinel sentinels error", + g->name().data()); + break; + } + } +} diff --git a/src/StandaloneServerPool.h b/src/StandaloneServerPool.h new file mode 100644 index 0000000..9f952aa --- /dev/null +++ b/src/StandaloneServerPool.h @@ -0,0 +1,43 @@ +/* + * predixy - A high performance and full features proxy for redis. + * Copyright (C) 2017 Joyield, Inc. + * All rights reserved. + */ + +#ifndef _PREDIXY_STANDALONE_SERVER_POOL_H_ +#define _PREDIXY_STANDALONE_SERVER_POOL_H_ + +#include +#include "Predixy.h" +#include "ServerPool.h" + +class StandaloneServerPool : public ServerPoolTmpl +{ +public: + static const int MaxSentinelNum = 64; +public: + StandaloneServerPool(Proxy* p); + ~StandaloneServerPool(); + void init(const StandaloneServerPoolConf& conf); + Server* getServer(Handler* h, Request* req, const String& key) const; + Server* iter(int& cursor) const + { + return ServerPool::iter(mServPool, cursor); + } + void refreshRequest(Handler* h); + void handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res); +private: + void handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res); + void handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res); + void handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res); + friend class ServerPoolTmpl; +private: + ServerPoolRefreshMethod mRefreshMethod; + std::vector mSentinels; + std::vector mServPool; + Distribution mDist; + Hash mHash; + char mHashTag[2]; +}; + +#endif