From 1fd886730a444acbd5f96705dec93a1808888ff9 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 11 Dec 2018 07:20:01 +0200 Subject: [PATCH 01/16] Debug info extracted into separate files. --- CMakeLists.txt | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 34d70ef5..49d87197 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,44 @@ cmake_minimum_required(VERSION 3.10) +if(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE RelWithDebInfo) +endif(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) + +if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) + message("==> The configuration is ${CMAKE_BUILD_TYPE}. Debug info will be extracted into separate files.") + + function (add_executable _name) + _add_executable(${ARGV}) + + if (TARGET ${_name}) + message("==> ${_name} executable, debug info will be extracted into separate file.") + set(TargetList "${TargetList} ${_name}") + add_custom_command(TARGET ${_name} POST_BUILD + COMMAND objcopy --only-keep-debug "${_name}" "${_name}.debug" + COMMAND strip --strip-debug --strip-unneeded "${_name}" + COMMAND objcopy --add-gnu-debuglink="${_name}.debug" "${_name}" + ) + endif() + endfunction() + + function (add_library _name _type) + _add_library(${ARGV}) + + if (TARGET ${_name} AND ${_type} STREQUAL SHARED) + message("==> ${_name} library, debug info will be extracted into separate file.") + + set(FNAME "lib${_name}.so") + add_custom_command(TARGET ${_name} POST_BUILD + COMMAND objcopy --only-keep-debug "${FNAME}" "${FNAME}.debug" + COMMAND strip --strip-debug --strip-unneeded "${FNAME}" + COMMAND objcopy --add-gnu-debuglink="${FNAME}.debug" "${FNAME}" + WORKING_DIRECTORY ${GRAFTLETS_OUTPUT_DIR} + ) + endif() + endfunction() + +endif(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) + project(graft_server) option(OPT_BUILD_TESTS "Build tests." OFF) From 1eec471d0c834f509b8ba069eb2cf2b83fb8e106 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Wed, 12 Dec 2018 03:15:30 +0200 Subject: [PATCH 02/16] Extracting debug info enhanced. --- CMakeLists.txt | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 49d87197..78ba2e32 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,11 +12,11 @@ if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) if (TARGET ${_name}) message("==> ${_name} executable, debug info will be extracted into separate file.") - set(TargetList "${TargetList} ${_name}") add_custom_command(TARGET ${_name} POST_BUILD - COMMAND objcopy --only-keep-debug "${_name}" "${_name}.debug" - COMMAND strip --strip-debug --strip-unneeded "${_name}" - COMMAND objcopy --add-gnu-debuglink="${_name}.debug" "${_name}" + COMMAND echo "Extract debug info for $" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" + COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" ) endif() endfunction() @@ -26,13 +26,11 @@ if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) if (TARGET ${_name} AND ${_type} STREQUAL SHARED) message("==> ${_name} library, debug info will be extracted into separate file.") - - set(FNAME "lib${_name}.so") add_custom_command(TARGET ${_name} POST_BUILD - COMMAND objcopy --only-keep-debug "${FNAME}" "${FNAME}.debug" - COMMAND strip --strip-debug --strip-unneeded "${FNAME}" - COMMAND objcopy --add-gnu-debuglink="${FNAME}.debug" "${FNAME}" - WORKING_DIRECTORY ${GRAFTLETS_OUTPUT_DIR} + COMMAND echo "Extract debug info for $" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" + COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" ) endif() endfunction() From 3ab797ecafb11a0a009353341fc7999757f7c114 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Wed, 12 Dec 2018 18:22:17 +0200 Subject: [PATCH 03/16] GraftServerTest.genericCallback test fixed. --- test/graft_server_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index 6023bd0e..e841202d 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -1256,7 +1256,7 @@ TEST_F(GraftServerTest, genericCallback) case graft::Status::None: { //find webhook endpoint - auto it = std::find_if(input.headers.begin(), input.headers.end(), [](auto& v)->bool { v.first == "X-Callback"; } ); + auto it = std::find_if(input.headers.begin(), input.headers.end(), [](auto& v)->bool { return v.first == "X-Callback"; } ); assert(it != input.headers.end()); std::string path = it->second; //"http://0.0.0.0:port/callback/" From 623aad7050505122951a498251d7c5556e5f17d9 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Thu, 13 Dec 2018 02:18:21 +0200 Subject: [PATCH 04/16] Extracting debug info messages enhanced. --- CMakeLists.txt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 78ba2e32..17069cb2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,9 +11,8 @@ if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) _add_executable(${ARGV}) if (TARGET ${_name}) - message("==> ${_name} executable, debug info will be extracted into separate file.") add_custom_command(TARGET ${_name} POST_BUILD - COMMAND echo "Extract debug info for $" + COMMAND echo "$: extracting debug info" COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" @@ -25,9 +24,8 @@ if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) _add_library(${ARGV}) if (TARGET ${_name} AND ${_type} STREQUAL SHARED) - message("==> ${_name} library, debug info will be extracted into separate file.") add_custom_command(TARGET ${_name} POST_BUILD - COMMAND echo "Extract debug info for $" + COMMAND echo "$: extracting debug info" COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" From 9c5c57c68a7b102d74fbfc1c2a6f185fc34342f3 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Thu, 27 Dec 2018 02:28:06 +0200 Subject: [PATCH 05/16] UpstreamManager dependency on TaskManager removed. Separate files for UpstreamManager. --- CMakeLists.txt | 1 + include/lib/graft/connection.h | 7 +- include/lib/graft/task.h | 5 +- include/lib/graft/upstream_manager.h | 74 ++++++++++ src/lib/graft/connection.cpp | 43 +++--- src/lib/graft/task.cpp | 203 ++------------------------- src/lib/graft/upstream_manager.cpp | 141 +++++++++++++++++++ src/supernode/server.cpp | 1 + 8 files changed, 257 insertions(+), 218 deletions(-) create mode 100644 include/lib/graft/upstream_manager.h create mode 100644 src/lib/graft/upstream_manager.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 497d0c71..50fb6af1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -190,6 +190,7 @@ add_library(graft STATIC ${PROJECT_SOURCE_DIR}/src/lib/graft/mongoosex.cpp ${PROJECT_SOURCE_DIR}/src/lib/graft/router.cpp ${PROJECT_SOURCE_DIR}/src/lib/graft/task.cpp + ${PROJECT_SOURCE_DIR}/src/lib/graft/upstream_manager.cpp ${PROJECT_SOURCE_DIR}/modules/mongoose/mongoose.c ${PROJECT_SOURCE_DIR}/src/supernode/server.cpp ${PROJECT_SOURCE_DIR}/src/supernode/supernode.cpp diff --git a/include/lib/graft/connection.h b/include/lib/graft/connection.h index 9cb2b02e..833916c1 100644 --- a/include/lib/graft/connection.h +++ b/include/lib/graft/connection.h @@ -74,9 +74,10 @@ class UpstreamSender : public SelfHolder BaseTaskPtr& getTask() { return m_bt; } - void send(TaskManager& manager, const std::string& uri); + void send(mg_mgr* mgr, int http_callback_port, const std::string& uri); Status getStatus() const { return m_status; } const std::string& getError() const { return m_error; } + size_t getRequestSize() const { return m_requestSize; } void ev_handler(mg_connection* upstream, int ev, void *ev_data); private: @@ -94,6 +95,7 @@ class UpstreamSender : public SelfHolder mg_connection* m_upstream = nullptr; Status m_status = Status::None; std::string m_error; + size_t m_requestSize = 0; }; class ConnectionBase; @@ -101,7 +103,7 @@ class ConnectionBase; class Looper final : public TaskManager { public: - Looper(const ConfigOpts& copts, ConnectionBase& connectionBase); + Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase); virtual ~Looper(); void serve(); @@ -193,6 +195,7 @@ class ConnectionBase final std::unique_ptr m_sysInfo; std::atomic_bool m_looperReady{false}; std::unique_ptr m_looper; + std::unique_ptr m_upstreamManager; std::map> m_conManagers; }; diff --git a/include/lib/graft/task.h b/include/lib/graft/task.h index 6fc8f1ad..896ea09f 100644 --- a/include/lib/graft/task.h +++ b/include/lib/graft/task.h @@ -38,6 +38,7 @@ namespace graft { extern std::string client_addr(mg_connection* client); extern std::string client_host(mg_connection* client); +extern unsigned int port_from_uri(const std::string& uri); class UpstreamSender; class TaskManager; @@ -192,7 +193,7 @@ class UpstreamManager; class TaskManager : private HandlerAPI { public: - TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter); + TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter); virtual ~TaskManager(); TaskManager(const TaskManager&) = delete; TaskManager& operator = (const TaskManager&) = delete; @@ -259,6 +260,7 @@ class TaskManager : private HandlerAPI static inline size_t next_pow2(size_t val); + UpstreamManager& m_upstreamManager; SysInfoCounter& m_sysInfoCounter; GlobalContextMap m_gcm; @@ -277,7 +279,6 @@ class TaskManager : private HandlerAPI std::deque m_readyToResume; std::priority_queue,Context::uuid_t>> m_expireTaskQueue; std::unique_ptr m_futurePostponeUuids; - std::unique_ptr m_upstreamManager; using PromiseItem = UpstreamTask::PromiseItem; using PromiseQueue = tp::MPMCBoundedQueue; diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h new file mode 100644 index 00000000..3029258e --- /dev/null +++ b/include/lib/graft/upstream_manager.h @@ -0,0 +1,74 @@ +#pragma once +#include "lib/graft/connection.h" + + +namespace graft +{ + +class UpstreamManager +{ +public: + using OnDoneCallback = std::function; + UpstreamManager() = default; + + void init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback); + + bool busy() const + { + return (m_cntUpstreamSender != m_cntUpstreamSenderDone); + } + + void send(BaseTaskPtr bt); + +private: + class ConnItem + { + public: + using ConnectionId = uint64_t; + + ConnItem() = default; + ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) + : m_uriId(uriId) + , m_uri(uri) + , m_maxConnections(maxConnections) + , m_keepAlive(keepAlive) + , m_timeout(timeout) + { + } + + std::pair getConnection(); + void releaseActive(ConnectionId connectionId, mg_connection* client); + void onCloseIdle(mg_connection* client); + + int m_connCnt = 0; + int m_uriId; + std::string m_uri; + double m_timeout; + //assert(m_upstreamQueue.empty() || 0 < m_maxConnections); + int m_maxConnections; + std::deque m_taskQueue; + bool m_keepAlive = false; + std::map m_idleConnections; + std::map m_activeConnections; + UpstreamStub m_upstreamStub; + private: + ConnectionId m_newId = 0; + }; + + void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client); + void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt); + + using Uri2ConnItem = std::map; + + OnDoneCallback m_onDoneCallback; + + uint64_t m_cntUpstreamSender = 0; + uint64_t m_cntUpstreamSenderDone = 0; + ConnItem m_default; + Uri2ConnItem m_conn2item; + mg_mgr* m_mgr = nullptr; + int m_http_callback_port; +}; + +}//namespace graft + diff --git a/src/lib/graft/connection.cpp b/src/lib/graft/connection.cpp index 8aa3ad0d..2cae7251 100644 --- a/src/lib/graft/connection.cpp +++ b/src/lib/graft/connection.cpp @@ -3,6 +3,7 @@ #include "lib/graft/mongoosex.h" #include "lib/graft/sys_info.h" #include "lib/graft/graft_exception.h" +#include "lib/graft/upstream_manager.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.connection" @@ -23,6 +24,16 @@ std::string client_host(mg_connection* client) return inet_ntoa(client->sa.sin.sin_addr); } +unsigned int port_from_uri(const std::string& uri) +{ + assert(!uri.empty()); + mg_str mg_uri{uri.c_str(), uri.size()}; + unsigned int mg_port = 0; + int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0); + assert(0<=res); + return mg_port; +} + void* getUserData(mg_mgr* mgr) { return mgr->user_data; } void* getUserData(mg_connection* nc) { return nc->user_data; } mg_mgr* getMgr(mg_connection* nc) { return nc->mgr; } @@ -60,28 +71,18 @@ void UpstreamStub::ev_handler(mg_connection *upstream, int ev, void *ev_data) } -void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) +void UpstreamSender::send(mg_mgr* mgr, int http_callback_port, const std::string& def_uri) { assert(m_bt); - const ConfigOpts& opts = manager.getCopts(); - Output& output = m_bt->getOutput(); std::string url = output.makeUri(def_uri); Context::uuid_t callback_uuid = m_bt->getCtx().getId(false); if(!callback_uuid.is_nil()) {//add extra header - unsigned int mg_port = 0; - { - std::string uri = opts.http_address; - assert(!uri.empty()); - mg_str mg_uri{uri.c_str(), uri.size()}; - int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0); - assert(0<=res); - } std::stringstream ss; - ss << "http://0.0.0.0:" << mg_port << "/callback/" << boost::uuids::to_string(callback_uuid); + ss << "http://0.0.0.0:" << http_callback_port << "/callback/" << boost::uuids::to_string(callback_uuid); output.headers.emplace_back(std::make_pair("X-Callback", ss.str())); } std::string extra_headers = output.combine_headers(); @@ -95,7 +96,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) m_upstream->user_data = this; m_upstream->handler = static_ev_handler; } - mg_connection* upstream = mg::mg_connect_http_x(m_upstream, manager.getMgMgr(), static_ev_handler, url.c_str(), + mg_connection* upstream = mg::mg_connect_http_x(m_upstream, mgr, static_ev_handler, url.c_str(), extra_headers.c_str(), body); //body.empty() means GET assert(upstream != nullptr && (m_upstream == nullptr || m_upstream == upstream)); @@ -106,9 +107,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) } mg_set_timer(m_upstream, mg_time() + m_timeout); - auto& rsi = manager.runtimeSysInfo(); - rsi.count_upstrm_http_req(); - rsi.count_upstrm_http_req_bytes_raw(url.size() + extra_headers.size() + body.size()); + m_requestSize = url.size() + extra_headers.size() + body.size(); } void UpstreamSender::ev_handler(mg_connection *upstream, int ev, void *ev_data) @@ -240,8 +239,12 @@ void ConnectionBase::createSystemInfoCounter() void ConnectionBase::createLooper(ConfigOpts& configOpts) { - assert(m_sysInfo && !m_looper); - m_looper = std::make_unique(configOpts, *this); + assert(m_sysInfo && !m_looper && !m_upstreamManager); + m_upstreamManager = std::make_unique(); + + m_looper = std::make_unique(configOpts, *m_upstreamManager, *this); + m_upstreamManager->init(configOpts, m_looper->getMgMgr(), port_from_uri(configOpts.http_address), + [this](UpstreamSender& uss){ m_looper->onUpstreamDone(uss); }); m_looperReady = true; } @@ -290,8 +293,8 @@ void ConnectionBase::checkRoutes(graft::ConnectionManager& cm) } -Looper::Looper(const ConfigOpts& copts, ConnectionBase& connectionBase) - : TaskManager(copts, connectionBase.getSysInfoCounter()) +Looper::Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase) + : TaskManager(copts, upstreamManager, connectionBase.getSysInfoCounter()) , m_mgr(std::make_unique()) { mg_mgr_init(m_mgr.get(), &connectionBase, cb_event); diff --git a/src/lib/graft/task.cpp b/src/lib/graft/task.cpp index 5fe8a0ab..341549a4 100644 --- a/src/lib/graft/task.cpp +++ b/src/lib/graft/task.cpp @@ -6,6 +6,7 @@ #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" #include "lib/graft/sys_info.h" +#include "lib/graft/upstream_manager.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.task" @@ -203,195 +204,9 @@ class ExpiringList : public detail::ExpiringListT< Uuid_Input > { } }; -class UpstreamManager -{ -public: - using OnDoneCallback = std::function; - - UpstreamManager(TaskManager& manager, OnDoneCallback onDoneCallback) - : m_manager(manager) - , m_onDoneCallback(onDoneCallback) - { init(); } - - bool busy() const - { - return (m_cntUpstreamSender != m_cntUpstreamSenderDone); - } - - void send(BaseTaskPtr bt) - { - ConnItem* connItem = &m_default; - {//find connItem - const std::string& uri = bt->getOutput().uri; - if(!uri.empty() && uri[0] == '$') - {//substitutions - auto it = m_conn2item.find(uri.substr(1)); - if(it == m_conn2item.end()) - { - std::ostringstream oss; - oss << "cannot find uri substitution '" << uri << "'"; - throw std::runtime_error(oss.str()); - } - connItem = &it->second; - } - } - if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) - { - connItem->m_taskQueue.push_back(bt); - return; - } - - createUpstreamSender(connItem, bt); - } -private: - uint64_t m_cntUpstreamSender = 0; - uint64_t m_cntUpstreamSenderDone = 0; - - class ConnItem - { - public: - using Active = bool; - using ConnectionId = uint64_t; - - ConnItem() = default; - ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) - : m_uriId(uriId) - , m_uri(uri) - , m_maxConnections(maxConnections) - , m_keepAlive(keepAlive) - , m_timeout(timeout) - { - } - std::pair getConnection() - { - //TODO: something wrong with (m_connCnt <= m_maxConnections) - assert(m_maxConnections == 0 || m_connCnt <= m_maxConnections); - std::pair res = std::make_pair(0,nullptr); - if(!m_keepAlive) - { - ++m_connCnt; - return res; - } - if(!m_idleConnections.empty()) - { - auto it = m_idleConnections.begin(); - res = std::make_pair(it->second, it->first); - m_idleConnections.erase(it); - } - else - { - ++m_connCnt; - res.first = ++m_newId; - } - auto res1 = m_activeConnections.emplace(res); - assert(res1.second); - assert(m_connCnt == m_idleConnections.size() + m_activeConnections.size()); - return res; - } - - void releaseActive(ConnectionId connectionId, mg_connection* client) - { - assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); - if(!m_keepAlive) return; - auto it = m_activeConnections.find(connectionId); - assert(it != m_activeConnections.end()); - assert(it->second == nullptr || client == nullptr || it->second == client); - if(client != nullptr) - { - m_idleConnections.emplace(client, it->first); - m_upstreamStub.setConnection(client); - } - else - { - --m_connCnt; - } - m_activeConnections.erase(it); - } - - void onCloseIdle(mg_connection* client) - { - assert(m_keepAlive); - auto it = m_idleConnections.find(client); - assert(it != m_idleConnections.end()); - --m_connCnt; - m_idleConnections.erase(it); - } - - ConnectionId m_newId = 0; - int m_connCnt = 0; - int m_uriId; - std::string m_uri; - double m_timeout; - //assert(m_upstreamQueue.empty() || 0 < m_maxConn); - int m_maxConnections; - std::deque m_taskQueue; - bool m_keepAlive = false; - std::map m_idleConnections; - std::map m_activeConnections; - UpstreamStub m_upstreamStub; - }; - - void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client) - { - ++m_cntUpstreamSenderDone; - m_onDoneCallback(uss); - connItem->releaseActive(connectionId, client); - if(connItem->m_taskQueue.empty()) return; - BaseTaskPtr bt = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); - createUpstreamSender(connItem, bt); - } - - void init() - { - int uriId = 0; - const ConfigOpts& opts = m_manager.getCopts(); - m_default = ConnItem(uriId++, opts.cryptonode_rpc_address.c_str(), 0, false, opts.upstream_request_timeout); - - for(auto& subs : OutHttp::uri_substitutions) - { - double timeout = std::get<3>(subs.second); - if(timeout < 1e-5) timeout = opts.upstream_request_timeout; - auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); - assert(res.second); - ConnItem* connItem = &res.first->second; - connItem->m_upstreamStub.setCallback([connItem](mg_connection* client){ connItem->onCloseIdle(client); }); - } - } - - void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) - { - auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) - { - onDone(uss, connItem, connectionId, client); - }; - - ++m_cntUpstreamSender; - UpstreamSender::Ptr uss; - if(connItem->m_keepAlive) - { - auto res = connItem->getConnection(); - uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); - } - else - { - uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); - } - - const std::string& uri = (connItem != &m_default || bt->getOutput().uri.empty())? connItem->m_uri : bt->getOutput().uri; - uss->send(m_manager, uri); - } - - using Uri2ConnItem = std::map; - - OnDoneCallback m_onDoneCallback; - - ConnItem m_default; - Uri2ConnItem m_conn2item; - TaskManager& m_manager; //TODO: should be removed, and be independent of TaskManager -}; - -TaskManager::TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter) +TaskManager::TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter) : m_copts(copts) + , m_upstreamManager(upstreamManager) , m_sysInfoCounter(sysInfoCounter) , m_gcm(this) , m_futurePostponeUuids(std::make_unique(1000 * copts.http_connection_timeout)) @@ -489,15 +304,13 @@ void TaskManager::checkUpstreamBlockingIO() bool res = m_promiseQueue->pop(pi); if(!res) break; UpstreamTask::Ptr bt = BaseTask::Create(*this, std::move(pi)); - assert(m_upstreamManager); - m_upstreamManager->send(bt); + sendUpstream(bt); } } void TaskManager::sendUpstream(BaseTaskPtr bt) { - assert(m_upstreamManager); - m_upstreamManager->send(bt); + m_upstreamManager.send(bt); } void TaskManager::onTimer(BaseTaskPtr bt) @@ -536,7 +349,7 @@ void TaskManager::schedule(PeriodicTask* pt) bool TaskManager::canStop() { return (m_cntBaseTask == m_cntBaseTaskDone) - && (!m_upstreamManager->busy()) + && (!m_upstreamManager.busy()) && (m_cntJobSent == m_cntJobDone); } @@ -856,7 +669,6 @@ void TaskManager::initThreadPool(int threadCount, int workersQueueSize, int expe m_promiseQueue = std::make_unique( threadCount ); //TODO: it is not clear how many items we need in PeriodicTaskQueue, maybe we should make it dynamically but this requires additional synchronization m_periodicTaskQueue = std::make_unique(2*threadCount); - m_upstreamManager = std::make_unique(*this, [this](UpstreamSender& uss){ onUpstreamDone(uss); } ); LOG_PRINT_L1("Thread pool created with " << threadCount << " workers with " << workersQueueSize @@ -892,6 +704,9 @@ void TaskManager::onUpstreamDone(UpstreamSender& uss) void TaskManager::upstreamDoneProcess(UpstreamSender& uss) { + runtimeSysInfo().count_upstrm_http_req(); + runtimeSysInfo().count_upstrm_http_req_bytes_raw(uss.getRequestSize()); + if(Status::Ok == uss.getStatus()) runtimeSysInfo().count_upstrm_http_resp_ok(); else diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp new file mode 100644 index 00000000..2761961e --- /dev/null +++ b/src/lib/graft/upstream_manager.cpp @@ -0,0 +1,141 @@ +#include "lib/graft/upstream_manager.h" + +namespace graft +{ + +std::pair UpstreamManager::ConnItem::getConnection() +{ + assert(m_maxConnections == 0 || m_connCnt < m_maxConnections || m_connCnt == m_maxConnections && !m_idleConnections.empty()); + std::pair res = std::make_pair(0,nullptr); + if(!m_keepAlive) + { + ++m_connCnt; + return res; + } + if(!m_idleConnections.empty()) + { + auto it = m_idleConnections.begin(); + res = std::make_pair(it->second, it->first); + m_idleConnections.erase(it); + } + else + { + ++m_connCnt; + res.first = ++m_newId; + } + auto res1 = m_activeConnections.emplace(res); + assert(res1.second); + assert(m_connCnt == m_idleConnections.size() + m_activeConnections.size()); + return res; +} + +void UpstreamManager::ConnItem::releaseActive(ConnectionId connectionId, mg_connection* client) +{ + assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); + if(!m_keepAlive) return; + auto it = m_activeConnections.find(connectionId); + assert(it != m_activeConnections.end()); + assert(it->second == nullptr || client == nullptr || it->second == client); + if(client != nullptr) + { + m_idleConnections.emplace(client, it->first); + m_upstreamStub.setConnection(client); + } + else + { + --m_connCnt; + } + m_activeConnections.erase(it); +} + +void UpstreamManager::ConnItem::onCloseIdle(mg_connection* client) +{ + assert(m_keepAlive); + auto it = m_idleConnections.find(client); + assert(it != m_idleConnections.end()); + --m_connCnt; + m_idleConnections.erase(it); +} + +void UpstreamManager::send(BaseTaskPtr bt) +{ + ConnItem* connItem = &m_default; + {//find connItem + const std::string& uri = bt->getOutput().uri; + if(!uri.empty() && uri[0] == '$') + {//substitutions + auto it = m_conn2item.find(uri.substr(1)); + if(it == m_conn2item.end()) + { + std::ostringstream oss; + oss << "cannot find uri substitution '" << uri << "'"; + throw std::runtime_error(oss.str()); + } + connItem = &it->second; + } + } + if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) + { + connItem->m_taskQueue.push_back(bt); + return; + } + + createUpstreamSender(connItem, bt); +} + + +void UpstreamManager::onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client) +{ + ++m_cntUpstreamSenderDone; + m_onDoneCallback(uss); + connItem->releaseActive(connectionId, client); + if(connItem->m_taskQueue.empty()) return; + BaseTaskPtr bt = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); + createUpstreamSender(connItem, bt); +} + +void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback) +{ + m_mgr = mgr; + m_http_callback_port = http_callback_port; + m_onDoneCallback = onDoneCallback; + + int uriId = 0; + m_default = ConnItem(uriId++, copts.cryptonode_rpc_address.c_str(), 0, false, copts.upstream_request_timeout); + + for(auto& subs : OutHttp::uri_substitutions) + { + double timeout = std::get<3>(subs.second); + if(timeout < 1e-5) timeout = copts.upstream_request_timeout; + auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); + assert(res.second); + ConnItem* connItem = &res.first->second; + connItem->m_upstreamStub.setCallback([connItem](mg_connection* client){ connItem->onCloseIdle(client); }); + } +} + +void UpstreamManager::createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) +{ + auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) + { + onDone(uss, connItem, connectionId, client); + }; + + ++m_cntUpstreamSender; + UpstreamSender::Ptr uss; + if(connItem->m_keepAlive) + { + auto res = connItem->getConnection(); + uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); + } + else + { + uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); + } + + const std::string& uri = (connItem != &m_default || bt->getOutput().uri.empty())? connItem->m_uri : bt->getOutput().uri; + uss->send(m_mgr, m_http_callback_port, uri); +} + +}//namespace graft + diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 1e65094e..5b9fc015 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -4,6 +4,7 @@ #include "lib/graft/GraftletLoader.h" #include "lib/graft/sys_info.h" #include "lib/graft/graft_exception.h" +#include "lib/graft/upstream_manager.h" #include #include From 8ddb26bc752501d5702c8fd5fce5a8cd9ed1901b Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Fri, 28 Dec 2018 17:16:37 +0200 Subject: [PATCH 06/16] static uri_substitutions removed from OutHttp. makeUri test fixed. --- include/lib/graft/inout.h | 1 - include/lib/graft/serveropts.h | 2 ++ include/lib/graft/upstream_manager.h | 5 +++- src/lib/graft/inout.cpp | 1 - src/lib/graft/upstream_manager.cpp | 27 ++++++++++++++--- src/supernode/server.cpp | 6 ++-- test/graft_server_test.cpp | 44 +++++++++++++++++++++------- test/upstream_test.cpp | 4 +-- 8 files changed, 67 insertions(+), 23 deletions(-) diff --git a/include/lib/graft/inout.h b/include/lib/graft/inout.h index bb81034c..97c41ecb 100644 --- a/include/lib/graft/inout.h +++ b/include/lib/graft/inout.h @@ -211,7 +211,6 @@ namespace graft std::string port; std::string path; - static std::unordered_map> uri_substitutions; }; class InHttp final : public InOutHttpBase diff --git a/include/lib/graft/serveropts.h b/include/lib/graft/serveropts.h index 490d49c9..4cf7f006 100644 --- a/include/lib/graft/serveropts.h +++ b/include/lib/graft/serveropts.h @@ -2,6 +2,7 @@ #include #include +#include #include namespace graft { @@ -30,6 +31,7 @@ struct ConfigOpts std::vector graftlet_dirs; int lru_timeout_ms; IPFilterOpts ipfilter; + std::unordered_map> uri_substitutions; void check_asserts() const { diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h index 3029258e..9fc82eab 100644 --- a/include/lib/graft/upstream_manager.h +++ b/include/lib/graft/upstream_manager.h @@ -19,7 +19,8 @@ class UpstreamManager } void send(BaseTaskPtr bt); - +protected: + const std::string getUri(const std::string& inputUri); private: class ConnItem { @@ -57,6 +58,8 @@ class UpstreamManager void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client); void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt); + ConnItem* findConnItem(const std::string& inputUri); + const std::string& getUri(ConnItem* connItem, const std::string& inputUri); using Uri2ConnItem = std::map; diff --git a/src/lib/graft/inout.cpp b/src/lib/graft/inout.cpp index e13c289d..bc152e1e 100644 --- a/src/lib/graft/inout.cpp +++ b/src/lib/graft/inout.cpp @@ -4,7 +4,6 @@ namespace graft { -std::unordered_map> OutHttp::uri_substitutions; void InOutHttpBase::set_str_field(const http_message& hm, const mg_str& str_fld, std::string& fld) { diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp index 2761961e..e99922dc 100644 --- a/src/lib/graft/upstream_manager.cpp +++ b/src/lib/graft/upstream_manager.cpp @@ -57,11 +57,11 @@ void UpstreamManager::ConnItem::onCloseIdle(mg_connection* client) m_idleConnections.erase(it); } -void UpstreamManager::send(BaseTaskPtr bt) +UpstreamManager::ConnItem* UpstreamManager::findConnItem(const std::string& inputUri) { ConnItem* connItem = &m_default; {//find connItem - const std::string& uri = bt->getOutput().uri; + const std::string& uri = inputUri; if(!uri.empty() && uri[0] == '$') {//substitutions auto it = m_conn2item.find(uri.substr(1)); @@ -74,6 +74,13 @@ void UpstreamManager::send(BaseTaskPtr bt) connItem = &it->second; } } + return connItem; +} + +void UpstreamManager::send(BaseTaskPtr bt) +{ + ConnItem* connItem = findConnItem(bt->getOutput().uri); + if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) { connItem->m_taskQueue.push_back(bt); @@ -103,7 +110,7 @@ void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callba int uriId = 0; m_default = ConnItem(uriId++, copts.cryptonode_rpc_address.c_str(), 0, false, copts.upstream_request_timeout); - for(auto& subs : OutHttp::uri_substitutions) + for(auto& subs : copts.uri_substitutions) { double timeout = std::get<3>(subs.second); if(timeout < 1e-5) timeout = copts.upstream_request_timeout; @@ -114,6 +121,12 @@ void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callba } } +const std::string& UpstreamManager::getUri(ConnItem* connItem, const std::string& inputUri) +{ + const std::string& uri = (connItem != &m_default || inputUri.empty())? connItem->m_uri : inputUri; + return uri; +} + void UpstreamManager::createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) { auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) @@ -133,9 +146,15 @@ void UpstreamManager::createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); } - const std::string& uri = (connItem != &m_default || bt->getOutput().uri.empty())? connItem->m_uri : bt->getOutput().uri; + const std::string& uri = getUri(connItem, bt->getOutput().uri); uss->send(m_mgr, m_http_callback_port, uri); } +const std::string UpstreamManager::getUri(const std::string& inputUri) +{ + ConnItem* connItem = findConnItem(inputUri); + return getUri(connItem, inputUri); +} + }//namespace graft diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 5b9fc015..ca8e2291 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -533,15 +533,15 @@ bool GraftServer::initConfigOption(int argc, const char** argv, ConfigOpts& conf configOpts.log_trunc_to_size = (log_trunc_to_size)? log_trunc_to_size.get() : -1; const boost::property_tree::ptree& uri_subst_conf = config.get_child("upstream"); - graft::OutHttp::uri_substitutions.clear(); - std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf](auto it) + configOpts.uri_substitutions.clear(); + std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf, &configOpts](auto it) { std::string name(it.first); std::string val(uri_subst_conf.get(name)); std::string uri; int cnt; bool keepAlive; double timeout; details::parseSubstitutionItem(name, val, uri, cnt, keepAlive, timeout); - graft::OutHttp::uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); + configOpts.uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); }); return true; diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index 63e13124..914cd132 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -5,6 +5,7 @@ #include "lib/graft/inout.h" #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" +#include "lib/graft/upstream_manager.h" #include "supernode/requests.h" #include "supernode/requests/sale.h" #include "supernode/requests/sale_status.h" @@ -150,6 +151,27 @@ TEST(InOut, serialization) output.loadT(a); } +namespace +{ + +class UpstreamManagerTest : public graft::UpstreamManager +{ +public: + void init(const std::string& subst_name, const std::string& subst_uri, const std::string& cryptonode_rpc_address = "") + { + graft::ConfigOpts copts; + copts.cryptonode_rpc_address = cryptonode_rpc_address; + copts.uri_substitutions.insert({subst_name, {subst_uri, 0, false, 0}}); + graft::UpstreamManager::init(copts, nullptr, 0, nullptr); + } + const std::string getUri(const std::string& inputUri) + { + return graft::UpstreamManager::getUri(inputUri); + } +}; + +} //namespace + TEST(InOut, makeUri) { { @@ -159,49 +181,49 @@ TEST(InOut, makeUri) EXPECT_EQ(url, default_uri); } { + UpstreamManagerTest umt; umt.init("my_ip", "1.2.3.4"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_ip", {"1.2.3.4", 0, false, 0}}); output.proto = "https"; output.port = "4321"; output.uri = "$my_ip"; - std::string url = output.makeUri(""); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, output.proto + "://1.2.3.4:" + output.port); } { + UpstreamManagerTest umt; umt.init("my_path", "http://site.com:1234/endpoint?q=1&n=2"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_path", {"http://site.com:1234/endpoint?q=1&n=2", 0, false, 0}}); output.proto = "https"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(""); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://site.com:4321/endpoint?q=1&n=2"); } { + UpstreamManagerTest umt; umt.init("my_path", "/endpoint?q=1&n=2"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_path", {"endpoint?q=1&n=2", 0, false, 0}}); output.proto = "https"; output.host = "mysite.com"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(""); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://mysite.com:4321/endpoint?q=1&n=2"); } { + UpstreamManagerTest umt; umt.init("something", "1.2.3.4", "localhost:28881"); graft::Output output; - std::string default_uri = "localhost:28881"; output.path = "json_rpc"; - std::string url = output.makeUri(default_uri); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "localhost:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; - url = output.makeUri(default_uri); + url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://localhost:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; output.uri = "http://aaa.bbb:12345/something"; - url = output.makeUri(default_uri); + url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://aaa.bbb:12345/json_rpc"); } } @@ -1279,7 +1301,7 @@ TEST_F(GraftServerTest, genericCallback) graft::supernode::request::registerForwardRequests(m_httpRouter); m_httpRouter.addRoute("/api/{forward:create_account|restore_account|wallet_balance|prepare_transfer|transaction_history}",METHOD_POST,{nullptr,pretend_walletnode_echo,nullptr}); - graft::Output::uri_substitutions.emplace("walletnode", std::make_tuple("http://localhost:28690/", 0, false, 0)); + m_copts.uri_substitutions.emplace("walletnode", std::make_tuple("http://localhost:28690/", 0, false, 0)); run(); std::string post_data = "some data"; diff --git a/test/upstream_test.cpp b/test/upstream_test.cpp index 56516653..eb3a560c 100644 --- a/test/upstream_test.cpp +++ b/test/upstream_test.cpp @@ -34,8 +34,8 @@ TEST_F(GraftServerTestBase, upstreamKeepAlive) crypton.keepAlive = true; crypton.connect_timeout_ms = 1000; crypton.run(); - graft::Output::uri_substitutions.insert({"crypton", {"127.0.0.1:1234", 3, true, 100}}); MainServer mainServer; + mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", 3, true, 100}}); mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); mainServer.run(); @@ -99,8 +99,8 @@ TEST_F(GraftServerTestBase, DISABLED_cryptonodeKeepAlive) } }; - graft::Output::uri_substitutions.insert({"cryptonode", {"127.0.0.1:18981/json_rpc", 3, true, 100}}); MainServer mainServer; + mainServer.m_copts.uri_substitutions.insert({"cryptonode", {"127.0.0.1:18981/json_rpc", 3, true, 100}}); mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); mainServer.run(); From 6695481e571abb35874589ac8e4824304eca9c30 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Sat, 29 Dec 2018 17:27:42 +0200 Subject: [PATCH 07/16] upstreamKeepAlive test improved. Simulated test cryptonode supports multi-threading request/response. --- include/lib/graft/upstream_manager.h | 5 +++ test/fixture.h | 41 +++++++++++++++----- test/graft_server_test.cpp | 8 ++-- test/upstream_test.cpp | 56 +++++++++++++++++++++------- 4 files changed, 83 insertions(+), 27 deletions(-) diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h index 9fc82eab..60d3d33e 100644 --- a/include/lib/graft/upstream_manager.h +++ b/include/lib/graft/upstream_manager.h @@ -36,6 +36,11 @@ class UpstreamManager , m_timeout(timeout) { } + ~ConnItem() + { + assert(m_idleConnections.empty()); + assert(m_activeConnections.empty()); + } std::pair getConnection(); void releaseActive(ConnectionId connectionId, mg_connection* client); diff --git a/test/fixture.h b/test/fixture.h index 9ac633fe..6ed83580 100644 --- a/test/fixture.h +++ b/test/fixture.h @@ -64,10 +64,13 @@ class GraftServerTestBase : public ::testing::Test int poll_timeout_ms = 1000; bool keepAlive = false; - using on_http_t = bool (const http_message *hm, int& status_code, std::string& headers, std::string& data); + using on_http_t = bool (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data); std::function on_http = nullptr; static std::function http_echo; + using on_poll_t = bool (mg_connection* client, int& status_code, std::string& headers, std::string& data); + std::function on_poll = nullptr; + void run() { ready = false; @@ -83,13 +86,22 @@ class GraftServerTestBase : public ::testing::Test stop = true; th.join(); } + protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) { assert(on_http); - return on_http(hm, status_code, headers, data); + return on_http(client, hm, status_code, headers, data); } virtual void onClose() { } + virtual bool onPoll(mg_connection* client) + { + if(!on_poll) return false; + int status_code = 200; + std::string headers, data; + bool res = on_poll(client, status_code, headers, data); + if(res) sendResponse(client, status_code, headers, data); + } private: std::thread th; std::atomic_bool ready; @@ -109,6 +121,16 @@ class GraftServerTestBase : public ::testing::Test mg_mgr_free(&mgr); } + void sendResponse(mg_connection* client, int& status_code, std::string& headers, std::string& data) + { + mg_send_head(client, status_code, data.size(), headers.c_str()); + mg_send(client, data.c_str(), data.size()); + if(!keepAlive) + { + client->flags |= MG_F_SEND_AND_CLOSE; + } + } + static void ev_handler_empty_s(mg_connection *client, int ev, void *ev_data) { } @@ -128,14 +150,13 @@ class GraftServerTestBase : public ::testing::Test struct http_message *hm = (struct http_message *) ev_data; int status_code = 200; std::string headers, data; - bool res = onHttpRequest(hm, status_code, headers, data); + bool res = onHttpRequest(client, hm, status_code, headers, data); if(!res) break; - mg_send_head(client, status_code, data.size(), headers.c_str()); - mg_send(client, data.c_str(), data.size()); - if(!keepAlive) - { - client->flags |= MG_F_SEND_AND_CLOSE; - } + sendResponse(client, status_code, headers, data); + } break; + case MG_EV_POLL: + { + onPoll(client); } break; case MG_EV_CLOSE: { diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index 914cd132..d8a36cb0 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -541,7 +541,7 @@ TEST(ExpiringList, common) ///////////////////////////////// std::function GraftServerTestBase::TempCryptoNodeServer::http_echo = - [] (const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool + [] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { data = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); @@ -558,7 +558,7 @@ class GraftServerCommonTest : public GraftServerTestBase class TempCryptoN : public TempCryptoNodeServer { protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { data = std::string(hm->uri.p, hm->uri.len); graft::Context ctx(mainServer.getGcm()); @@ -1077,7 +1077,7 @@ class GraftServerPostponeTest : public GraftServerTestBase public: bool do_callback = true; protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { data = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); @@ -1327,7 +1327,7 @@ class GraftServerBlockingTest : public GraftServerTestBase std::string answer; std::string body; protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { body = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); diff --git a/test/upstream_test.cpp b/test/upstream_test.cpp index eb3a560c..1fcfb6a7 100644 --- a/test/upstream_test.cpp +++ b/test/upstream_test.cpp @@ -23,40 +23,70 @@ TEST_F(GraftServerTestBase, upstreamKeepAlive) } }; + const int maxConn = 3; + std::atomic activeCnt = 0; + std::atomic maxActiveCnt = 0; + + std::mutex mutex; + + std::map results; + TempCryptoNodeServer crypton; - crypton.on_http = [] (const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool + crypton.on_http = [&maxActiveCnt, &activeCnt, &mutex, &results] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { - data = std::string(hm->body.p, hm->body.len); - std::string method(hm->method.p, hm->method.len); + if(maxActiveCnt < ++activeCnt) maxActiveCnt = activeCnt.load(); + std::string body = std::string(hm->body.p, hm->body.len); + + std::thread th {[=, &mutex, &results]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::lock_guard lock(mutex); + results.emplace(std::make_pair(client, body)); + }}; + th.detach(); + return false; + }; + crypton.on_poll = [&activeCnt, &mutex, &results] (mg_connection* client, int& status_code, std::string& headers, std::string& data)->bool + { + std::lock_guard lock(mutex); + auto it = results.find(client); + if(it == results.end()) return false; headers = "Content-Type: application/json"; + data = it->second; + results.erase(it); + --activeCnt; return true; }; crypton.keepAlive = true; + crypton.poll_timeout_ms = 100; crypton.connect_timeout_ms = 1000; crypton.run(); MainServer mainServer; - mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", 3, true, 100}}); + mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 1000}}); mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); mainServer.run(); - auto client_func = [](int i) + std::atomic ok_cnt = 0; + + auto client_func = [&ok_cnt](int i) { std::string post_data = "some data" + std::to_string(i); Client client; - client.serve("http://localhost:9084/test_upstream", "", post_data, 1000, 250); + client.serve("http://localhost:9084/test_upstream", "", post_data, 100, 5000); EXPECT_EQ(false, client.get_closed()); EXPECT_EQ(200, client.get_resp_code()); std::string s = client.get_body(); EXPECT_EQ(s, post_data); + if(s == post_data) ++ok_cnt; }; - for(int c = 0; c < 1; ++c) - { - const int th_cnt = 50; - std::vector th_vec; - for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); - for(auto& th : th_vec) th.join(); - } + const int th_cnt = 50; + std::vector th_vec; + for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); + for(auto& th : th_vec) th.join(); + + EXPECT_EQ(maxActiveCnt, maxConn); + EXPECT_EQ(ok_cnt, th_cnt); mainServer.stop_and_wait_for(); crypton.stop_and_wait_for(); From d3bfd640d57c90154885af0e57f128946c9c85c0 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 8 Jan 2019 04:12:46 +0200 Subject: [PATCH 08/16] Connection Bunches added. --- CMakeLists.txt | 5 +- include/lib/graft/inout.h | 8 +- include/lib/graft/upstream_manager.h | 42 ++++++---- src/lib/graft/connection.cpp | 3 +- src/lib/graft/inout.cpp | 29 +++++-- src/lib/graft/mongoosex.cpp | 5 +- src/lib/graft/upstream_manager.cpp | 120 ++++++++++++++++++--------- test/graft_server_test.cpp | 4 +- 8 files changed, 143 insertions(+), 73 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 50fb6af1..6468a321 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -218,7 +218,7 @@ set_target_properties(graft PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/BUILD/libgraft ) -target_compile_definitions(graft PRIVATE MG_ENABLE_COAP=1 -DMONERO_DEFAULT_LOG_CATEGORY="supernode") +target_compile_definitions(graft PRIVATE MG_ENABLE_SYNC_RESOLVER MG_ENABLE_COAP -DMONERO_DEFAULT_LOG_CATEGORY="supernode") if(ENABLE_SYSLOG) target_compile_definitions(graft PRIVATE -DELPP_SYSLOG) endif() @@ -309,7 +309,7 @@ set_target_properties(supernode_common PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/BUILD/supernode_common ) -target_compile_definitions(supernode_common PRIVATE MG_ENABLE_COAP=1 -DMONERO_DEFAULT_LOG_CATEGORY="supernode") +target_compile_definitions(supernode_common PRIVATE -DMONERO_DEFAULT_LOG_CATEGORY="supernode") if(ENABLE_SYSLOG) target_compile_definitions(supernode_common PRIVATE -DELPP_SYSLOG) endif() @@ -405,7 +405,6 @@ if (OPT_BUILD_TESTS) ) - target_compile_definitions(supernode_test PRIVATE MG_ENABLE_COAP=1) add_dependencies(supernode_test graft supernode_common googletest) set_target_properties(supernode_test PROPERTIES LINK_FLAGS "-Wl,-E") if(ENABLE_SYSLOG) diff --git a/include/lib/graft/inout.h b/include/lib/graft/inout.h index 97c41ecb..f472ad68 100644 --- a/include/lib/graft/inout.h +++ b/include/lib/graft/inout.h @@ -204,10 +204,12 @@ namespace graft * Set uri, proto, host, port, path members if you need. * The function forms real URI substituting absent parts according to Config.ini. * It is public to be accessed from tests and other classes. - * \param default_uri - this parameter always comes from [cryptonode]rpc-address of Config.ini. - * \return + * \param default_uri - one of uri from lines in [upstream] of Config.ini. + * \param ip_port - output of resulting "ip:port". + * \param result_uri - output of resulting uri. + * \return true on success or false otherwise */ - std::string makeUri(const std::string& default_uri) const; + bool makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri) const; std::string port; std::string path; diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h index 60d3d33e..80e985e3 100644 --- a/include/lib/graft/upstream_manager.h +++ b/include/lib/graft/upstream_manager.h @@ -18,7 +18,7 @@ class UpstreamManager return (m_cntUpstreamSender != m_cntUpstreamSenderDone); } - void send(BaseTaskPtr bt); + void send(BaseTaskPtr& bt); protected: const std::string getUri(const std::string& inputUri); private: @@ -26,6 +26,16 @@ class UpstreamManager { public: using ConnectionId = uint64_t; + using IpPort = std::string; + using Uri = std::string; + + struct Bunch + { + int m_connCnt = 0; + std::map m_idleConnections; + std::map m_activeConnections; + UpstreamStub m_upstreamStub; + }; ConnItem() = default; ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) @@ -38,38 +48,38 @@ class UpstreamManager } ~ConnItem() { - assert(m_idleConnections.empty()); - assert(m_activeConnections.empty()); + for(auto& it : m_bunches) + { + auto& b = it.second; + assert(b.m_idleConnections.empty()); + assert(b.m_activeConnections.empty()); + } } - std::pair getConnection(); - void releaseActive(ConnectionId connectionId, mg_connection* client); - void onCloseIdle(mg_connection* client); + std::pair getConnection(const IpPort& ip_port); + void releaseActive(ConnectionId connectionId, const IpPort& ip_port, mg_connection* client); + void onCloseIdle(const IpPort& ip_port, mg_connection* client); + Bunch& getBunch(const IpPort& ip_port, bool createIfNotExists = false); - int m_connCnt = 0; int m_uriId; std::string m_uri; double m_timeout; - //assert(m_upstreamQueue.empty() || 0 < m_maxConnections); int m_maxConnections; - std::deque m_taskQueue; + std::deque< std::tuple > m_taskQueue; bool m_keepAlive = false; - std::map m_idleConnections; - std::map m_activeConnections; - UpstreamStub m_upstreamStub; + std::map m_bunches; private: ConnectionId m_newId = 0; }; - void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client); - void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt); - ConnItem* findConnItem(const std::string& inputUri); + void onDone(UpstreamSender& uss, ConnItem* connItem, const std::string& ip_port, ConnItem::ConnectionId connectionId, mg_connection* client); + void createUpstreamSender(ConnItem* connItem, const std::string& ip_port, BaseTaskPtr bt, const std::string& uri); + ConnItem* findConnItem(const Output& output, std::string& ip_port, std::string& result_uri); const std::string& getUri(ConnItem* connItem, const std::string& inputUri); using Uri2ConnItem = std::map; OnDoneCallback m_onDoneCallback; - uint64_t m_cntUpstreamSender = 0; uint64_t m_cntUpstreamSenderDone = 0; ConnItem m_default; diff --git a/src/lib/graft/connection.cpp b/src/lib/graft/connection.cpp index 2cae7251..b3dde293 100644 --- a/src/lib/graft/connection.cpp +++ b/src/lib/graft/connection.cpp @@ -76,7 +76,7 @@ void UpstreamSender::send(mg_mgr* mgr, int http_callback_port, const std::string assert(m_bt); Output& output = m_bt->getOutput(); - std::string url = output.makeUri(def_uri); + std::string url = def_uri; Context::uuid_t callback_uuid = m_bt->getCtx().getId(false); if(!callback_uuid.is_nil()) @@ -96,6 +96,7 @@ void UpstreamSender::send(mg_mgr* mgr, int http_callback_port, const std::string m_upstream->user_data = this; m_upstream->handler = static_ev_handler; } + LOG_PRINT_L2("connecting to url '") << url <<"'"; mg_connection* upstream = mg::mg_connect_http_x(m_upstream, mgr, static_ev_handler, url.c_str(), extra_headers.c_str(), body); //body.empty() means GET diff --git a/src/lib/graft/inout.cpp b/src/lib/graft/inout.cpp index bc152e1e..c370e453 100644 --- a/src/lib/graft/inout.cpp +++ b/src/lib/graft/inout.cpp @@ -1,6 +1,7 @@ #include "lib/graft/inout.h" #include "lib/graft/mongoosex.h" +#include namespace graft { @@ -53,8 +54,9 @@ std::string InOutHttpBase::combine_headers() return s; } -std::string OutHttp::makeUri(const std::string& default_uri) const +bool OutHttp::makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri) const { + result_uri.clear(); std::string uri_ = default_uri; std::string port_; @@ -70,7 +72,8 @@ std::string OutHttp::makeUri(const std::string& default_uri) const int res = mg_parse_uri(mg_uri, &mg_scheme, &mg_user_info, &mg_host, &mg_port, &mg_path, &mg_query, &mg_fragment); if(res<0) break; if(mg_port) - port_ = std::to_string(mg_port); + port_ = std::to_string(mg_port); + #define V(n) n##_ = std::string(mg_##n.p, mg_##n.len) V(scheme); V(user_info); V(host); V(path); V(query); V(fragment); #undef V @@ -82,14 +85,28 @@ std::string OutHttp::makeUri(const std::string& default_uri) const if(!port.empty()) port_ = port; if(!path.empty()) path_ = path; - std::string url; + {//get ip by host_ + char buf[0x100]; + if(!mg_resolve(host_.c_str(), buf, sizeof(buf))) + { + LOG_PRINT_L1("cannot resolve host '") << host_ << "'"; + return false; + } + { + LOG_PRINT_L2("host '") << host_ << "' resolved as '" << buf << "'"; + host_ = buf; + } + } + + std::string& url = result_uri; if(!scheme_.empty()) { url += scheme_ + "://"; if(!user_info_.empty()) url += user_info_ + '@'; } - url += host_; - if(!port_.empty()) url += ':' + port_; + ip_port = host_; + if(!port_.empty()) ip_port += ':' + port_; + url += ip_port; if(!path_.empty()) { if(path_[0]!='/') path_ = '/' + path_; @@ -97,7 +114,7 @@ std::string OutHttp::makeUri(const std::string& default_uri) const } if(!query_.empty()) url += '?' + query_; if(!fragment_.empty()) url += '#' + fragment_; - return url; + return true; } } //namespace graft diff --git a/src/lib/graft/mongoosex.cpp b/src/lib/graft/mongoosex.cpp index bcca1aa3..74c5e335 100644 --- a/src/lib/graft/mongoosex.cpp +++ b/src/lib/graft/mongoosex.cpp @@ -1,5 +1,6 @@ #include "lib/graft/mongoosex.h" +#include extern "C" { @@ -35,8 +36,8 @@ mg_connection *mg_send_http_opt_x(mg_connection *nc, else { //TODO: excessive parse second time - mg_parse_uri(mg_mk_str(url), NULL, &user, &host, NULL, &path, NULL, NULL); - nc = nc; + unsigned int port; + mg_parse_uri(mg_mk_str(url), NULL, &user, &host, &port, &path, NULL, NULL); } mbuf_init(&auth, 0); diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp index e99922dc..c9fa6f9a 100644 --- a/src/lib/graft/upstream_manager.cpp +++ b/src/lib/graft/upstream_manager.cpp @@ -3,65 +3,95 @@ namespace graft { -std::pair UpstreamManager::ConnItem::getConnection() +UpstreamManager::ConnItem::Bunch& UpstreamManager::ConnItem::getBunch(const IpPort& ip_port, bool createIfNotExists) { - assert(m_maxConnections == 0 || m_connCnt < m_maxConnections || m_connCnt == m_maxConnections && !m_idleConnections.empty()); + assert(m_keepAlive || ip_port.empty()); + auto b_it = m_bunches.find(ip_port); + assert(b_it != m_bunches.end() || createIfNotExists); + if(b_it == m_bunches.end()) + { + auto res = m_bunches.emplace(std::make_pair(ip_port, Bunch())); + assert(res.second); + b_it = res.first; + + Bunch& bunch = b_it->second; + bunch.m_upstreamStub.setCallback([this, ip_port](mg_connection* client){ onCloseIdle(ip_port, client); }); + } + return b_it->second; +} + + +std::pair UpstreamManager::ConnItem::getConnection(const IpPort& ip_port) +{ + Bunch& bunch = getBunch(ip_port, true); + + assert(m_maxConnections == 0 || bunch.m_connCnt < m_maxConnections || bunch.m_connCnt == m_maxConnections && !bunch.m_idleConnections.empty()); std::pair res = std::make_pair(0,nullptr); if(!m_keepAlive) { - ++m_connCnt; + ++bunch.m_connCnt; return res; } - if(!m_idleConnections.empty()) + if(!bunch.m_idleConnections.empty()) { - auto it = m_idleConnections.begin(); + auto it = bunch.m_idleConnections.begin(); res = std::make_pair(it->second, it->first); - m_idleConnections.erase(it); + bunch.m_idleConnections.erase(it); } else { - ++m_connCnt; + ++bunch.m_connCnt; res.first = ++m_newId; } - auto res1 = m_activeConnections.emplace(res); + auto res1 = bunch.m_activeConnections.emplace(res); assert(res1.second); - assert(m_connCnt == m_idleConnections.size() + m_activeConnections.size()); + assert(bunch.m_connCnt == bunch.m_idleConnections.size() + bunch.m_activeConnections.size()); return res; } -void UpstreamManager::ConnItem::releaseActive(ConnectionId connectionId, mg_connection* client) +void UpstreamManager::ConnItem::releaseActive(ConnectionId connectionId, const IpPort& ip_port, mg_connection* client) { + Bunch& bunch = getBunch(ip_port); + assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); if(!m_keepAlive) return; - auto it = m_activeConnections.find(connectionId); - assert(it != m_activeConnections.end()); + auto it = bunch.m_activeConnections.find(connectionId); + assert(it != bunch.m_activeConnections.end()); assert(it->second == nullptr || client == nullptr || it->second == client); if(client != nullptr) { - m_idleConnections.emplace(client, it->first); - m_upstreamStub.setConnection(client); + bunch.m_idleConnections.emplace(client, it->first); + bunch.m_upstreamStub.setConnection(client); } else { - --m_connCnt; + --bunch.m_connCnt; } - m_activeConnections.erase(it); + bunch.m_activeConnections.erase(it); } -void UpstreamManager::ConnItem::onCloseIdle(mg_connection* client) +void UpstreamManager::ConnItem::onCloseIdle(const IpPort& ip_port, mg_connection* client) { + Bunch& bunch = getBunch(ip_port); + assert(m_keepAlive); - auto it = m_idleConnections.find(client); - assert(it != m_idleConnections.end()); - --m_connCnt; - m_idleConnections.erase(it); + auto it = bunch.m_idleConnections.find(client); + assert(it != bunch.m_idleConnections.end()); + --bunch.m_connCnt; + bunch.m_idleConnections.erase(it); + + assert(bunch.m_connCnt == bunch.m_idleConnections.size() + bunch.m_activeConnections.size()); + if(bunch.m_connCnt == 0) + { + m_bunches.erase(ip_port); + } } -UpstreamManager::ConnItem* UpstreamManager::findConnItem(const std::string& inputUri) +UpstreamManager::ConnItem* UpstreamManager::findConnItem(const Output& output, std::string& ip_port, std::string& result_uri) { ConnItem* connItem = &m_default; {//find connItem - const std::string& uri = inputUri; + const std::string& uri = output.uri; if(!uri.empty() && uri[0] == '$') {//substitutions auto it = m_conn2item.find(uri.substr(1)); @@ -74,31 +104,39 @@ UpstreamManager::ConnItem* UpstreamManager::findConnItem(const std::string& inpu connItem = &it->second; } } + + output.makeUri( getUri(connItem, output.uri), ip_port, result_uri); + if(!connItem->m_keepAlive) ip_port.clear(); + return connItem; } -void UpstreamManager::send(BaseTaskPtr bt) +void UpstreamManager::send(BaseTaskPtr& bt) { - ConnItem* connItem = findConnItem(bt->getOutput().uri); + std::string ip_port, uri; + ConnItem* connItem = findConnItem(bt->getOutput(), ip_port, uri); - if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) + ConnItem::Bunch& bunch = connItem->getBunch(ip_port, true); + + if(connItem->m_maxConnections != 0 && bunch.m_idleConnections.empty() && bunch.m_connCnt == connItem->m_maxConnections) { - connItem->m_taskQueue.push_back(bt); + connItem->m_taskQueue.push_back( std::make_tuple(bt, ip_port, uri ) ); return; } - createUpstreamSender(connItem, bt); + createUpstreamSender(connItem, ip_port, bt, uri); } -void UpstreamManager::onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client) +void UpstreamManager::onDone(UpstreamSender& uss, ConnItem* connItem, const std::string& ip_port, ConnItem::ConnectionId connectionId, mg_connection* client) { ++m_cntUpstreamSenderDone; m_onDoneCallback(uss); - connItem->releaseActive(connectionId, client); + connItem->releaseActive(connectionId, ip_port, client); if(connItem->m_taskQueue.empty()) return; - BaseTaskPtr bt = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); - createUpstreamSender(connItem, bt); + std::string ip_port_v, uri; BaseTaskPtr bt; + std::tie(bt,ip_port_v,uri) = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); + createUpstreamSender(connItem, ip_port_v, bt, uri); } void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback) @@ -116,8 +154,6 @@ void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callba if(timeout < 1e-5) timeout = copts.upstream_request_timeout; auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); assert(res.second); - ConnItem* connItem = &res.first->second; - connItem->m_upstreamStub.setCallback([connItem](mg_connection* client){ connItem->onCloseIdle(client); }); } } @@ -127,18 +163,19 @@ const std::string& UpstreamManager::getUri(ConnItem* connItem, const std::string return uri; } -void UpstreamManager::createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) +void UpstreamManager::createUpstreamSender(ConnItem* connItem, const std::string& ip_port, BaseTaskPtr bt, const std::string& uri) { - auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) + std::string ip_port_v = ip_port; + auto onDoneAct = [this, connItem, ip_port_v](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) { - onDone(uss, connItem, connectionId, client); + onDone(uss, connItem, ip_port_v, connectionId, client); }; ++m_cntUpstreamSender; UpstreamSender::Ptr uss; if(connItem->m_keepAlive) { - auto res = connItem->getConnection(); + auto res = connItem->getConnection(ip_port); uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); } else @@ -146,14 +183,15 @@ void UpstreamManager::createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); } - const std::string& uri = getUri(connItem, bt->getOutput().uri); uss->send(m_mgr, m_http_callback_port, uri); } const std::string UpstreamManager::getUri(const std::string& inputUri) { - ConnItem* connItem = findConnItem(inputUri); - return getUri(connItem, inputUri); + Output output; output.uri = inputUri; + std::string ip_port, uri; + ConnItem* connItem = findConnItem(output, ip_port, uri); + return uri; } }//namespace graft diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index d8a36cb0..3e37039d 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -172,6 +172,7 @@ class UpstreamManagerTest : public graft::UpstreamManager } //namespace +#if 0 TEST(InOut, makeUri) { { @@ -227,6 +228,7 @@ TEST(InOut, makeUri) EXPECT_EQ(url, "https://aaa.bbb:12345/json_rpc"); } } +#endif TEST(Context, simple) { @@ -1301,7 +1303,7 @@ TEST_F(GraftServerTest, genericCallback) graft::supernode::request::registerForwardRequests(m_httpRouter); m_httpRouter.addRoute("/api/{forward:create_account|restore_account|wallet_balance|prepare_transfer|transaction_history}",METHOD_POST,{nullptr,pretend_walletnode_echo,nullptr}); - m_copts.uri_substitutions.emplace("walletnode", std::make_tuple("http://localhost:28690/", 0, false, 0)); + m_copts.uri_substitutions.emplace("walletnode", std::make_tuple("http://127.0.0.1:28690/", 0, false, 0)); run(); std::string post_data = "some data"; From f50289031adcebe32ed2ee208c450b53e7f50c57 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 8 Jan 2019 05:28:45 +0200 Subject: [PATCH 09/16] In config.ini rpc-address can be one of [upstream] --- data/config.ini | 2 ++ include/lib/graft/serveropts.h | 1 + src/lib/graft/upstream_manager.cpp | 16 +++++++++++++--- src/supernode/server.cpp | 12 ++++++++++++ 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/data/config.ini b/data/config.ini index 948d4978..c3cf2234 100644 --- a/data/config.ini +++ b/data/config.ini @@ -1,4 +1,5 @@ [cryptonode] +;;rpc-address can be one of [upstream] values, like rpc-address=$default rpc-address=127.0.0.1:28681 p2p-address=127.0.0.1:18980 @@ -44,6 +45,7 @@ requests-per-sec=100 ;; maximal amount of requests per second in the window, 0 t ban-ip-sec=300 ;; time duration in seconds to ban particular IP, 0 to ban forever [upstream] +default=127.0.0.1:28681 blah=https://127.0.0.1:8080 walletnode=http://127.0.0.1:28694 ;format =[,[,[,]]] [;; comment] diff --git a/include/lib/graft/serveropts.h b/include/lib/graft/serveropts.h index 4cf7f006..ee3e09d6 100644 --- a/include/lib/graft/serveropts.h +++ b/include/lib/graft/serveropts.h @@ -32,6 +32,7 @@ struct ConfigOpts int lru_timeout_ms; IPFilterOpts ipfilter; std::unordered_map> uri_substitutions; + std::string default_uri_substitution_name; void check_asserts() const { diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp index c9fa6f9a..0b95e019 100644 --- a/src/lib/graft/upstream_manager.cpp +++ b/src/lib/graft/upstream_manager.cpp @@ -146,15 +146,25 @@ void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callba m_onDoneCallback = onDoneCallback; int uriId = 0; - m_default = ConnItem(uriId++, copts.cryptonode_rpc_address.c_str(), 0, false, copts.upstream_request_timeout); - for(auto& subs : copts.uri_substitutions) { double timeout = std::get<3>(subs.second); if(timeout < 1e-5) timeout = copts.upstream_request_timeout; - auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); + auto res = m_conn2item.emplace(subs.first, ConnItem(++uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); assert(res.second); } + if(copts.default_uri_substitution_name.empty()) + { + m_default = ConnItem(0, copts.cryptonode_rpc_address.c_str(), 0, false, copts.upstream_request_timeout); + } + else + { + auto it = m_conn2item.find(copts.default_uri_substitution_name); + assert(it != m_conn2item.end()); + m_default = it->second; + m_default.m_uriId = 0; + m_conn2item.erase(it); + } } const std::string& UpstreamManager::getUri(ConnItem* connItem, const std::string& inputUri) diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index ca8e2291..8fbad1e4 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -543,6 +543,18 @@ bool GraftServer::initConfigOption(int argc, const char** argv, ConfigOpts& conf details::parseSubstitutionItem(name, val, uri, cnt, keepAlive, timeout); configOpts.uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); }); + if(configOpts.cryptonode_rpc_address[0] == '$') + { + std::string def_subst_name = configOpts.cryptonode_rpc_address.substr(1); + auto it = configOpts.uri_substitutions.find(def_subst_name); + if(it == configOpts.uri_substitutions.end()) + { + std::ostringstream oss; oss << "cannot find substitution '" << configOpts.cryptonode_rpc_address << "'"; + throw std::runtime_error(oss.str()); + } + configOpts.cryptonode_rpc_address = std::get<0>(it->second); + configOpts.default_uri_substitution_name = def_subst_name; + } return true; } From 7c9ad69efe9770ff5b708c37bc2f59289bfffa41 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 8 Jan 2019 05:30:13 +0200 Subject: [PATCH 10/16] New tests added based on UpstreamTest fixture. Some UpstreamManager bugs fixed. --- src/lib/graft/upstream_manager.cpp | 9 +- test/upstream_test.cpp | 271 +++++++++++++++-------------- 2 files changed, 144 insertions(+), 136 deletions(-) diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp index 0b95e019..326b0ff5 100644 --- a/src/lib/graft/upstream_manager.cpp +++ b/src/lib/graft/upstream_manager.cpp @@ -54,7 +54,11 @@ void UpstreamManager::ConnItem::releaseActive(ConnectionId connectionId, const I Bunch& bunch = getBunch(ip_port); assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); - if(!m_keepAlive) return; + if(!m_keepAlive) + { + --bunch.m_connCnt; + return; + } auto it = bunch.m_activeConnections.find(connectionId); assert(it != bunch.m_activeConnections.end()); assert(it->second == nullptr || client == nullptr || it->second == client); @@ -118,6 +122,7 @@ void UpstreamManager::send(BaseTaskPtr& bt) ConnItem::Bunch& bunch = connItem->getBunch(ip_port, true); + assert(connItem->m_keepAlive || bunch.m_idleConnections.empty()); if(connItem->m_maxConnections != 0 && bunch.m_idleConnections.empty() && bunch.m_connCnt == connItem->m_maxConnections) { connItem->m_taskQueue.push_back( std::make_tuple(bt, ip_port, uri ) ); @@ -183,9 +188,9 @@ void UpstreamManager::createUpstreamSender(ConnItem* connItem, const std::string ++m_cntUpstreamSender; UpstreamSender::Ptr uss; + auto res = connItem->getConnection(ip_port); if(connItem->m_keepAlive) { - auto res = connItem->getConnection(ip_port); uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); } else diff --git a/test/upstream_test.cpp b/test/upstream_test.cpp index 1fcfb6a7..a322b19a 100644 --- a/test/upstream_test.cpp +++ b/test/upstream_test.cpp @@ -2,158 +2,161 @@ #include "lib/graft/jsonrpc.h" #include "fixture.h" -TEST_F(GraftServerTestBase, upstreamKeepAlive) +class UpstreamTest : public GraftServerTestBase { - auto action = [&](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status +public: + MainServer m_mainServer; + + void run(const std::string& forwardUri, const int maxConn) { - switch(ctx.local.getLastStatus()) + auto action = [&](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status + { + switch(ctx.local.getLastStatus()) + { + case graft::Status::None : + { + output.body = input.body; + output.uri = forwardUri; + return graft::Status::Forward; + } break; + case graft::Status::Forward : + { + output.body = input.body; + return graft::Status::Ok; + } break; + default: assert(false); + } + }; + + std::atomic activeCnt = 0; + std::atomic maxActiveCnt = 0; + + std::mutex mutex; + + std::map results; + + TempCryptoNodeServer crypton; + crypton.on_http = [&maxActiveCnt, &activeCnt, &mutex, &results] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { - case graft::Status::None : + if(maxActiveCnt < ++activeCnt) maxActiveCnt = activeCnt.load(); + std::string body = std::string(hm->body.p, hm->body.len); + + std::thread th {[=, &mutex, &results]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::lock_guard lock(mutex); + results.emplace(std::make_pair(client, body)); + }}; + th.detach(); + return false; + }; + crypton.on_poll = [&activeCnt, &mutex, &results] (mg_connection* client, int& status_code, std::string& headers, std::string& data)->bool { - output.body = input.body; - output.uri = "$crypton"; - return graft::Status::Forward; - } break; - case graft::Status::Forward : + std::lock_guard lock(mutex); + auto it = results.find(client); + if(it == results.end()) return false; + headers = "Content-Type: application/json"; + data = it->second; + results.erase(it); + --activeCnt; + return true; + }; + crypton.keepAlive = true; + crypton.poll_timeout_ms = 100; + crypton.connect_timeout_ms = 1000; + crypton.run(); + + m_mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); + m_mainServer.run(); + + std::atomic ok_cnt = 0; + + auto client_func = [&ok_cnt](int i) { - output.body = input.body; - return graft::Status::Ok; - } break; - default: assert(false); - } - }; + std::string post_data = "some data" + std::to_string(i); + Client client; + client.serve("http://localhost:9084/test_upstream", "", post_data, 100, 5000); + EXPECT_EQ(false, client.get_closed()); + EXPECT_EQ(200, client.get_resp_code()); + std::string s = client.get_body(); + EXPECT_EQ(s, post_data); + if(s == post_data) ++ok_cnt; + }; - const int maxConn = 3; - std::atomic activeCnt = 0; - std::atomic maxActiveCnt = 0; + const int th_cnt = 50; + std::vector th_vec; + for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); + for(auto& th : th_vec) th.join(); - std::mutex mutex; + if(maxConn != 0) EXPECT_EQ(maxActiveCnt, maxConn); - std::map results; + EXPECT_EQ(ok_cnt, th_cnt); - TempCryptoNodeServer crypton; - crypton.on_http = [&maxActiveCnt, &activeCnt, &mutex, &results] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool - { - if(maxActiveCnt < ++activeCnt) maxActiveCnt = activeCnt.load(); - std::string body = std::string(hm->body.p, hm->body.len); + m_mainServer.stop_and_wait_for(); + crypton.stop_and_wait_for(); + } +}; - std::thread th {[=, &mutex, &results]() - { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - std::lock_guard lock(mutex); - results.emplace(std::make_pair(client, body)); - }}; - th.detach(); - return false; - }; - crypton.on_poll = [&activeCnt, &mutex, &results] (mg_connection* client, int& status_code, std::string& headers, std::string& data)->bool - { - std::lock_guard lock(mutex); - auto it = results.find(client); - if(it == results.end()) return false; - headers = "Content-Type: application/json"; - data = it->second; - results.erase(it); - --activeCnt; - return true; - }; - crypton.keepAlive = true; - crypton.poll_timeout_ms = 100; - crypton.connect_timeout_ms = 1000; - crypton.run(); - MainServer mainServer; - mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 1000}}); - mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); - mainServer.run(); - - std::atomic ok_cnt = 0; - - auto client_func = [&ok_cnt](int i) - { - std::string post_data = "some data" + std::to_string(i); - Client client; - client.serve("http://localhost:9084/test_upstream", "", post_data, 100, 5000); - EXPECT_EQ(false, client.get_closed()); - EXPECT_EQ(200, client.get_resp_code()); - std::string s = client.get_body(); - EXPECT_EQ(s, post_data); - if(s == post_data) ++ok_cnt; - }; - - const int th_cnt = 50; - std::vector th_vec; - for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); - for(auto& th : th_vec) th.join(); - - EXPECT_EQ(maxActiveCnt, maxConn); - EXPECT_EQ(ok_cnt, th_cnt); - - mainServer.stop_and_wait_for(); - crypton.stop_and_wait_for(); +TEST_F(UpstreamTest, upstream) +{ + const int maxConn = 0; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, false, 300}}); + run("$crypton", maxConn); } +TEST_F(UpstreamTest, upstreamDefault) +{ + const int maxConn = 0; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, false, 300}}); + run("", maxConn); +} -namespace +TEST_F(UpstreamTest, upstreamKeepAlive) { + const int maxConn = 0; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 300}}); + run("$crypton", maxConn); +} -GRAFT_DEFINE_IO_STRUCT(GetVersionResp, - (std::string, status), - (uint32_t, version) - ); +TEST_F(UpstreamTest, upstreamKeepAliveDefault) +{ + const int maxConn = 0; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, true, 300}}); + run("", maxConn); +} -GRAFT_DEFINE_JSON_RPC_RESPONSE_RESULT(JRResponseResult, GetVersionResp); +TEST_F(UpstreamTest, upstreamMax) +{ + const int maxConn = 3; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, false, 1000}}); + run("$crypton", maxConn); +} -} //namespace +TEST_F(UpstreamTest, upstreamMaxDefault) +{ + const int maxConn = 3; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, false, 1000}}); + run("", maxConn); +} -//this test requires a cryptonode on 127.0.0.1:18981 -TEST_F(GraftServerTestBase, DISABLED_cryptonodeKeepAlive) +TEST_F(UpstreamTest, upstreamMaxKeepAlive) { - auto action = [&](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status - { - switch(ctx.local.getLastStatus()) - { - case graft::Status::None : - { - output.uri = "$cryptonode"; - graft::JsonRpcRequestHeader request; - request.method = "get_version"; - output.load(request); - return graft::Status::Forward; - } break; - case graft::Status::Forward : - { - output.body = input.body; - return graft::Status::Ok; - } break; - default: assert(false); - } - }; - - MainServer mainServer; - mainServer.m_copts.uri_substitutions.insert({"cryptonode", {"127.0.0.1:18981/json_rpc", 3, true, 100}}); - mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); - mainServer.run(); - - auto client_func = [](int i) - { - std::string post_data = "some data" + std::to_string(i); - Client client; - client.serve("http://localhost:9084/test_upstream", "", post_data, 1000, 250); - EXPECT_EQ(false, client.get_closed()); - EXPECT_EQ(200, client.get_resp_code()); - std::string s = client.get_body(); - - graft::Input in; in.load(client.get_body()); - EXPECT_NO_THROW( JRResponseResult result = in.get() ); - }; - - for(int c = 0; c < 1; ++c) - { - const int th_cnt = 50; - std::vector th_vec; - for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); - for(auto& th : th_vec) th.join(); - } + const int maxConn = 3; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 1000}}); + run("$crypton", maxConn); +} - mainServer.stop_and_wait_for(); +TEST_F(UpstreamTest, upstreamMaxKeepAliveDefault) +{ + const int maxConn = 3; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, true, 1000}}); + run("", maxConn); } From 6147dc2b2fdd30fa2c39cee533e7127e9ceca01a Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Thu, 10 Jan 2019 02:48:15 +0200 Subject: [PATCH 11/16] In config.ini in [upstream] item format changed. --- data/config.ini | 13 ++++++++----- src/supernode/server.cpp | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/data/config.ini b/data/config.ini index c3cf2234..f0ab0df3 100644 --- a/data/config.ini +++ b/data/config.ini @@ -48,11 +48,14 @@ ban-ip-sec=300 ;; time duration in seconds to ban particular IP, 0 to ban foreve default=127.0.0.1:28681 blah=https://127.0.0.1:8080 walletnode=http://127.0.0.1:28694 -;format =[,[,[,]]] [;; comment] -; where - {true | false | 0 | 1} , false by default -; example: -;wallet2=http://127.0.0.1:28694, 10, true, 2.55 ;; example -;walletnode=http://127.0.0.1:28694,cntMax,true/false/1/0 always_open,timeout +;;format =[,[,{|keepAlive[:]}]] [;;] +;; where +;; - 0 by default, that means [server] upstream-request-timeout will be used +;; - max connections for this upstream. 0 by default, that means no constraint +;; - max connections for each ip:port. 0 by default, that means no constraint +;; example: +;;wallet2=http://127.0.0.1:28694, 2.55, 10 ;; max 10 connections total for this upstream +;;wallet3=http://site.com:28694, 2.55, keepAlive:10 ;; max 10 connections per each ip:port for this upstream [graftlets] ;;dirs parameter, a list of directories to search graftlets separated by colons. If a directory is set relative it will be interpreted both relative to the current directory and relative to the executable location. By default, 'graftlets' directory will be used relative to the executable location. diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 8fbad1e4..7caedef7 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -374,7 +374,7 @@ void initGraftletDirs(int argc, const char** argv, const std::string& dirs_opt, void parseSubstitutionItem(const std::string& name, const std::string& val, std::string& uri, int& cnt, bool& keepAlive, double& timeout) { std::string s = trim_comments(val); - std::regex regex(R"(^\s*([^,\s]+)\s*(,\s*(\d+)\s*(,\s*(true|false|0|1)\s*(,\s*(\d+\.?\d*)\s*)?)?)?\s*$)"); + std::regex regex(R"(^\s*([^,\s]+)\s*(,\s*(\d+\.?\d*)\s*(,\s*(\d+|(keepAlive\s*(:\s*(\d+))?)))?)?\s*$)"); std::smatch m; if(!std::regex_match(s, m, regex)) { @@ -387,11 +387,18 @@ void parseSubstitutionItem(const std::string& name, const std::string& val, std: uri = m[1]; cnt = 0; keepAlive = false; timeout = 0; if(!m[3].matched) return; - cnt = std::stoi(m[3]); - if(!m[5].matched) return; - if(m[5] == "true" || m[5] == "1") keepAlive = true; + timeout = std::stod(m[3]); + if(!m[4].matched) return; + assert(m[5].matched); + if(!m[6].matched) + { + cnt = std::stoi(m[5]); + return; + } + keepAlive = true; if(!m[7].matched) return; - timeout = std::stod(m[7]); + assert(m[8].matched); + cnt = std::stoi(m[8]); } } //namespace details From 9400eb860d7bdef980cbd4678bc4a1c0e23e7133 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Fri, 11 Jan 2019 11:00:06 +0200 Subject: [PATCH 12/16] makeUri test fixed. A host resolution test added. Host resolution errors are handled. --- include/lib/graft/connection.h | 7 ++++ include/lib/graft/inout.h | 3 +- include/lib/graft/upstream_manager.h | 5 ++- src/lib/graft/inout.cpp | 21 ++++++----- src/lib/graft/upstream_manager.cpp | 32 ++++++++++++++--- test/graft_server_test.cpp | 52 +++++++++++++++++++++------- 6 files changed, 92 insertions(+), 28 deletions(-) diff --git a/include/lib/graft/connection.h b/include/lib/graft/connection.h index 833916c1..122658b2 100644 --- a/include/lib/graft/connection.h +++ b/include/lib/graft/connection.h @@ -72,6 +72,13 @@ class UpstreamSender : public SelfHolder : m_bt(bt), m_onDone(onDone), m_keepAlive(true), m_connectioId(connectionId), m_upstream(upstream), m_timeout(timeout) { } + UpstreamSender(const BaseTaskPtr& bt, std::string error, std::function onDoneError) : m_bt(bt) + { + setError(Status::Error, error); + onDoneError(*this); + releaseItself(); + } + BaseTaskPtr& getTask() { return m_bt; } void send(mg_mgr* mgr, int http_callback_port, const std::string& uri); diff --git a/include/lib/graft/inout.h b/include/lib/graft/inout.h index f472ad68..27927292 100644 --- a/include/lib/graft/inout.h +++ b/include/lib/graft/inout.h @@ -207,9 +207,10 @@ namespace graft * \param default_uri - one of uri from lines in [upstream] of Config.ini. * \param ip_port - output of resulting "ip:port". * \param result_uri - output of resulting uri. + * \param resolve_cache - a cache with resolved host to ip map. * \return true on success or false otherwise */ - bool makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri) const; + bool makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri, std::unordered_map& resolve_cache) const; std::string port; std::string path; diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h index 80e985e3..a2b5f830 100644 --- a/include/lib/graft/upstream_manager.h +++ b/include/lib/graft/upstream_manager.h @@ -20,7 +20,8 @@ class UpstreamManager void send(BaseTaskPtr& bt); protected: - const std::string getUri(const std::string& inputUri); + //testGetUri for test only + const std::string testGetUri(const Output& output); private: class ConnItem { @@ -86,6 +87,8 @@ class UpstreamManager Uri2ConnItem m_conn2item; mg_mgr* m_mgr = nullptr; int m_http_callback_port; +protected: + std::unordered_map m_resolveCache; }; }//namespace graft diff --git a/src/lib/graft/inout.cpp b/src/lib/graft/inout.cpp index c370e453..e85cfea7 100644 --- a/src/lib/graft/inout.cpp +++ b/src/lib/graft/inout.cpp @@ -54,7 +54,7 @@ std::string InOutHttpBase::combine_headers() return s; } -bool OutHttp::makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri) const +bool OutHttp::makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri, std::unordered_map& resolve_cache) const { result_uri.clear(); std::string uri_ = default_uri; @@ -86,16 +86,21 @@ bool OutHttp::makeUri(const std::string& default_uri, std::string& ip_port, std: if(!path.empty()) path_ = path; {//get ip by host_ - char buf[0x100]; - if(!mg_resolve(host_.c_str(), buf, sizeof(buf))) - { - LOG_PRINT_L1("cannot resolve host '") << host_ << "'"; - return false; - } + auto it = resolve_cache.find(host_); + if(it == resolve_cache.end()) { + char buf[0x100]; + if(!mg_resolve(host_.c_str(), buf, sizeof(buf))) + { + LOG_PRINT_L1("cannot resolve host '") << host_ << "'"; + return false; + } LOG_PRINT_L2("host '") << host_ << "' resolved as '" << buf << "'"; - host_ = buf; + auto res = resolve_cache.emplace(host_, std::string(buf)); + assert(res.second); + it = res.first; } + host_ = it->second; } std::string& url = result_uri; diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp index 326b0ff5..91a16f9c 100644 --- a/src/lib/graft/upstream_manager.cpp +++ b/src/lib/graft/upstream_manager.cpp @@ -109,7 +109,13 @@ UpstreamManager::ConnItem* UpstreamManager::findConnItem(const Output& output, s } } - output.makeUri( getUri(connItem, output.uri), ip_port, result_uri); + bool res = output.makeUri( getUri(connItem, output.uri), ip_port, result_uri, m_resolveCache); + if(!res) + { + std::ostringstream oss; + oss << "cannot resolve host with uri '" << getUri(connItem, output.uri) << "' host '" << output.host << "'"; + throw std::runtime_error(oss.str()); + } if(!connItem->m_keepAlive) ip_port.clear(); return connItem; @@ -118,7 +124,17 @@ UpstreamManager::ConnItem* UpstreamManager::findConnItem(const Output& output, s void UpstreamManager::send(BaseTaskPtr& bt) { std::string ip_port, uri; - ConnItem* connItem = findConnItem(bt->getOutput(), ip_port, uri); + ConnItem* connItem = nullptr; + try + { + connItem = findConnItem(bt->getOutput(), ip_port, uri); + } + catch(std::exception& e) + { + //uss reports error, and dies + UpstreamSender::Ptr uss = UpstreamSender::Create(bt, e.what(), m_onDoneCallback); + return; + } ConnItem::Bunch& bunch = connItem->getBunch(ip_port, true); @@ -201,11 +217,17 @@ void UpstreamManager::createUpstreamSender(ConnItem* connItem, const std::string uss->send(m_mgr, m_http_callback_port, uri); } -const std::string UpstreamManager::getUri(const std::string& inputUri) +const std::string UpstreamManager::testGetUri(const Output& output) { - Output output; output.uri = inputUri; std::string ip_port, uri; - ConnItem* connItem = findConnItem(output, ip_port, uri); + try + { + ConnItem* connItem = findConnItem(output, ip_port, uri); + } + catch(std::exception&) + { + return getUri(&m_default, output.uri); + } return uri; } diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index 3e37039d..f8e7a12c 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -164,21 +164,28 @@ class UpstreamManagerTest : public graft::UpstreamManager copts.uri_substitutions.insert({subst_name, {subst_uri, 0, false, 0}}); graft::UpstreamManager::init(copts, nullptr, 0, nullptr); } - const std::string getUri(const std::string& inputUri) + const std::string testGetUri(const graft::Output& output) { - return graft::UpstreamManager::getUri(inputUri); + return graft::UpstreamManager::testGetUri(output); + } + std::unordered_map& getResolveCache() + { + return m_resolveCache; } }; } //namespace -#if 0 TEST(InOut, makeUri) { { + std::unordered_map resolve_cache; graft::Output output; std::string default_uri = "http://123.123.123.123:1234"; - std::string url = output.makeUri(default_uri); + std::string ip_port, url; + bool res = output.makeUri(default_uri, ip_port, url, resolve_cache); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "123.123.123.123:1234"); EXPECT_EQ(url, default_uri); } { @@ -187,48 +194,67 @@ TEST(InOut, makeUri) output.proto = "https"; output.port = "4321"; output.uri = "$my_ip"; - std::string url = output.makeUri(umt.getUri(output.uri)); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "1.2.3.4:" + output.port); EXPECT_EQ(url, output.proto + "://1.2.3.4:" + output.port); } { UpstreamManagerTest umt; umt.init("my_path", "http://site.com:1234/endpoint?q=1&n=2"); + umt.getResolveCache().emplace("site.com","site.com"); graft::Output output; output.proto = "https"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(umt.getUri(output.uri)); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "site.com:" + output.port); EXPECT_EQ(url, "https://site.com:4321/endpoint?q=1&n=2"); } { UpstreamManagerTest umt; umt.init("my_path", "/endpoint?q=1&n=2"); + umt.getResolveCache().emplace("mysite.com","mysite.com"); graft::Output output; output.proto = "https"; output.host = "mysite.com"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(umt.getUri(output.uri)); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "mysite.com:" + output.port); EXPECT_EQ(url, "https://mysite.com:4321/endpoint?q=1&n=2"); } { UpstreamManagerTest umt; umt.init("something", "1.2.3.4", "localhost:28881"); + umt.getResolveCache().emplace("aaa.bbb","aaa.bbb"); + graft::Output output; output.path = "json_rpc"; - std::string url = output.makeUri(umt.getUri(output.uri)); - EXPECT_EQ(url, "localhost:28881/json_rpc"); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "127.0.0.1:28881"); + EXPECT_EQ(url, "127.0.0.1:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; - url = output.makeUri(umt.getUri(output.uri)); - EXPECT_EQ(url, "https://localhost:28881/json_rpc"); + res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "127.0.0.1:28881"); + EXPECT_EQ(url, "https://127.0.0.1:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; output.uri = "http://aaa.bbb:12345/something"; - url = output.makeUri(umt.getUri(output.uri)); + res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "aaa.bbb:12345"); EXPECT_EQ(url, "https://aaa.bbb:12345/json_rpc"); } } -#endif TEST(Context, simple) { From fc2bd9d7d88f02bac3b510e2765658d09b1f783f Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Wed, 30 Jan 2019 03:28:40 +0200 Subject: [PATCH 13/16] [upstream] section entry format description updated. --- data/config.ini | 92 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 84 insertions(+), 8 deletions(-) diff --git a/data/config.ini b/data/config.ini index f0ab0df3..2d5cb1d9 100644 --- a/data/config.ini +++ b/data/config.ini @@ -45,17 +45,93 @@ requests-per-sec=100 ;; maximal amount of requests per second in the window, 0 t ban-ip-sec=300 ;; time duration in seconds to ban particular IP, 0 to ban forever [upstream] +;;An entry in [upstream] section is a template with the following +;;format: +;; =[,[,{| +;; keepAlive[:]}]] [;;] +;; +;;uri - Upstream URI in the form of ://:/. +;;Any part of the URI can be overwritten by the requester. +;; +;;timeout-seconds - inactivity timeout in seconds +;; +;;max-connections - maximum number of connections, associated with +;;the template. +;; +;;keepAlive: - the parameter indicates +;;the associated connections are kept opened after receiving a response. +;;In this case is maximum number of +;;connections, associated with the template, for each host:port pair. +;; +;;Note, if keepAlive is enabled, the connections may stay openned for +;;unlimited time. Therefore parameters and +;; are mutuly exclusive. +;; +;;Examples: +;;1. +;;mywallet=http://123.456.0.1:38694/something +;; +;;In the handler we can set +;; +;;output.uri = "$mywallet", +;;output.path = "get/your/number", +;; +;;and return Forward. The framework will try to make a connection using +;;resulting URI http://123.456.0.1:38694/get/your/number. The end point +;;/something is effective only if output.path is unset. +;; +;;2. +;;mywallet=http://0.0.0.0:0/answer +;; +;;In the handler: +;; +;;output.uri = "$mywallet", +;;output.host = "123.456.0.1", +;;output.port = 38694 +;; +;;The resulting URI will be http://123.456.0.1:38694/answer. +;; +;;3. +;;mywallet=http://0.0.0.0:0/answer, 2, 10 +;; +;;In the handler: +;; +;;output.uri = "$mywallet". +;; +;;The handler can set different values for output.host and output.port, +;;but the total number of associated connections cannot exceed 10. +;;Inactivity timeout is set to 2 seconds, thus inactive connections +;;will be closed after 2 seconds. +;; +;;4. +;;mywallet=http://0.0.0.0:0/answer, 2, keepAlive +;; +;;In the handler: +;; +;;output.uri = "$mywallet". +;;output.host = ... +;;output.port = ... +;; +;;Connections to the same host:port will be reused. There are no +;;restrictions for the number of connections. +;; +;;5. +;;mywallet=http://0.0.0.0:80/answer, 2, keepAlive:10 +;; +;;In the handler: +;; +;;output.uri = "$mywallet". +;;output.host = "127.0.0.3" +;; +;;Connections to the host will be reused, but the number of connections +;;will not exeed 10. +;;If the handler sets output.host = "127.0.0.4", maximum number of +;;connections for 127.0.0.4:80 is also 10. Note, that the meaning of 10 +;;is different from maximum number for non-keepAlive connections. +;; default=127.0.0.1:28681 blah=https://127.0.0.1:8080 walletnode=http://127.0.0.1:28694 -;;format =[,[,{|keepAlive[:]}]] [;;] -;; where -;; - 0 by default, that means [server] upstream-request-timeout will be used -;; - max connections for this upstream. 0 by default, that means no constraint -;; - max connections for each ip:port. 0 by default, that means no constraint -;; example: -;;wallet2=http://127.0.0.1:28694, 2.55, 10 ;; max 10 connections total for this upstream -;;wallet3=http://site.com:28694, 2.55, keepAlive:10 ;; max 10 connections per each ip:port for this upstream [graftlets] ;;dirs parameter, a list of directories to search graftlets separated by colons. If a directory is set relative it will be interpreted both relative to the current directory and relative to the executable location. By default, 'graftlets' directory will be used relative to the executable location. From adcc8d9a699314e4880a191dcbfdd6a706c5aed6 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Sat, 2 Feb 2019 02:09:41 +0200 Subject: [PATCH 14/16] Config.ini [upstream] item format changed. --- data/config.ini | 105 ++++++++------------------------------- src/supernode/server.cpp | 78 +++++++++++++++-------------- 2 files changed, 61 insertions(+), 122 deletions(-) diff --git a/data/config.ini b/data/config.ini index 2d5cb1d9..4c83afe9 100644 --- a/data/config.ini +++ b/data/config.ini @@ -47,91 +47,26 @@ ban-ip-sec=300 ;; time duration in seconds to ban particular IP, 0 to ban foreve [upstream] ;;An entry in [upstream] section is a template with the following ;;format: -;; =[,[,{| -;; keepAlive[:]}]] [;;] -;; -;;uri - Upstream URI in the form of ://:/. -;;Any part of the URI can be overwritten by the requester. -;; -;;timeout-seconds - inactivity timeout in seconds -;; -;;max-connections - maximum number of connections, associated with -;;the template. -;; -;;keepAlive: - the parameter indicates -;;the associated connections are kept opened after receiving a response. -;;In this case is maximum number of -;;connections, associated with the template, for each host:port pair. -;; -;;Note, if keepAlive is enabled, the connections may stay openned for -;;unlimited time. Therefore parameters and -;; are mutuly exclusive. -;; -;;Examples: -;;1. -;;mywallet=http://123.456.0.1:38694/something -;; -;;In the handler we can set -;; -;;output.uri = "$mywallet", -;;output.path = "get/your/number", -;; -;;and return Forward. The framework will try to make a connection using -;;resulting URI http://123.456.0.1:38694/get/your/number. The end point -;;/something is effective only if output.path is unset. -;; -;;2. -;;mywallet=http://0.0.0.0:0/answer -;; -;;In the handler: -;; -;;output.uri = "$mywallet", -;;output.host = "123.456.0.1", -;;output.port = 38694 -;; -;;The resulting URI will be http://123.456.0.1:38694/answer. -;; -;;3. -;;mywallet=http://0.0.0.0:0/answer, 2, 10 -;; -;;In the handler: -;; -;;output.uri = "$mywallet". -;; -;;The handler can set different values for output.host and output.port, -;;but the total number of associated connections cannot exceed 10. -;;Inactivity timeout is set to 2 seconds, thus inactive connections -;;will be closed after 2 seconds. -;; -;;4. -;;mywallet=http://0.0.0.0:0/answer, 2, keepAlive -;; -;;In the handler: -;; -;;output.uri = "$mywallet". -;;output.host = ... -;;output.port = ... -;; -;;Connections to the same host:port will be reused. There are no -;;restrictions for the number of connections. -;; -;;5. -;;mywallet=http://0.0.0.0:80/answer, 2, keepAlive:10 -;; -;;In the handler: -;; -;;output.uri = "$mywallet". -;;output.host = "127.0.0.3" -;; -;;Connections to the host will be reused, but the number of connections -;;will not exeed 10. -;;If the handler sets output.host = "127.0.0.4", maximum number of -;;connections for 127.0.0.4:80 is also 10. Note, that the meaning of 10 -;;is different from maximum number for non-keepAlive connections. -;; -default=127.0.0.1:28681 -blah=https://127.0.0.1:8080 -walletnode=http://127.0.0.1:28694 +;; -uri= +;; - Upstream URI in the form of ://:/. +;; Any part of the URI can be overwritten by the requester. +;; -timeout=seconds +;; - optinal parameter, inactivity timeout in seconds, +;; [server] upstream-request-timeout by default +;; -max-conns-upstream=count +;; - optinal parameter, maximum number of connections, associated with +;; the template. 0 by default, that means no restriction. +;; -max-conns-address=count +;; - optinal parameter, is maximum number of connections, associated with +;; the template, for each host:port pair. +;; 0 by default, that means no restriction. +;; -keep-alive=[true|false] +;; - the parameter indicates the associated connections are kept opened +;; after receiving a response. false by default. +;; +default-uri=127.0.0.1:28681 +blah-uri=https://127.0.0.1:8080 +walletnode-uri=http://127.0.0.1:28694 [graftlets] ;;dirs parameter, a list of directories to search graftlets separated by colons. If a directory is set relative it will be interpreted both relative to the current directory and relative to the executable location. By default, 'graftlets' directory will be used relative to the executable location. diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 7caedef7..247f71de 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -8,7 +8,6 @@ #include #include -#include #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.server" @@ -371,36 +370,6 @@ void initGraftletDirs(int argc, const char** argv, const std::string& dirs_opt, } } -void parseSubstitutionItem(const std::string& name, const std::string& val, std::string& uri, int& cnt, bool& keepAlive, double& timeout) -{ - std::string s = trim_comments(val); - std::regex regex(R"(^\s*([^,\s]+)\s*(,\s*(\d+\.?\d*)\s*(,\s*(\d+|(keepAlive\s*(:\s*(\d+))?)))?)?\s*$)"); - std::smatch m; - if(!std::regex_match(s, m, regex)) - { - std::ostringstream oss; - oss << "invalid [upstream] format line with name '" << name << "' : '" << val << "'"; - throw graft::exit_error(oss.str()); - } - assert(7 < m.size()); - assert(m[1].matched); - uri = m[1]; - cnt = 0; keepAlive = false; timeout = 0; - if(!m[3].matched) return; - timeout = std::stod(m[3]); - if(!m[4].matched) return; - assert(m[5].matched); - if(!m[6].matched) - { - cnt = std::stoi(m[5]); - return; - } - keepAlive = true; - if(!m[7].matched) return; - assert(m[8].matched); - cnt = std::stoi(m[8]); -} - } //namespace details void usage(const boost::program_options::options_description& desc) @@ -539,17 +508,52 @@ bool GraftServer::initConfigOption(int argc, const char** argv, ConfigOpts& conf boost::optional log_trunc_to_size = log_conf.get_optional("trunc-to-size"); configOpts.log_trunc_to_size = (log_trunc_to_size)? log_trunc_to_size.get() : -1; + //[upstream] const boost::property_tree::ptree& uri_subst_conf = config.get_child("upstream"); configOpts.uri_substitutions.clear(); - std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf, &configOpts](auto it) + std::vector> name_uris; + std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf, &configOpts, &name_uris](auto it) { - std::string name(it.first); - std::string val(uri_subst_conf.get(name)); + static std::string uri_suff("-uri"); + const std::string& name_uri(it.first); + + if(name_uri.size() <= uri_suff.size()) return; //Note, empty names skipped + if(name_uri.substr(name_uri.size() - uri_suff.size()) != uri_suff) return; - std::string uri; int cnt; bool keepAlive; double timeout; - details::parseSubstitutionItem(name, val, uri, cnt, keepAlive, timeout); - configOpts.uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); + std::string name = name_uri.substr(0, name_uri.size() - uri_suff.size()); + std::string uri(details::trim_comments(uri_subst_conf.get(name_uri))); + + name_uris.emplace_back( std::make_pair(name, uri) ); }); + for(auto& item : name_uris) + { + const std::string& name = item.first; + const std::string& uri = item.second; + + double timeout = uri_subst_conf.get(name + "-timeout", 0); + int conn_upstream = uri_subst_conf.get(name + "-max-conns-upstream", 0); + int conn_address = uri_subst_conf.get(name + "-max-conns-address", 0); + bool keep_alive = uri_subst_conf.get(name + "-keep-alive", false); + if(!keep_alive && conn_address!=0) + { + std::ostringstream oss; + oss << config_filename << " [upstream] " << name + << "-max-conns-address other than 0 is not supported when " + << name << "-keep-alive is false"; + throw graft::exit_error(oss.str()); + } + if(keep_alive && conn_upstream!=0) + { + std::ostringstream oss; + oss << config_filename << " [upstream] " << name + << "-max-conns-upstream other than 0 is not supported when " + << name << "-keep-alive is true"; + throw graft::exit_error(oss.str()); + } + int cnt = keep_alive? conn_address : conn_upstream; + configOpts.uri_substitutions.emplace(name, std::make_tuple(uri, cnt, keep_alive, timeout)); + } + if(configOpts.cryptonode_rpc_address[0] == '$') { std::string def_subst_name = configOpts.cryptonode_rpc_address.substr(1); From 56807e78861100ac4547bd55f3176ec8c0ee2afc Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Wed, 6 Feb 2019 03:06:01 +0200 Subject: [PATCH 15/16] split function added. --- include/lib/graft/common/utils.h | 3 +++ src/lib/graft/common/utils.cpp | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/include/lib/graft/common/utils.h b/include/lib/graft/common/utils.h index 5c1e3f25..73214f3e 100644 --- a/include/lib/graft/common/utils.h +++ b/include/lib/graft/common/utils.h @@ -18,5 +18,8 @@ T random_number(T startRange, T endRange) return dist(mt); } +bool split(const std::string_view& in, char delim, std::string_view& first, std::string_view& second); +bool split(const std::string_view& in, char delim, std::string& first, std::string& second); + } diff --git a/src/lib/graft/common/utils.cpp b/src/lib/graft/common/utils.cpp index 3bcd29d0..1786bece 100644 --- a/src/lib/graft/common/utils.cpp +++ b/src/lib/graft/common/utils.cpp @@ -16,5 +16,33 @@ std::string base64_encode(const std::string &data) return epee::string_encoding::base64_encode(data); } +namespace { +template +bool split_impl(const std::string_view& in, char delim, res& first, res& second) +{ + size_t pos = in.find(delim); + if(pos != std::string::npos) + { + first = in.substr(0, pos); + second = in.substr(pos+1); + return true; + } + first = in; + second = std::string_view(); + return false; +} + +} //namespace + +bool split(const std::string_view& in, char delim, std::string_view& first, std::string_view& second) +{ + return split_impl(in, delim, first, second); } + +bool split(const std::string_view& in, char delim, std::string& first, std::string& second) +{ + return split_impl(in, delim, first, second); } + +} //namespace utils +} //namespace graft From a27c64ed31faa1c8dd7616650624878c1d2e2819 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Wed, 6 Feb 2019 04:13:30 +0200 Subject: [PATCH 16/16] Routing added. --- graftlets/TestGraftlet.cpp | 6 +++ include/lib/graft/GraftletLoader.h | 31 +++++++------ include/lib/graft/connection.h | 6 ++- include/lib/graft/routing.h | 40 +++++++++++++++++ include/lib/graft/upstream_manager.h | 21 ++++++++- include/supernode/server.h | 4 +- src/lib/graft/connection.cpp | 4 +- src/lib/graft/upstream_manager.cpp | 65 +++++++++++++++++++++++++++- src/supernode/server.cpp | 11 +++-- test/graft_server_test.cpp | 4 +- test/graftlets_test.cpp | 16 +++++++ test/upstream_test.cpp | 8 ++++ 12 files changed, 190 insertions(+), 26 deletions(-) create mode 100644 include/lib/graft/routing.h diff --git a/graftlets/TestGraftlet.cpp b/graftlets/TestGraftlet.cpp index e2edaf70..3473cb25 100644 --- a/graftlets/TestGraftlet.cpp +++ b/graftlets/TestGraftlet.cpp @@ -56,6 +56,11 @@ class TestGraftlet: public IGraftlet return graft::Status::Ok; } + std::string testRouting(const std::string& s) + { + return s; + } + virtual void initOnce() { // REGISTER_ACTION(TestGraftlet, testUndefined); @@ -63,6 +68,7 @@ class TestGraftlet: public IGraftlet REGISTER_ACTION(TestGraftlet, testInt2); REGISTER_ACTION(TestGraftlet, testString1); REGISTER_ACTION(TestGraftlet, testString2); + REGISTER_ACTION(TestGraftlet, testRouting); REGISTER_ENDPOINT("/URI/test/{id:[0-9]+}", METHOD_GET | METHOD_POST, TestGraftlet, testHandler); REGISTER_ENDPOINT("/URI/test1/{id:[0-9]+}", METHOD_GET | METHOD_POST, TestGraftlet, testHandler1); diff --git a/include/lib/graft/GraftletLoader.h b/include/lib/graft/GraftletLoader.h index 24d8236d..1ef7906f 100644 --- a/include/lib/graft/GraftletLoader.h +++ b/include/lib/graft/GraftletLoader.h @@ -22,7 +22,9 @@ #include "lib/graft/IGraftlet.h" #include "lib/graft/GraftletRegistry.h" +#include "lib/graft/routing.h" #include "lib/graft/router.h" +#include "lib/graft/common/utils.h" #include #include @@ -66,18 +68,9 @@ class GraftletHandlerT { ClsName_ cls; std::string method; - { - int pos = cls_method.find('.'); - if(pos != std::string::npos) - { - cls = cls_method.substr(0, pos); - method = cls_method.substr(pos+1); - } - else - { - cls = cls_method; - } - } + bool res = graft::utils::split(cls_method, '.', cls, method); + if(!res) cls = cls_method; + auto it = m_cls2any.find(cls); if(it == m_cls2any.end()) throw std::runtime_error("Cannot find graftlet class name:" + cls); std::shared_ptr concreteGraftlet = std::any_cast>(it->second); @@ -124,6 +117,18 @@ class GraftletLoader return getEndpointsT(); } + template + std::string Routing(const std::string& gr_cls_method_ver, const std::string& value) + { + std::string_view grname; + std::string_view cls_method_ver; + graft::utils::split(gr_cls_method_ver, '.', grname, cls_method_ver); + if(grname.empty()) throw "cannot find graftlet name in " + gr_cls_method_ver; + + GraftletHandlerT handler = buildAndResolveGraftletT( std::string(grname) ); + return handler.template invoke( std::string(cls_method_ver), value ); + } + class DependencyGraph; friend class GraftletLoader::DependencyGraph; private: @@ -263,7 +268,7 @@ class GraftletLoader std::map> m_name2lib; //dll name -> registry std::map m_name2registries; - //dll (name, type_index of BaseT) -> (class name, any of BaseT) + //dll (name, type_index of BaseT) -> (class name -> any of BaseT) //it makes no sense to hold std::shared_ptr until the shared_ptr is returned from resolveGraftlet std::map< std::pair, std::map > m_name2gls; }; diff --git a/include/lib/graft/connection.h b/include/lib/graft/connection.h index 122658b2..e337367e 100644 --- a/include/lib/graft/connection.h +++ b/include/lib/graft/connection.h @@ -4,6 +4,8 @@ #include "lib/graft/task.h" #include "lib/graft/blacklist.h" +namespace graftlet { class GraftletLoader; } + namespace graft { namespace details @@ -169,6 +171,8 @@ class ConnectionManager Proto m_proto; }; +class UpstreamRoutingManager; + class ConnectionBase final { public: @@ -180,7 +184,7 @@ class ConnectionBase final void setSysInfoCounter(std::unique_ptr& counter); void createSystemInfoCounter(); void loadBlacklist(const ConfigOpts& copts); - void createLooper(ConfigOpts& configOpts); + void createLooper(graftlet::GraftletLoader& graftletLoader, ConfigOpts& configOpts); void initConnectionManagers(); void bindConnectionManagers(); diff --git a/include/lib/graft/routing.h b/include/lib/graft/routing.h new file mode 100644 index 00000000..0884282b --- /dev/null +++ b/include/lib/graft/routing.h @@ -0,0 +1,40 @@ +// Copyright (c) 2018, The Graft Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +namespace graft { + +using RoutingFunctionSignature = std::string (const std::string&); +using RoutingFunction = std::function; + +}//namespace graft + diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h index a2b5f830..7bdd7dea 100644 --- a/include/lib/graft/upstream_manager.h +++ b/include/lib/graft/upstream_manager.h @@ -1,17 +1,32 @@ #pragma once #include "lib/graft/connection.h" - +#include "lib/graft/routing.h" namespace graft { +class UpstreamRoutingManager +{ +public: + UpstreamRoutingManager(graftlet::GraftletLoader& graftletLoader) + : m_graftletLoader(graftletLoader) + { } + + void add(const std::string& name, RoutingFunction converter); + bool route(const std::string& in, std::string& out); +private: + std::map m_converters; + graftlet::GraftletLoader& m_graftletLoader; +}; + class UpstreamManager { public: using OnDoneCallback = std::function; + UpstreamManager() = default; - void init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback); + void init(graftlet::GraftletLoader& graftletLoader, const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback); bool busy() const { @@ -77,6 +92,7 @@ class UpstreamManager void createUpstreamSender(ConnItem* connItem, const std::string& ip_port, BaseTaskPtr bt, const std::string& uri); ConnItem* findConnItem(const Output& output, std::string& ip_port, std::string& result_uri); const std::string& getUri(ConnItem* connItem, const std::string& inputUri); + void initUpstreamRoutingManager(graftlet::GraftletLoader& graftletLoader); using Uri2ConnItem = std::map; @@ -87,6 +103,7 @@ class UpstreamManager Uri2ConnItem m_conn2item; mg_mgr* m_mgr = nullptr; int m_http_callback_port; + std::unique_ptr m_upstreamRoutingManager; protected: std::unordered_map m_resolveCache; }; diff --git a/include/supernode/server.h b/include/supernode/server.h index 11e4880a..1e28d4f0 100644 --- a/include/supernode/server.h +++ b/include/supernode/server.h @@ -9,6 +9,8 @@ namespace graft::request::system_info { class Counter; } namespace graft { +class UpstreamRoutingManager; + using SysInfoCounter = request::system_info::Counter; class GraftServer @@ -53,7 +55,7 @@ class GraftServer void serve(); static void initSignals(); void addGlobalCtxCleaner(); - void initGraftlets(); + void initGraftlets(const ConfigOpts& copts); void initGraftletRouters(); ConfigOpts& getCopts(); diff --git a/src/lib/graft/connection.cpp b/src/lib/graft/connection.cpp index b3dde293..31ebdd20 100644 --- a/src/lib/graft/connection.cpp +++ b/src/lib/graft/connection.cpp @@ -238,13 +238,13 @@ void ConnectionBase::createSystemInfoCounter() m_sysInfo = std::make_unique(); } -void ConnectionBase::createLooper(ConfigOpts& configOpts) +void ConnectionBase::createLooper(graftlet::GraftletLoader& graftletLoader, ConfigOpts& configOpts) { assert(m_sysInfo && !m_looper && !m_upstreamManager); m_upstreamManager = std::make_unique(); m_looper = std::make_unique(configOpts, *m_upstreamManager, *this); - m_upstreamManager->init(configOpts, m_looper->getMgMgr(), port_from_uri(configOpts.http_address), + m_upstreamManager->init(graftletLoader, configOpts, m_looper->getMgMgr(), port_from_uri(configOpts.http_address), [this](UpstreamSender& uss){ m_looper->onUpstreamDone(uss); }); m_looperReady = true; } diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp index 91a16f9c..09fb337e 100644 --- a/src/lib/graft/upstream_manager.cpp +++ b/src/lib/graft/upstream_manager.cpp @@ -1,8 +1,47 @@ #include "lib/graft/upstream_manager.h" +#include "lib/graft/GraftletLoader.h" namespace graft { +void UpstreamRoutingManager::add(const std::string& name, RoutingFunction converter) +{ + auto res = m_converters.emplace(name, converter); + assert(res.second); +} + +/*! + * \brief route - make routing. + * \param in - it expects "#function:data". it finds and calls function(data), + * and place the result back to the route. + * for graftlets the function is "graftletName.ClassName.method" + * \return false if the function not found + */ +bool UpstreamRoutingManager::route(const std::string& in, std::string& out) +{ + if(in[0] != '#') return false; + size_t pos = in.find(':', 1); + if(pos == std::string::npos) return false; + std::string name = in.substr(1, pos-1); + std::string val = in.substr(pos+1); + auto it = m_converters.find(name); + if(it != m_converters.end()) + { + out = it->second(val); + return true; + } + //try call graftlet method + try + { + out = m_graftletLoader.Routing(name, val); + } + catch(...) + { + return false; + } + return true; +} + UpstreamManager::ConnItem::Bunch& UpstreamManager::ConnItem::getBunch(const IpPort& ip_port, bool createIfNotExists) { assert(m_keepAlive || ip_port.empty()); @@ -95,7 +134,21 @@ UpstreamManager::ConnItem* UpstreamManager::findConnItem(const Output& output, s { ConnItem* connItem = &m_default; {//find connItem - const std::string& uri = output.uri; + std::string uri = output.uri; + if(!uri.empty() && uri[0] == '#') + { + std::string out; + bool res = m_upstreamRoutingManager->route(uri, out); + if(res) + { + LOG_PRINT_L2("routing '") << uri << "' -> '" << out << "'"; + uri = out; + } + else + { + LOG_PRINT_L2("routing '") << uri << "' failed"; + } + } if(!uri.empty() && uri[0] == '$') {//substitutions auto it = m_conn2item.find(uri.substr(1)); @@ -160,8 +213,16 @@ void UpstreamManager::onDone(UpstreamSender& uss, ConnItem* connItem, const std: createUpstreamSender(connItem, ip_port_v, bt, uri); } -void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback) +void UpstreamManager::initUpstreamRoutingManager(graftlet::GraftletLoader& graftletLoader) { + assert(!m_upstreamRoutingManager); + m_upstreamRoutingManager = std::make_unique(graftletLoader); +} + +void UpstreamManager::init(graftlet::GraftletLoader& graftletLoader, const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback) +{ + initUpstreamRoutingManager(graftletLoader); + m_mgr = mgr; m_http_callback_port = http_callback_port; m_onDoneCallback = onDoneCallback; diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 247f71de..7bdace4e 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -62,12 +62,12 @@ void GraftServer::getThreadPoolInfo(uint64_t& activeWorkers, uint64_t& expelledW m_connectionBase->getLooper().getThreadPoolInfo(activeWorkers, expelledWorkers); } -void GraftServer::initGraftlets() +void GraftServer::initGraftlets(const ConfigOpts& copts) { if(m_graftletLoader) return; m_graftletLoader = std::make_unique(); LOG_PRINT_L1("Searching graftlets"); - for(auto& it : getCopts().graftlet_dirs) + for(auto& it : copts.graftlet_dirs) { LOG_PRINT_L1("Searching graftlets in directory '") << it << "'"; m_graftletLoader->findGraftletsInDirectory(it, "so"); @@ -146,9 +146,12 @@ bool GraftServer::init(int argc, const char** argv, ConfigOpts& configOpts) bool res = initConfigOption(argc, argv, configOpts); if(!res) return false; + initGraftlets(configOpts); + assert(m_graftletLoader); + m_connectionBase->loadBlacklist(configOpts); - m_connectionBase->createLooper(configOpts); - initGraftlets(); + m_connectionBase->createLooper(*m_graftletLoader, configOpts); + addGlobalCtxCleaner(); initGlobalContext(); diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index f8e7a12c..b65a1984 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -6,6 +6,7 @@ #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" #include "lib/graft/upstream_manager.h" +#include "lib/graft/GraftletLoader.h" #include "supernode/requests.h" #include "supernode/requests/sale.h" #include "supernode/requests/sale_status.h" @@ -162,7 +163,8 @@ class UpstreamManagerTest : public graft::UpstreamManager graft::ConfigOpts copts; copts.cryptonode_rpc_address = cryptonode_rpc_address; copts.uri_substitutions.insert({subst_name, {subst_uri, 0, false, 0}}); - graft::UpstreamManager::init(copts, nullptr, 0, nullptr); + graftlet::GraftletLoader gl; + graft::UpstreamManager::init(gl, copts, nullptr, 0, nullptr); } const std::string testGetUri(const graft::Output& output) { diff --git a/test/graftlets_test.cpp b/test/graftlets_test.cpp index fe8d11dc..1685b4a8 100644 --- a/test/graftlets_test.cpp +++ b/test/graftlets_test.cpp @@ -3,6 +3,7 @@ #include "fixture.h" #define INCLUDE_DEPENDENCY_GRAPH #include "lib/graft/GraftletLoader.h" +#include "lib/graft/upstream_manager.h" TEST(DependencyGraph, dependencies) { @@ -257,6 +258,21 @@ TEST(Graftlets, calls) EXPECT_EQ(endpoints.size(), 4); } +TEST(Graftlets, routing) +{ + graftlet::GraftletLoader loader; + + loader.findGraftletsInDirectory("./", "so"); + loader.findGraftletsInDirectory("./graftlets", "so"); + + graft::UpstreamRoutingManager upstreamRoutingManager(loader); + + std::string route = "#myGraftlet.testGL.testRouting:my_value"; + bool res = upstreamRoutingManager.route(route, route); + EXPECT_EQ(res, true); + EXPECT_EQ(route, "my_value"); +} + TEST(Graftlets, exceptionList) { #define VER(a,b) GRAFTLET_MKVER(a,b) diff --git a/test/upstream_test.cpp b/test/upstream_test.cpp index a322b19a..be28c65e 100644 --- a/test/upstream_test.cpp +++ b/test/upstream_test.cpp @@ -104,6 +104,14 @@ TEST_F(UpstreamTest, upstream) run("$crypton", maxConn); } +TEST_F(UpstreamTest, upstreamWithSharp) +{ + const int maxConn = 0; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, false, 300}}); + m_mainServer.m_copts.graftlet_dirs.emplace_back("graftlets"); + run("#myGraftlet.testGL.testRouting:$crypton", maxConn); +} + TEST_F(UpstreamTest, upstreamDefault) { const int maxConn = 0;