Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FullSupernodeList: sync on framework's global context instead of mutex #220

Open
wants to merge 2 commits into
base: alpha4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/lib/graft/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class Looper final : public TaskManager
bool stopped() const { return m_stop; }

virtual mg_mgr* getMgMgr() override { return m_mgr.get(); }
void setOnStopHandler(std::function<void (void)> handler);

protected:
std::unique_ptr<mg_mgr> m_mgr;
private:
Expand All @@ -121,6 +123,7 @@ class Looper final : public TaskManager
std::atomic_bool m_ready {false};
std::atomic_bool m_stop {false};
std::atomic_bool m_forceStop {false};
std::function<void (void)> m_onStopHandler;
};

class ConnectionManager
Expand Down
6 changes: 4 additions & 2 deletions include/rta/fullsupernodelist.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "rta/supernode.h"
#include "rta/DaemonRpcClient.h"
#include "lib/graft/context.h"

#include <cryptonote_config.h>
#include <string>
Expand Down Expand Up @@ -30,8 +31,9 @@ class FullSupernodeList
static constexpr int64_t AUTH_SAMPLE_HASH_HEIGHT = 20; // block number for calculating auth sample should be calculated as current block height - AUTH_SAMPLE_HASH_HEIGHT;
static constexpr int64_t ANNOUNCE_TTL_SECONDS = 60 * 60; // if more than ANNOUNCE_TTL_SECONDS passed from last annouce - supernode excluded from auth sample selection

FullSupernodeList(const std::string &daemon_address, boost::shared_ptr<boost::asio::io_service> ios, bool testnet = false);
FullSupernodeList(const std::string &daemon_address, boost::shared_ptr<boost::asio::io_service> ios, graft::GlobalContextMap & ctxMap, bool testnet = false);
~FullSupernodeList();
void close();
/**
* @brief add - adds supernode object to a list and owns it. caller doesn't need to delete an object
* @param item - pointer to a Supernode object
Expand Down Expand Up @@ -135,9 +137,9 @@ class FullSupernodeList
std::string m_daemon_address;
bool m_testnet;
DaemonRpcClient m_rpc_client;
mutable boost::shared_mutex m_access;
std::unique_ptr<utils::ThreadPool> m_tp;
std::atomic_size_t m_refresh_counter;
mutable graft::Context m_ctx;
};

using FullSupernodeListPtr = boost::shared_ptr<FullSupernodeList>;
Expand Down
11 changes: 11 additions & 0 deletions src/lib/graft/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,14 @@ void Looper::serve()
if( stopped() && (m_forceStop || canStop()) ) break;
}

if (m_onStopHandler) {
m_onStopHandler();
}

setIOThread(false);



LOG_PRINT_L0("Server shutdown.");
}

Expand All @@ -354,6 +360,11 @@ void Looper::stop(bool force)
if(force) m_forceStop = true;
}

void Looper::setOnStopHandler(std::function<void ()> handler)
{
m_onStopHandler = handler;
}

void Looper::notifyJobReady()
{
mg_notify(m_mgr.get());
Expand Down
121 changes: 82 additions & 39 deletions src/rta/fullsupernodelist.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "rta/fullsupernodelist.h"
#include "supernode/requestdefines.h"

#include <wallet/api/wallet_manager.h>
#include <cryptonote_basic/cryptonote_basic_impl.h>
Expand Down Expand Up @@ -123,19 +124,29 @@ constexpr int32_t FullSupernodeList::TIERS, FullSupernodeList::ITEMS_PER_TIER, F
constexpr int64_t FullSupernodeList::AUTH_SAMPLE_HASH_HEIGHT, FullSupernodeList::ANNOUNCE_TTL_SECONDS;
#endif

FullSupernodeList::FullSupernodeList(const string &daemon_address, boost::shared_ptr<boost::asio::io_service> ios, bool testnet)
FullSupernodeList::FullSupernodeList(const string &daemon_address, boost::shared_ptr<boost::asio::io_service> ios,
graft::GlobalContextMap &ctxMap, bool testnet)
: m_daemon_address(daemon_address)
, m_testnet(testnet)
, m_rpc_client(daemon_address, "", "", ios)
, m_tp(new utils::ThreadPool())
, m_ctx(ctxMap)
{
m_refresh_counter = 0;
}

FullSupernodeList::~FullSupernodeList()
{
boost::unique_lock<boost::shared_mutex> writerLock(m_access);
m_list.clear();

}

void FullSupernodeList::close()
{
std::function f = [&](FullSupernodeListPtr &) {
m_list.clear();
return true;
};
m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
}

bool FullSupernodeList::add(Supernode *item)
Expand All @@ -149,9 +160,12 @@ bool FullSupernodeList::add(SupernodePtr item)
LOG_ERROR("item already exists: " << item->walletAddress());
return false;
}
std::function f = [&](FullSupernodeListPtr &)->bool {
m_list.insert(std::make_pair(item->walletAddress(), item));
return true;
};
m_ctx.global.apply("fsl", f);

boost::unique_lock<boost::shared_mutex> writerLock(m_access);
m_list.insert(std::make_pair(item->walletAddress(), item));
LOG_PRINT_L1("added supernode: " << item->walletAddress());
LOG_PRINT_L1("list size: " << m_list.size());
return true;
Expand Down Expand Up @@ -188,42 +202,63 @@ size_t FullSupernodeList::loadFromDirThreaded(const string &base_dir, size_t &fo

bool FullSupernodeList::remove(const string &address)
{
boost::unique_lock<boost::shared_mutex> readerLock(m_access);
return m_list.erase(address) > 0;
bool result = false;
std::function f = [&](FullSupernodeListPtr &)->bool {
result = m_list.erase(address) > 0;
return true;
};
m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
return result;
}

size_t FullSupernodeList::size() const
{
boost::shared_lock<boost::shared_mutex> readerLock(m_access);
return m_list.size();
size_t result = 0;
std::function f = [&](FullSupernodeListPtr &)->bool {
result = m_list.size();
return true;
};
m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
return result;
}

bool FullSupernodeList::exists(const string &address) const
{

boost::shared_lock<boost::shared_mutex> readerLock(m_access);
return m_list.find(address) != m_list.end();
bool result = false;
std::function f = [&](FullSupernodeListPtr &)->bool {
result = m_list.find(address) != m_list.end();
return true;
};
m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
return result;
}

bool FullSupernodeList::update(const string &address, const vector<Supernode::SignedKeyImage> &key_images)
{

boost::unique_lock<boost::shared_mutex> writerLock(m_access);
auto it = m_list.find(address);
if (it != m_list.end()) {
uint64_t height = 0;
return it->second->importKeyImages(key_images, height);
}
return false;
bool result = false;
std::function f = [&](FullSupernodeListPtr &)->bool {
auto it = m_list.find(address);
if (it != m_list.end()) {
uint64_t height = 0;
result = it->second->importKeyImages(key_images, height);
}
return true;
};
m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
return result;
}

SupernodePtr FullSupernodeList::get(const string &address) const
{
boost::shared_lock<boost::shared_mutex> readerLock(m_access);
auto it = m_list.find(address);
if (it != m_list.end())
return it->second;
return SupernodePtr(nullptr);
SupernodePtr result(nullptr);
std::function f = [&](FullSupernodeListPtr &)->bool {
auto it = m_list.find(address);
if (it != m_list.end())
result = it->second;
return true;
};
m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
return result;
}

bool FullSupernodeList::buildAuthSample(uint64_t height, vector<SupernodePtr> &out)
Expand All @@ -242,17 +277,21 @@ bool FullSupernodeList::buildAuthSample(uint64_t height, vector<SupernodePtr> &o

std::array<std::vector<SupernodePtr>, TIERS> tier_supernodes;
{
boost::shared_lock<boost::shared_mutex> readerLock(m_access);
int64_t now = static_cast<int64_t>(std::time(nullptr));
int64_t cutoff_time = now - ANNOUNCE_TTL_SECONDS;
for (const auto &sn_pair : m_list) {
const auto &sn = sn_pair.second;
const auto tier = sn->tier();
MTRACE("checking supernode " << sn_pair.first << ", updated: " << (now - sn->lastUpdateTime()) << "s ago"
<< ", tier: " << tier);
if (tier > 0 && sn->lastUpdateTime() >= cutoff_time)
tier_supernodes[tier - 1].push_back(sn);
}
std::function f = [&](FullSupernodeListPtr &)->bool {
int64_t now = static_cast<int64_t>(std::time(nullptr));
int64_t cutoff_time = now - ANNOUNCE_TTL_SECONDS;
for (const auto &sn_pair : m_list) {
const auto &sn = sn_pair.second;
const auto tier = sn->tier();
MTRACE("checking supernode " << sn_pair.first << ", updated: " << (now - sn->lastUpdateTime()) << "s ago"
<< ", tier: " << tier);
if (tier > 0 && sn->lastUpdateTime() >= cutoff_time)
tier_supernodes[tier - 1].push_back(sn);
}
return true;
};

m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
}

array<int, TIERS> select;
Expand Down Expand Up @@ -315,10 +354,14 @@ vector<string> FullSupernodeList::items() const
{
vector<string> result;
result.reserve(m_list.size());
boost::shared_lock<boost::shared_mutex> readerLock(m_access);
for (auto const& it: m_list)
result.push_back(it.first);

std::function f = [&](FullSupernodeListPtr &)->bool {
for (auto const& it: m_list)
result.push_back(it.first);
return true;
};

m_ctx.global.apply(CONTEXT_KEY_FULLSUPERNODELIST, f);
return result;
}

Expand Down
15 changes: 12 additions & 3 deletions src/supernode/supernode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,30 @@ void Supernode::prepareDataDir()
);

supernode->setNetworkAddress(m_configEx.http_address + "/dapi/v2.0");
//put fsl into global context


// create fullsupernode list instance and put it into global context
// TODO: doesn't really need to be a shared_ptr here, could be raw pointer
graft::FullSupernodeListPtr fsl = boost::make_shared<graft::FullSupernodeList>(
m_configEx.cryptonode_rpc_address, graft::Supernode::getIoService(),
getLooper().getGcm(),
m_configEx.testnet);

fsl->add(supernode);

//put fsl into global context
getLooper().setOnStopHandler([fsl]() {
fsl->close();
});
Context ctx(getLooper().getGcm());
ctx.global["supernode"] = supernode;
ctx.global[CONTEXT_KEY_FULLSUPERNODELIST] = fsl;
ctx.global["testnet"] = m_configEx.testnet;
ctx.global["watchonly_wallets_path"] = m_configEx.watchonly_wallets_path;
ctx.global["cryptonode_rpc_address"] = m_configEx.cryptonode_rpc_address;

fsl->add(supernode);



}

void Supernode::initMisc(ConfigOpts& configOpts)
Expand Down
7 changes: 5 additions & 2 deletions test/rta_classes_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ struct FullSupernodeListTest : public ::testing::Test
}
};


#if 0
TEST_F(FullSupernodeListTest, basic)
{

MGINFO_YELLOW("*** This test requires running cryptonode RPC on localhost:28881. If not running, test will fail ***");


const std::string wallet_path1 = "supernode_tier1_1";
const std::string wallet_path2 = "supernode_tier1_2";

Expand Down Expand Up @@ -211,6 +213,7 @@ TEST_F(FullSupernodeListTest, basic)
EXPECT_TRUE(sn_list.remove(sn1.walletAddress()));

EXPECT_TRUE(sn_list.size() == 0);

}


Expand Down Expand Up @@ -544,4 +547,4 @@ TEST_F(FullSupernodeListTest, announce1)
ASSERT_TRUE(watch_only_sn2.get() == nullptr);

}

#endif