diff --git a/CMakeLists.txt b/CMakeLists.txt index 0cec3f8a..6468a321 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,40 @@ cmake_minimum_required(VERSION 3.10) +if(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE RelWithDebInfo) +endif(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) + +if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) + message("==> The configuration is ${CMAKE_BUILD_TYPE}. Debug info will be extracted into separate files.") + + function (add_executable _name) + _add_executable(${ARGV}) + + if (TARGET ${_name}) + add_custom_command(TARGET ${_name} POST_BUILD + COMMAND echo "$: extracting debug info" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" + COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" + ) + endif() + endfunction() + + function (add_library _name _type) + _add_library(${ARGV}) + + if (TARGET ${_name} AND ${_type} STREQUAL SHARED) + add_custom_command(TARGET ${_name} POST_BUILD + COMMAND echo "$: extracting debug info" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --only-keep-debug "$" "$.debug" + COMMAND ${CMAKE_COMMAND} -E chdir $ strip --strip-debug --strip-unneeded "$" + COMMAND ${CMAKE_COMMAND} -E chdir $ objcopy --add-gnu-debuglink="$.debug" "$" + ) + endif() + endfunction() + +endif(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) + project(graft_server) option(OPT_BUILD_TESTS "Build tests." OFF) @@ -155,6 +190,7 @@ add_library(graft STATIC ${PROJECT_SOURCE_DIR}/src/lib/graft/mongoosex.cpp ${PROJECT_SOURCE_DIR}/src/lib/graft/router.cpp ${PROJECT_SOURCE_DIR}/src/lib/graft/task.cpp + ${PROJECT_SOURCE_DIR}/src/lib/graft/upstream_manager.cpp ${PROJECT_SOURCE_DIR}/modules/mongoose/mongoose.c ${PROJECT_SOURCE_DIR}/src/supernode/server.cpp ${PROJECT_SOURCE_DIR}/src/supernode/supernode.cpp @@ -182,7 +218,7 @@ set_target_properties(graft PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/BUILD/libgraft ) -target_compile_definitions(graft PRIVATE MG_ENABLE_COAP=1 -DMONERO_DEFAULT_LOG_CATEGORY="supernode") +target_compile_definitions(graft PRIVATE MG_ENABLE_SYNC_RESOLVER MG_ENABLE_COAP -DMONERO_DEFAULT_LOG_CATEGORY="supernode") if(ENABLE_SYSLOG) target_compile_definitions(graft PRIVATE -DELPP_SYSLOG) endif() @@ -273,7 +309,7 @@ set_target_properties(supernode_common PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/BUILD/supernode_common ) -target_compile_definitions(supernode_common PRIVATE MG_ENABLE_COAP=1 -DMONERO_DEFAULT_LOG_CATEGORY="supernode") +target_compile_definitions(supernode_common PRIVATE -DMONERO_DEFAULT_LOG_CATEGORY="supernode") if(ENABLE_SYSLOG) target_compile_definitions(supernode_common PRIVATE -DELPP_SYSLOG) endif() @@ -369,7 +405,6 @@ if (OPT_BUILD_TESTS) ) - target_compile_definitions(supernode_test PRIVATE MG_ENABLE_COAP=1) add_dependencies(supernode_test graft supernode_common googletest) set_target_properties(supernode_test PROPERTIES LINK_FLAGS "-Wl,-E") if(ENABLE_SYSLOG) diff --git a/data/config.ini b/data/config.ini index 948d4978..4c83afe9 100644 --- a/data/config.ini +++ b/data/config.ini @@ -1,4 +1,5 @@ [cryptonode] +;;rpc-address can be one of [upstream] values, like rpc-address=$default rpc-address=127.0.0.1:28681 p2p-address=127.0.0.1:18980 @@ -44,13 +45,28 @@ requests-per-sec=100 ;; maximal amount of requests per second in the window, 0 t ban-ip-sec=300 ;; time duration in seconds to ban particular IP, 0 to ban forever [upstream] -blah=https://127.0.0.1:8080 -walletnode=http://127.0.0.1:28694 -;format =[,[,[,]]] [;; comment] -; where - {true | false | 0 | 1} , false by default -; example: -;wallet2=http://127.0.0.1:28694, 10, true, 2.55 ;; example -;walletnode=http://127.0.0.1:28694,cntMax,true/false/1/0 always_open,timeout +;;An entry in [upstream] section is a template with the following +;;format: +;; -uri= +;; - Upstream URI in the form of ://:/. +;; Any part of the URI can be overwritten by the requester. +;; -timeout=seconds +;; - optinal parameter, inactivity timeout in seconds, +;; [server] upstream-request-timeout by default +;; -max-conns-upstream=count +;; - optinal parameter, maximum number of connections, associated with +;; the template. 0 by default, that means no restriction. +;; -max-conns-address=count +;; - optinal parameter, is maximum number of connections, associated with +;; the template, for each host:port pair. +;; 0 by default, that means no restriction. +;; -keep-alive=[true|false] +;; - the parameter indicates the associated connections are kept opened +;; after receiving a response. false by default. +;; +default-uri=127.0.0.1:28681 +blah-uri=https://127.0.0.1:8080 +walletnode-uri=http://127.0.0.1:28694 [graftlets] ;;dirs parameter, a list of directories to search graftlets separated by colons. If a directory is set relative it will be interpreted both relative to the current directory and relative to the executable location. By default, 'graftlets' directory will be used relative to the executable location. diff --git a/graftlets/TestGraftlet.cpp b/graftlets/TestGraftlet.cpp index e2edaf70..3473cb25 100644 --- a/graftlets/TestGraftlet.cpp +++ b/graftlets/TestGraftlet.cpp @@ -56,6 +56,11 @@ class TestGraftlet: public IGraftlet return graft::Status::Ok; } + std::string testRouting(const std::string& s) + { + return s; + } + virtual void initOnce() { // REGISTER_ACTION(TestGraftlet, testUndefined); @@ -63,6 +68,7 @@ class TestGraftlet: public IGraftlet REGISTER_ACTION(TestGraftlet, testInt2); REGISTER_ACTION(TestGraftlet, testString1); REGISTER_ACTION(TestGraftlet, testString2); + REGISTER_ACTION(TestGraftlet, testRouting); REGISTER_ENDPOINT("/URI/test/{id:[0-9]+}", METHOD_GET | METHOD_POST, TestGraftlet, testHandler); REGISTER_ENDPOINT("/URI/test1/{id:[0-9]+}", METHOD_GET | METHOD_POST, TestGraftlet, testHandler1); diff --git a/include/lib/graft/GraftletLoader.h b/include/lib/graft/GraftletLoader.h index 24d8236d..1ef7906f 100644 --- a/include/lib/graft/GraftletLoader.h +++ b/include/lib/graft/GraftletLoader.h @@ -22,7 +22,9 @@ #include "lib/graft/IGraftlet.h" #include "lib/graft/GraftletRegistry.h" +#include "lib/graft/routing.h" #include "lib/graft/router.h" +#include "lib/graft/common/utils.h" #include #include @@ -66,18 +68,9 @@ class GraftletHandlerT { ClsName_ cls; std::string method; - { - int pos = cls_method.find('.'); - if(pos != std::string::npos) - { - cls = cls_method.substr(0, pos); - method = cls_method.substr(pos+1); - } - else - { - cls = cls_method; - } - } + bool res = graft::utils::split(cls_method, '.', cls, method); + if(!res) cls = cls_method; + auto it = m_cls2any.find(cls); if(it == m_cls2any.end()) throw std::runtime_error("Cannot find graftlet class name:" + cls); std::shared_ptr concreteGraftlet = std::any_cast>(it->second); @@ -124,6 +117,18 @@ class GraftletLoader return getEndpointsT(); } + template + std::string Routing(const std::string& gr_cls_method_ver, const std::string& value) + { + std::string_view grname; + std::string_view cls_method_ver; + graft::utils::split(gr_cls_method_ver, '.', grname, cls_method_ver); + if(grname.empty()) throw "cannot find graftlet name in " + gr_cls_method_ver; + + GraftletHandlerT handler = buildAndResolveGraftletT( std::string(grname) ); + return handler.template invoke( std::string(cls_method_ver), value ); + } + class DependencyGraph; friend class GraftletLoader::DependencyGraph; private: @@ -263,7 +268,7 @@ class GraftletLoader std::map> m_name2lib; //dll name -> registry std::map m_name2registries; - //dll (name, type_index of BaseT) -> (class name, any of BaseT) + //dll (name, type_index of BaseT) -> (class name -> any of BaseT) //it makes no sense to hold std::shared_ptr until the shared_ptr is returned from resolveGraftlet std::map< std::pair, std::map > m_name2gls; }; diff --git a/include/lib/graft/common/utils.h b/include/lib/graft/common/utils.h index 5c1e3f25..73214f3e 100644 --- a/include/lib/graft/common/utils.h +++ b/include/lib/graft/common/utils.h @@ -18,5 +18,8 @@ T random_number(T startRange, T endRange) return dist(mt); } +bool split(const std::string_view& in, char delim, std::string_view& first, std::string_view& second); +bool split(const std::string_view& in, char delim, std::string& first, std::string& second); + } diff --git a/include/lib/graft/connection.h b/include/lib/graft/connection.h index 9cb2b02e..e337367e 100644 --- a/include/lib/graft/connection.h +++ b/include/lib/graft/connection.h @@ -4,6 +4,8 @@ #include "lib/graft/task.h" #include "lib/graft/blacklist.h" +namespace graftlet { class GraftletLoader; } + namespace graft { namespace details @@ -72,11 +74,19 @@ class UpstreamSender : public SelfHolder : m_bt(bt), m_onDone(onDone), m_keepAlive(true), m_connectioId(connectionId), m_upstream(upstream), m_timeout(timeout) { } + UpstreamSender(const BaseTaskPtr& bt, std::string error, std::function onDoneError) : m_bt(bt) + { + setError(Status::Error, error); + onDoneError(*this); + releaseItself(); + } + BaseTaskPtr& getTask() { return m_bt; } - void send(TaskManager& manager, const std::string& uri); + void send(mg_mgr* mgr, int http_callback_port, const std::string& uri); Status getStatus() const { return m_status; } const std::string& getError() const { return m_error; } + size_t getRequestSize() const { return m_requestSize; } void ev_handler(mg_connection* upstream, int ev, void *ev_data); private: @@ -94,6 +104,7 @@ class UpstreamSender : public SelfHolder mg_connection* m_upstream = nullptr; Status m_status = Status::None; std::string m_error; + size_t m_requestSize = 0; }; class ConnectionBase; @@ -101,7 +112,7 @@ class ConnectionBase; class Looper final : public TaskManager { public: - Looper(const ConfigOpts& copts, ConnectionBase& connectionBase); + Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase); virtual ~Looper(); void serve(); @@ -160,6 +171,8 @@ class ConnectionManager Proto m_proto; }; +class UpstreamRoutingManager; + class ConnectionBase final { public: @@ -171,7 +184,7 @@ class ConnectionBase final void setSysInfoCounter(std::unique_ptr& counter); void createSystemInfoCounter(); void loadBlacklist(const ConfigOpts& copts); - void createLooper(ConfigOpts& configOpts); + void createLooper(graftlet::GraftletLoader& graftletLoader, ConfigOpts& configOpts); void initConnectionManagers(); void bindConnectionManagers(); @@ -193,6 +206,7 @@ class ConnectionBase final std::unique_ptr m_sysInfo; std::atomic_bool m_looperReady{false}; std::unique_ptr m_looper; + std::unique_ptr m_upstreamManager; std::map> m_conManagers; }; diff --git a/include/lib/graft/inout.h b/include/lib/graft/inout.h index bb81034c..27927292 100644 --- a/include/lib/graft/inout.h +++ b/include/lib/graft/inout.h @@ -204,14 +204,16 @@ namespace graft * Set uri, proto, host, port, path members if you need. * The function forms real URI substituting absent parts according to Config.ini. * It is public to be accessed from tests and other classes. - * \param default_uri - this parameter always comes from [cryptonode]rpc-address of Config.ini. - * \return + * \param default_uri - one of uri from lines in [upstream] of Config.ini. + * \param ip_port - output of resulting "ip:port". + * \param result_uri - output of resulting uri. + * \param resolve_cache - a cache with resolved host to ip map. + * \return true on success or false otherwise */ - std::string makeUri(const std::string& default_uri) const; + bool makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri, std::unordered_map& resolve_cache) const; std::string port; std::string path; - static std::unordered_map> uri_substitutions; }; class InHttp final : public InOutHttpBase diff --git a/include/lib/graft/routing.h b/include/lib/graft/routing.h new file mode 100644 index 00000000..0884282b --- /dev/null +++ b/include/lib/graft/routing.h @@ -0,0 +1,40 @@ +// Copyright (c) 2018, The Graft Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +namespace graft { + +using RoutingFunctionSignature = std::string (const std::string&); +using RoutingFunction = std::function; + +}//namespace graft + diff --git a/include/lib/graft/serveropts.h b/include/lib/graft/serveropts.h index 490d49c9..ee3e09d6 100644 --- a/include/lib/graft/serveropts.h +++ b/include/lib/graft/serveropts.h @@ -2,6 +2,7 @@ #include #include +#include #include namespace graft { @@ -30,6 +31,8 @@ struct ConfigOpts std::vector graftlet_dirs; int lru_timeout_ms; IPFilterOpts ipfilter; + std::unordered_map> uri_substitutions; + std::string default_uri_substitution_name; void check_asserts() const { diff --git a/include/lib/graft/task.h b/include/lib/graft/task.h index 6fc8f1ad..896ea09f 100644 --- a/include/lib/graft/task.h +++ b/include/lib/graft/task.h @@ -38,6 +38,7 @@ namespace graft { extern std::string client_addr(mg_connection* client); extern std::string client_host(mg_connection* client); +extern unsigned int port_from_uri(const std::string& uri); class UpstreamSender; class TaskManager; @@ -192,7 +193,7 @@ class UpstreamManager; class TaskManager : private HandlerAPI { public: - TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter); + TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter); virtual ~TaskManager(); TaskManager(const TaskManager&) = delete; TaskManager& operator = (const TaskManager&) = delete; @@ -259,6 +260,7 @@ class TaskManager : private HandlerAPI static inline size_t next_pow2(size_t val); + UpstreamManager& m_upstreamManager; SysInfoCounter& m_sysInfoCounter; GlobalContextMap m_gcm; @@ -277,7 +279,6 @@ class TaskManager : private HandlerAPI std::deque m_readyToResume; std::priority_queue,Context::uuid_t>> m_expireTaskQueue; std::unique_ptr m_futurePostponeUuids; - std::unique_ptr m_upstreamManager; using PromiseItem = UpstreamTask::PromiseItem; using PromiseQueue = tp::MPMCBoundedQueue; diff --git a/include/lib/graft/upstream_manager.h b/include/lib/graft/upstream_manager.h new file mode 100644 index 00000000..7bdd7dea --- /dev/null +++ b/include/lib/graft/upstream_manager.h @@ -0,0 +1,112 @@ +#pragma once +#include "lib/graft/connection.h" +#include "lib/graft/routing.h" + +namespace graft +{ + +class UpstreamRoutingManager +{ +public: + UpstreamRoutingManager(graftlet::GraftletLoader& graftletLoader) + : m_graftletLoader(graftletLoader) + { } + + void add(const std::string& name, RoutingFunction converter); + bool route(const std::string& in, std::string& out); +private: + std::map m_converters; + graftlet::GraftletLoader& m_graftletLoader; +}; + +class UpstreamManager +{ +public: + using OnDoneCallback = std::function; + + UpstreamManager() = default; + + void init(graftlet::GraftletLoader& graftletLoader, const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback); + + bool busy() const + { + return (m_cntUpstreamSender != m_cntUpstreamSenderDone); + } + + void send(BaseTaskPtr& bt); +protected: + //testGetUri for test only + const std::string testGetUri(const Output& output); +private: + class ConnItem + { + public: + using ConnectionId = uint64_t; + using IpPort = std::string; + using Uri = std::string; + + struct Bunch + { + int m_connCnt = 0; + std::map m_idleConnections; + std::map m_activeConnections; + UpstreamStub m_upstreamStub; + }; + + ConnItem() = default; + ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) + : m_uriId(uriId) + , m_uri(uri) + , m_maxConnections(maxConnections) + , m_keepAlive(keepAlive) + , m_timeout(timeout) + { + } + ~ConnItem() + { + for(auto& it : m_bunches) + { + auto& b = it.second; + assert(b.m_idleConnections.empty()); + assert(b.m_activeConnections.empty()); + } + } + + std::pair getConnection(const IpPort& ip_port); + void releaseActive(ConnectionId connectionId, const IpPort& ip_port, mg_connection* client); + void onCloseIdle(const IpPort& ip_port, mg_connection* client); + Bunch& getBunch(const IpPort& ip_port, bool createIfNotExists = false); + + int m_uriId; + std::string m_uri; + double m_timeout; + int m_maxConnections; + std::deque< std::tuple > m_taskQueue; + bool m_keepAlive = false; + std::map m_bunches; + private: + ConnectionId m_newId = 0; + }; + + void onDone(UpstreamSender& uss, ConnItem* connItem, const std::string& ip_port, ConnItem::ConnectionId connectionId, mg_connection* client); + void createUpstreamSender(ConnItem* connItem, const std::string& ip_port, BaseTaskPtr bt, const std::string& uri); + ConnItem* findConnItem(const Output& output, std::string& ip_port, std::string& result_uri); + const std::string& getUri(ConnItem* connItem, const std::string& inputUri); + void initUpstreamRoutingManager(graftlet::GraftletLoader& graftletLoader); + + using Uri2ConnItem = std::map; + + OnDoneCallback m_onDoneCallback; + uint64_t m_cntUpstreamSender = 0; + uint64_t m_cntUpstreamSenderDone = 0; + ConnItem m_default; + Uri2ConnItem m_conn2item; + mg_mgr* m_mgr = nullptr; + int m_http_callback_port; + std::unique_ptr m_upstreamRoutingManager; +protected: + std::unordered_map m_resolveCache; +}; + +}//namespace graft + diff --git a/include/supernode/server.h b/include/supernode/server.h index 11e4880a..1e28d4f0 100644 --- a/include/supernode/server.h +++ b/include/supernode/server.h @@ -9,6 +9,8 @@ namespace graft::request::system_info { class Counter; } namespace graft { +class UpstreamRoutingManager; + using SysInfoCounter = request::system_info::Counter; class GraftServer @@ -53,7 +55,7 @@ class GraftServer void serve(); static void initSignals(); void addGlobalCtxCleaner(); - void initGraftlets(); + void initGraftlets(const ConfigOpts& copts); void initGraftletRouters(); ConfigOpts& getCopts(); diff --git a/src/lib/graft/common/utils.cpp b/src/lib/graft/common/utils.cpp index 3bcd29d0..1786bece 100644 --- a/src/lib/graft/common/utils.cpp +++ b/src/lib/graft/common/utils.cpp @@ -16,5 +16,33 @@ std::string base64_encode(const std::string &data) return epee::string_encoding::base64_encode(data); } +namespace { +template +bool split_impl(const std::string_view& in, char delim, res& first, res& second) +{ + size_t pos = in.find(delim); + if(pos != std::string::npos) + { + first = in.substr(0, pos); + second = in.substr(pos+1); + return true; + } + first = in; + second = std::string_view(); + return false; +} + +} //namespace + +bool split(const std::string_view& in, char delim, std::string_view& first, std::string_view& second) +{ + return split_impl(in, delim, first, second); } + +bool split(const std::string_view& in, char delim, std::string& first, std::string& second) +{ + return split_impl(in, delim, first, second); } + +} //namespace utils +} //namespace graft diff --git a/src/lib/graft/connection.cpp b/src/lib/graft/connection.cpp index 8aa3ad0d..31ebdd20 100644 --- a/src/lib/graft/connection.cpp +++ b/src/lib/graft/connection.cpp @@ -3,6 +3,7 @@ #include "lib/graft/mongoosex.h" #include "lib/graft/sys_info.h" #include "lib/graft/graft_exception.h" +#include "lib/graft/upstream_manager.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.connection" @@ -23,6 +24,16 @@ std::string client_host(mg_connection* client) return inet_ntoa(client->sa.sin.sin_addr); } +unsigned int port_from_uri(const std::string& uri) +{ + assert(!uri.empty()); + mg_str mg_uri{uri.c_str(), uri.size()}; + unsigned int mg_port = 0; + int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0); + assert(0<=res); + return mg_port; +} + void* getUserData(mg_mgr* mgr) { return mgr->user_data; } void* getUserData(mg_connection* nc) { return nc->user_data; } mg_mgr* getMgr(mg_connection* nc) { return nc->mgr; } @@ -60,28 +71,18 @@ void UpstreamStub::ev_handler(mg_connection *upstream, int ev, void *ev_data) } -void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) +void UpstreamSender::send(mg_mgr* mgr, int http_callback_port, const std::string& def_uri) { assert(m_bt); - const ConfigOpts& opts = manager.getCopts(); - Output& output = m_bt->getOutput(); - std::string url = output.makeUri(def_uri); + std::string url = def_uri; Context::uuid_t callback_uuid = m_bt->getCtx().getId(false); if(!callback_uuid.is_nil()) {//add extra header - unsigned int mg_port = 0; - { - std::string uri = opts.http_address; - assert(!uri.empty()); - mg_str mg_uri{uri.c_str(), uri.size()}; - int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0); - assert(0<=res); - } std::stringstream ss; - ss << "http://0.0.0.0:" << mg_port << "/callback/" << boost::uuids::to_string(callback_uuid); + ss << "http://0.0.0.0:" << http_callback_port << "/callback/" << boost::uuids::to_string(callback_uuid); output.headers.emplace_back(std::make_pair("X-Callback", ss.str())); } std::string extra_headers = output.combine_headers(); @@ -95,7 +96,8 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) m_upstream->user_data = this; m_upstream->handler = static_ev_handler; } - mg_connection* upstream = mg::mg_connect_http_x(m_upstream, manager.getMgMgr(), static_ev_handler, url.c_str(), + LOG_PRINT_L2("connecting to url '") << url <<"'"; + mg_connection* upstream = mg::mg_connect_http_x(m_upstream, mgr, static_ev_handler, url.c_str(), extra_headers.c_str(), body); //body.empty() means GET assert(upstream != nullptr && (m_upstream == nullptr || m_upstream == upstream)); @@ -106,9 +108,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri) } mg_set_timer(m_upstream, mg_time() + m_timeout); - auto& rsi = manager.runtimeSysInfo(); - rsi.count_upstrm_http_req(); - rsi.count_upstrm_http_req_bytes_raw(url.size() + extra_headers.size() + body.size()); + m_requestSize = url.size() + extra_headers.size() + body.size(); } void UpstreamSender::ev_handler(mg_connection *upstream, int ev, void *ev_data) @@ -238,10 +238,14 @@ void ConnectionBase::createSystemInfoCounter() m_sysInfo = std::make_unique(); } -void ConnectionBase::createLooper(ConfigOpts& configOpts) +void ConnectionBase::createLooper(graftlet::GraftletLoader& graftletLoader, ConfigOpts& configOpts) { - assert(m_sysInfo && !m_looper); - m_looper = std::make_unique(configOpts, *this); + assert(m_sysInfo && !m_looper && !m_upstreamManager); + m_upstreamManager = std::make_unique(); + + m_looper = std::make_unique(configOpts, *m_upstreamManager, *this); + m_upstreamManager->init(graftletLoader, configOpts, m_looper->getMgMgr(), port_from_uri(configOpts.http_address), + [this](UpstreamSender& uss){ m_looper->onUpstreamDone(uss); }); m_looperReady = true; } @@ -290,8 +294,8 @@ void ConnectionBase::checkRoutes(graft::ConnectionManager& cm) } -Looper::Looper(const ConfigOpts& copts, ConnectionBase& connectionBase) - : TaskManager(copts, connectionBase.getSysInfoCounter()) +Looper::Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase) + : TaskManager(copts, upstreamManager, connectionBase.getSysInfoCounter()) , m_mgr(std::make_unique()) { mg_mgr_init(m_mgr.get(), &connectionBase, cb_event); diff --git a/src/lib/graft/inout.cpp b/src/lib/graft/inout.cpp index e13c289d..e85cfea7 100644 --- a/src/lib/graft/inout.cpp +++ b/src/lib/graft/inout.cpp @@ -1,10 +1,10 @@ #include "lib/graft/inout.h" #include "lib/graft/mongoosex.h" +#include namespace graft { -std::unordered_map> OutHttp::uri_substitutions; void InOutHttpBase::set_str_field(const http_message& hm, const mg_str& str_fld, std::string& fld) { @@ -54,8 +54,9 @@ std::string InOutHttpBase::combine_headers() return s; } -std::string OutHttp::makeUri(const std::string& default_uri) const +bool OutHttp::makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri, std::unordered_map& resolve_cache) const { + result_uri.clear(); std::string uri_ = default_uri; std::string port_; @@ -71,7 +72,8 @@ std::string OutHttp::makeUri(const std::string& default_uri) const int res = mg_parse_uri(mg_uri, &mg_scheme, &mg_user_info, &mg_host, &mg_port, &mg_path, &mg_query, &mg_fragment); if(res<0) break; if(mg_port) - port_ = std::to_string(mg_port); + port_ = std::to_string(mg_port); + #define V(n) n##_ = std::string(mg_##n.p, mg_##n.len) V(scheme); V(user_info); V(host); V(path); V(query); V(fragment); #undef V @@ -83,14 +85,33 @@ std::string OutHttp::makeUri(const std::string& default_uri) const if(!port.empty()) port_ = port; if(!path.empty()) path_ = path; - std::string url; + {//get ip by host_ + auto it = resolve_cache.find(host_); + if(it == resolve_cache.end()) + { + char buf[0x100]; + if(!mg_resolve(host_.c_str(), buf, sizeof(buf))) + { + LOG_PRINT_L1("cannot resolve host '") << host_ << "'"; + return false; + } + LOG_PRINT_L2("host '") << host_ << "' resolved as '" << buf << "'"; + auto res = resolve_cache.emplace(host_, std::string(buf)); + assert(res.second); + it = res.first; + } + host_ = it->second; + } + + std::string& url = result_uri; if(!scheme_.empty()) { url += scheme_ + "://"; if(!user_info_.empty()) url += user_info_ + '@'; } - url += host_; - if(!port_.empty()) url += ':' + port_; + ip_port = host_; + if(!port_.empty()) ip_port += ':' + port_; + url += ip_port; if(!path_.empty()) { if(path_[0]!='/') path_ = '/' + path_; @@ -98,7 +119,7 @@ std::string OutHttp::makeUri(const std::string& default_uri) const } if(!query_.empty()) url += '?' + query_; if(!fragment_.empty()) url += '#' + fragment_; - return url; + return true; } } //namespace graft diff --git a/src/lib/graft/mongoosex.cpp b/src/lib/graft/mongoosex.cpp index bcca1aa3..74c5e335 100644 --- a/src/lib/graft/mongoosex.cpp +++ b/src/lib/graft/mongoosex.cpp @@ -1,5 +1,6 @@ #include "lib/graft/mongoosex.h" +#include extern "C" { @@ -35,8 +36,8 @@ mg_connection *mg_send_http_opt_x(mg_connection *nc, else { //TODO: excessive parse second time - mg_parse_uri(mg_mk_str(url), NULL, &user, &host, NULL, &path, NULL, NULL); - nc = nc; + unsigned int port; + mg_parse_uri(mg_mk_str(url), NULL, &user, &host, &port, &path, NULL, NULL); } mbuf_init(&auth, 0); diff --git a/src/lib/graft/task.cpp b/src/lib/graft/task.cpp index 5fe8a0ab..341549a4 100644 --- a/src/lib/graft/task.cpp +++ b/src/lib/graft/task.cpp @@ -6,6 +6,7 @@ #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" #include "lib/graft/sys_info.h" +#include "lib/graft/upstream_manager.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.task" @@ -203,195 +204,9 @@ class ExpiringList : public detail::ExpiringListT< Uuid_Input > { } }; -class UpstreamManager -{ -public: - using OnDoneCallback = std::function; - - UpstreamManager(TaskManager& manager, OnDoneCallback onDoneCallback) - : m_manager(manager) - , m_onDoneCallback(onDoneCallback) - { init(); } - - bool busy() const - { - return (m_cntUpstreamSender != m_cntUpstreamSenderDone); - } - - void send(BaseTaskPtr bt) - { - ConnItem* connItem = &m_default; - {//find connItem - const std::string& uri = bt->getOutput().uri; - if(!uri.empty() && uri[0] == '$') - {//substitutions - auto it = m_conn2item.find(uri.substr(1)); - if(it == m_conn2item.end()) - { - std::ostringstream oss; - oss << "cannot find uri substitution '" << uri << "'"; - throw std::runtime_error(oss.str()); - } - connItem = &it->second; - } - } - if(connItem->m_maxConnections != 0 && connItem->m_idleConnections.empty() && connItem->m_connCnt == connItem->m_maxConnections) - { - connItem->m_taskQueue.push_back(bt); - return; - } - - createUpstreamSender(connItem, bt); - } -private: - uint64_t m_cntUpstreamSender = 0; - uint64_t m_cntUpstreamSenderDone = 0; - - class ConnItem - { - public: - using Active = bool; - using ConnectionId = uint64_t; - - ConnItem() = default; - ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout) - : m_uriId(uriId) - , m_uri(uri) - , m_maxConnections(maxConnections) - , m_keepAlive(keepAlive) - , m_timeout(timeout) - { - } - std::pair getConnection() - { - //TODO: something wrong with (m_connCnt <= m_maxConnections) - assert(m_maxConnections == 0 || m_connCnt <= m_maxConnections); - std::pair res = std::make_pair(0,nullptr); - if(!m_keepAlive) - { - ++m_connCnt; - return res; - } - if(!m_idleConnections.empty()) - { - auto it = m_idleConnections.begin(); - res = std::make_pair(it->second, it->first); - m_idleConnections.erase(it); - } - else - { - ++m_connCnt; - res.first = ++m_newId; - } - auto res1 = m_activeConnections.emplace(res); - assert(res1.second); - assert(m_connCnt == m_idleConnections.size() + m_activeConnections.size()); - return res; - } - - void releaseActive(ConnectionId connectionId, mg_connection* client) - { - assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); - if(!m_keepAlive) return; - auto it = m_activeConnections.find(connectionId); - assert(it != m_activeConnections.end()); - assert(it->second == nullptr || client == nullptr || it->second == client); - if(client != nullptr) - { - m_idleConnections.emplace(client, it->first); - m_upstreamStub.setConnection(client); - } - else - { - --m_connCnt; - } - m_activeConnections.erase(it); - } - - void onCloseIdle(mg_connection* client) - { - assert(m_keepAlive); - auto it = m_idleConnections.find(client); - assert(it != m_idleConnections.end()); - --m_connCnt; - m_idleConnections.erase(it); - } - - ConnectionId m_newId = 0; - int m_connCnt = 0; - int m_uriId; - std::string m_uri; - double m_timeout; - //assert(m_upstreamQueue.empty() || 0 < m_maxConn); - int m_maxConnections; - std::deque m_taskQueue; - bool m_keepAlive = false; - std::map m_idleConnections; - std::map m_activeConnections; - UpstreamStub m_upstreamStub; - }; - - void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client) - { - ++m_cntUpstreamSenderDone; - m_onDoneCallback(uss); - connItem->releaseActive(connectionId, client); - if(connItem->m_taskQueue.empty()) return; - BaseTaskPtr bt = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); - createUpstreamSender(connItem, bt); - } - - void init() - { - int uriId = 0; - const ConfigOpts& opts = m_manager.getCopts(); - m_default = ConnItem(uriId++, opts.cryptonode_rpc_address.c_str(), 0, false, opts.upstream_request_timeout); - - for(auto& subs : OutHttp::uri_substitutions) - { - double timeout = std::get<3>(subs.second); - if(timeout < 1e-5) timeout = opts.upstream_request_timeout; - auto res = m_conn2item.emplace(subs.first, ConnItem(uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); - assert(res.second); - ConnItem* connItem = &res.first->second; - connItem->m_upstreamStub.setCallback([connItem](mg_connection* client){ connItem->onCloseIdle(client); }); - } - } - - void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt) - { - auto onDoneAct = [this, connItem](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) - { - onDone(uss, connItem, connectionId, client); - }; - - ++m_cntUpstreamSender; - UpstreamSender::Ptr uss; - if(connItem->m_keepAlive) - { - auto res = connItem->getConnection(); - uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); - } - else - { - uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); - } - - const std::string& uri = (connItem != &m_default || bt->getOutput().uri.empty())? connItem->m_uri : bt->getOutput().uri; - uss->send(m_manager, uri); - } - - using Uri2ConnItem = std::map; - - OnDoneCallback m_onDoneCallback; - - ConnItem m_default; - Uri2ConnItem m_conn2item; - TaskManager& m_manager; //TODO: should be removed, and be independent of TaskManager -}; - -TaskManager::TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter) +TaskManager::TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter) : m_copts(copts) + , m_upstreamManager(upstreamManager) , m_sysInfoCounter(sysInfoCounter) , m_gcm(this) , m_futurePostponeUuids(std::make_unique(1000 * copts.http_connection_timeout)) @@ -489,15 +304,13 @@ void TaskManager::checkUpstreamBlockingIO() bool res = m_promiseQueue->pop(pi); if(!res) break; UpstreamTask::Ptr bt = BaseTask::Create(*this, std::move(pi)); - assert(m_upstreamManager); - m_upstreamManager->send(bt); + sendUpstream(bt); } } void TaskManager::sendUpstream(BaseTaskPtr bt) { - assert(m_upstreamManager); - m_upstreamManager->send(bt); + m_upstreamManager.send(bt); } void TaskManager::onTimer(BaseTaskPtr bt) @@ -536,7 +349,7 @@ void TaskManager::schedule(PeriodicTask* pt) bool TaskManager::canStop() { return (m_cntBaseTask == m_cntBaseTaskDone) - && (!m_upstreamManager->busy()) + && (!m_upstreamManager.busy()) && (m_cntJobSent == m_cntJobDone); } @@ -856,7 +669,6 @@ void TaskManager::initThreadPool(int threadCount, int workersQueueSize, int expe m_promiseQueue = std::make_unique( threadCount ); //TODO: it is not clear how many items we need in PeriodicTaskQueue, maybe we should make it dynamically but this requires additional synchronization m_periodicTaskQueue = std::make_unique(2*threadCount); - m_upstreamManager = std::make_unique(*this, [this](UpstreamSender& uss){ onUpstreamDone(uss); } ); LOG_PRINT_L1("Thread pool created with " << threadCount << " workers with " << workersQueueSize @@ -892,6 +704,9 @@ void TaskManager::onUpstreamDone(UpstreamSender& uss) void TaskManager::upstreamDoneProcess(UpstreamSender& uss) { + runtimeSysInfo().count_upstrm_http_req(); + runtimeSysInfo().count_upstrm_http_req_bytes_raw(uss.getRequestSize()); + if(Status::Ok == uss.getStatus()) runtimeSysInfo().count_upstrm_http_resp_ok(); else diff --git a/src/lib/graft/upstream_manager.cpp b/src/lib/graft/upstream_manager.cpp new file mode 100644 index 00000000..09fb337e --- /dev/null +++ b/src/lib/graft/upstream_manager.cpp @@ -0,0 +1,296 @@ +#include "lib/graft/upstream_manager.h" +#include "lib/graft/GraftletLoader.h" + +namespace graft +{ + +void UpstreamRoutingManager::add(const std::string& name, RoutingFunction converter) +{ + auto res = m_converters.emplace(name, converter); + assert(res.second); +} + +/*! + * \brief route - make routing. + * \param in - it expects "#function:data". it finds and calls function(data), + * and place the result back to the route. + * for graftlets the function is "graftletName.ClassName.method" + * \return false if the function not found + */ +bool UpstreamRoutingManager::route(const std::string& in, std::string& out) +{ + if(in[0] != '#') return false; + size_t pos = in.find(':', 1); + if(pos == std::string::npos) return false; + std::string name = in.substr(1, pos-1); + std::string val = in.substr(pos+1); + auto it = m_converters.find(name); + if(it != m_converters.end()) + { + out = it->second(val); + return true; + } + //try call graftlet method + try + { + out = m_graftletLoader.Routing(name, val); + } + catch(...) + { + return false; + } + return true; +} + +UpstreamManager::ConnItem::Bunch& UpstreamManager::ConnItem::getBunch(const IpPort& ip_port, bool createIfNotExists) +{ + assert(m_keepAlive || ip_port.empty()); + auto b_it = m_bunches.find(ip_port); + assert(b_it != m_bunches.end() || createIfNotExists); + if(b_it == m_bunches.end()) + { + auto res = m_bunches.emplace(std::make_pair(ip_port, Bunch())); + assert(res.second); + b_it = res.first; + + Bunch& bunch = b_it->second; + bunch.m_upstreamStub.setCallback([this, ip_port](mg_connection* client){ onCloseIdle(ip_port, client); }); + } + return b_it->second; +} + + +std::pair UpstreamManager::ConnItem::getConnection(const IpPort& ip_port) +{ + Bunch& bunch = getBunch(ip_port, true); + + assert(m_maxConnections == 0 || bunch.m_connCnt < m_maxConnections || bunch.m_connCnt == m_maxConnections && !bunch.m_idleConnections.empty()); + std::pair res = std::make_pair(0,nullptr); + if(!m_keepAlive) + { + ++bunch.m_connCnt; + return res; + } + if(!bunch.m_idleConnections.empty()) + { + auto it = bunch.m_idleConnections.begin(); + res = std::make_pair(it->second, it->first); + bunch.m_idleConnections.erase(it); + } + else + { + ++bunch.m_connCnt; + res.first = ++m_newId; + } + auto res1 = bunch.m_activeConnections.emplace(res); + assert(res1.second); + assert(bunch.m_connCnt == bunch.m_idleConnections.size() + bunch.m_activeConnections.size()); + return res; +} + +void UpstreamManager::ConnItem::releaseActive(ConnectionId connectionId, const IpPort& ip_port, mg_connection* client) +{ + Bunch& bunch = getBunch(ip_port); + + assert(m_keepAlive || ((connectionId == 0) && (client == nullptr))); + if(!m_keepAlive) + { + --bunch.m_connCnt; + return; + } + auto it = bunch.m_activeConnections.find(connectionId); + assert(it != bunch.m_activeConnections.end()); + assert(it->second == nullptr || client == nullptr || it->second == client); + if(client != nullptr) + { + bunch.m_idleConnections.emplace(client, it->first); + bunch.m_upstreamStub.setConnection(client); + } + else + { + --bunch.m_connCnt; + } + bunch.m_activeConnections.erase(it); +} + +void UpstreamManager::ConnItem::onCloseIdle(const IpPort& ip_port, mg_connection* client) +{ + Bunch& bunch = getBunch(ip_port); + + assert(m_keepAlive); + auto it = bunch.m_idleConnections.find(client); + assert(it != bunch.m_idleConnections.end()); + --bunch.m_connCnt; + bunch.m_idleConnections.erase(it); + + assert(bunch.m_connCnt == bunch.m_idleConnections.size() + bunch.m_activeConnections.size()); + if(bunch.m_connCnt == 0) + { + m_bunches.erase(ip_port); + } +} + +UpstreamManager::ConnItem* UpstreamManager::findConnItem(const Output& output, std::string& ip_port, std::string& result_uri) +{ + ConnItem* connItem = &m_default; + {//find connItem + std::string uri = output.uri; + if(!uri.empty() && uri[0] == '#') + { + std::string out; + bool res = m_upstreamRoutingManager->route(uri, out); + if(res) + { + LOG_PRINT_L2("routing '") << uri << "' -> '" << out << "'"; + uri = out; + } + else + { + LOG_PRINT_L2("routing '") << uri << "' failed"; + } + } + if(!uri.empty() && uri[0] == '$') + {//substitutions + auto it = m_conn2item.find(uri.substr(1)); + if(it == m_conn2item.end()) + { + std::ostringstream oss; + oss << "cannot find uri substitution '" << uri << "'"; + throw std::runtime_error(oss.str()); + } + connItem = &it->second; + } + } + + bool res = output.makeUri( getUri(connItem, output.uri), ip_port, result_uri, m_resolveCache); + if(!res) + { + std::ostringstream oss; + oss << "cannot resolve host with uri '" << getUri(connItem, output.uri) << "' host '" << output.host << "'"; + throw std::runtime_error(oss.str()); + } + if(!connItem->m_keepAlive) ip_port.clear(); + + return connItem; +} + +void UpstreamManager::send(BaseTaskPtr& bt) +{ + std::string ip_port, uri; + ConnItem* connItem = nullptr; + try + { + connItem = findConnItem(bt->getOutput(), ip_port, uri); + } + catch(std::exception& e) + { + //uss reports error, and dies + UpstreamSender::Ptr uss = UpstreamSender::Create(bt, e.what(), m_onDoneCallback); + return; + } + + ConnItem::Bunch& bunch = connItem->getBunch(ip_port, true); + + assert(connItem->m_keepAlive || bunch.m_idleConnections.empty()); + if(connItem->m_maxConnections != 0 && bunch.m_idleConnections.empty() && bunch.m_connCnt == connItem->m_maxConnections) + { + connItem->m_taskQueue.push_back( std::make_tuple(bt, ip_port, uri ) ); + return; + } + + createUpstreamSender(connItem, ip_port, bt, uri); +} + + +void UpstreamManager::onDone(UpstreamSender& uss, ConnItem* connItem, const std::string& ip_port, ConnItem::ConnectionId connectionId, mg_connection* client) +{ + ++m_cntUpstreamSenderDone; + m_onDoneCallback(uss); + connItem->releaseActive(connectionId, ip_port, client); + if(connItem->m_taskQueue.empty()) return; + std::string ip_port_v, uri; BaseTaskPtr bt; + std::tie(bt,ip_port_v,uri) = connItem->m_taskQueue.front(); connItem->m_taskQueue.pop_front(); + createUpstreamSender(connItem, ip_port_v, bt, uri); +} + +void UpstreamManager::initUpstreamRoutingManager(graftlet::GraftletLoader& graftletLoader) +{ + assert(!m_upstreamRoutingManager); + m_upstreamRoutingManager = std::make_unique(graftletLoader); +} + +void UpstreamManager::init(graftlet::GraftletLoader& graftletLoader, const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback) +{ + initUpstreamRoutingManager(graftletLoader); + + m_mgr = mgr; + m_http_callback_port = http_callback_port; + m_onDoneCallback = onDoneCallback; + + int uriId = 0; + for(auto& subs : copts.uri_substitutions) + { + double timeout = std::get<3>(subs.second); + if(timeout < 1e-5) timeout = copts.upstream_request_timeout; + auto res = m_conn2item.emplace(subs.first, ConnItem(++uriId, std::get<0>(subs.second), std::get<1>(subs.second), std::get<2>(subs.second), timeout)); + assert(res.second); + } + if(copts.default_uri_substitution_name.empty()) + { + m_default = ConnItem(0, copts.cryptonode_rpc_address.c_str(), 0, false, copts.upstream_request_timeout); + } + else + { + auto it = m_conn2item.find(copts.default_uri_substitution_name); + assert(it != m_conn2item.end()); + m_default = it->second; + m_default.m_uriId = 0; + m_conn2item.erase(it); + } +} + +const std::string& UpstreamManager::getUri(ConnItem* connItem, const std::string& inputUri) +{ + const std::string& uri = (connItem != &m_default || inputUri.empty())? connItem->m_uri : inputUri; + return uri; +} + +void UpstreamManager::createUpstreamSender(ConnItem* connItem, const std::string& ip_port, BaseTaskPtr bt, const std::string& uri) +{ + std::string ip_port_v = ip_port; + auto onDoneAct = [this, connItem, ip_port_v](UpstreamSender& uss, uint64_t connectionId, mg_connection* client) + { + onDone(uss, connItem, ip_port_v, connectionId, client); + }; + + ++m_cntUpstreamSender; + UpstreamSender::Ptr uss; + auto res = connItem->getConnection(ip_port); + if(connItem->m_keepAlive) + { + uss = UpstreamSender::Create(bt, onDoneAct, res.first, res.second, connItem->m_timeout); + } + else + { + uss = UpstreamSender::Create(bt, onDoneAct, connItem->m_timeout); + } + + uss->send(m_mgr, m_http_callback_port, uri); +} + +const std::string UpstreamManager::testGetUri(const Output& output) +{ + std::string ip_port, uri; + try + { + ConnItem* connItem = findConnItem(output, ip_port, uri); + } + catch(std::exception&) + { + return getUri(&m_default, output.uri); + } + return uri; +} + +}//namespace graft + diff --git a/src/supernode/server.cpp b/src/supernode/server.cpp index 1e65094e..7bdace4e 100644 --- a/src/supernode/server.cpp +++ b/src/supernode/server.cpp @@ -4,10 +4,10 @@ #include "lib/graft/GraftletLoader.h" #include "lib/graft/sys_info.h" #include "lib/graft/graft_exception.h" +#include "lib/graft/upstream_manager.h" #include #include -#include #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.server" @@ -62,12 +62,12 @@ void GraftServer::getThreadPoolInfo(uint64_t& activeWorkers, uint64_t& expelledW m_connectionBase->getLooper().getThreadPoolInfo(activeWorkers, expelledWorkers); } -void GraftServer::initGraftlets() +void GraftServer::initGraftlets(const ConfigOpts& copts) { if(m_graftletLoader) return; m_graftletLoader = std::make_unique(); LOG_PRINT_L1("Searching graftlets"); - for(auto& it : getCopts().graftlet_dirs) + for(auto& it : copts.graftlet_dirs) { LOG_PRINT_L1("Searching graftlets in directory '") << it << "'"; m_graftletLoader->findGraftletsInDirectory(it, "so"); @@ -146,9 +146,12 @@ bool GraftServer::init(int argc, const char** argv, ConfigOpts& configOpts) bool res = initConfigOption(argc, argv, configOpts); if(!res) return false; + initGraftlets(configOpts); + assert(m_graftletLoader); + m_connectionBase->loadBlacklist(configOpts); - m_connectionBase->createLooper(configOpts); - initGraftlets(); + m_connectionBase->createLooper(*m_graftletLoader, configOpts); + addGlobalCtxCleaner(); initGlobalContext(); @@ -370,29 +373,6 @@ void initGraftletDirs(int argc, const char** argv, const std::string& dirs_opt, } } -void parseSubstitutionItem(const std::string& name, const std::string& val, std::string& uri, int& cnt, bool& keepAlive, double& timeout) -{ - std::string s = trim_comments(val); - std::regex regex(R"(^\s*([^,\s]+)\s*(,\s*(\d+)\s*(,\s*(true|false|0|1)\s*(,\s*(\d+\.?\d*)\s*)?)?)?\s*$)"); - std::smatch m; - if(!std::regex_match(s, m, regex)) - { - std::ostringstream oss; - oss << "invalid [upstream] format line with name '" << name << "' : '" << val << "'"; - throw graft::exit_error(oss.str()); - } - assert(7 < m.size()); - assert(m[1].matched); - uri = m[1]; - cnt = 0; keepAlive = false; timeout = 0; - if(!m[3].matched) return; - cnt = std::stoi(m[3]); - if(!m[5].matched) return; - if(m[5] == "true" || m[5] == "1") keepAlive = true; - if(!m[7].matched) return; - timeout = std::stod(m[7]); -} - } //namespace details void usage(const boost::program_options::options_description& desc) @@ -531,17 +511,64 @@ bool GraftServer::initConfigOption(int argc, const char** argv, ConfigOpts& conf boost::optional log_trunc_to_size = log_conf.get_optional("trunc-to-size"); configOpts.log_trunc_to_size = (log_trunc_to_size)? log_trunc_to_size.get() : -1; + //[upstream] const boost::property_tree::ptree& uri_subst_conf = config.get_child("upstream"); - graft::OutHttp::uri_substitutions.clear(); - std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf](auto it) + configOpts.uri_substitutions.clear(); + std::vector> name_uris; + std::for_each(uri_subst_conf.begin(), uri_subst_conf.end(),[&uri_subst_conf, &configOpts, &name_uris](auto it) { - std::string name(it.first); - std::string val(uri_subst_conf.get(name)); + static std::string uri_suff("-uri"); + const std::string& name_uri(it.first); + + if(name_uri.size() <= uri_suff.size()) return; //Note, empty names skipped + if(name_uri.substr(name_uri.size() - uri_suff.size()) != uri_suff) return; - std::string uri; int cnt; bool keepAlive; double timeout; - details::parseSubstitutionItem(name, val, uri, cnt, keepAlive, timeout); - graft::OutHttp::uri_substitutions.emplace(std::move(name), std::make_tuple(std::move(uri), cnt, keepAlive, timeout)); + std::string name = name_uri.substr(0, name_uri.size() - uri_suff.size()); + std::string uri(details::trim_comments(uri_subst_conf.get(name_uri))); + + name_uris.emplace_back( std::make_pair(name, uri) ); }); + for(auto& item : name_uris) + { + const std::string& name = item.first; + const std::string& uri = item.second; + + double timeout = uri_subst_conf.get(name + "-timeout", 0); + int conn_upstream = uri_subst_conf.get(name + "-max-conns-upstream", 0); + int conn_address = uri_subst_conf.get(name + "-max-conns-address", 0); + bool keep_alive = uri_subst_conf.get(name + "-keep-alive", false); + if(!keep_alive && conn_address!=0) + { + std::ostringstream oss; + oss << config_filename << " [upstream] " << name + << "-max-conns-address other than 0 is not supported when " + << name << "-keep-alive is false"; + throw graft::exit_error(oss.str()); + } + if(keep_alive && conn_upstream!=0) + { + std::ostringstream oss; + oss << config_filename << " [upstream] " << name + << "-max-conns-upstream other than 0 is not supported when " + << name << "-keep-alive is true"; + throw graft::exit_error(oss.str()); + } + int cnt = keep_alive? conn_address : conn_upstream; + configOpts.uri_substitutions.emplace(name, std::make_tuple(uri, cnt, keep_alive, timeout)); + } + + if(configOpts.cryptonode_rpc_address[0] == '$') + { + std::string def_subst_name = configOpts.cryptonode_rpc_address.substr(1); + auto it = configOpts.uri_substitutions.find(def_subst_name); + if(it == configOpts.uri_substitutions.end()) + { + std::ostringstream oss; oss << "cannot find substitution '" << configOpts.cryptonode_rpc_address << "'"; + throw std::runtime_error(oss.str()); + } + configOpts.cryptonode_rpc_address = std::get<0>(it->second); + configOpts.default_uri_substitution_name = def_subst_name; + } return true; } diff --git a/test/fixture.h b/test/fixture.h index 9ac633fe..6ed83580 100644 --- a/test/fixture.h +++ b/test/fixture.h @@ -64,10 +64,13 @@ class GraftServerTestBase : public ::testing::Test int poll_timeout_ms = 1000; bool keepAlive = false; - using on_http_t = bool (const http_message *hm, int& status_code, std::string& headers, std::string& data); + using on_http_t = bool (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data); std::function on_http = nullptr; static std::function http_echo; + using on_poll_t = bool (mg_connection* client, int& status_code, std::string& headers, std::string& data); + std::function on_poll = nullptr; + void run() { ready = false; @@ -83,13 +86,22 @@ class GraftServerTestBase : public ::testing::Test stop = true; th.join(); } + protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) { assert(on_http); - return on_http(hm, status_code, headers, data); + return on_http(client, hm, status_code, headers, data); } virtual void onClose() { } + virtual bool onPoll(mg_connection* client) + { + if(!on_poll) return false; + int status_code = 200; + std::string headers, data; + bool res = on_poll(client, status_code, headers, data); + if(res) sendResponse(client, status_code, headers, data); + } private: std::thread th; std::atomic_bool ready; @@ -109,6 +121,16 @@ class GraftServerTestBase : public ::testing::Test mg_mgr_free(&mgr); } + void sendResponse(mg_connection* client, int& status_code, std::string& headers, std::string& data) + { + mg_send_head(client, status_code, data.size(), headers.c_str()); + mg_send(client, data.c_str(), data.size()); + if(!keepAlive) + { + client->flags |= MG_F_SEND_AND_CLOSE; + } + } + static void ev_handler_empty_s(mg_connection *client, int ev, void *ev_data) { } @@ -128,14 +150,13 @@ class GraftServerTestBase : public ::testing::Test struct http_message *hm = (struct http_message *) ev_data; int status_code = 200; std::string headers, data; - bool res = onHttpRequest(hm, status_code, headers, data); + bool res = onHttpRequest(client, hm, status_code, headers, data); if(!res) break; - mg_send_head(client, status_code, data.size(), headers.c_str()); - mg_send(client, data.c_str(), data.size()); - if(!keepAlive) - { - client->flags |= MG_F_SEND_AND_CLOSE; - } + sendResponse(client, status_code, headers, data); + } break; + case MG_EV_POLL: + { + onPoll(client); } break; case MG_EV_CLOSE: { diff --git a/test/graft_server_test.cpp b/test/graft_server_test.cpp index 50983b5e..b65a1984 100644 --- a/test/graft_server_test.cpp +++ b/test/graft_server_test.cpp @@ -5,6 +5,8 @@ #include "lib/graft/inout.h" #include "lib/graft/handler_api.h" #include "lib/graft/expiring_list.h" +#include "lib/graft/upstream_manager.h" +#include "lib/graft/GraftletLoader.h" #include "supernode/requests.h" #include "supernode/requests/sale.h" #include "supernode/requests/sale_status.h" @@ -150,58 +152,108 @@ TEST(InOut, serialization) output.loadT(a); } +namespace +{ + +class UpstreamManagerTest : public graft::UpstreamManager +{ +public: + void init(const std::string& subst_name, const std::string& subst_uri, const std::string& cryptonode_rpc_address = "") + { + graft::ConfigOpts copts; + copts.cryptonode_rpc_address = cryptonode_rpc_address; + copts.uri_substitutions.insert({subst_name, {subst_uri, 0, false, 0}}); + graftlet::GraftletLoader gl; + graft::UpstreamManager::init(gl, copts, nullptr, 0, nullptr); + } + const std::string testGetUri(const graft::Output& output) + { + return graft::UpstreamManager::testGetUri(output); + } + std::unordered_map& getResolveCache() + { + return m_resolveCache; + } +}; + +} //namespace + TEST(InOut, makeUri) { { + std::unordered_map resolve_cache; graft::Output output; std::string default_uri = "http://123.123.123.123:1234"; - std::string url = output.makeUri(default_uri); + std::string ip_port, url; + bool res = output.makeUri(default_uri, ip_port, url, resolve_cache); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "123.123.123.123:1234"); EXPECT_EQ(url, default_uri); } { + UpstreamManagerTest umt; umt.init("my_ip", "1.2.3.4"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_ip", {"1.2.3.4", 0, false, 0}}); output.proto = "https"; output.port = "4321"; output.uri = "$my_ip"; - std::string url = output.makeUri(""); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "1.2.3.4:" + output.port); EXPECT_EQ(url, output.proto + "://1.2.3.4:" + output.port); } { + UpstreamManagerTest umt; umt.init("my_path", "http://site.com:1234/endpoint?q=1&n=2"); + umt.getResolveCache().emplace("site.com","site.com"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_path", {"http://site.com:1234/endpoint?q=1&n=2", 0, false, 0}}); output.proto = "https"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(""); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "site.com:" + output.port); EXPECT_EQ(url, "https://site.com:4321/endpoint?q=1&n=2"); } { + UpstreamManagerTest umt; umt.init("my_path", "/endpoint?q=1&n=2"); + umt.getResolveCache().emplace("mysite.com","mysite.com"); graft::Output output; - graft::Output::uri_substitutions.insert({"my_path", {"endpoint?q=1&n=2", 0, false, 0}}); output.proto = "https"; output.host = "mysite.com"; output.port = "4321"; output.uri = "$my_path"; - std::string url = output.makeUri(""); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "mysite.com:" + output.port); EXPECT_EQ(url, "https://mysite.com:4321/endpoint?q=1&n=2"); } { + UpstreamManagerTest umt; umt.init("something", "1.2.3.4", "localhost:28881"); + umt.getResolveCache().emplace("aaa.bbb","aaa.bbb"); + graft::Output output; - std::string default_uri = "localhost:28881"; output.path = "json_rpc"; - std::string url = output.makeUri(default_uri); - EXPECT_EQ(url, "localhost:28881/json_rpc"); + std::string ip_port, url; + bool res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "127.0.0.1:28881"); + EXPECT_EQ(url, "127.0.0.1:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; - url = output.makeUri(default_uri); - EXPECT_EQ(url, "https://localhost:28881/json_rpc"); + res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "127.0.0.1:28881"); + EXPECT_EQ(url, "https://127.0.0.1:28881/json_rpc"); output.path = "/json_rpc"; output.proto = "https"; output.uri = "http://aaa.bbb:12345/something"; - url = output.makeUri(default_uri); + res = output.makeUri(umt.testGetUri(output), ip_port, url, umt.getResolveCache()); + EXPECT_EQ(res, true); + EXPECT_EQ(ip_port, "aaa.bbb:12345"); EXPECT_EQ(url, "https://aaa.bbb:12345/json_rpc"); } } @@ -519,7 +571,7 @@ TEST(ExpiringList, common) ///////////////////////////////// std::function GraftServerTestBase::TempCryptoNodeServer::http_echo = - [] (const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool + [] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { data = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); @@ -536,7 +588,7 @@ class GraftServerCommonTest : public GraftServerTestBase class TempCryptoN : public TempCryptoNodeServer { protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { data = std::string(hm->uri.p, hm->uri.len); graft::Context ctx(mainServer.getGcm()); @@ -1055,7 +1107,7 @@ class GraftServerPostponeTest : public GraftServerTestBase public: bool do_callback = true; protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { data = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); @@ -1256,7 +1308,7 @@ TEST_F(GraftServerTest, genericCallback) case graft::Status::None: { //find webhook endpoint - auto it = std::find_if(input.headers.begin(), input.headers.end(), [](auto& v)->bool { v.first == "X-Callback"; } ); + auto it = std::find_if(input.headers.begin(), input.headers.end(), [](auto& v)->bool { return v.first == "X-Callback"; } ); assert(it != input.headers.end()); std::string path = it->second; //"http://0.0.0.0:port/callback/" @@ -1279,7 +1331,7 @@ TEST_F(GraftServerTest, genericCallback) graft::supernode::request::registerForwardRequests(m_httpRouter); m_httpRouter.addRoute("/api/{forward:create_account|restore_account|wallet_balance|prepare_transfer|transaction_history}",METHOD_POST,{nullptr,pretend_walletnode_echo,nullptr}); - graft::Output::uri_substitutions.emplace("walletnode", std::make_tuple("http://localhost:28690/", 0, false, 0)); + m_copts.uri_substitutions.emplace("walletnode", std::make_tuple("http://127.0.0.1:28690/", 0, false, 0)); run(); std::string post_data = "some data"; @@ -1305,7 +1357,7 @@ class GraftServerBlockingTest : public GraftServerTestBase std::string answer; std::string body; protected: - virtual bool onHttpRequest(const http_message *hm, int& status_code, std::string& headers, std::string& data) override + virtual bool onHttpRequest(mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) override { body = std::string(hm->body.p, hm->body.len); std::string method(hm->method.p, hm->method.len); diff --git a/test/graftlets_test.cpp b/test/graftlets_test.cpp index fe8d11dc..1685b4a8 100644 --- a/test/graftlets_test.cpp +++ b/test/graftlets_test.cpp @@ -3,6 +3,7 @@ #include "fixture.h" #define INCLUDE_DEPENDENCY_GRAPH #include "lib/graft/GraftletLoader.h" +#include "lib/graft/upstream_manager.h" TEST(DependencyGraph, dependencies) { @@ -257,6 +258,21 @@ TEST(Graftlets, calls) EXPECT_EQ(endpoints.size(), 4); } +TEST(Graftlets, routing) +{ + graftlet::GraftletLoader loader; + + loader.findGraftletsInDirectory("./", "so"); + loader.findGraftletsInDirectory("./graftlets", "so"); + + graft::UpstreamRoutingManager upstreamRoutingManager(loader); + + std::string route = "#myGraftlet.testGL.testRouting:my_value"; + bool res = upstreamRoutingManager.route(route, route); + EXPECT_EQ(res, true); + EXPECT_EQ(route, "my_value"); +} + TEST(Graftlets, exceptionList) { #define VER(a,b) GRAFTLET_MKVER(a,b) diff --git a/test/upstream_test.cpp b/test/upstream_test.cpp index 56516653..be28c65e 100644 --- a/test/upstream_test.cpp +++ b/test/upstream_test.cpp @@ -2,128 +2,169 @@ #include "lib/graft/jsonrpc.h" #include "fixture.h" -TEST_F(GraftServerTestBase, upstreamKeepAlive) +class UpstreamTest : public GraftServerTestBase { - auto action = [&](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status +public: + MainServer m_mainServer; + + void run(const std::string& forwardUri, const int maxConn) { - switch(ctx.local.getLastStatus()) + auto action = [&](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status { - case graft::Status::None : + switch(ctx.local.getLastStatus()) + { + case graft::Status::None : + { + output.body = input.body; + output.uri = forwardUri; + return graft::Status::Forward; + } break; + case graft::Status::Forward : + { + output.body = input.body; + return graft::Status::Ok; + } break; + default: assert(false); + } + }; + + std::atomic activeCnt = 0; + std::atomic maxActiveCnt = 0; + + std::mutex mutex; + + std::map results; + + TempCryptoNodeServer crypton; + crypton.on_http = [&maxActiveCnt, &activeCnt, &mutex, &results] (mg_connection* client, const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool { - output.body = input.body; - output.uri = "$crypton"; - return graft::Status::Forward; - } break; - case graft::Status::Forward : + if(maxActiveCnt < ++activeCnt) maxActiveCnt = activeCnt.load(); + std::string body = std::string(hm->body.p, hm->body.len); + + std::thread th {[=, &mutex, &results]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::lock_guard lock(mutex); + results.emplace(std::make_pair(client, body)); + }}; + th.detach(); + return false; + }; + crypton.on_poll = [&activeCnt, &mutex, &results] (mg_connection* client, int& status_code, std::string& headers, std::string& data)->bool { - output.body = input.body; - return graft::Status::Ok; - } break; - default: assert(false); - } - }; - - TempCryptoNodeServer crypton; - crypton.on_http = [] (const http_message *hm, int& status_code, std::string& headers, std::string& data) -> bool - { - data = std::string(hm->body.p, hm->body.len); - std::string method(hm->method.p, hm->method.len); - headers = "Content-Type: application/json"; - return true; - }; - crypton.keepAlive = true; - crypton.connect_timeout_ms = 1000; - crypton.run(); - graft::Output::uri_substitutions.insert({"crypton", {"127.0.0.1:1234", 3, true, 100}}); - MainServer mainServer; - mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); - mainServer.run(); - - auto client_func = [](int i) - { - std::string post_data = "some data" + std::to_string(i); - Client client; - client.serve("http://localhost:9084/test_upstream", "", post_data, 1000, 250); - EXPECT_EQ(false, client.get_closed()); - EXPECT_EQ(200, client.get_resp_code()); - std::string s = client.get_body(); - EXPECT_EQ(s, post_data); - }; - - for(int c = 0; c < 1; ++c) - { + std::lock_guard lock(mutex); + auto it = results.find(client); + if(it == results.end()) return false; + headers = "Content-Type: application/json"; + data = it->second; + results.erase(it); + --activeCnt; + return true; + }; + crypton.keepAlive = true; + crypton.poll_timeout_ms = 100; + crypton.connect_timeout_ms = 1000; + crypton.run(); + + m_mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); + m_mainServer.run(); + + std::atomic ok_cnt = 0; + + auto client_func = [&ok_cnt](int i) + { + std::string post_data = "some data" + std::to_string(i); + Client client; + client.serve("http://localhost:9084/test_upstream", "", post_data, 100, 5000); + EXPECT_EQ(false, client.get_closed()); + EXPECT_EQ(200, client.get_resp_code()); + std::string s = client.get_body(); + EXPECT_EQ(s, post_data); + if(s == post_data) ++ok_cnt; + }; + const int th_cnt = 50; std::vector th_vec; for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); for(auto& th : th_vec) th.join(); + + if(maxConn != 0) EXPECT_EQ(maxActiveCnt, maxConn); + + EXPECT_EQ(ok_cnt, th_cnt); + + m_mainServer.stop_and_wait_for(); + crypton.stop_and_wait_for(); } +}; - mainServer.stop_and_wait_for(); - crypton.stop_and_wait_for(); +TEST_F(UpstreamTest, upstream) +{ + const int maxConn = 0; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, false, 300}}); + run("$crypton", maxConn); } +TEST_F(UpstreamTest, upstreamWithSharp) +{ + const int maxConn = 0; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, false, 300}}); + m_mainServer.m_copts.graftlet_dirs.emplace_back("graftlets"); + run("#myGraftlet.testGL.testRouting:$crypton", maxConn); +} -namespace +TEST_F(UpstreamTest, upstreamDefault) { + const int maxConn = 0; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, false, 300}}); + run("", maxConn); +} -GRAFT_DEFINE_IO_STRUCT(GetVersionResp, - (std::string, status), - (uint32_t, version) - ); +TEST_F(UpstreamTest, upstreamKeepAlive) +{ + const int maxConn = 0; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 300}}); + run("$crypton", maxConn); +} -GRAFT_DEFINE_JSON_RPC_RESPONSE_RESULT(JRResponseResult, GetVersionResp); +TEST_F(UpstreamTest, upstreamKeepAliveDefault) +{ + const int maxConn = 0; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, true, 300}}); + run("", maxConn); +} -} //namespace +TEST_F(UpstreamTest, upstreamMax) +{ + const int maxConn = 3; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, false, 1000}}); + run("$crypton", maxConn); +} -//this test requires a cryptonode on 127.0.0.1:18981 -TEST_F(GraftServerTestBase, DISABLED_cryptonodeKeepAlive) +TEST_F(UpstreamTest, upstreamMaxDefault) { - auto action = [&](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status - { - switch(ctx.local.getLastStatus()) - { - case graft::Status::None : - { - output.uri = "$cryptonode"; - graft::JsonRpcRequestHeader request; - request.method = "get_version"; - output.load(request); - return graft::Status::Forward; - } break; - case graft::Status::Forward : - { - output.body = input.body; - return graft::Status::Ok; - } break; - default: assert(false); - } - }; - - graft::Output::uri_substitutions.insert({"cryptonode", {"127.0.0.1:18981/json_rpc", 3, true, 100}}); - MainServer mainServer; - mainServer.m_router.addRoute("/test_upstream", METHOD_POST, {nullptr, action, nullptr}); - mainServer.run(); - - auto client_func = [](int i) - { - std::string post_data = "some data" + std::to_string(i); - Client client; - client.serve("http://localhost:9084/test_upstream", "", post_data, 1000, 250); - EXPECT_EQ(false, client.get_closed()); - EXPECT_EQ(200, client.get_resp_code()); - std::string s = client.get_body(); - - graft::Input in; in.load(client.get_body()); - EXPECT_NO_THROW( JRResponseResult result = in.get() ); - }; - - for(int c = 0; c < 1; ++c) - { - const int th_cnt = 50; - std::vector th_vec; - for(int i = 0; i < th_cnt; ++i) th_vec.emplace_back(std::thread([i, client_func](){ client_func(i); })); - for(auto& th : th_vec) th.join(); - } + const int maxConn = 3; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, false, 1000}}); + run("", maxConn); +} - mainServer.stop_and_wait_for(); +TEST_F(UpstreamTest, upstreamMaxKeepAlive) +{ + const int maxConn = 3; + m_mainServer.m_copts.uri_substitutions.insert({"crypton", {"127.0.0.1:1234", maxConn, true, 1000}}); + run("$crypton", maxConn); +} + +TEST_F(UpstreamTest, upstreamMaxKeepAliveDefault) +{ + const int maxConn = 3; + m_mainServer.m_copts.cryptonode_rpc_address = ""; + m_mainServer.m_copts.default_uri_substitution_name = "default"; + m_mainServer.m_copts.uri_substitutions.insert({"default", {"127.0.0.1:1234", maxConn, true, 1000}}); + run("", maxConn); }