Skip to content

Commit

Permalink
Merge pull request #210 from graft-project/alpha4
Browse files Browse the repository at this point in the history
Merge Alpha4 to development/v.0.0.2
  • Loading branch information
AlexanderSuprunenko authored Feb 14, 2019
2 parents 45c4db0 + d1a66ea commit e93c20f
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 23 deletions.
1 change: 1 addition & 0 deletions data/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ data-dir=
stake-wallet-name=stake-wallet
testnet=true
stake-wallet-refresh-interval-ms=50000
stake-wallet-refresh-interval-random-factor=0

[ipfilter]
;; path to ipfilter rules file
Expand Down
3 changes: 1 addition & 2 deletions include/lib/graft/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ std::string base64_encode(const std::string &data);
template <typename T>
T random_number(T startRange, T endRange)
{
std::random_device rd;
std::mt19937 mt(rd());
static std::mt19937 mt(std::random_device{}());
std::uniform_int_distribution<T> dist(startRange, endRange);
return dist(mt);
}
Expand Down
5 changes: 3 additions & 2 deletions include/lib/graft/handler_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class HandlerAPI
public:
virtual void sendUpstreamBlocking(Output& output, Input& input, std::string& err) = 0;
virtual bool addPeriodicTask(const Router::Handler& h_worker,
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max()) = 0;
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max(),
double random_factor = 0) = 0;
virtual request::system_info::Counter& runtimeSysInfo() = 0;
virtual const ConfigOpts& configOpts() const = 0;
};
Expand Down
19 changes: 13 additions & 6 deletions include/lib/graft/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,23 @@ class PeriodicTask : public BaseTask
PeriodicTask(
TaskManager& manager, const Router::Handler3& h3,
std::chrono::milliseconds timeout_ms,
std::chrono::milliseconds initial_timeout_ms
std::chrono::milliseconds initial_timeout_ms,
double random_factor = 0
) : BaseTask(manager, Router::JobParams({Input(), Router::vars_t(), h3}))
, m_timeout_ms(timeout_ms), m_initial_timeout_ms(initial_timeout_ms)
, m_random_factor(random_factor)
{
}

PeriodicTask(TaskManager& manager, const Router::Handler3& h3, std::chrono::milliseconds timeout_ms)
: PeriodicTask(manager, h3, timeout_ms, timeout_ms)
PeriodicTask(TaskManager& manager, const Router::Handler3& h3,
std::chrono::milliseconds timeout_ms, double random_factor = 0)
: PeriodicTask(manager, h3, timeout_ms, timeout_ms, random_factor)
{
}

std::chrono::milliseconds m_timeout_ms;
std::chrono::milliseconds m_initial_timeout_ms;
double m_random_factor;
bool m_initial_run {true};

public:
Expand Down Expand Up @@ -199,7 +203,9 @@ class TaskManager : private HandlerAPI

void sendUpstream(BaseTaskPtr bt);
void addPeriodicTask(const Router::Handler3& h3,
std::chrono::milliseconds interval_ms, std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max());
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max(),
double random_factor = 0);

////getters
virtual mg_mgr* getMgMgr() = 0;
Expand All @@ -220,7 +226,8 @@ class TaskManager : private HandlerAPI
virtual void sendUpstreamBlocking(Output& output, Input& input, std::string& err) override;
virtual bool addPeriodicTask(const Router::Handler& h_worker,
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max()) override;
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max(),
double random_factor = 0 ) override;
virtual request::system_info::Counter& runtimeSysInfo() override;
virtual const ConfigOpts& configOpts() const override;

Expand Down Expand Up @@ -282,7 +289,7 @@ class TaskManager : private HandlerAPI
using PromiseItem = UpstreamTask::PromiseItem;
using PromiseQueue = tp::MPMCBoundedQueue<PromiseItem>;

using PeridicTaskItem = std::tuple<Router::Handler3, std::chrono::milliseconds, std::chrono::milliseconds>;
using PeridicTaskItem = std::tuple<Router::Handler3, std::chrono::milliseconds, std::chrono::milliseconds, double>;
using PeriodicTaskQueue = tp::MPMCBoundedQueue<PeridicTaskItem>;

std::unique_ptr<PromiseQueue> m_promiseQueue;
Expand Down
1 change: 1 addition & 0 deletions include/supernode/supernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Supernode : public GraftServer
std::string data_dir;
std::string stake_wallet_name;
size_t stake_wallet_refresh_interval_ms;
double stake_wallet_refresh_interval_random_factor;
// runtime parameters.
// path to watch-only wallets (supernodes)
std::string watchonly_wallets_path;
Expand Down
29 changes: 19 additions & 10 deletions src/lib/graft/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "lib/graft/handler_api.h"
#include "lib/graft/expiring_list.h"
#include "lib/graft/sys_info.h"
#include "lib/graft/common/utils.h"

#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "supernode.task"
Expand Down Expand Up @@ -419,17 +420,18 @@ inline size_t TaskManager::next_pow2(size_t val)
}

bool TaskManager::addPeriodicTask(const Router::Handler& h_worker,
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms)
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms,
double random_factor)
{
if(io_thread)
{//it is called from pre_action or post_action, and we can call requestAddPeriodicTask directly
addPeriodicTask({nullptr, h_worker, nullptr}, interval_ms, initial_interval_ms);
addPeriodicTask({nullptr, h_worker, nullptr}, interval_ms, initial_interval_ms, random_factor);
return true;
}
else
{
PeridicTaskItem item = std::make_tuple(Router::Handler3(nullptr, h_worker, nullptr), interval_ms, initial_interval_ms);
PeridicTaskItem item = std::make_tuple(Router::Handler3(nullptr, h_worker, nullptr), interval_ms, initial_interval_ms, random_factor);
bool ok = m_periodicTaskQueue->push( std::move(item) );
if(!ok) return false;
notifyJobReady();
Expand Down Expand Up @@ -457,7 +459,8 @@ void TaskManager::checkPeriodicTaskIO()
Router::Handler3& h3 = std::get<0>(pti);
std::chrono::milliseconds& interval_ms = std::get<1>(pti);
std::chrono::milliseconds& initial_interval_ms = std::get<2>(pti);
addPeriodicTask(h3, interval_ms, initial_interval_ms);
double& random_factor = std::get<3>(pti);
addPeriodicTask(h3, interval_ms, initial_interval_ms, random_factor);
}
}

Expand Down Expand Up @@ -816,11 +819,11 @@ void TaskManager::processOk(BaseTaskPtr bt)
}

void TaskManager::addPeriodicTask(
const Router::Handler3& h3, std::chrono::milliseconds interval_ms, std::chrono::milliseconds initial_interval_ms)
const Router::Handler3& h3, std::chrono::milliseconds interval_ms, std::chrono::milliseconds initial_interval_ms, double random_factor)
{
if(initial_interval_ms == std::chrono::milliseconds::max()) initial_interval_ms = interval_ms;

BaseTask* bt = BaseTask::Create<PeriodicTask>(*this, h3, interval_ms, initial_interval_ms).get();
BaseTask* bt = BaseTask::Create<PeriodicTask>(*this, h3, interval_ms, initial_interval_ms, random_factor).get();
PeriodicTask* pt = dynamic_cast<PeriodicTask*>(bt);
assert(pt);
schedule(pt);
Expand Down Expand Up @@ -976,9 +979,15 @@ void PeriodicTask::finalize()

std::chrono::milliseconds PeriodicTask::getTimeout()
{
auto ret = (m_initial_run) ? m_initial_timeout_ms : m_timeout_ms;
m_initial_run = false;
return ret;
if(m_initial_run)
{
m_initial_run = false;
return m_initial_timeout_ms;
}
if(m_random_factor < 0.0001) return m_timeout_ms;
using i_type = decltype(m_timeout_ms.count());
i_type v = graft::utils::random_number(m_timeout_ms.count(), (i_type)(m_timeout_ms.count()*(1.0 + m_random_factor)));
return std::chrono::milliseconds(v);
}

ClientTask::ClientTask(ConnectionManager* connectionManager, mg_connection *client, Router::JobParams& prms)
Expand Down
4 changes: 3 additions & 1 deletion src/supernode/supernode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ bool Supernode::initConfigOption(int argc, const char** argv, ConfigOpts& config
m_configEx.stake_wallet_refresh_interval_ms = server_conf.get<size_t>("stake-wallet-refresh-interval-ms",
consts::DEFAULT_STAKE_WALLET_REFRESH_INTERFAL_MS);
m_configEx.testnet = server_conf.get<bool>("testnet", false);
m_configEx.stake_wallet_refresh_interval_random_factor = server_conf.get<double>("stake-wallet-refresh-interval-random-factor", 0);
return res;
}

Expand Down Expand Up @@ -137,7 +138,8 @@ void Supernode::startSupernodePeriodicTasks()
getLooper().addPeriodicTask(
graft::Router::Handler3(nullptr, graft::supernode::request::sendAnnounce, nullptr),
std::chrono::milliseconds(m_configEx.stake_wallet_refresh_interval_ms),
std::chrono::milliseconds(initial_interval_ms)
std::chrono::milliseconds(initial_interval_ms),
m_configEx.stake_wallet_refresh_interval_random_factor
);
}
}
Expand Down
5 changes: 3 additions & 2 deletions test/sys_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ class HandlerAPIImpl : public graft::HandlerAPI
public:
virtual void sendUpstreamBlocking(Output& output, Input& input, std::string& err) override { }
virtual bool addPeriodicTask(const Router::Handler& h_worker,
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max()) override { }
std::chrono::milliseconds interval_ms,
std::chrono::milliseconds initial_interval_ms = std::chrono::milliseconds::max(),
double random_factor = 0) override { }
virtual graft::request::system_info::Counter& runtimeSysInfo() override
{
return m_sic;
Expand Down

0 comments on commit e93c20f

Please sign in to comment.