Skip to content

Commit

Permalink
Merge pull request #48 from joyieldInc/StandaloneServerPool
Browse files Browse the repository at this point in the history
support redis standalone backend
  • Loading branch information
fortrue authored Sep 22, 2018
2 parents 63596e9 + fb1ac64 commit 57de3c2
Show file tree
Hide file tree
Showing 12 changed files with 774 additions and 67 deletions.
71 changes: 71 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## redis standalone server pool define

##StandaloneServerPool {
## [Password xxx] #default no
## [Databases number] #default 1
## Hash atol|crc16
## [HashTag "xx"] #default no
## Distribution modula|random
## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0
## RefreshMethod fixed|sentinel #
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 0, server connection tcp keepalive
## Sentinels [sentinel-password] {
## + addr
## ...
## }
## Group xxx {
## [+ addr] #if RefreshMethod==fixed: the first addr is master in a group, then all addrs is slaves in this group
## ...
## }
##}


## Examples:
#StandaloneServerPool {
# Databases 16
# Hash crc16
# HashTag "{}"
# Distribution modula
# MasterReadPriority 60
# StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50
# RefreshMethod sentinel
# RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10
# ServerRetryTimeout 1
# KeepAlive 120
# Sentinels {
# + 10.2.2.2:7500
# + 10.2.2.3:7500
# + 10.2.2.4:7500
# }
# Group shard001 {
# }
# Group shard002 {
# }
#}

#StandaloneServerPool {
# Databases 16
# Hash crc16
# HashTag "{}"
# Distribution modula
# MasterReadPriority 60
# StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50
# RefreshMethod fixed
# ServerTimeout 1
# ServerFailureLimit 10
# ServerRetryTimeout 1
# KeepAlive 120
# Group shard001 {
# + 10.2.3.2:6379
# }
#}
105 changes: 61 additions & 44 deletions src/Conf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void Conf::setGlobal(const ConfParser::Node* node)
{
const ConfParser::Node* authority = nullptr;
const ConfParser::Node* clusterServerPool = nullptr;
const ConfParser::Node* sentinelServerPool = nullptr;
const ConfParser::Node* standaloneServerPool = nullptr;
const ConfParser::Node* dataCenter = nullptr;
std::vector<const ConfParser::Node*> latencyMonitors;
for (auto p = node; p; p = p->next) {
Expand Down Expand Up @@ -162,7 +162,9 @@ void Conf::setGlobal(const ConfParser::Node* node)
} else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) {
clusterServerPool = p;
} else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) {
sentinelServerPool = p;
standaloneServerPool = p;
} else if (strcasecmp(p->key.c_str(), "StandaloneServerPool") == 0) {
standaloneServerPool = p;
} else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) {
dataCenter = p;
} else if (strcasecmp(p->key.c_str(), "CustomCommand") == 0) {
Expand All @@ -174,14 +176,17 @@ void Conf::setGlobal(const ConfParser::Node* node)
if (authority) {
setAuthority(authority);
}
if (clusterServerPool && sentinelServerPool) {
Throw(LogicError, "Can't define ClusterServerPool and SentinelServerPool at the same time");
if (clusterServerPool && standaloneServerPool) {
Throw(LogicError, "Can't define ClusterServerPool/StandaloneServerPool at the same time");
} else if (clusterServerPool) {
setClusterServerPool(clusterServerPool);
mServerPoolType = ServerPool::Cluster;
} else if (sentinelServerPool) {
setSentinelServerPool(sentinelServerPool);
mServerPoolType = ServerPool::Sentinel;
} else if (standaloneServerPool) {
if (strcasecmp(standaloneServerPool->key.c_str(), "SentinelServerPool") == 0) {
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::Sentinel;
}
setStandaloneServerPool(standaloneServerPool);
mServerPoolType = ServerPool::Standalone;
} else {
Throw(LogicError, "Must define a server pool");
}
Expand Down Expand Up @@ -253,28 +258,21 @@ void Conf::setAuthority(const ConfParser::Node* node)

bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
{
bool ret = true;
if (setStr(sp.password, "Password", p)) {
return true;
} else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) {
return true;
} else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) {
return true;
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
return true;
} else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
return true;
} else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
return true;
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
return true;
} else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
return true;
} else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
return true;
} else if (setInt(sp.databases, "Databases", p, 1, 128)) {
return true;
} else {
ret = false;
}
return false;
return ret;
}

void Conf::setClusterServerPool(const ConfParser::Node* node)
Expand All @@ -298,60 +296,79 @@ void Conf::setClusterServerPool(const ConfParser::Node* node)
}
}

void Conf::setSentinelServerPool(const ConfParser::Node* node)
void Conf::setStandaloneServerPool(const ConfParser::Node* node)
{
if (!node->sub) {
Throw(InvalidValue, "%s:%d SentinelServerPool require scope value", node->file, node->line);
Throw(InvalidValue, "%s:%d StandaloneServerPool require scope value", node->file, node->line);
}
mSentinelServerPool.hashTag[0] = '\0';
mSentinelServerPool.hashTag[1] = '\0';
mStandaloneServerPool.hashTag[0] = '\0';
mStandaloneServerPool.hashTag[1] = '\0';
for (auto p = node->sub; p; p = p->next) {
if (setServerPool(mSentinelServerPool, p)) {
if (setServerPool(mStandaloneServerPool, p)) {
} else if (strcasecmp(p->key.c_str(), "RefreshMethod") == 0) {
try {
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::parse(p->val.c_str());
} catch (ServerPoolRefreshMethod::InvalidEnumValue& excp) {
Throw(InvalidValue, "%s:%d unknown RefreshMethod:%s", p->file, p->line, p->val.c_str());
}
} else if (strcasecmp(p->key.c_str(), "Distribution") == 0) {
mSentinelServerPool.dist = Distribution::parse(p->val.c_str());
if (mSentinelServerPool.dist == Distribution::None) {
mStandaloneServerPool.dist = Distribution::parse(p->val.c_str());
if (mStandaloneServerPool.dist == Distribution::None) {
Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line);
}
} else if (strcasecmp(p->key.c_str(), "Hash") == 0) {
mSentinelServerPool.hash = Hash::parse(p->val.c_str());
if (mSentinelServerPool.hash == Hash::None) {
mStandaloneServerPool.hash = Hash::parse(p->val.c_str());
if (mStandaloneServerPool.hash == Hash::None) {
Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line);
}
} else if (strcasecmp(p->key.c_str(), "HashTag") == 0) {
if (p->val.empty()) {
mSentinelServerPool.hashTag[0] = '\0';
mSentinelServerPool.hashTag[1] = '\0';
mStandaloneServerPool.hashTag[0] = '\0';
mStandaloneServerPool.hashTag[1] = '\0';
} else if (p->val.size() == 2) {
mSentinelServerPool.hashTag[0] = p->val[0];
mSentinelServerPool.hashTag[1] = p->val[1];
mStandaloneServerPool.hashTag[0] = p->val[0];
mStandaloneServerPool.hashTag[1] = p->val[1];
} else {
Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line);
}
} else if (setServers(mSentinelServerPool.sentinels, "Sentinels", p)) {
mSentinelServerPool.sentinelPassword = p->val;
} else if (setServers(mStandaloneServerPool.sentinels, "Sentinels", p)) {
mStandaloneServerPool.sentinelPassword = p->val;
} else if (strcasecmp(p->key.c_str(), "Group") == 0) {
mSentinelServerPool.groups.push_back(ServerGroupConf{p->val});
mStandaloneServerPool.groups.push_back(ServerGroupConf{p->val});
if (p->sub) {
auto& g = mSentinelServerPool.groups.back();
auto& g = mStandaloneServerPool.groups.back();
setServers(g.servers, "Group", p);
}
} else {
Throw(UnknownKey, "%s:%d unknown key %s",
p->file, p->line, p->key.c_str());
}
}
if (mSentinelServerPool.sentinels.empty()) {
Throw(LogicError, "SentinelServerPool no sentinel server");
if (mStandaloneServerPool.groups.empty()) {
Throw(LogicError, "StandaloneServerPool no server group");
}
if (mSentinelServerPool.groups.empty()) {
Throw(LogicError, "SentinelServerPool no server group");
if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::None) {
Throw(LogicError, "StandaloneServerPool must define RefreshMethod");
} else if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
if (mStandaloneServerPool.sentinels.empty()) {
Throw(LogicError, "StandaloneServerPool with RefreshMethod(sentinel) but no sentinel servers");
}
} else {
if (!mStandaloneServerPool.sentinels.empty()) {
Throw(LogicError, "StandaloneServerPool with Sentinels but RefreshMethod is not sentinel");
}
for (auto& g : mStandaloneServerPool.groups) {
if (g.servers.empty()) {
Throw(LogicError, "Group(%s) must add servers", g.name.c_str());
}
}
}
if (mSentinelServerPool.groups.size() > 1) {
if (mSentinelServerPool.dist == Distribution::None) {
Throw(LogicError, "SentinelServerPool must define Dsitribution in multi groups");
if (mStandaloneServerPool.groups.size() > 1) {
if (mStandaloneServerPool.dist == Distribution::None) {
Throw(LogicError, "StandaloneServerPool must define Dsitribution in multi groups");
}
if (mSentinelServerPool.hash == Hash::None) {
Throw(LogicError, "SentinelServerPool must define Hash in multi groups");
if (mStandaloneServerPool.hash == Hash::None) {
Throw(LogicError, "StandaloneServerPool must define Hash in multi groups");
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/Conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "ConfParser.h"
#include "Auth.h"
#include "Command.h"
#include "Enums.h"

struct AuthConf
{
Expand Down Expand Up @@ -63,8 +64,9 @@ struct ClusterServerPoolConf : public ServerPoolConf
std::vector<ServerConf> servers;
};

struct SentinelServerPoolConf : public ServerPoolConf
struct StandaloneServerPoolConf : public ServerPoolConf
{
ServerPoolRefreshMethod refreshMethod = ServerPoolRefreshMethod::None;
Distribution dist = Distribution::None;
Hash hash = Hash::None;
char hashTag[2];
Expand Down Expand Up @@ -175,9 +177,9 @@ class Conf
{
return mClusterServerPool;
}
const SentinelServerPoolConf& sentinelServerPool() const
const StandaloneServerPoolConf& standaloneServerPool() const
{
return mSentinelServerPool;
return mStandaloneServerPool;
}
const std::string& localDC() const
{
Expand All @@ -198,7 +200,7 @@ class Conf
void setGlobal(const ConfParser::Node* node);
void setAuthority(const ConfParser::Node* node);
void setClusterServerPool(const ConfParser::Node* node);
void setSentinelServerPool(const ConfParser::Node* node);
void setStandaloneServerPool(const ConfParser::Node* node);
void setDataCenter(const ConfParser::Node* node);
void check();
bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n);
Expand Down Expand Up @@ -229,7 +231,7 @@ class Conf
std::vector<AuthConf> mAuthConfs;
int mServerPoolType;
ClusterServerPoolConf mClusterServerPool;
SentinelServerPoolConf mSentinelServerPool;
StandaloneServerPoolConf mStandaloneServerPool;
std::vector<DCConf> mDCConfs;
std::string mLocalDC;
std::vector<LatencyMonitorConf> mLatencyMonitors;
Expand Down
20 changes: 11 additions & 9 deletions src/ConfParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,22 @@ ConfParser::Status ConfParser::parse(std::string& line, std::string& key, std::s
case SValBody:
return KeyVal;
case VValBody:
{
auto ret = KeyVal;
val.assign(line, pos, line.size() - pos);
if (val.back() == '{') {
val.resize(val.size() - 1);
int vsp = 0;
for (auto it = val.rbegin(); it != val.rend(); ++it) {
if (isspace(*it)) {
++vsp;
}
ret = BeginScope;
}
int vsp = 0;
for (auto it = val.rbegin(); it != val.rend(); ++it) {
if (isspace(*it)) {
++vsp;
}
val.resize(val.size() - vsp);
return BeginScope;
} else {
return KeyVal;
}
val.resize(val.size() - vsp);
return ret;
}
case ScopeReady:
return KeyVal;
case ScopeBody:
Expand Down
15 changes: 15 additions & 0 deletions src/Enums.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <[email protected]>
* All rights reserved.
*/

#include "Enums.h"

const ServerPoolRefreshMethod::TypeName
ServerPoolRefreshMethod::sPairs[3] = {
{ServerPoolRefreshMethod::None, "none"},
{ServerPoolRefreshMethod::Fixed, "fixed"},
{ServerPoolRefreshMethod::Sentinel, "sentinel"},
};

Loading

0 comments on commit 57de3c2

Please sign in to comment.