Skip to content

Commit

Permalink
Revert "Feature/redis sentinel (#302)" (#303)
Browse files Browse the repository at this point in the history
This reverts commit 48c31e8.
  • Loading branch information
davehorton authored Jul 7, 2023
1 parent 48c31e8 commit 0457f3b
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 316 deletions.
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
path = deps/sofia-sip
url = https://github.com/davehorton/sofia-sip.git
branch = main
[submodule "deps/cpp-bredis"]
path = deps/cpp-bredis
url = https://github.com/basiliscos/cpp-bredis.git
[submodule "deps/boost"]
path = deps/boost
url = https://github.com/boostorg/boost.git
[submodule "deps/jansson"]
path = deps/jansson
url = https://github.com/akheron/jansson.git
[submodule "deps/redis"]
path = deps/redis
url = https://github.com/boostorg/redis.git
8 changes: 4 additions & 4 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ INCLUDES= -I${srcdir}/deps/sofia-sip/libsofia-sip-ua/su -I${srcdir}/deps/sofia-s
-I${srcdir}/deps/sofia-sip/libsofia-sip-ua/sip -I${srcdir}/deps/sofia-sip/libsofia-sip-ua/msg \
-I${srcdir}/deps/sofia-sip/libsofia-sip-ua/url -I${srcdir}/deps/sofia-sip/libsofia-sip-ua/tport \
-I${srcdir}/deps/sofia-sip/libsofia-sip-ua/bnf -I${srcdir}/deps/jansson/src \
-I${srcdir}/deps/redis/include \
-I${srcdir}/deps/cpp-bredis/include \
-I${srcdir}/deps/prometheus-cpp/build/include -I/usr/local/include

AM_LDFLAGS= -L/usr/local/lib -L${srcdir}/deps/prometheus-cpp/build/lib
LDADD= -lredis++ -lhiredis -lboost_thread -lpthread -lssl -lcrypto -lz
LDADD= -lboost_thread -lpthread -lssl -lcrypto -lz

if DEP_BOOST
INCLUDES += -I${srcdir}/deps/boost
Expand All @@ -24,8 +24,8 @@ drachtio_SOURCES= src/main.cpp src/controller.cpp src/drachtio-config.cpp \
src/timer-queue.cpp src/cdr.cpp src/timer-queue-manager.cpp src/sip-transports.cpp \
src/request-handler.cpp src/request-router.cpp src/stats-collector.cpp \
src/invite-in-progress.cpp src/blacklist.cpp src/ua-invalid.cpp
drachtio_CPPFLAGS=-D_REENTRANT -DDRACHTIO_VERSION=\"$(MYVERSION)\" \
-DBOOST_ALLOW_DEPRECATED_HEADERS -Wno-deprecated-declarations
drachtio_CPPFLAGS=-D_REENTRANT -DDRACHTIO_VERSION=\"$(MYVERSION)\" -Wno-error=deprecated-declarations \
-DBOOST_ALLOW_DEPRECATED_HEADERS -O2 -Wno-stringop-overflow
drachtio_LDADD= ${srcdir}/deps/sofia-sip/libsofia-sip-ua/.libs/libsofia-sip-ua.a ${srcdir}/deps/jansson/src/.libs/libjansson.a -lcurl -lpthread -lssl -lcrypto -lz
if ! DEP_BOOST
drachtio_CPPFLAGS+=-DBOOST_LOG_DYN_LINK
Expand Down
1 change: 1 addition & 0 deletions deps/cpp-bredis
Submodule cpp-bredis added at d42625
1 change: 0 additions & 1 deletion deps/redis
Submodule redis deleted from 465253
235 changes: 91 additions & 144 deletions src/blacklist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,164 +19,111 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <boost/algorithm/string.hpp>
#include <boost/variant.hpp>
#include <boost/redis/src.hpp>
#include <boost/redis/logger.hpp>
#include <boost/system/error_code.hpp>
#include <iostream>
#include <iterator>
#include <regex>

#include <string>
#include <iostream>


namespace net = boost::asio;
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using boost::redis::address;
using boost::redis::ignore_t;
#include "bredis.hpp"

#include "blacklist.hpp"
#include "controller.hpp"

namespace {
logger l(logger::level::err);

std::vector<address> parseAddressList(const std::string& addrList) {
std::vector<address> addresses;
std::vector<std::string> hostPortPairs;

// Split the string on commas to get a list of host:port pairs.
boost::split(hostPortPairs, addrList, boost::is_any_of(","));

for(const std::string& pair : hostPortPairs) {
std::vector<std::string> hostPort;
// Split each pair on ':' to separate the host and the port.
boost::split(hostPort, pair, boost::is_any_of(":"));
if(hostPort.size() == 2) {
address addr = { hostPort[0], hostPort[1] };
addresses.push_back(addr);
}
}
return addresses;
}

}
namespace r = bredis;
using socket_t = boost::asio::ip::tcp::socket;
using Buffer = boost::asio::streambuf;
using Iterator = typename r::to_iterator<Buffer>::iterator_t;
using Policy = r::parsing_policy::keep_result;
using result_t = r::parse_result_mapper_t<Iterator, Policy>;

namespace drachtio {

void Blacklist::start(void) {
srand (time(NULL));
std::thread t(&Blacklist::threadFunc, this) ;
m_thread.swap( t ) ;
}
void Blacklist::threadFunc() {
DR_LOG(log_debug) << "Blacklist thread id: " << std::this_thread::get_id() ;
while (true) {
static bool QueryRedis(
boost::asio::io_context& ioservice,
std::string redisKey,
const boost::asio::ip::tcp::endpoint& endpoint,
std::unordered_set<std::string>& ips
) {
try {
address addr;
if (m_redisSentinels.length() > 0) {
std::string ip, port;
querySentinels(ip, port);
addr.host = ip;
addr.port = port;
}
else {
addr.host = m_redisAddress;
addr.port = m_redisPort;
Buffer rx_buff;
socket_t socket(ioservice, endpoint.protocol());
DR_LOG(log_debug) << "Blacklist: connecting to " << endpoint.address() ;
socket.connect(endpoint) ;
DR_LOG(log_debug) << "Blacklist: successfully connected to redis" ;

r::Connection<socket_t> c(std::move(socket));
c.write(r::single_command_t{"SMEMBERS", redisKey});
auto r = c.read(rx_buff);
auto extract = boost::apply_visitor(r::extractor<Iterator>(), r.result);
rx_buff.consume(r.consumed);
auto &reply = boost::get<r::extracts::array_holder_t>(extract);
DR_LOG(log_info) << "Blacklist: got " << reply.elements.size() << " IPs to blacklist" ;
ips.clear();
BOOST_FOREACH(auto& member, reply.elements) {
auto &reply_str = boost::get<r::extracts::string_t>(member);
ips.insert(reply_str.str);
}
DR_LOG(log_debug) << "querying redis for blacklisted ips at " << addr.host << ":" << addr.port ;

config cfg;
request req;
response< std::list<std::string> > resp;
net::io_context ioc;
connection conn{ioc};
socket.close();
return true;
} catch( std::exception& e) {
DR_LOG(log_info) << "Blacklist::QueryRedis - Error: connecting to " << endpoint.address() << " " << std::string( e.what() ) ;
return false;
}
}

Blacklist::Blacklist(std::string& redisAddress, unsigned int redisPort, std::string& redisKey, unsigned int refreshSecs) :
m_redisKey(redisKey),
m_refreshSecs(refreshSecs),
m_redisAddress(redisAddress),
m_redisPort(redisPort)
{
}

void Blacklist::start() {
srand (time(NULL));
std::thread t(&Blacklist::threadFunc, this) ;
m_thread.swap( t ) ;
}

cfg.addr = addr;
if (!m_redisUsername.empty()) {
cfg.username = m_redisUsername;
Blacklist::~Blacklist() {
stop() ;
}
void Blacklist::threadFunc() {
bool initialized = false;
DR_LOG(log_debug) << "Blacklist thread id: " << std::this_thread::get_id() ;

while (true) {
unsigned int interval = m_refreshSecs;

/* get redis endpoint */
boost::system::error_code ec;
boost::asio::ip::address ip_address =
boost::asio::ip::address::from_string(m_redisAddress, ec);
if (ec.value() != 0) {
/* must be a dns name */
DR_LOG(log_debug) << "Blacklist resolving " << m_redisAddress ;

boost::asio::ip::tcp::resolver resolver(m_ioservice);
boost::asio::ip::tcp::resolver::results_type results = resolver.resolve(
m_redisAddress,
boost::lexical_cast<std::string>(m_redisPort),
ec);
for (boost::asio::ip::tcp::endpoint const& endpoint : results) {
DR_LOG(log_debug) << "Blacklist resolved to " << endpoint.address() ;
if (QueryRedis(m_ioservice, m_redisKey, endpoint, m_ips)) initialized = true;
break;
}
}
if (!m_redisPassword.empty()) {
cfg.password = m_redisPassword;
else {
boost::asio::ip::tcp::endpoint endpoint(ip_address, m_redisPort);
DR_LOG(log_debug) << "Connecting to redis at " << m_redisAddress << ":" << m_redisPort ;
if (QueryRedis(m_ioservice, m_redisKey, endpoint, m_ips)) initialized = true;
}

req.push("SMEMBERS", m_redisKey);
conn.async_run(cfg, l, net::detached);
conn.async_exec(req, resp, [&](auto ec, auto) {
if (!ec) {
auto& arr = std::get<0>(resp).value();
DR_LOG(log_debug) << "Found:" << arr.size() << " blacklisted ips in redis";
for (auto& r : arr) {
m_ips.insert(r);
}
}
conn.cancel();
});
ioc.run();

int sleepSecs = m_ips.size() > 0 ? m_refreshSecs : 300;
DR_LOG(log_debug) << "sleeping for " << sleepSecs << " seconds";
std::this_thread::sleep_for (std::chrono::seconds(sleepSecs));

} catch (const boost::system::system_error& e) {
DR_LOG(log_error) << "Caught boost system error in threadFunc: " << e.what();
return;
} catch (const std::exception& e) {
DR_LOG(log_error) << "Caught exception in Blacklist::threadFunc: " << e.what();
return;
} catch (...) {
DR_LOG(log_error) << "Caught unknown exception in Blacklist::threadFunc";
return;
if (!initialized) interval = 60;
std::this_thread::sleep_for (std::chrono::seconds(interval));
}
}
}

void Blacklist::querySentinels(std::string& ip, std::string& port) {
try {
config cfg;
request req;
response<std::optional<std::array<std::string, 2>>, ignore_t> resp;
net::io_context ioc;
connection conn{ioc};

auto addresses = parseAddressList(m_redisSentinels);
auto addr = addresses.front();
req.push("SENTINEL", "get-master-addr-by-name", m_redisServiceName.c_str());

DR_LOG(log_debug) << "querying sentinels " << addr.host << ":" << addr.port;
cfg.addr = addresses.front();
conn.async_run(cfg, l, net::detached);
conn.async_exec(req, resp, [&](auto ec, auto) {
if (!ec && std::get<0>(resp)) {
ip = std::get<0>(resp).value().value().at(0);
port = std::get<0>(resp).value().value().at(1);
DR_LOG(log_debug) << "sentinel reports master at " << ip << " port " << port;
}
else if (ec) {
DR_LOG(log_debug) << "error querying sentinel: " << ec.message();
}
else {
DR_LOG(log_debug) << "sentinel reports no master";
}
conn.cancel();
});
ioc.run();
} catch (const boost::system::system_error& e) {
DR_LOG(log_error) << "Caught boost system error in Blacklist::querySentinels: " << e.what();
return;
} catch (const std::exception& e) {
DR_LOG(log_error) << "Caught exception in Blacklist::Blacklist::querySentinels: " << e.what();
return;
} catch (...) {
DR_LOG(log_error) << "Caught unknown exception in Blacklist::Blacklist::querySentinels";
return;
}
}

}
void Blacklist::stop() {
//m_thread.join() ;
}

}
}
40 changes: 11 additions & 29 deletions src/blacklist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,50 +30,32 @@ THE SOFTWARE.

#include "drachtio.h"

using socket_t = boost::asio::ip::tcp::socket;

namespace drachtio {

class Blacklist {
public:
Blacklist(string& redisKey, unsigned int refreshSecs = 3600) : m_redisKey(redisKey), m_refreshSecs(refreshSecs) {
}
~Blacklist(){};

void start(void);
Blacklist(string& redisAddress, unsigned int redisPort, string& redisKey, unsigned int refreshSecs = 3600);
~Blacklist();

void start();
void stop() ;
void threadFunc(void) ;

void redisAddress(std::string& address, std::string& port) {
m_redisAddress = address;
m_redisPort = port;
}
void sentinelAddresses(std::string& addresses) {
m_redisSentinels = addresses;
}
void redisServiceName(std::string& serviceName) {
m_redisServiceName = serviceName;
}
void username(std::string& username) {
m_redisUsername = username;
}
void password(std::string& password) {
m_redisPassword = password;
}
void threadFunc(void) ;
bool isBlackListed(const char* srcAddress) {
return m_ips.end() != m_ips.find(srcAddress);
}

private:
void querySentinels(std::string& ip, std::string& port);

std::thread m_thread ;
boost::asio::io_context m_ioservice;
std::string m_redisAddress;
std::string m_redisPort;
unsigned int m_redisPort;
std::string& m_redisKey;
unsigned int m_refreshSecs;
std::string m_redisSentinels;
std::string m_redisServiceName;
std::string m_redisUsername;
std::string m_redisPassword;
std::unordered_set<std::string> m_ips ;
std::thread m_thread ;
} ;
}

Expand Down
Loading

0 comments on commit 0457f3b

Please sign in to comment.