diff --git a/libraries/chain/include/eosio/chain/hotstuff.hpp b/libraries/chain/include/eosio/chain/hotstuff.hpp index 2db473827d..a3267f1d6b 100644 --- a/libraries/chain/include/eosio/chain/hotstuff.hpp +++ b/libraries/chain/include/eosio/chain/hotstuff.hpp @@ -54,7 +54,12 @@ namespace eosio::chain { quorum_certificate_message high_qc; //justification }; - using hs_message = std::variant; + using hs_message_inner = std::variant; + + struct hs_message { + std::optional connection_id; + hs_message_inner message; + }; struct finalizer_state { bool chained_mode = false; diff --git a/libraries/hotstuff/chain_pacemaker.cpp b/libraries/hotstuff/chain_pacemaker.cpp index 1e55bda11a..aac831d5e2 100644 --- a/libraries/hotstuff/chain_pacemaker.cpp +++ b/libraries/hotstuff/chain_pacemaker.cpp @@ -292,65 +292,66 @@ namespace eosio { namespace hotstuff { prof.core_out(); } - void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& connection_id) { + bcast_hs_message( { connection_id, msg } ); } - void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& connection_id) { + bcast_hs_message( { connection_id, msg } ); } - void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& connection_id) { + bcast_hs_message( { connection_id, msg } ); } - void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& connection_id) { + bcast_hs_message( { connection_id, msg } ); } // called from net threads void chain_pacemaker::on_hs_msg(const eosio::chain::hs_message &msg) { + uint32_t connection_id = msg.connection_id.value(); std::visit(overloaded{ - [this](const hs_vote_message& m) { on_hs_vote_msg(m); }, - [this](const hs_proposal_message& m) { on_hs_proposal_msg(m); }, - [this](const hs_new_block_message& m) { on_hs_new_block_msg(m); }, - [this](const hs_new_view_message& m) { on_hs_new_view_msg(m); }, - }, msg); + [this, connection_id](const hs_vote_message& m) { on_hs_vote_msg(connection_id, m); }, + [this, connection_id](const hs_proposal_message& m) { on_hs_proposal_msg(connection_id, m); }, + [this, connection_id](const hs_new_block_message& m) { on_hs_new_block_msg(connection_id, m); }, + [this, connection_id](const hs_new_view_message& m) { on_hs_new_view_msg(connection_id, m); }, + }, msg.message); } // called from net threads - void chain_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg) { + void chain_pacemaker::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) { csc prof("prop"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_proposal_msg(msg); + _qc_chain.on_hs_proposal_msg(msg, std::make_optional(connection_id)); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_vote_msg(const hs_vote_message& msg) { + void chain_pacemaker::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) { csc prof("vote"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_vote_msg(msg); + _qc_chain.on_hs_vote_msg(msg, std::make_optional(connection_id)); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_new_block_msg(const hs_new_block_message& msg) { + void chain_pacemaker::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) { csc prof("nblk"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_new_block_msg(msg); + _qc_chain.on_hs_new_block_msg(msg, std::make_optional(connection_id)); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_new_view_msg(const hs_new_view_message& msg) { + void chain_pacemaker::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) { csc prof("view"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_new_view_msg(msg); + _qc_chain.on_hs_new_view_msg(msg, std::make_optional(connection_id)); prof.core_out(); } diff --git a/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp index 7fce189948..3dace899b6 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp @@ -36,10 +36,10 @@ namespace eosio::hotstuff { virtual std::vector get_finalizers() = 0; //outbound communications; 'id' is the producer name (can be ignored if/when irrelevant to the implementer) - virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id) = 0; - virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id) = 0; - virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id) = 0; - virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id) = 0; + virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id, const std::optional& connection_id = std::nullopt) = 0; + virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id, const std::optional& connection_id = std::nullopt) = 0; + virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id, const std::optional& connection_id = std::nullopt) = 0; + virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id, const std::optional& connection_id = std::nullopt) = 0; }; } // namespace eosio::hotstuff diff --git a/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp index a16a583a7f..685946acc8 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp @@ -40,18 +40,18 @@ namespace eosio::hotstuff { uint32_t get_quorum_threshold(); - void send_hs_proposal_msg(const hs_proposal_message& msg, name id); - void send_hs_vote_msg(const hs_vote_message& msg, name id); - void send_hs_new_view_msg(const hs_new_view_message& msg, name id); - void send_hs_new_block_msg(const hs_new_block_message& msg, name id); + void send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& connection_id); + void send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& connection_id); + void send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& connection_id); + void send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& connection_id); private: void on_accepted_block( const block_state_ptr& blk ); - void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler - void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler - void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler - void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler + void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //consensus msg event handler + void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //confirmation msg event handler + void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler + void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler private: //FIXME/REMOVE: for testing/debugging only diff --git a/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp b/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp index c410d7d769..13e79d15e2 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp @@ -107,10 +107,10 @@ namespace eosio::hotstuff { void on_beat(); //handler for pacemaker beat() - void on_hs_vote_msg(const hs_vote_message& msg); //vote msg event handler - void on_hs_proposal_msg(const hs_proposal_message& msg); //proposal msg event handler - void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler - void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler + void on_hs_vote_msg(const hs_vote_message& msg, const std::optional& connection_id = std::nullopt); //vote msg event handler + void on_hs_proposal_msg(const hs_proposal_message& msg, const std::optional& connection_id = std::nullopt); //proposal msg event handler + void on_hs_new_view_msg(const hs_new_view_message& msg, const std::optional& connection_id = std::nullopt); //new view msg event handler + void on_hs_new_block_msg(const hs_new_block_message& msg, const std::optional& connection_id = std::nullopt); //new block msg event handler private: @@ -139,10 +139,10 @@ namespace eosio::hotstuff { bool am_i_leader(); //check if I am the current leader bool am_i_finalizer(); //check if I am one of the current finalizers - void process_proposal(const hs_proposal_message& msg); //handles proposal - void process_vote(const hs_vote_message& msg); //handles vote - void process_new_view(const hs_new_view_message& msg); //handles new view - void process_new_block(const hs_new_block_message& msg); //handles new block + void process_proposal(const hs_proposal_message& msg, bool is_loopback = false); //handles proposal + void process_vote(const hs_vote_message& msg, bool is_loopback = false); //handles vote + void process_new_view(const hs_new_view_message& msg, bool is_loopback = false); //handles new view + void process_new_block(const hs_new_block_message& msg, bool is_loopback = false); //handles new block hs_vote_message sign_proposal(const hs_proposal_message& proposal, name finalizer); //sign proposal @@ -156,10 +156,10 @@ namespace eosio::hotstuff { std::vector get_qc_chain(const fc::sha256& proposal_id); //get 3-phase proposal justification - void send_hs_proposal_msg(const hs_proposal_message& msg); //send vote msg - void send_hs_vote_msg(const hs_vote_message& msg); //send proposal msg - void send_hs_new_view_msg(const hs_new_view_message& msg); //send new view msg - void send_hs_new_block_msg(const hs_new_block_message& msg); //send new block msg + void send_hs_proposal_msg(const hs_proposal_message& msg, bool is_forwarding = false); //send vote msg + void send_hs_vote_msg(const hs_vote_message& msg, bool is_forwarding = false); //send proposal msg + void send_hs_new_view_msg(const hs_new_view_message& msg, bool is_forwarding = false); //send new view msg + void send_hs_new_block_msg(const hs_new_block_message& msg, bool is_forwarding = false); //send new block msg void update(const hs_proposal_message& proposal); //update internal state void commit(const hs_proposal_message& proposal); //commit proposal (finality) @@ -232,6 +232,7 @@ namespace eosio::hotstuff { proposal_store_type _proposal_store; //internal proposals store #endif + std::optional _connection_id; // last net_plugin sender }; } /// eosio::hotstuff diff --git a/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp index 567eb44465..063442d04c 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp @@ -57,10 +57,10 @@ namespace eosio { namespace hotstuff { uint32_t get_quorum_threshold(); - void send_hs_proposal_msg(const hs_proposal_message & msg, name id); - void send_hs_vote_msg(const hs_vote_message & msg, name id); - void send_hs_new_block_msg(const hs_new_block_message & msg, name id); - void send_hs_new_view_msg(const hs_new_view_message & msg, name id); + void send_hs_proposal_msg(const hs_proposal_message & msg, name id, const std::optional& connection_id); + void send_hs_vote_msg(const hs_vote_message & msg, name id, const std::optional& connection_id); + void send_hs_new_block_msg(const hs_new_block_message & msg, name id, const std::optional& connection_id); + void send_hs_new_view_msg(const hs_new_view_message & msg, name id, const std::optional& connection_id); std::vector _pending_message_queue; diff --git a/libraries/hotstuff/qc_chain.cpp b/libraries/hotstuff/qc_chain.cpp index 2ee77ad3b1..89511cf65f 100644 --- a/libraries/hotstuff/qc_chain.cpp +++ b/libraries/hotstuff/qc_chain.cpp @@ -286,7 +286,7 @@ namespace eosio::hotstuff { return v_msg; } - void qc_chain::process_proposal(const hs_proposal_message & proposal){ + void qc_chain::process_proposal(const hs_proposal_message & proposal, bool is_loopback){ //auto start = fc::time_point::now(); @@ -412,6 +412,13 @@ namespace eosio::hotstuff { //update internal state update(proposal); + // Conditional propagation of proposal messages: we only rebroadcast proposal + // messages that have been considered new and relevant to this node; do not + // rebroadcast if loopback (receiving our own proposal message). + if (!is_loopback) { + send_hs_proposal_msg(proposal, true); + } + for (auto &msg : msgs) { send_hs_vote_msg(msg); } @@ -423,7 +430,7 @@ namespace eosio::hotstuff { //fc_dlog(_logger, " ... process_proposal() total time : ${total_time}", ("total_time", total_time)); } - void qc_chain::process_vote(const hs_vote_message & vote){ + void qc_chain::process_vote(const hs_vote_message & vote, bool is_loopback){ //auto start = fc::time_point::now(); #warning check for duplicate or invalid vote. We will return in either case, but keep proposals for evidence of double signing @@ -431,8 +438,16 @@ namespace eosio::hotstuff { bool am_leader = am_i_leader(); - if (!am_leader) + if (!am_leader) { + // Unconditional propagation of vote messages: we rebroadcast all votes, + // unless we are the leader, which is the single intended recipient. + // Do not rebroadcast if loopback (receiving our own vote message). + if (!is_loopback) { + send_hs_vote_msg(vote, true); + } return; + } + fc_tlog(_logger, " === Process vote from ${finalizer} : current bitset ${value}" , ("finalizer", vote.finalizer)("value", _current_qc.get_active_finalizers_string())); // only leader need to take action on votes @@ -504,21 +519,31 @@ namespace eosio::hotstuff { //fc_tlog(_logger, " ... process_vote() total time : ${total_time}", ("total_time", total_time)); } - void qc_chain::process_new_view(const hs_new_view_message & msg){ + void qc_chain::process_new_view(const hs_new_view_message & msg, bool is_loopback){ fc_tlog(_logger, " === ${id} process_new_view === ${qc}", ("qc", msg.high_qc)("id", _id)); + + // Unconditional propagation of new-view messages. + if (!is_loopback) { // note: is_loopback==true may never happen here + send_hs_new_view_msg(msg, true); + } + auto increment_version = fc::make_scoped_exit([this]() { ++_state_version; }); if (!update_high_qc(quorum_certificate{msg.high_qc})) { increment_version.cancel(); } } - void qc_chain::process_new_block(const hs_new_block_message & msg){ + void qc_chain::process_new_block(const hs_new_block_message & msg, bool is_loopback){ // If I'm not a leader, I probably don't care about hs-new-block messages. #warning check for a need to gossip/rebroadcast even if it's not for us (maybe here, maybe somewhere else). // TODO: check for a need to gossip/rebroadcast even if it's not for us (maybe here, maybe somewhere else). if (! am_i_leader()) { fc_tlog(_logger, " === ${id} process_new_block === discarding because I'm not the leader; block_id : ${bid}, justify : ${just}", ("bid", msg.block_id)("just", msg.justify)("id", _id)); + // Unconditional propagation of new-block messages (if not the leader, which is the final recipient). + if (!is_loopback) { + send_hs_new_block_msg(msg, true); // note: is_loopback==true may never happen here + } return; } @@ -568,26 +593,28 @@ namespace eosio::hotstuff { } } - void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg){ + void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg, bool is_forwarding){ fc_tlog(_logger, " === broadcast_hs_proposal ==="); - _pacemaker->send_hs_proposal_msg(msg, _id); - process_proposal(msg); + _pacemaker->send_hs_proposal_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt); + if (!is_forwarding) + process_proposal(msg, true); } - void qc_chain::send_hs_vote_msg(const hs_vote_message & msg){ + void qc_chain::send_hs_vote_msg(const hs_vote_message & msg, bool is_forwarding){ fc_tlog(_logger, " === broadcast_hs_vote ==="); - _pacemaker->send_hs_vote_msg(msg, _id); - process_vote(msg); + _pacemaker->send_hs_vote_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt); + if (!is_forwarding) + process_vote(msg, true); } - void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg){ + void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg, bool is_forwarding){ fc_tlog(_logger, " === broadcast_hs_new_view ==="); - _pacemaker->send_hs_new_view_msg(msg, _id); + _pacemaker->send_hs_new_view_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt); } - void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg){ + void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg, bool is_forwarding){ fc_tlog(_logger, " === broadcast_hs_new_block ==="); - _pacemaker->send_hs_new_block_msg(msg, _id); + _pacemaker->send_hs_new_block_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt); } //extends predicate @@ -626,6 +653,8 @@ namespace eosio::hotstuff { // Called from the main application thread void qc_chain::on_beat(){ + _connection_id = std::nullopt; + // Non-proposing leaders do not care about on_beat(), because leaders react to a block proposal // which comes from processing an incoming new block message from a proposer instead. // on_beat() is called by the pacemaker, which decides when it's time to check whether we are @@ -830,22 +859,26 @@ namespace eosio::hotstuff { } //on proposal received, called from network thread - void qc_chain::on_hs_proposal_msg(const hs_proposal_message& msg) { + void qc_chain::on_hs_proposal_msg(const hs_proposal_message& msg, const std::optional& connection_id) { + _connection_id = connection_id; process_proposal(msg); } //on vote received, called from network thread - void qc_chain::on_hs_vote_msg(const hs_vote_message& msg) { + void qc_chain::on_hs_vote_msg(const hs_vote_message& msg, const std::optional& connection_id) { + _connection_id = connection_id; process_vote(msg); } //on new view received, called from network thread - void qc_chain::on_hs_new_view_msg(const hs_new_view_message& msg) { + void qc_chain::on_hs_new_view_msg(const hs_new_view_message& msg, const std::optional& connection_id) { + _connection_id = connection_id; process_new_view(msg); } //on new block received, called from network thread - void qc_chain::on_hs_new_block_msg(const hs_new_block_message& msg) { + void qc_chain::on_hs_new_block_msg(const hs_new_block_message& msg, const std::optional& connection_id) { + _connection_id = connection_id; process_new_block(msg); } diff --git a/libraries/hotstuff/test/test_pacemaker.cpp b/libraries/hotstuff/test/test_pacemaker.cpp index de42e32ff1..3f97605c8f 100644 --- a/libraries/hotstuff/test/test_pacemaker.cpp +++ b/libraries/hotstuff/test/test_pacemaker.cpp @@ -158,20 +158,24 @@ namespace eosio::hotstuff { _qcc_store.emplace( name, qcc_ptr ); }; - void test_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) { - _pending_message_queue.push_back(std::make_pair(id, msg)); + void test_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& connection_id) { + if (!connection_id.has_value()) + _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) { - _pending_message_queue.push_back(std::make_pair(id, msg)); + void test_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& connection_id) { + if (!connection_id.has_value()) + _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) { - _pending_message_queue.push_back(std::make_pair(id, msg)); + void test_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& connection_id) { + if (!connection_id.has_value()) + _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) { - _pending_message_queue.push_back(std::make_pair(id, msg)); + void test_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& connection_id) { + if (!connection_id.has_value()) + _pending_message_queue.push_back(std::make_pair(id, msg)); }; void test_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg, name id) { @@ -180,7 +184,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_proposal_msg(msg); + qcc_ptr->on_hs_proposal_msg(msg, std::make_optional(0)); qc_itr++; } } @@ -191,7 +195,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_vote_msg(msg); + qcc_ptr->on_hs_vote_msg(msg, std::make_optional(0)); qc_itr++; } } @@ -202,7 +206,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_new_block_msg(msg); + qcc_ptr->on_hs_new_block_msg(msg, std::make_optional(0)); qc_itr++; } } @@ -213,7 +217,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_new_view_msg(msg); + qcc_ptr->on_hs_new_view_msg(msg, std::make_optional(0)); qc_itr++; } } diff --git a/libraries/libfc/include/fc/crypto/bls_signature.hpp b/libraries/libfc/include/fc/crypto/bls_signature.hpp index e87f2f6253..a457088eeb 100644 --- a/libraries/libfc/include/fc/crypto/bls_signature.hpp +++ b/libraries/libfc/include/fc/crypto/bls_signature.hpp @@ -8,6 +8,8 @@ #include #include +#include + namespace fc::crypto::blslib { namespace config { @@ -34,6 +36,21 @@ namespace fc::crypto::blslib { } // fc::crypto::blslib +// for std::unordered_set +namespace std { + template <> + struct hash { + size_t operator()(const fc::crypto::blslib::bls_signature& obj) const { + size_t seed = 0; + const auto& x_c0_d = obj._sig.x.c0.d; + for (const auto& val : x_c0_d) seed ^= val; + const auto& x_c1_d = obj._sig.x.c1.d; + for (const auto& val : x_c1_d) seed ^= val; + return seed; + } + }; +} + namespace fc { void to_variant(const crypto::blslib::bls_signature& var, variant& vo, const yield_function_t& yield = yield_function_t()); @@ -43,4 +60,4 @@ namespace fc { FC_REFLECT(bls12_381::fp, (d)) FC_REFLECT(bls12_381::fp2, (c0)(c1)) FC_REFLECT(bls12_381::g2, (x)(y)(z)) -FC_REFLECT(crypto::blslib::bls_signature, (_sig) ) \ No newline at end of file +FC_REFLECT(crypto::blslib::bls_signature, (_sig) ) diff --git a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp index 811885768a..35b3935ccb 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp @@ -142,7 +142,7 @@ namespace eosio { sync_request_message, signed_block, packed_transaction, - hs_message>; + hs_message_inner>; } // namespace eosio diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a8bb78dc00..e2834c7a7e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -301,7 +301,7 @@ namespace eosio { bool have_txn( const transaction_id_type& tid ) const; void expire_txns(); - void bcast_msg( send_buffer_type msg ); + void bcast_msg( const std::optional& connection_id, send_buffer_type msg ); void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) { std::optional rm_blk_id = unlinkable_block_cache.add_unlinkable_block(std::move(b), id); @@ -335,6 +335,7 @@ namespace eosio { constexpr uint32_t signed_block_which = fc::get_index(); // see protocol net_message constexpr uint32_t packed_transaction_which = fc::get_index(); // see protocol net_message + constexpr uint32_t hs_message_which = fc::get_index(); // see protocol net_message class connections_manager { alignas(hardware_destructive_interference_size) @@ -483,6 +484,11 @@ namespace eosio { mutable fc::mutex chain_info_mtx; // protects chain_info_t chain_info_t chain_info GUARDED_BY(chain_info_mtx); + mutable fc::mutex seen_hs_msgs_mtx; + std::vector> seen_hs_msgs_buckets GUARDED_BY(seen_hs_msgs_mtx); + uint64_t seen_hs_msgs_current_bucket = 0 GUARDED_BY(seen_hs_msgs_mtx); + const uint64_t seen_hs_msgs_bucket_max_size = 65536; + public: void update_chain_info(); chain_info_t get_chain_info() const; @@ -496,6 +502,8 @@ namespace eosio { void on_irreversible_block( const block_state_ptr& block ); void bcast_hs_message( const hs_message& msg ); + void add_seen_hs_message( const fc::sha256& hash ); + bool check_seen_hs_message( const fc::sha256& hash ); void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); void start_expire_timer(); @@ -1035,7 +1043,7 @@ namespace eosio { void handle_message( const block_id_type& id, signed_block_ptr ptr ); void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead void handle_message( packed_transaction_ptr trx ); - void handle_message( const hs_message& msg ); + void handle_message( const hs_message_inner& msg ); // returns calculated number of blocks combined latency uint32_t calc_block_latency(); @@ -1117,7 +1125,7 @@ namespace eosio { c->handle_message( msg ); } - void operator()( const hs_message& msg ) const { + void operator()( const hs_message_inner& msg ) const { // continue call to handle_message on connection strand peer_dlog( c, "handle hs_message" ); c->handle_message( msg ); @@ -2531,9 +2539,10 @@ namespace eosio { } ); } - void dispatch_manager::bcast_msg( send_buffer_type msg ) { - my_impl->connections.for_each_block_connection( [msg{std::move(msg)}]( auto& cp ) { + void dispatch_manager::bcast_msg( const std::optional& connection_id, send_buffer_type msg ) { + my_impl->connections.for_each_block_connection( [connection_id, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; + if( connection_id.has_value() && cp->connection_id == connection_id.value() ) return true; cp->strand.post( [cp, msg]() { if (cp->protocol_version >= proto_instant_finality) cp->enqueue_buffer( msg, no_reason ); @@ -2946,13 +2955,23 @@ namespace eosio { } else if( which == packed_transaction_which ) { return process_next_trx_message( message_length ); } else { + // Hotstuff messages are only received if this node has not seen them recently + if( which == hs_message_which ) { + auto hash_ds = pending_message_buffer.create_peek_datastream(); + char buf[message_length]; + hash_ds.read(buf, message_length); + sha256 hash = fc::sha256::hash(buf, message_length); + if (my_impl->check_seen_hs_message(hash)) + return true; // discard duplicate message + my_impl->add_seen_hs_message(hash); + } + auto ds = pending_message_buffer.create_datastream(); net_message msg; fc::raw::unpack( ds, msg ); msg_handler m( shared_from_this() ); std::visit( m, msg ); } - } catch( const fc::exception& e ) { peer_elog( this, "Exception in handling message: ${s}", ("s", e.to_detail_string()) ); close(); @@ -3561,9 +3580,9 @@ namespace eosio { } } - void connection::handle_message( const hs_message& msg ) { + void connection::handle_message( const hs_message_inner& msg ) { peer_dlog(this, "received hs: ${msg}", ("msg", msg)); - my_impl->chain_plug->notify_hs_message(msg); + my_impl->chain_plug->notify_hs_message( { connection_id, msg } ); } size_t calc_trx_size( const packed_transaction_ptr& trx ) { @@ -3818,16 +3837,36 @@ namespace eosio { } void net_plugin_impl::bcast_hs_message( const hs_message& msg ) { - fc_dlog(logger, "sending hs msg: ${msg}", ("msg", msg)); + fc_dlog(logger, "sending hs msg: ${msg}", ("msg", msg.message)); buffer_factory buff_factory; - auto send_buffer = buff_factory.get_send_buffer( msg ); + auto send_buffer = buff_factory.get_send_buffer( msg.message ); + + // record having seen own message + sha256 hash = fc::sha256::hash(send_buffer->data() + message_header_size, send_buffer->size() - message_header_size); + add_seen_hs_message(hash); - dispatcher->strand.post( [this, msg{std::move(send_buffer)}]() mutable { - dispatcher->bcast_msg( std::move(msg) ); + // optional connection_id is excluded from the broadcast + dispatcher->strand.post( [this, connection_id = msg.connection_id, msg{std::move(send_buffer)}]() mutable { + dispatcher->bcast_msg( connection_id, std::move(msg) ); }); } + void net_plugin_impl::add_seen_hs_message( const fc::sha256& hash ) { + fc::lock_guard g( seen_hs_msgs_mtx ); + if (seen_hs_msgs_buckets[seen_hs_msgs_current_bucket].size() >= seen_hs_msgs_bucket_max_size) { + seen_hs_msgs_current_bucket = 1 - seen_hs_msgs_current_bucket; // swap bucket 0 <-> 1 + seen_hs_msgs_buckets[seen_hs_msgs_current_bucket].clear(); + } + seen_hs_msgs_buckets[seen_hs_msgs_current_bucket].insert(hash); + } + + bool net_plugin_impl::check_seen_hs_message( const fc::sha256& hash ) { + fc::lock_guard g( seen_hs_msgs_mtx ); + return seen_hs_msgs_buckets[seen_hs_msgs_current_bucket].find(hash) != seen_hs_msgs_buckets[seen_hs_msgs_current_bucket].end() + || seen_hs_msgs_buckets[1 - seen_hs_msgs_current_bucket].find(hash) != seen_hs_msgs_buckets[1 - seen_hs_msgs_current_bucket].end(); + } + // called from application thread void net_plugin_impl::on_irreversible_block( const block_state_ptr& block) { fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num)("id", block->id) ); @@ -4158,6 +4197,8 @@ namespace eosio { void net_plugin_impl::plugin_startup() { fc_ilog( logger, "my node_id is ${id}", ("id", node_id )); + seen_hs_msgs_buckets.clear(); + seen_hs_msgs_buckets.resize(2); chain_plug->register_pacemaker_bcast_function( [my = shared_from_this()](const hs_message& s) { my->bcast_hs_message(s);