Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UpstreamManager dependency on TaskManager removed. #186

Open
wants to merge 8 commits into
base: development/v.0.0.2
Choose a base branch
from
36 changes: 36 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
cmake_minimum_required(VERSION 3.10)

if(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE RelWithDebInfo)
endif(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE)

if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo)
message("==> The configuration is ${CMAKE_BUILD_TYPE}. Debug info will be extracted into separate files.")

function (add_executable _name)
_add_executable(${ARGV})

if (TARGET ${_name})
add_custom_command(TARGET ${_name} POST_BUILD
COMMAND echo "$<TARGET_FILE_NAME:${_name}>: extracting debug info"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --only-keep-debug "$<TARGET_FILE_NAME:${_name}>" "$<TARGET_FILE_NAME:${_name}>.debug"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> strip --strip-debug --strip-unneeded "$<TARGET_FILE_NAME:${_name}>"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --add-gnu-debuglink="$<TARGET_FILE_NAME:${_name}>.debug" "$<TARGET_FILE_NAME:${_name}>"
)
endif()
endfunction()

function (add_library _name _type)
_add_library(${ARGV})

if (TARGET ${_name} AND ${_type} STREQUAL SHARED)
add_custom_command(TARGET ${_name} POST_BUILD
COMMAND echo "$<TARGET_FILE_NAME:${_name}>: extracting debug info"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --only-keep-debug "$<TARGET_FILE_NAME:${_name}>" "$<TARGET_FILE_NAME:${_name}>.debug"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> strip --strip-debug --strip-unneeded "$<TARGET_FILE_NAME:${_name}>"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --add-gnu-debuglink="$<TARGET_FILE_NAME:${_name}>.debug" "$<TARGET_FILE_NAME:${_name}>"
)
endif()
endfunction()

endif(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo)

project(graft_server)

option(OPT_BUILD_TESTS "Build tests." OFF)
Expand Down Expand Up @@ -155,6 +190,7 @@ add_library(graft STATIC
${PROJECT_SOURCE_DIR}/src/lib/graft/mongoosex.cpp
${PROJECT_SOURCE_DIR}/src/lib/graft/router.cpp
${PROJECT_SOURCE_DIR}/src/lib/graft/task.cpp
${PROJECT_SOURCE_DIR}/src/lib/graft/upstream_manager.cpp
${PROJECT_SOURCE_DIR}/modules/mongoose/mongoose.c
${PROJECT_SOURCE_DIR}/src/supernode/server.cpp
${PROJECT_SOURCE_DIR}/src/supernode/supernode.cpp
Expand Down
7 changes: 5 additions & 2 deletions include/lib/graft/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ class UpstreamSender : public SelfHolder<UpstreamSender>

BaseTaskPtr& getTask() { return m_bt; }

void send(TaskManager& manager, const std::string& uri);
void send(mg_mgr* mgr, int http_callback_port, const std::string& uri);
Status getStatus() const { return m_status; }
const std::string& getError() const { return m_error; }
size_t getRequestSize() const { return m_requestSize; }

void ev_handler(mg_connection* upstream, int ev, void *ev_data);
private:
Expand All @@ -94,14 +95,15 @@ class UpstreamSender : public SelfHolder<UpstreamSender>
mg_connection* m_upstream = nullptr;
Status m_status = Status::None;
std::string m_error;
size_t m_requestSize = 0;
};

class ConnectionBase;

class Looper final : public TaskManager
{
public:
Looper(const ConfigOpts& copts, ConnectionBase& connectionBase);
Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase);
virtual ~Looper();

void serve();
Expand Down Expand Up @@ -193,6 +195,7 @@ class ConnectionBase final
std::unique_ptr<SysInfoCounter> m_sysInfo;
std::atomic_bool m_looperReady{false};
std::unique_ptr<Looper> m_looper;
std::unique_ptr<UpstreamManager> m_upstreamManager;
std::map<ConnectionManager::Proto, std::unique_ptr<ConnectionManager>> m_conManagers;
};

Expand Down
1 change: 0 additions & 1 deletion include/lib/graft/inout.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ namespace graft

std::string port;
std::string path;
static std::unordered_map<std::string, std::tuple<std::string,int,bool,double>> uri_substitutions;
};

class InHttp final : public InOutHttpBase
Expand Down
2 changes: 2 additions & 0 deletions include/lib/graft/serveropts.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>
#include <vector>
#include <unordered_map>
#include <cassert>

namespace graft {
Expand Down Expand Up @@ -30,6 +31,7 @@ struct ConfigOpts
std::vector<std::string> graftlet_dirs;
int lru_timeout_ms;
IPFilterOpts ipfilter;
std::unordered_map<std::string, std::tuple<std::string,int,bool,double>> uri_substitutions;

void check_asserts() const
{
Expand Down
5 changes: 3 additions & 2 deletions include/lib/graft/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace graft
{
extern std::string client_addr(mg_connection* client);
extern std::string client_host(mg_connection* client);
extern unsigned int port_from_uri(const std::string& uri);

class UpstreamSender;
class TaskManager;
Expand Down Expand Up @@ -192,7 +193,7 @@ class UpstreamManager;
class TaskManager : private HandlerAPI
{
public:
TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter);
TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter);
virtual ~TaskManager();
TaskManager(const TaskManager&) = delete;
TaskManager& operator = (const TaskManager&) = delete;
Expand Down Expand Up @@ -259,6 +260,7 @@ class TaskManager : private HandlerAPI

static inline size_t next_pow2(size_t val);

UpstreamManager& m_upstreamManager;
SysInfoCounter& m_sysInfoCounter;
GlobalContextMap m_gcm;

Expand All @@ -277,7 +279,6 @@ class TaskManager : private HandlerAPI
std::deque<BaseTaskPtr> m_readyToResume;
std::priority_queue<std::pair<std::chrono::time_point<std::chrono::steady_clock>,Context::uuid_t>> m_expireTaskQueue;
std::unique_ptr<ExpiringList> m_futurePostponeUuids;
std::unique_ptr<UpstreamManager> m_upstreamManager;

using PromiseItem = UpstreamTask::PromiseItem;
using PromiseQueue = tp::MPMCBoundedQueue<PromiseItem>;
Expand Down
82 changes: 82 additions & 0 deletions include/lib/graft/upstream_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#pragma once
#include "lib/graft/connection.h"


namespace graft
{

class UpstreamManager
{
public:
using OnDoneCallback = std::function<void(UpstreamSender& uss)>;
UpstreamManager() = default;

void init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback);

bool busy() const
{
return (m_cntUpstreamSender != m_cntUpstreamSenderDone);
}

void send(BaseTaskPtr bt);
protected:
const std::string getUri(const std::string& inputUri);
private:
class ConnItem
{
public:
using ConnectionId = uint64_t;

ConnItem() = default;
ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout)
: m_uriId(uriId)
, m_uri(uri)
, m_maxConnections(maxConnections)
, m_keepAlive(keepAlive)
, m_timeout(timeout)
{
}
~ConnItem()
{
assert(m_idleConnections.empty());
assert(m_activeConnections.empty());
}

std::pair<ConnectionId, mg_connection*> getConnection();
void releaseActive(ConnectionId connectionId, mg_connection* client);
void onCloseIdle(mg_connection* client);

int m_connCnt = 0;
int m_uriId;
std::string m_uri;
double m_timeout;
//assert(m_upstreamQueue.empty() || 0 < m_maxConnections);
int m_maxConnections;
std::deque<BaseTaskPtr> m_taskQueue;
bool m_keepAlive = false;
std::map<mg_connection*, ConnectionId> m_idleConnections;
std::map<ConnectionId, mg_connection*> m_activeConnections;
UpstreamStub m_upstreamStub;
private:
ConnectionId m_newId = 0;
};

void onDone(UpstreamSender& uss, ConnItem* connItem, ConnItem::ConnectionId connectionId, mg_connection* client);
void createUpstreamSender(ConnItem* connItem, BaseTaskPtr bt);
ConnItem* findConnItem(const std::string& inputUri);
const std::string& getUri(ConnItem* connItem, const std::string& inputUri);

using Uri2ConnItem = std::map<std::string, ConnItem>;

OnDoneCallback m_onDoneCallback;

uint64_t m_cntUpstreamSender = 0;
uint64_t m_cntUpstreamSenderDone = 0;
ConnItem m_default;
Uri2ConnItem m_conn2item;
mg_mgr* m_mgr = nullptr;
int m_http_callback_port;
};

}//namespace graft

43 changes: 23 additions & 20 deletions src/lib/graft/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "lib/graft/mongoosex.h"
#include "lib/graft/sys_info.h"
#include "lib/graft/graft_exception.h"
#include "lib/graft/upstream_manager.h"

#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "supernode.connection"
Expand All @@ -23,6 +24,16 @@ std::string client_host(mg_connection* client)
return inet_ntoa(client->sa.sin.sin_addr);
}

unsigned int port_from_uri(const std::string& uri)
{
assert(!uri.empty());
mg_str mg_uri{uri.c_str(), uri.size()};
unsigned int mg_port = 0;
int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0);
assert(0<=res);
return mg_port;
}

void* getUserData(mg_mgr* mgr) { return mgr->user_data; }
void* getUserData(mg_connection* nc) { return nc->user_data; }
mg_mgr* getMgr(mg_connection* nc) { return nc->mgr; }
Expand Down Expand Up @@ -60,28 +71,18 @@ void UpstreamStub::ev_handler(mg_connection *upstream, int ev, void *ev_data)
}


void UpstreamSender::send(TaskManager &manager, const std::string& def_uri)
void UpstreamSender::send(mg_mgr* mgr, int http_callback_port, const std::string& def_uri)
{
assert(m_bt);

const ConfigOpts& opts = manager.getCopts();

Output& output = m_bt->getOutput();
std::string url = output.makeUri(def_uri);

Context::uuid_t callback_uuid = m_bt->getCtx().getId(false);
if(!callback_uuid.is_nil())
{//add extra header
unsigned int mg_port = 0;
{
std::string uri = opts.http_address;
assert(!uri.empty());
mg_str mg_uri{uri.c_str(), uri.size()};
int res = mg_parse_uri(mg_uri, 0, 0, 0, &mg_port, 0, 0, 0);
assert(0<=res);
}
std::stringstream ss;
ss << "http://0.0.0.0:" << mg_port << "/callback/" << boost::uuids::to_string(callback_uuid);
ss << "http://0.0.0.0:" << http_callback_port << "/callback/" << boost::uuids::to_string(callback_uuid);
output.headers.emplace_back(std::make_pair("X-Callback", ss.str()));
}
std::string extra_headers = output.combine_headers();
Expand All @@ -95,7 +96,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri)
m_upstream->user_data = this;
m_upstream->handler = static_ev_handler<UpstreamSender>;
}
mg_connection* upstream = mg::mg_connect_http_x(m_upstream, manager.getMgMgr(), static_ev_handler<UpstreamSender>, url.c_str(),
mg_connection* upstream = mg::mg_connect_http_x(m_upstream, mgr, static_ev_handler<UpstreamSender>, url.c_str(),
extra_headers.c_str(),
body); //body.empty() means GET
assert(upstream != nullptr && (m_upstream == nullptr || m_upstream == upstream));
Expand All @@ -106,9 +107,7 @@ void UpstreamSender::send(TaskManager &manager, const std::string& def_uri)
}
mg_set_timer(m_upstream, mg_time() + m_timeout);

auto& rsi = manager.runtimeSysInfo();
rsi.count_upstrm_http_req();
rsi.count_upstrm_http_req_bytes_raw(url.size() + extra_headers.size() + body.size());
m_requestSize = url.size() + extra_headers.size() + body.size();
}

void UpstreamSender::ev_handler(mg_connection *upstream, int ev, void *ev_data)
Expand Down Expand Up @@ -240,8 +239,12 @@ void ConnectionBase::createSystemInfoCounter()

void ConnectionBase::createLooper(ConfigOpts& configOpts)
{
assert(m_sysInfo && !m_looper);
m_looper = std::make_unique<Looper>(configOpts, *this);
assert(m_sysInfo && !m_looper && !m_upstreamManager);
m_upstreamManager = std::make_unique<UpstreamManager>();

m_looper = std::make_unique<Looper>(configOpts, *m_upstreamManager, *this);
m_upstreamManager->init(configOpts, m_looper->getMgMgr(), port_from_uri(configOpts.http_address),
[this](UpstreamSender& uss){ m_looper->onUpstreamDone(uss); });
m_looperReady = true;
}

Expand Down Expand Up @@ -290,8 +293,8 @@ void ConnectionBase::checkRoutes(graft::ConnectionManager& cm)
}


Looper::Looper(const ConfigOpts& copts, ConnectionBase& connectionBase)
: TaskManager(copts, connectionBase.getSysInfoCounter())
Looper::Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase)
: TaskManager(copts, upstreamManager, connectionBase.getSysInfoCounter())
, m_mgr(std::make_unique<mg_mgr>())
{
mg_mgr_init(m_mgr.get(), &connectionBase, cb_event);
Expand Down
1 change: 0 additions & 1 deletion src/lib/graft/inout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace graft
{
std::unordered_map<std::string, std::tuple<std::string,int,bool,double>> OutHttp::uri_substitutions;

void InOutHttpBase::set_str_field(const http_message& hm, const mg_str& str_fld, std::string& fld)
{
Expand Down
Loading