diff --git a/CMakeLists.txt b/CMakeLists.txt index 0cec3f8a..50fb6af1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,40 @@ cmake_minimum_required(VERSION 3.10) +if(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE RelWithDebInfo) +endif(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) + +if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) + message("==> The configuration is ${CMAKE_BUILD_TYPE}. Debug info will be extracted into separate files.") + + function (add_executable _name) + _add_executable(${ARGV}) + + if (TARGET ${_name}) + add_custom_command(TARGET ${_name} POST_BUILD + COMMAND echo "$: extracting debug info" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" + COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" + ) + endif() + endfunction() + + function (add_library _name _type) + _add_library(${ARGV}) + + if (TARGET ${_name} AND ${_type} STREQUAL SHARED) + add_custom_command(TARGET ${_name} POST_BUILD + COMMAND echo "$: extracting debug info" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" + COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" + ) + endif() + endfunction() + +endif(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) + project(graft_server) option(OPT_BUILD_TESTS "Build tests." OFF) @@ -155,6 +190,7 @@ add_library(graft STATIC ${PROJECT_SOURCE_DIR}/src/lib/graft/mongoosex.cpp ${PROJECT_SOURCE_DIR}/src/lib/graft/router.cpp ${PROJECT_SOURCE_DIR}/src/lib/graft/task.cpp + ${PROJECT_SOURCE_DIR}/src/lib/graft/upstream_manager.cpp ${PROJECT_SOURCE_DIR}/modules/mongoose/mongoose.c ${PROJECT_SOURCE_DIR}/src/supernode/server.cpp ${PROJECT_SOURCE_DIR}/src/supernode/supernode.cpp diff --git a/include/lib/graft/connection.h b/include/lib/graft/connection.h index 9cb2b02e..833916c1 100644 --- a/include/lib/graft/connection.h +++ b/include/lib/graft/connection.h @@ -74,9 +74,10 @@ class UpstreamSender : public SelfHolder BaseTaskPtr& getTask() { return m_bt; } - void send(TaskManager& manager, const std::string& uri); + void send(mg_mgr* mgr, int http_callback_port, const std::string& uri); Status getStatus() const { return m_status; } const std::string& getError() const { return m_error; } + size_t getRequestSize() const { return m_requestSize; } void ev_handler(mg_connection* upstream, int ev, void *ev_data); private: @@ -94,6 +95,7 @@ class UpstreamSender : public SelfHolder mg_connection* m_upstream = nullptr; Status m_status = Status::None; std::string m_error; + size_t m_requestSize = 0; }; class ConnectionBase; @@ -101,7 +103,7 @@ class ConnectionBase; class Looper final : public TaskManager { public: - Looper(const ConfigOpts& copts, ConnectionBase& connectionBase); + Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase); virtual ~Looper(); void serve(); @@ -193,6 +195,7 @@ class ConnectionBase final std::unique_ptr m_sysInfo; std::atomic_bool m_looperReady{false}; std::unique_ptr m_looper; + std::unique_ptr m_upstreamManager; std::map> m_conManagers; }; diff --git a/include/lib/graft/inout.h b/include/lib/graft/inout.h index bb81034c..97c41ecb 100644 --- a/include/lib/graft/inout.h +++ b/include/lib/graft/inout.h @@ -211,7 +211,6 @@ namespace graft std::string port; std::string path; - static std::unordered_map> uri_substitutions; }; class InHttp final : public InOutHttpBase diff --git a/include/lib/graft/serveropts.h b/include/lib/graft/serveropts.h index 490d49c9..4cf7f006 100644 --- a/include/lib/graft/serveropts.h +++ b/include/lib/graft/serveropts.h @@ -2,6 +2,7 @@ #include #include +#include #include namespace graft { @@ -30,6 +31,7 @@ struct ConfigOpts std::vector graftlet_dirs; int lru_timeout_ms; IPFilterOpts ipfilter; + std::unordered_map> uri_substitutions; void check_asserts() const { diff --git a/include/lib/graft/task.h b/include/lib/graft/task.h index 6fc8f1ad..896ea09f 100644 --- a/include/lib/graft/task.h +++ b/include/lib/graft/task.h @@ -38,6 +38,7 @@ namespace graft { extern std::string client_addr(mg_connection* client); extern std::string client_host(mg_connection* client); +extern unsigned int port_from_uri(const std::string& uri); class UpstreamSender; class TaskManager; @@ -192,7 +193,7 @@ class UpstreamManager; class TaskManager : private HandlerAPI { public: - TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter); + TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter); virtual ~TaskManager(); TaskManager(const TaskManager&) = delete; TaskManager& operator = (const TaskManager&) = delete; @@ -259,6 +260,7 @@ class TaskManager : private HandlerAPI static inline size_t next_pow2(size_t val); + UpstreamManager& m_upstreamManager; SysInfoCounter& m_sysInfoCounter; GlobalContextMap m_gcm; @@ -277,7 +279,6 @@ class TaskManager : private HandlerAPI std::deque m_readyToResume; std::priority_queue,Context::uuid_t>> m_expireTaskQueue; std::unique_ptr m_futurePostponeUuids; - std::unique_ptr m_upstreamManager; using PromiseItem = UpstreamTask::PromiseItem; using PromiseQueue = tp::MPMCBoundedQueue; diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h new file mode 100644 index 00000000..60d3d33e --- /dev/null +++ b/include/lib/graft/upstream_manager.h @@ -0,0 +1,82 @@ +#pragma once +#include "lib/graft/connection.h" + + +namespace graft +{ + +class UpstreamManager +{ +public: + using OnDoneCallback = std::function; + UpstreamManager() = default; + + void init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback); + + bool busy() const + { + return (m_cntUpstreamSender != m_cntUpstreamSenderDone); + } + + void send(BaseTaskPtr bt); +protected: + const std::string getUri(const std::string& inputUri); +private: + class ConnItem + { + public: + using ConnectionId = uint64_t; + + ConnItem() = default; + ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) + : m_uriId(uriId) + , m_uri(uri) + , m_maxConnections(maxConnections) + , m_keepAlive(keepAlive) + , m_timeout(timeout) + { + } + ~ConnItem() + { + assert(m_idleConnections.empty()); + assert(m_activeConnections.empty()); + } + + std::pair getConnection(); + void releaseActive(ConnectionId connectionId, mg_connection* client); + void onCloseIdle(mg_connection* client); + + int m_connCnt = 0; + int m_uriId; + std::string m_uri; + double m_timeout; + //assert(m_upstreamQueue.empty() || 0 < m_maxConnections); + int m_maxConnections; + std::deque m_taskQueue; + bool m_keepAlive = false; + std::map m_idleConnections; + std::map m_activeConnections; + UpstreamStub m_upstreamStub; + private: + ConnectionId m_newId = 0; + }; + + void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client); + void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt); + ConnItem* findConnItem(const std::string& inputUri); + const std::string& getUri(ConnItem* connItem, const std::string& inputUri); + + using Uri2ConnItem = std::map; + + OnDoneCallback m_onDoneCallback; + + uint64_t m_cntUpstreamSender = 0; + uint64_t m_cntUpstreamSenderDone = 0; + ConnItem m_default; + Uri2ConnItem m_conn2item; + mg_mgr* m_mgr = nullptr; + int m_http_callback_port; +}; + +}//namespace graft + diff --git a/src/lib/graft/connection.cpp b/src/lib/graft/connection.cpp index 8aa3ad0d..2cae7251 100644 --- a/src/lib/graft/connection.cpp +++ b/src/lib/graft/connection.cpp @@ -3,6 +3,7 @@ #include "lib/graft/mongoosex.h" #include "lib/graft/sys_info.h" #include "lib/graft/graft_exception.h" +#include "lib/graft/upstream_manager.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.connection" @@ -23,6 +24,16 @@ std::string client_host(mg_connection* client) return inet_ntoa(client->sa.sin.sin_addr); } +unsigned int port_from_uri(const std::string& uri) +{ + assert(!uri.empty()); + mg_str mg_uri{uri.c_str(), uri.size()}; + unsigned int mg_port = 0; + int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0); + assert(0<=res); + return mg_port; +} + void* getUserData(mg_mgr* mgr) { return mgr->user_data; } void* getUserData(mg_connection* nc) { return nc->user_data; } mg_mgr* getMgr(mg_connection* nc) { return nc->mgr; } @@ -60,28 +71,18 @@ void UpstreamStub::ev_handler(mg_connection *upstream, int ev, void *ev_data) } -void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) +void UpstreamSender::send(mg_mgr* mgr, int http_callback_port, const std::string& def_uri) { assert(m_bt); - const ConfigOpts& opts = manager.getCopts(); - Output& output = m_bt->getOutput(); std::string url = output.makeUri(def_uri); Context::uuid_t callback_uuid = m_bt->getCtx().getId(false); if(!callback_uuid.is_nil()) {//add extra header - unsigned int mg_port = 0; - { - std::string uri = opts.http_address; - assert(!uri.empty()); - mg_str mg_uri{uri.c_str(), uri.size()}; - int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0); - assert(0<=res); - } std::stringstream ss; - ss << "http://0.0.0.0:" << mg_port << "/callback/" << boost::uuids::to_string(callback_uuid); + ss << "http://0.0.0.0:" << http_callback_port << "/callback/" << boost::uuids::to_string(callback_uuid); output.headers.emplace_back(std::make_pair("X-Callback", ss.str())); } std::string extra_headers = output.combine_headers(); @@ -95,7 +96,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) m_upstream->user_data = this; m_upstream->handler = static_ev_handler; } - mg_connection* upstream = mg::mg_connect_http_x(m_upstream, manager.getMgMgr(), static_ev_handler, url.c_str(), + mg_connection* upstream = mg::mg_connect_http_x(m_upstream, mgr, static_ev_handler, url.c_str(), extra_headers.c_str(), body); //body.empty() means GET assert(upstream != nullptr && (m_upstream == nullptr || m_upstream == upstream)); @@ -106,9 +107,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) } mg_set_timer(m_upstream, mg_time() + m_timeout); - auto& rsi = manager.runtimeSysInfo(); - rsi.count_upstrm_http_req(); - rsi.count_upstrm_http_req_bytes_raw(url.size() + extra_headers.size() + body.size()); + m_requestSize = url.size() + extra_headers.size() + body.size(); } void UpstreamSender::ev_handler(mg_connection *upstream, int ev, void *ev_data) @@ -240,8 +239,12 @@ void ConnectionBase::createSystemInfoCounter() void ConnectionBase::createLooper(ConfigOpts& configOpts) { - assert(m_sysInfo && !m_looper); - m_looper = std::make_unique(configOpts, *this); + assert(m_sysInfo && !m_looper && !m_upstreamManager); + m_upstreamManager = std::make_unique(); + + m_looper = std::make_unique(configOpts, *m_upstreamManager, *this); + m_upstreamManager->init(configOpts, m_looper->getMgMgr(), port_from_uri(configOpts.http_address), + [this](UpstreamSender& uss){ m_looper->onUpstreamDone(uss); }); m_looperReady = true; } @@ -290,8 +293,8 @@ void ConnectionBase::checkRoutes(graft::ConnectionManager& cm) } -Looper::Looper(const ConfigOpts& copts, ConnectionBase& connectionBase) - : TaskManager(copts, connectionBase.getSysInfoCounter()) +Looper::Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase) + : TaskManager(copts, upstreamManager, connectionBase.getSysInfoCounter()) , m_mgr(std::make_unique()) { mg_mgr_init(m_mgr.get(), &connectionBase, cb_event); diff --git a/src/lib/graft/inout.cpp b/src/lib/graft/inout.cpp index e13c289d..bc152e1e 100644 --- a/src/lib/graft/inout.cpp +++ b/src/lib/graft/inout.cpp @@ -4,7 +4,6 @@ namespace graft { -std::unordered_map> OutHttp::uri_substitutions; void InOutHttpBase::set_str_field(const http_message& hm, const mg_str& str_fld, std::string& fld) { diff --git a/src/lib/graft/task.cpp b/src/lib/graft/task.cpp index 5fe8a0ab..341549a4 100644 --- a/src/lib/graft/task.cpp +++ b/src/lib/graft/task.cpp @@ -6,6 +6,7 @@ #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" #include "lib/graft/sys_info.h" +#include "lib/graft/upstream_manager.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.task" @@ -203,195 +204,9 @@ class ExpiringList : public detail::ExpiringListT< Uuid_Input > { } }; -class UpstreamManager -{ -public: - using OnDoneCallback = std::function; - - UpstreamManager(TaskManager& manager, OnDoneCallback onDoneCallback) - : m_manager(manager) - , m_onDoneCallback(onDoneCallback) - { init(); } - - bool busy() const - { - return (m_cntUpstreamSender != m_cntUpstreamSenderDone); - } - - void send(BaseTaskPtr bt) - { - ConnItem* connItem = &m_default; - {//find connItem - const std::string& uri = bt->getOutput().uri; - if(!uri.empty() && uri[0] == '$') - {//substitutions - auto it = m_conn2item.find(uri.substr(1)); - if(it == m_conn2item.end()) - { - std::ostringstream oss; - oss << "cannot find uri substitution '" << uri << "'"; - throw std::runtime_error(oss.str()); - } - connItem = &it->second; - } - } - if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) - { - connItem->m_taskQueue.push_back(bt); - return; - } - - createUpstreamSender(connItem, bt); - } -private: - uint64_t m_cntUpstreamSender = 0; - uint64_t m_cntUpstreamSenderDone = 0; - - class ConnItem - { - public: - using Active = bool; - using ConnectionId = uint64_t; - - ConnItem() = default; - ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) - : m_uriId(uriId) - , m_uri(uri) - , m_maxConnections(maxConnections) - , m_keepAlive(keepAlive) - , m_timeout(timeout) - { - } - std::pair getConnection() - { - //TODO: something wrong with (m_connCnt <= m_maxConnections) - assert(m_maxConnections == 0 || m_connCnt <= m_maxConnections); - std::pair res = std::make_pair(0,nullptr); - if(!m_keepAlive) - { - ++m_connCnt; - return res; - } - if(!m_idleConnections.empty()) - { - auto it = m_idleConnections.begin(); - res = std::make_pair(it->second, it->first); - m_idleConnections.erase(it); - } - else - { - ++m_connCnt; - res.first = ++m_newId; - } - auto res1 = m_activeConnections.emplace(res); - assert(res1.second); - assert(m_connCnt == m_idleConnections.size() + m_activeConnections.size()); - return res; - } - - void releaseActive(ConnectionId connectionId, mg_connection* client) - { - assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); - if(!m_keepAlive) return; - auto it = m_activeConnections.find(connectionId); - assert(it != m_activeConnections.end()); - assert(it->second == nullptr || client == nullptr || it->second == client); - if(client != nullptr) - { - m_idleConnections.emplace(client, it->first); - m_upstreamStub.setConnection(client); - } - else - { - --m_connCnt; - } - m_activeConnections.erase(it); - } - - void onCloseIdle(mg_connection* client) - { - assert(m_keepAlive); - auto it = m_idleConnections.find(client); - assert(it != m_idleConnections.end()); - --m_connCnt; - m_idleConnections.erase(it); - } - - ConnectionId m_newId = 0; - int m_connCnt = 0; - int m_uriId; - std::string m_uri; - double m_timeout; - //assert(m_upstreamQueue.empty() || 0 < m_maxConn); - int m_maxConnections; - std::deque m_taskQueue; - bool m_keepAlive = false; - std::map m_idleConnections; - std::map m_activeConnections; - UpstreamStub m_upstreamStub; - }; - - void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client) - { - ++m_cntUpstreamSenderDone; - m_onDoneCallback(uss); - connItem->releaseActive(connectionId, client); - if(connItem->m_taskQueue.empty()) return; - BaseTaskPtr bt = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); - createUpstreamSender(connItem, bt); - } - - void init() - { - int uriId = 0; - const ConfigOpts& opts = m_manager.getCopts(); - m_default = ConnItem(uriId++, opts.cryptonode_rpc_address.c_str(), 0, false, opts.upstream_request_timeout); - - for(auto& subs : OutHttp::uri_substitutions) - { - double timeout = std::get<3>(subs.second); - if(timeout < 1e-5) timeout = opts.upstream_request_timeout; - auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); - assert(res.second); - ConnItem* connItem = &res.first->second; - connItem->m_upstreamStub.setCallback([connItem](mg_connection* client){ connItem->onCloseIdle(client); }); - } - } - - void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) - { - auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) - { - onDone(uss, connItem, connectionId, client); - }; - - ++m_cntUpstreamSender; - UpstreamSender::Ptr uss; - if(connItem->m_keepAlive) - { - auto res = connItem->getConnection(); - uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); - } - else - { - uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); - } - - const std::string& uri = (connItem != &m_default || bt->getOutput().uri.empty())? connItem->m_uri : bt->getOutput().uri; - uss->send(m_manager, uri); - } - - using Uri2ConnItem = std::map; - - OnDoneCallback m_onDoneCallback; - - ConnItem m_default; - Uri2ConnItem m_conn2item; - TaskManager& m_manager; //TODO: should be removed, and be independent of TaskManager -}; - -TaskManager::TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter) +TaskManager::TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter) : m_copts(copts) + , m_upstreamManager(upstreamManager) , m_sysInfoCounter(sysInfoCounter) , m_gcm(this) , m_futurePostponeUuids(std::make_unique(1000 * copts.http_connection_timeout)) @@ -489,15 +304,13 @@ void TaskManager::checkUpstreamBlockingIO() bool res = m_promiseQueue->pop(pi); if(!res) break; UpstreamTask::Ptr bt = BaseTask::Create(*this, std::move(pi)); - assert(m_upstreamManager); - m_upstreamManager->send(bt); + sendUpstream(bt); } } void TaskManager::sendUpstream(BaseTaskPtr bt) { - assert(m_upstreamManager); - m_upstreamManager->send(bt); + m_upstreamManager.send(bt); } void TaskManager::onTimer(BaseTaskPtr bt) @@ -536,7 +349,7 @@ void TaskManager::schedule(PeriodicTask* pt) bool TaskManager::canStop() { return (m_cntBaseTask == m_cntBaseTaskDone) - && (!m_upstreamManager->busy()) + && (!m_upstreamManager.busy()) && (m_cntJobSent == m_cntJobDone); } @@ -856,7 +669,6 @@ void TaskManager::initThreadPool(int threadCount, int workersQueueSize, int expe m_promiseQueue = std::make_unique( threadCount ); //TODO: it is not clear how many items we need in PeriodicTaskQueue, maybe we should make it dynamically but this requires additional synchronization m_periodicTaskQueue = std::make_unique(2*threadCount); - m_upstreamManager = std::make_unique(*this, [this](UpstreamSender& uss){ onUpstreamDone(uss); } ); LOG_PRINT_L1("Thread pool created with " << threadCount << " workers with " << workersQueueSize @@ -892,6 +704,9 @@ void TaskManager::onUpstreamDone(UpstreamSender& uss) void TaskManager::upstreamDoneProcess(UpstreamSender& uss) { + runtimeSysInfo().count_upstrm_http_req(); + runtimeSysInfo().count_upstrm_http_req_bytes_raw(uss.getRequestSize()); + if(Status::Ok == uss.getStatus()) runtimeSysInfo().count_upstrm_http_resp_ok(); else diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp new file mode 100644 index 00000000..e99922dc --- /dev/null +++ b/src/lib/graft/upstream_manager.cpp @@ -0,0 +1,160 @@ +#include "lib/graft/upstream_manager.h" + +namespace graft +{ + +std::pair UpstreamManager::ConnItem::getConnection() +{ + assert(m_maxConnections == 0 || m_connCnt < m_maxConnections || m_connCnt == m_maxConnections && !m_idleConnections.empty()); + std::pair res = std::make_pair(0,nullptr); + if(!m_keepAlive) + { + ++m_connCnt; + return res; + } + if(!m_idleConnections.empty()) + { + auto it = m_idleConnections.begin(); + res = std::make_pair(it->second, it->first); + m_idleConnections.erase(it); + } + else + { + ++m_connCnt; + res.first = ++m_newId; + } + auto res1 = m_activeConnections.emplace(res); + assert(res1.second); + assert(m_connCnt == m_idleConnections.size() + m_activeConnections.size()); + return res; +} + +void UpstreamManager::ConnItem::releaseActive(ConnectionId connectionId, mg_connection* client) +{ + assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); + if(!m_keepAlive) return; + auto it = m_activeConnections.find(connectionId); + assert(it != m_activeConnections.end()); + assert(it->second == nullptr || client == nullptr || it->second == client); + if(client != nullptr) + { + m_idleConnections.emplace(client, it->first); + m_upstreamStub.setConnection(client); + } + else + { + --m_connCnt; + } + m_activeConnections.erase(it); +} + +void UpstreamManager::ConnItem::onCloseIdle(mg_connection* client) +{ + assert(m_keepAlive); + auto it = m_idleConnections.find(client); + assert(it != m_idleConnections.end()); + --m_connCnt; + m_idleConnections.erase(it); +} + +UpstreamManager::ConnItem* UpstreamManager::findConnItem(const std::string& inputUri) +{ + ConnItem* connItem = &m_default; + {//find connItem + const std::string& uri = inputUri; + if(!uri.empty() && uri[0] == '$') + {//substitutions + auto it = m_conn2item.find(uri.substr(1)); + if(it == m_conn2item.end()) + { + std::ostringstream oss; + oss << "cannot find uri substitution '" << uri << "'"; + throw std::runtime_error(oss.str()); + } + connItem = &it->second; + } + } + return connItem; +} + +void UpstreamManager::send(BaseTaskPtr bt) +{ + ConnItem* connItem = findConnItem(bt->getOutput().uri); + + if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) + { + connItem->m_taskQueue.push_back(bt); + return; + } + + createUpstreamSender(connItem, bt); +} + + +void UpstreamManager::onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client) +{ + ++m_cntUpstreamSenderDone; + m_onDoneCallback(uss); + connItem->releaseActive(connectionId, client); + if(connItem->m_taskQueue.empty()) return; + BaseTaskPtr bt = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); + createUpstreamSender(connItem, bt); +} + +void UpstreamManager::init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback) +{ + m_mgr = mgr; + m_http_callback_port = http_callback_port; + m_onDoneCallback = onDoneCallback; + + int uriId = 0; + m_default = ConnItem(uriId++, copts.cryptonode_rpc_address.c_str(), 0, false, copts.upstream_request_timeout); + + for(auto& subs : copts.uri_substitutions) + { + double timeout = std::get<3>(subs.second); + if(timeout < 1e-5) timeout = copts.upstream_request_timeout; + auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); + assert(res.second); + ConnItem* connItem = &res.first->second; + connItem->m_upstreamStub.setCallback([connItem](mg_connection* client){ connItem->onCloseIdle(client); }); + } +} + +const std::string& UpstreamManager::getUri(ConnItem* connItem, const std::string& inputUri) +{ + const std::string& uri = (connItem != &m_default || inputUri.empty())? connItem->m_uri : inputUri; + return uri; +} + +void UpstreamManager::createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) +{ + auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) + { + onDone(uss, connItem, connectionId, client); + }; + + ++m_cntUpstreamSender; + UpstreamSender::Ptr uss; + if(connItem->m_keepAlive) + { + auto res = connItem->getConnection(); + uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); + } + else + { + uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); + } + + const std::string& uri = getUri(connItem, bt->getOutput().uri); + uss->send(m_mgr, m_http_callback_port, uri); +} + +const std::string UpstreamManager::getUri(const std::string& inputUri) +{ + ConnItem* connItem = findConnItem(inputUri); + return getUri(connItem, inputUri); +} + +}//namespace graft + diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 1e65094e..ca8e2291 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -4,6 +4,7 @@ #include "lib/graft/GraftletLoader.h" #include "lib/graft/sys_info.h" #include "lib/graft/graft_exception.h" +#include "lib/graft/upstream_manager.h" #include #include @@ -532,15 +533,15 @@ bool GraftServer::initConfigOption(int argc, const char** argv, ConfigOpts& conf configOpts.log_trunc_to_size = (log_trunc_to_size)? log_trunc_to_size.get() : -1; const boost::property_tree::ptree& uri_subst_conf = config.get_child("upstream"); - graft::OutHttp::uri_substitutions.clear(); - std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf](auto it) + configOpts.uri_substitutions.clear(); + std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf, &configOpts](auto it) { std::string name(it.first); std::string val(uri_subst_conf.get(name)); std::string uri; int cnt; bool keepAlive; double timeout; details::parseSubstitutionItem(name, val, uri, cnt, keepAlive, timeout); - graft::OutHttp::uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); + configOpts.uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); }); return true; diff --git a/test/fixture.h b/test/fixture.h index 9ac633fe..6ed83580 100644 --- a/test/fixture.h +++ b/test/fixture.h @@ -64,10 +64,13 @@ class GraftServerTestBase : public ::testing::Test int poll_timeout_ms = 1000; bool keepAlive = false; - using on_http_t = bool (const http_message *hm, int& status_code, std::string& headers, std::string& data); + using on_http_t = bool (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data); std::function on_http = nullptr; static std::function http_echo; + using on_poll_t = bool (mg_connection* client, int& status_code, std::string& headers, std::string& data); + std::function on_poll = nullptr; + void run() { ready = false; @@ -83,13 +86,22 @@ class GraftServerTestBase : public ::testing::Test stop = true; th.join(); } + protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) { assert(on_http); - return on_http(hm, status_code, headers, data); + return on_http(client, hm, status_code, headers, data); } virtual void onClose() { } + virtual bool onPoll(mg_connection* client) + { + if(!on_poll) return false; + int status_code = 200; + std::string headers, data; + bool res = on_poll(client, status_code, headers, data); + if(res) sendResponse(client, status_code, headers, data); + } private: std::thread th; std::atomic_bool ready; @@ -109,6 +121,16 @@ class GraftServerTestBase : public ::testing::Test mg_mgr_free(&mgr); } + void sendResponse(mg_connection* client, int& status_code, std::string& headers, std::string& data) + { + mg_send_head(client, status_code, data.size(), headers.c_str()); + mg_send(client, data.c_str(), data.size()); + if(!keepAlive) + { + client->flags |= MG_F_SEND_AND_CLOSE; + } + } + static void ev_handler_empty_s(mg_connection *client, int ev, void *ev_data) { } @@ -128,14 +150,13 @@ class GraftServerTestBase : public ::testing::Test struct http_message *hm = (struct http_message *) ev_data; int status_code = 200; std::string headers, data; - bool res = onHttpRequest(hm, status_code, headers, data); + bool res = onHttpRequest(client, hm, status_code, headers, data); if(!res) break; - mg_send_head(client, status_code, data.size(), headers.c_str()); - mg_send(client, data.c_str(), data.size()); - if(!keepAlive) - { - client->flags |= MG_F_SEND_AND_CLOSE; - } + sendResponse(client, status_code, headers, data); + } break; + case MG_EV_POLL: + { + onPoll(client); } break; case MG_EV_CLOSE: { diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index 50983b5e..d8a36cb0 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -5,6 +5,7 @@ #include "lib/graft/inout.h" #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" +#include "lib/graft/upstream_manager.h" #include "supernode/requests.h" #include "supernode/requests/sale.h" #include "supernode/requests/sale_status.h" @@ -150,6 +151,27 @@ TEST(InOut, serialization) output.loadT(a); } +namespace +{ + +class UpstreamManagerTest : public graft::UpstreamManager +{ +public: + void init(const std::string& subst_name, const std::string& subst_uri, const std::string& cryptonode_rpc_address = "") + { + graft::ConfigOpts copts; + copts.cryptonode_rpc_address = cryptonode_rpc_address; + copts.uri_substitutions.insert({subst_name, {subst_uri, 0, false, 0}}); + graft::UpstreamManager::init(copts, nullptr, 0, nullptr); + } + const std::string getUri(const std::string& inputUri) + { + return graft::UpstreamManager::getUri(inputUri); + } +}; + +} //namespace + TEST(InOut, makeUri) { { @@ -159,49 +181,49 @@ TEST(InOut, makeUri) EXPECT_EQ(url, default_uri); } { + UpstreamManagerTest umt; umt.init("my_ip", "1.2.3.4"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_ip", {"1.2.3.4", 0, false, 0}}); output.proto = "https"; output.port = "4321"; output.uri = "$my_ip"; - std::string url = output.makeUri(""); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, output.proto + "://1.2.3.4:" + output.port); } { + UpstreamManagerTest umt; umt.init("my_path", "http://site.com:1234/endpoint?q=1&n=2"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_path", {"http://site.com:1234/endpoint?q=1&n=2", 0, false, 0}}); output.proto = "https"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(""); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://site.com:4321/endpoint?q=1&n=2"); } { + UpstreamManagerTest umt; umt.init("my_path", "/endpoint?q=1&n=2"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_path", {"endpoint?q=1&n=2", 0, false, 0}}); output.proto = "https"; output.host = "mysite.com"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(""); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://mysite.com:4321/endpoint?q=1&n=2"); } { + UpstreamManagerTest umt; umt.init("something", "1.2.3.4", "localhost:28881"); graft::Output output; - std::string default_uri = "localhost:28881"; output.path = "json_rpc"; - std::string url = output.makeUri(default_uri); + std::string url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "localhost:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; - url = output.makeUri(default_uri); + url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://localhost:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; output.uri = "http://aaa.bbb:12345/something"; - url = output.makeUri(default_uri); + url = output.makeUri(umt.getUri(output.uri)); EXPECT_EQ(url, "https://aaa.bbb:12345/json_rpc"); } } @@ -519,7 +541,7 @@ TEST(ExpiringList, common) ///////////////////////////////// std::function GraftServerTestBase::TempCryptoNodeServer::http_echo = - [] (const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool + [] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { data = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); @@ -536,7 +558,7 @@ class GraftServerCommonTest : public GraftServerTestBase class TempCryptoN : public TempCryptoNodeServer { protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { data = std::string(hm->uri.p, hm->uri.len); graft::Context ctx(mainServer.getGcm()); @@ -1055,7 +1077,7 @@ class GraftServerPostponeTest : public GraftServerTestBase public: bool do_callback = true; protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { data = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); @@ -1256,7 +1278,7 @@ TEST_F(GraftServerTest, genericCallback) case graft::Status::None: { //find webhook endpoint - auto it = std::find_if(input.headers.begin(), input.headers.end(), [](auto& v)->bool { v.first == "X-Callback"; } ); + auto it = std::find_if(input.headers.begin(), input.headers.end(), [](auto& v)->bool { return v.first == "X-Callback"; } ); assert(it != input.headers.end()); std::string path = it->second; //"http://0.0.0.0:port/callback/" @@ -1279,7 +1301,7 @@ TEST_F(GraftServerTest, genericCallback) graft::supernode::request::registerForwardRequests(m_httpRouter); m_httpRouter.addRoute("/api/{forward:create_account|restore_account|wallet_balance|prepare_transfer|transaction_history}",METHOD_POST,{nullptr,pretend_walletnode_echo,nullptr}); - graft::Output::uri_substitutions.emplace("walletnode", std::make_tuple("http://localhost:28690/", 0, false, 0)); + m_copts.uri_substitutions.emplace("walletnode", std::make_tuple("http://localhost:28690/", 0, false, 0)); run(); std::string post_data = "some data"; @@ -1305,7 +1327,7 @@ class GraftServerBlockingTest : public GraftServerTestBase std::string answer; std::string body; protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { body = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); diff --git a/test/upstream_test.cpp b/test/upstream_test.cpp index 56516653..1fcfb6a7 100644 --- a/test/upstream_test.cpp +++ b/test/upstream_test.cpp @@ -23,40 +23,70 @@ TEST_F(GraftServerTestBase, upstreamKeepAlive) } }; + const int maxConn = 3; + std::atomic activeCnt = 0; + std::atomic maxActiveCnt = 0; + + std::mutex mutex; + + std::map results; + TempCryptoNodeServer crypton; - crypton.on_http = [] (const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool + crypton.on_http = [&maxActiveCnt, &activeCnt, &mutex, &results] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { - data = std::string(hm->body.p, hm->body.len); - std::string method(hm->method.p, hm->method.len); + if(maxActiveCnt < ++activeCnt) maxActiveCnt = activeCnt.load(); + std::string body = std::string(hm->body.p, hm->body.len); + + std::thread th {[=, &mutex, &results]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::lock_guard lock(mutex); + results.emplace(std::make_pair(client, body)); + }}; + th.detach(); + return false; + }; + crypton.on_poll = [&activeCnt, &mutex, &results] (mg_connection* client, int& status_code, std::string& headers, std::string& data)->bool + { + std::lock_guard lock(mutex); + auto it = results.find(client); + if(it == results.end()) return false; headers = "Content-Type: application/json"; + data = it->second; + results.erase(it); + --activeCnt; return true; }; crypton.keepAlive = true; + crypton.poll_timeout_ms = 100; crypton.connect_timeout_ms = 1000; crypton.run(); - graft::Output::uri_substitutions.insert({"crypton", {"127.0.0.1:1234", 3, true, 100}}); MainServer mainServer; + mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 1000}}); mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); mainServer.run(); - auto client_func = [](int i) + std::atomic ok_cnt = 0; + + auto client_func = [&ok_cnt](int i) { std::string post_data = "some data" + std::to_string(i); Client client; - client.serve("http://localhost:9084/test_upstream", "", post_data, 1000, 250); + client.serve("http://localhost:9084/test_upstream", "", post_data, 100, 5000); EXPECT_EQ(false, client.get_closed()); EXPECT_EQ(200, client.get_resp_code()); std::string s = client.get_body(); EXPECT_EQ(s, post_data); + if(s == post_data) ++ok_cnt; }; - for(int c = 0; c < 1; ++c) - { - const int th_cnt = 50; - std::vector th_vec; - for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); - for(auto& th : th_vec) th.join(); - } + const int th_cnt = 50; + std::vector th_vec; + for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); + for(auto& th : th_vec) th.join(); + + EXPECT_EQ(maxActiveCnt, maxConn); + EXPECT_EQ(ok_cnt, th_cnt); mainServer.stop_and_wait_for(); crypton.stop_and_wait_for(); @@ -99,8 +129,8 @@ TEST_F(GraftServerTestBase, DISABLED_cryptonodeKeepAlive) } }; - graft::Output::uri_substitutions.insert({"cryptonode", {"127.0.0.1:18981/json_rpc", 3, true, 100}}); MainServer mainServer; + mainServer.m_copts.uri_substitutions.insert({"cryptonode", {"127.0.0.1:18981/json_rpc", 3, true, 100}}); mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); mainServer.run();