diff --git a/libraries/chain/include/eosio/chain/hotstuff.hpp b/libraries/chain/include/eosio/chain/hotstuff.hpp index b580387d51..10a19b8c7a 100644 --- a/libraries/chain/include/eosio/chain/hotstuff.hpp +++ b/libraries/chain/include/eosio/chain/hotstuff.hpp @@ -57,6 +57,13 @@ namespace eosio::chain { using hs_message = std::variant; + enum hs_message_warning : uint32_t { + discarded, // default code for dropped messages (irrelevant, redundant, ...) + duplicate_signature, // same message signature already seen + invalid_signature, // invalid message signature + invalid // invalid message (other reason) + }; + struct finalizer_state { bool chained_mode = false; fc::sha256 b_leaf; diff --git a/libraries/hotstuff/chain_pacemaker.cpp b/libraries/hotstuff/chain_pacemaker.cpp index 2940f08cc8..8a40aac050 100644 --- a/libraries/hotstuff/chain_pacemaker.cpp +++ b/libraries/hotstuff/chain_pacemaker.cpp @@ -118,12 +118,18 @@ namespace eosio { namespace hotstuff { _head_block_state = chain->head_block_state(); } - void chain_pacemaker::register_bcast_function(std::function broadcast_hs_message) { + void chain_pacemaker::register_bcast_function(std::function&, const chain::hs_message&)> broadcast_hs_message) { FC_ASSERT(broadcast_hs_message, "on_hs_message must be provided"); std::lock_guard g( _hotstuff_global_mutex ); // not actually needed but doesn't hurt bcast_hs_message = std::move(broadcast_hs_message); } + void chain_pacemaker::register_warn_function(std::function warning_hs_message) { + FC_ASSERT(warning_hs_message, "must provide callback"); + std::lock_guard g( _hotstuff_global_mutex ); // not actually needed but doesn't hurt + warn_hs_message = std::move(warning_hs_message); + } + void chain_pacemaker::get_state(finalizer_state& fs) const { // lock-free state version check uint64_t current_state_version = _qc_chain.get_state_version(); @@ -297,65 +303,69 @@ namespace eosio { namespace hotstuff { prof.core_out(); } - void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); + } + + void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); } - void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); } - void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); } - void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) { + warn_hs_message(sender_peer, code); } // called from net threads - void chain_pacemaker::on_hs_msg(const eosio::chain::hs_message &msg) { + void chain_pacemaker::on_hs_msg(const uint32_t connection_id, const eosio::chain::hs_message &msg) { std::visit(overloaded{ - [this](const hs_vote_message& m) { on_hs_vote_msg(m); }, - [this](const hs_proposal_message& m) { on_hs_proposal_msg(m); }, - [this](const hs_new_block_message& m) { on_hs_new_block_msg(m); }, - [this](const hs_new_view_message& m) { on_hs_new_view_msg(m); }, + [this, connection_id](const hs_vote_message& m) { on_hs_vote_msg(connection_id, m); }, + [this, connection_id](const hs_proposal_message& m) { on_hs_proposal_msg(connection_id, m); }, + [this, connection_id](const hs_new_block_message& m) { on_hs_new_block_msg(connection_id, m); }, + [this, connection_id](const hs_new_view_message& m) { on_hs_new_view_msg(connection_id, m); }, }, msg); } // called from net threads - void chain_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg) { + void chain_pacemaker::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) { csc prof("prop"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_proposal_msg(msg); + _qc_chain.on_hs_proposal_msg(connection_id, msg); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_vote_msg(const hs_vote_message& msg) { + void chain_pacemaker::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) { csc prof("vote"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_vote_msg(msg); + _qc_chain.on_hs_vote_msg(connection_id, msg); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_new_block_msg(const hs_new_block_message& msg) { + void chain_pacemaker::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) { csc prof("nblk"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_new_block_msg(msg); + _qc_chain.on_hs_new_block_msg(connection_id, msg); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_new_view_msg(const hs_new_view_message& msg) { + void chain_pacemaker::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) { csc prof("view"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_new_view_msg(msg); + _qc_chain.on_hs_new_view_msg(connection_id, msg); prof.core_out(); } diff --git a/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp index 7fce189948..989e1f478e 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp @@ -10,6 +10,7 @@ namespace eosio::chain { struct hs_vote_message; struct hs_new_view_message; struct hs_new_block_message; + enum hs_message_warning : uint32_t; } namespace eosio::hotstuff { @@ -36,10 +37,12 @@ namespace eosio::hotstuff { virtual std::vector get_finalizers() = 0; //outbound communications; 'id' is the producer name (can be ignored if/when irrelevant to the implementer) - virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id) = 0; - virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id) = 0; - virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id) = 0; - virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id) = 0; + virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + + virtual void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) = 0; }; } // namespace eosio::hotstuff diff --git a/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp index c2c2331278..64bff0dd07 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp @@ -24,11 +24,12 @@ namespace eosio::hotstuff { std::set my_producers, chain::bls_key_map_t finalizer_keys, fc::logger& logger); - void register_bcast_function(std::function broadcast_hs_message); + void register_bcast_function(std::function&, const chain::hs_message&)> broadcast_hs_message); + void register_warn_function(std::function warning_hs_message); void beat(); - void on_hs_msg(const hs_message& msg); + void on_hs_msg(const uint32_t connection_id, const hs_message& msg); void get_state(finalizer_state& fs) const; @@ -43,19 +44,21 @@ namespace eosio::hotstuff { uint32_t get_quorum_threshold(); - void send_hs_proposal_msg(const hs_proposal_message& msg, name id); - void send_hs_vote_msg(const hs_vote_message& msg, name id); - void send_hs_new_view_msg(const hs_new_view_message& msg, name id); - void send_hs_new_block_msg(const hs_new_block_message& msg, name id); + void send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& exclude_peer); + void send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& exclude_peer); + void send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& exclude_peer); + void send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& exclude_peer); + + void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code); private: void on_accepted_block( const block_state_ptr& blk ); void on_irreversible_block( const block_state_ptr& blk ); - void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler - void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler - void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler - void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler + void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //consensus msg event handler + void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //confirmation msg event handler + void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler + void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler private: //FIXME/REMOVE: for testing/debugging only @@ -84,7 +87,8 @@ namespace eosio::hotstuff { boost::signals2::scoped_connection _irreversible_block_connection; qc_chain _qc_chain; - std::function bcast_hs_message; + std::function&, const chain::hs_message&)> bcast_hs_message; + std::function warn_hs_message; uint32_t _quorum_threshold = 15; //FIXME/TODO: calculate from schedule fc::logger& _logger; diff --git a/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp b/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp index e696280009..14f8ce65a0 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp @@ -110,10 +110,10 @@ namespace eosio::hotstuff { void on_beat(); //handler for pacemaker beat() - void on_hs_vote_msg(const hs_vote_message& msg); //vote msg event handler - void on_hs_proposal_msg(const hs_proposal_message& msg); //proposal msg event handler - void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler - void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler + void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //vote msg event handler + void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //proposal msg event handler + void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler + void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler private: @@ -142,10 +142,11 @@ namespace eosio::hotstuff { bool am_i_leader(); //check if I am the current leader bool am_i_finalizer(); //check if I am one of the current finalizers - void process_proposal(const hs_proposal_message& msg); //handles proposal - void process_vote(const hs_vote_message& msg); //handles vote - void process_new_view(const hs_new_view_message& msg); //handles new view - void process_new_block(const hs_new_block_message& msg); //handles new block + // is_loopback is used to check if the call is processing a self-receipt message (if so, never propagate it) + void process_proposal(const hs_proposal_message& msg, bool is_loopback = false); //handles proposal + void process_vote(const hs_vote_message& msg, bool is_loopback = false); //handles vote + void process_new_view(const hs_new_view_message& msg, bool is_loopback = false); //handles new view + void process_new_block(const hs_new_block_message& msg, bool is_loopback = false); //handles new block hs_vote_message sign_proposal(const hs_proposal_message& proposal, name finalizer); //sign proposal @@ -159,10 +160,12 @@ namespace eosio::hotstuff { std::vector get_qc_chain(const fc::sha256& proposal_id); //get 3-phase proposal justification - void send_hs_proposal_msg(const hs_proposal_message& msg); //send vote msg - void send_hs_vote_msg(const hs_vote_message& msg); //send proposal msg - void send_hs_new_view_msg(const hs_new_view_message& msg); //send new view msg - void send_hs_new_block_msg(const hs_new_block_message& msg); //send new block msg + void send_hs_proposal_msg(const hs_proposal_message& msg, bool propagation = false); //send vote msg + void send_hs_vote_msg(const hs_vote_message& msg, bool propagation = false); //send proposal msg + void send_hs_new_view_msg(const hs_new_view_message& msg, bool propagation = false); //send new view msg + void send_hs_new_block_msg(const hs_new_block_message& msg, bool propagation = false); //send new block msg + + void send_hs_message_warning(const chain::hs_message_warning code = hs_message_warning::discarded); //use generic discard reason if none given void update(const hs_proposal_message& proposal); //update internal state void commit(const hs_proposal_message& proposal); //commit proposal (finality) @@ -236,6 +239,12 @@ namespace eosio::hotstuff { proposal_store_type _proposal_store; //internal proposals store #endif + // connection_id of the network peer that originally sent the message + // being processed by the qc_chain. This is used to fill in the + // exclude_peer parameter to the pacemaker when qc_chain is calling + // the pacemaker to propagate that message, which the original sender + // peer won't need to receive back. + std::optional _sender_connection_id; }; } /// eosio::hotstuff diff --git a/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp index 567eb44465..cfa6b05239 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp @@ -57,10 +57,12 @@ namespace eosio { namespace hotstuff { uint32_t get_quorum_threshold(); - void send_hs_proposal_msg(const hs_proposal_message & msg, name id); - void send_hs_vote_msg(const hs_vote_message & msg, name id); - void send_hs_new_block_msg(const hs_new_block_message & msg, name id); - void send_hs_new_view_msg(const hs_new_view_message & msg, name id); + void send_hs_proposal_msg(const hs_proposal_message & msg, name id, const std::optional& exclude_peer); + void send_hs_vote_msg(const hs_vote_message & msg, name id, const std::optional& exclude_peer); + void send_hs_new_block_msg(const hs_new_block_message & msg, name id, const std::optional& exclude_peer); + void send_hs_new_view_msg(const hs_new_view_message & msg, name id, const std::optional& exclude_peer); + + void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code); std::vector _pending_message_queue; diff --git a/libraries/hotstuff/qc_chain.cpp b/libraries/hotstuff/qc_chain.cpp index e62381fb0e..a8c37d9f77 100644 --- a/libraries/hotstuff/qc_chain.cpp +++ b/libraries/hotstuff/qc_chain.cpp @@ -290,7 +290,7 @@ namespace eosio::hotstuff { return v_msg; } - void qc_chain::process_proposal(const hs_proposal_message & proposal){ + void qc_chain::process_proposal(const hs_proposal_message & proposal, bool is_loopback){ //auto start = fc::time_point::now(); @@ -299,6 +299,7 @@ namespace eosio::hotstuff { const hs_proposal_message *jp = get_proposal( proposal.justify.proposal_id ); if (jp == nullptr) { fc_elog(_logger, " *** ${id} proposal justification unknown : ${proposal_id}", ("id",_id)("proposal_id", proposal.justify.proposal_id)); + send_hs_message_warning(); // example; to be tuned to actual need return; //can't recognize a proposal with an unknown justification } } @@ -318,6 +319,7 @@ namespace eosio::hotstuff { } + send_hs_message_warning(); // example; to be tuned to actual need return; //already aware of proposal, nothing to do } @@ -427,7 +429,7 @@ namespace eosio::hotstuff { //fc_dlog(_logger, " ... process_proposal() total time : ${total_time}", ("total_time", total_time)); } - void qc_chain::process_vote(const hs_vote_message & vote){ + void qc_chain::process_vote(const hs_vote_message & vote, bool is_loopback){ //auto start = fc::time_point::now(); #warning check for duplicate or invalid vote. We will return in either case, but keep proposals for evidence of double signing @@ -440,12 +442,15 @@ namespace eosio::hotstuff { fc_tlog(_logger, " === Process vote from ${finalizer} : current bitset ${value}" , ("finalizer", vote.finalizer)("value", _current_qc.get_active_finalizers_string())); // only leader need to take action on votes - if (vote.proposal_id != _current_qc.get_proposal_id()) + if (vote.proposal_id != _current_qc.get_proposal_id()) { + send_hs_message_warning(); // example; to be tuned to actual need return; + } const hs_proposal_message *p = get_proposal( vote.proposal_id ); if (p == nullptr) { fc_elog(_logger, " *** ${id} couldn't find proposal, vote : ${vote}", ("id",_id)("vote", vote)); + send_hs_message_warning(); // example; to be tuned to actual need return; } @@ -508,7 +513,7 @@ namespace eosio::hotstuff { //fc_tlog(_logger, " ... process_vote() total time : ${total_time}", ("total_time", total_time)); } - void qc_chain::process_new_view(const hs_new_view_message & msg){ + void qc_chain::process_new_view(const hs_new_view_message & msg, bool is_loopback){ fc_tlog(_logger, " === ${id} process_new_view === ${qc}", ("qc", msg.high_qc)("id", _id)); auto increment_version = fc::make_scoped_exit([this]() { ++_state_version; }); if (!update_high_qc(quorum_certificate{msg.high_qc})) { @@ -516,7 +521,7 @@ namespace eosio::hotstuff { } } - void qc_chain::process_new_block(const hs_new_block_message & msg){ + void qc_chain::process_new_block(const hs_new_block_message & msg, bool is_loopback){ // If I'm not a leader, I probably don't care about hs-new-block messages. #warning check for a need to gossip/rebroadcast even if it's not for us (maybe here, maybe somewhere else). @@ -572,26 +577,33 @@ namespace eosio::hotstuff { } } - void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg){ + void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg, bool propagation){ fc_tlog(_logger, " === broadcast_hs_proposal ==="); - _pacemaker->send_hs_proposal_msg(msg, _id); - process_proposal(msg); + _pacemaker->send_hs_proposal_msg(msg, _id, propagation ? _sender_connection_id : std::nullopt); + if (!propagation) + process_proposal(msg, true); } - void qc_chain::send_hs_vote_msg(const hs_vote_message & msg){ + void qc_chain::send_hs_vote_msg(const hs_vote_message & msg, bool propagation){ fc_tlog(_logger, " === broadcast_hs_vote ==="); - _pacemaker->send_hs_vote_msg(msg, _id); - process_vote(msg); + _pacemaker->send_hs_vote_msg(msg, _id, propagation ? _sender_connection_id : std::nullopt); + if (!propagation) + process_vote(msg, true); } - void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg){ + void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg, bool propagation){ fc_tlog(_logger, " === broadcast_hs_new_view ==="); - _pacemaker->send_hs_new_view_msg(msg, _id); + _pacemaker->send_hs_new_view_msg(msg, _id, propagation ? _sender_connection_id : std::nullopt); } - void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg){ + void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg, bool propagation){ fc_tlog(_logger, " === broadcast_hs_new_block ==="); - _pacemaker->send_hs_new_block_msg(msg, _id); + _pacemaker->send_hs_new_block_msg(msg, _id, propagation ? _sender_connection_id : std::nullopt); + } + + void qc_chain::send_hs_message_warning(const chain::hs_message_warning code) { + EOS_ASSERT( _sender_connection_id.has_value() , chain_exception, "qc_chain processed a message without a sender" ); + _pacemaker->send_hs_message_warning(_sender_connection_id.value(), code); } //extends predicate @@ -630,6 +642,8 @@ namespace eosio::hotstuff { // Called from the main application thread void qc_chain::on_beat(){ + _sender_connection_id = std::nullopt; + // Non-proposing leaders do not care about on_beat(), because leaders react to a block proposal // which comes from processing an incoming new block message from a proposer instead. // on_beat() is called by the pacemaker, which decides when it's time to check whether we are @@ -834,22 +848,26 @@ namespace eosio::hotstuff { } //on proposal received, called from network thread - void qc_chain::on_hs_proposal_msg(const hs_proposal_message& msg) { + void qc_chain::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) { + _sender_connection_id = connection_id; process_proposal(msg); } //on vote received, called from network thread - void qc_chain::on_hs_vote_msg(const hs_vote_message& msg) { + void qc_chain::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) { + _sender_connection_id = connection_id; process_vote(msg); } //on new view received, called from network thread - void qc_chain::on_hs_new_view_msg(const hs_new_view_message& msg) { + void qc_chain::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) { + _sender_connection_id = connection_id; process_new_view(msg); } //on new block received, called from network thread - void qc_chain::on_hs_new_block_msg(const hs_new_block_message& msg) { + void qc_chain::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) { + _sender_connection_id = connection_id; process_new_block(msg); } diff --git a/libraries/hotstuff/test/test_pacemaker.cpp b/libraries/hotstuff/test/test_pacemaker.cpp index de42e32ff1..87aa3822a1 100644 --- a/libraries/hotstuff/test/test_pacemaker.cpp +++ b/libraries/hotstuff/test/test_pacemaker.cpp @@ -158,29 +158,31 @@ namespace eosio::hotstuff { _qcc_store.emplace( name, qcc_ptr ); }; - void test_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) { + void test_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) { + void test_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) { + void test_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) { + void test_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; + void test_pacemaker::send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) { } + void test_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg, name id) { auto qc_itr = _qcc_store.begin(); while (qc_itr != _qcc_store.end()){ const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_proposal_msg(msg); + qcc_ptr->on_hs_proposal_msg(0, msg); qc_itr++; } } @@ -191,7 +193,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_vote_msg(msg); + qcc_ptr->on_hs_vote_msg(0, msg); qc_itr++; } } @@ -202,7 +204,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_new_block_msg(msg); + qcc_ptr->on_hs_new_block_msg(0, msg); qc_itr++; } } @@ -213,7 +215,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_new_view_msg(msg); + qcc_ptr->on_hs_new_view_msg(0, msg); qc_itr++; } } diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index c64962b468..8a07e7af3e 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -1122,11 +1122,15 @@ void chain_plugin::create_pacemaker(std::set my_producers, my->_chain_pacemaker.emplace(&chain(), std::move(my_producers), std::move(finalizer_keys), hotstuff_logger); } -void chain_plugin::register_pacemaker_bcast_function(std::function bcast_hs_message) { +void chain_plugin::register_pacemaker_bcast_function(std::function&, const chain::hs_message&)> bcast_hs_message) { EOS_ASSERT( my->_chain_pacemaker, plugin_config_exception, "chain_pacemaker not created" ); my->_chain_pacemaker->register_bcast_function(std::move(bcast_hs_message)); } +void chain_plugin::register_pacemaker_warn_function(std::function warn_hs_message) { + EOS_ASSERT( my->_chain_pacemaker, plugin_config_exception, "chain_pacemaker not created" ); + my->_chain_pacemaker->register_warn_function(std::move(warn_hs_message)); +} void chain_plugin::plugin_initialize(const variables_map& options) { handle_sighup(); // Sets loggers @@ -2685,8 +2689,8 @@ read_only::get_finalizer_state(const get_finalizer_state_params&, const fc::time } // namespace chain_apis // called from net threads -void chain_plugin::notify_hs_message( const hs_message& msg ) { - my->_chain_pacemaker->on_hs_msg(msg); +void chain_plugin::notify_hs_message( const uint32_t connection_id, const hs_message& msg ) { + my->_chain_pacemaker->on_hs_msg(connection_id, msg); }; void chain_plugin::notify_hs_block_produced() { diff --git a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp index e4d96b2a4a..7ab17137a7 100644 --- a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp +++ b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp @@ -1032,8 +1032,9 @@ class chain_plugin : public plugin { const controller& chain() const; void create_pacemaker(std::set my_producers, chain::bls_key_map_t finalizer_keys); - void register_pacemaker_bcast_function(std::function bcast_hs_message); - void notify_hs_message( const chain::hs_message& msg ); + void register_pacemaker_bcast_function(std::function&, const chain::hs_message&)> bcast_hs_message); + void register_pacemaker_warn_function(std::function warn_hs_message); + void notify_hs_message( const uint32_t connection_id, const chain::hs_message& msg ); void notify_hs_block_produced(); chain::chain_id_type get_chain_id() const; diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a8bb78dc00..e8db373f81 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -301,7 +301,7 @@ namespace eosio { bool have_txn( const transaction_id_type& tid ) const; void expire_txns(); - void bcast_msg( send_buffer_type msg ); + void bcast_msg( const std::optional& exclude_peer, send_buffer_type msg ); void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) { std::optional rm_blk_id = unlinkable_block_cache.add_unlinkable_block(std::move(b), id); @@ -495,7 +495,8 @@ namespace eosio { void transaction_ack(const std::pair&); void on_irreversible_block( const block_state_ptr& block ); - void bcast_hs_message( const hs_message& msg ); + void bcast_hs_message( const std::optional& exclude_peer, const hs_message& msg ); + void warn_hs_message( const uint32_t sender_peer, const hs_message_warning& code ); void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); void start_expire_timer(); @@ -2531,9 +2532,10 @@ namespace eosio { } ); } - void dispatch_manager::bcast_msg( send_buffer_type msg ) { - my_impl->connections.for_each_block_connection( [msg{std::move(msg)}]( auto& cp ) { + void dispatch_manager::bcast_msg( const std::optional& exclude_peer, send_buffer_type msg ) { + my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; + if( exclude_peer.has_value() && cp->connection_id == exclude_peer.value() ) return true; cp->strand.post( [cp, msg]() { if (cp->protocol_version >= proto_instant_finality) cp->enqueue_buffer( msg, no_reason ); @@ -3563,7 +3565,7 @@ namespace eosio { void connection::handle_message( const hs_message& msg ) { peer_dlog(this, "received hs: ${msg}", ("msg", msg)); - my_impl->chain_plug->notify_hs_message(msg); + my_impl->chain_plug->notify_hs_message(connection_id, msg); } size_t calc_trx_size( const packed_transaction_ptr& trx ) { @@ -3817,17 +3819,21 @@ namespace eosio { on_active_schedule(chain_plug->chain().active_producers()); } - void net_plugin_impl::bcast_hs_message( const hs_message& msg ) { + void net_plugin_impl::bcast_hs_message( const std::optional& exclude_peer, const hs_message& msg ) { fc_dlog(logger, "sending hs msg: ${msg}", ("msg", msg)); buffer_factory buff_factory; auto send_buffer = buff_factory.get_send_buffer( msg ); - dispatcher->strand.post( [this, msg{std::move(send_buffer)}]() mutable { - dispatcher->bcast_msg( std::move(msg) ); + dispatcher->strand.post( [this, exclude_peer, msg{std::move(send_buffer)}]() mutable { + dispatcher->bcast_msg( exclude_peer, std::move(msg) ); }); } + void net_plugin_impl::warn_hs_message( const uint32_t sender_peer, const hs_message_warning& code ) { + // potentially react to (repeated) receipt of invalid, irrelevant, duplicate, etc. hotstuff messages from sender_peer (connection ID) here + } + // called from application thread void net_plugin_impl::on_irreversible_block( const block_state_ptr& block) { fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num)("id", block->id) ); @@ -4159,8 +4165,12 @@ namespace eosio { fc_ilog( logger, "my node_id is ${id}", ("id", node_id )); chain_plug->register_pacemaker_bcast_function( - [my = shared_from_this()](const hs_message& s) { - my->bcast_hs_message(s); + [my = shared_from_this()](const std::optional& c, const hs_message& s) { + my->bcast_hs_message(c, s); + } ); + chain_plug->register_pacemaker_warn_function( + [my = shared_from_this()](const uint32_t c, const hs_message_warning& s) { + my->warn_hs_message(c, s); } ); producer_plug = app().find_plugin();