diff --git a/libraries/chain/include/eosio/chain/hotstuff.hpp b/libraries/chain/include/eosio/chain/hotstuff.hpp index b580387d51..27fb8491f6 100644 --- a/libraries/chain/include/eosio/chain/hotstuff.hpp +++ b/libraries/chain/include/eosio/chain/hotstuff.hpp @@ -57,6 +57,13 @@ namespace eosio::chain { using hs_message = std::variant; + enum class hs_message_warning { + discarded, // default code for dropped messages (irrelevant, redundant, ...) + duplicate_signature, // same message signature already seen + invalid_signature, // invalid message signature + invalid // invalid message (other reason) + }; + struct finalizer_state { bool chained_mode = false; fc::sha256 b_leaf; diff --git a/libraries/hotstuff/chain_pacemaker.cpp b/libraries/hotstuff/chain_pacemaker.cpp index 2940f08cc8..e034d9b1fc 100644 --- a/libraries/hotstuff/chain_pacemaker.cpp +++ b/libraries/hotstuff/chain_pacemaker.cpp @@ -118,12 +118,18 @@ namespace eosio { namespace hotstuff { _head_block_state = chain->head_block_state(); } - void chain_pacemaker::register_bcast_function(std::function broadcast_hs_message) { + void chain_pacemaker::register_bcast_function(std::function&, const chain::hs_message&)> broadcast_hs_message) { FC_ASSERT(broadcast_hs_message, "on_hs_message must be provided"); - std::lock_guard g( _hotstuff_global_mutex ); // not actually needed but doesn't hurt + // no need to std::lock_guard g( _hotstuff_global_mutex ); here since pre-comm init bcast_hs_message = std::move(broadcast_hs_message); } + void chain_pacemaker::register_warn_function(std::function warning_hs_message) { + FC_ASSERT(warning_hs_message, "must provide callback"); + // no need to std::lock_guard g( _hotstuff_global_mutex ); here since pre-comm init + warn_hs_message = std::move(warning_hs_message); + } + void chain_pacemaker::get_state(finalizer_state& fs) const { // lock-free state version check uint64_t current_state_version = _qc_chain.get_state_version(); @@ -297,65 +303,69 @@ namespace eosio { namespace hotstuff { prof.core_out(); } - void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); + } + + void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); } - void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); } - void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& exclude_peer) { + bcast_hs_message(exclude_peer, msg); } - void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) { - bcast_hs_message(msg); + void chain_pacemaker::send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) { + warn_hs_message(sender_peer, code); } // called from net threads - void chain_pacemaker::on_hs_msg(const eosio::chain::hs_message &msg) { + void chain_pacemaker::on_hs_msg(const uint32_t connection_id, const eosio::chain::hs_message &msg) { std::visit(overloaded{ - [this](const hs_vote_message& m) { on_hs_vote_msg(m); }, - [this](const hs_proposal_message& m) { on_hs_proposal_msg(m); }, - [this](const hs_new_block_message& m) { on_hs_new_block_msg(m); }, - [this](const hs_new_view_message& m) { on_hs_new_view_msg(m); }, + [this, connection_id](const hs_vote_message& m) { on_hs_vote_msg(connection_id, m); }, + [this, connection_id](const hs_proposal_message& m) { on_hs_proposal_msg(connection_id, m); }, + [this, connection_id](const hs_new_block_message& m) { on_hs_new_block_msg(connection_id, m); }, + [this, connection_id](const hs_new_view_message& m) { on_hs_new_view_msg(connection_id, m); }, }, msg); } // called from net threads - void chain_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg) { + void chain_pacemaker::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) { csc prof("prop"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_proposal_msg(msg); + _qc_chain.on_hs_proposal_msg(connection_id, msg); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_vote_msg(const hs_vote_message& msg) { + void chain_pacemaker::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) { csc prof("vote"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_vote_msg(msg); + _qc_chain.on_hs_vote_msg(connection_id, msg); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_new_block_msg(const hs_new_block_message& msg) { + void chain_pacemaker::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) { csc prof("nblk"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_new_block_msg(msg); + _qc_chain.on_hs_new_block_msg(connection_id, msg); prof.core_out(); } // called from net threads - void chain_pacemaker::on_hs_new_view_msg(const hs_new_view_message& msg) { + void chain_pacemaker::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) { csc prof("view"); std::lock_guard g( _hotstuff_global_mutex ); prof.core_in(); - _qc_chain.on_hs_new_view_msg(msg); + _qc_chain.on_hs_new_view_msg(connection_id, msg); prof.core_out(); } diff --git a/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp index 7fce189948..98339ef58b 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp @@ -2,16 +2,10 @@ #include #include +#include #include -namespace eosio::chain { - struct hs_proposal_message; - struct hs_vote_message; - struct hs_new_view_message; - struct hs_new_block_message; -} - namespace eosio::hotstuff { // Abstract pacemaker; a reference of this type will only be used by qc_chain, as qc_chain @@ -36,10 +30,12 @@ namespace eosio::hotstuff { virtual std::vector get_finalizers() = 0; //outbound communications; 'id' is the producer name (can be ignored if/when irrelevant to the implementer) - virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id) = 0; - virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id) = 0; - virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id) = 0; - virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id) = 0; + virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id, const std::optional& exclude_peer = std::nullopt) = 0; + + virtual void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) = 0; }; } // namespace eosio::hotstuff diff --git a/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp index c2c2331278..64bff0dd07 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp @@ -24,11 +24,12 @@ namespace eosio::hotstuff { std::set my_producers, chain::bls_key_map_t finalizer_keys, fc::logger& logger); - void register_bcast_function(std::function broadcast_hs_message); + void register_bcast_function(std::function&, const chain::hs_message&)> broadcast_hs_message); + void register_warn_function(std::function warning_hs_message); void beat(); - void on_hs_msg(const hs_message& msg); + void on_hs_msg(const uint32_t connection_id, const hs_message& msg); void get_state(finalizer_state& fs) const; @@ -43,19 +44,21 @@ namespace eosio::hotstuff { uint32_t get_quorum_threshold(); - void send_hs_proposal_msg(const hs_proposal_message& msg, name id); - void send_hs_vote_msg(const hs_vote_message& msg, name id); - void send_hs_new_view_msg(const hs_new_view_message& msg, name id); - void send_hs_new_block_msg(const hs_new_block_message& msg, name id); + void send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& exclude_peer); + void send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& exclude_peer); + void send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& exclude_peer); + void send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& exclude_peer); + + void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code); private: void on_accepted_block( const block_state_ptr& blk ); void on_irreversible_block( const block_state_ptr& blk ); - void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler - void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler - void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler - void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler + void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //consensus msg event handler + void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //confirmation msg event handler + void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler + void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler private: //FIXME/REMOVE: for testing/debugging only @@ -84,7 +87,8 @@ namespace eosio::hotstuff { boost::signals2::scoped_connection _irreversible_block_connection; qc_chain _qc_chain; - std::function bcast_hs_message; + std::function&, const chain::hs_message&)> bcast_hs_message; + std::function warn_hs_message; uint32_t _quorum_threshold = 15; //FIXME/TODO: calculate from schedule fc::logger& _logger; diff --git a/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp b/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp index e696280009..6169b1efa5 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp @@ -100,20 +100,20 @@ namespace eosio::hotstuff { chain::bls_key_map_t finalizer_keys, fc::logger& logger); - uint64_t get_state_version() const { return _state_version; } // calling this w/ thread sync is optional + uint64_t get_state_version() const { return _state_version; } // no lock required - name get_id_i() const { return _id; } // so far, only ever relevant in a test environment (no sync) + name get_id_i() const { return _id; } // only for testing // Calls to the following methods should be thread-synchronized externally: void get_state(finalizer_state& fs) const; - void on_beat(); //handler for pacemaker beat() + void on_beat(); - void on_hs_vote_msg(const hs_vote_message& msg); //vote msg event handler - void on_hs_proposal_msg(const hs_proposal_message& msg); //proposal msg event handler - void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler - void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler + void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); + void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); + void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); + void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); private: @@ -126,48 +126,59 @@ namespace eosio::hotstuff { hs_bitset update_bitset(const hs_bitset& finalizer_set, name finalizer); - digest_type get_digest_to_sign(const block_id_type& block_id, uint8_t phase_counter, const fc::sha256& final_on_qc); //get digest to sign from proposal data + //get digest to sign from proposal data + digest_type get_digest_to_sign(const block_id_type& block_id, uint8_t phase_counter, const fc::sha256& final_on_qc); - void reset_qc(const fc::sha256& proposal_id); //reset current internal qc + void reset_qc(const fc::sha256& proposal_id); - bool evaluate_quorum(const extended_schedule& es, const hs_bitset& finalizers, const fc::crypto::blslib::bls_signature& agg_sig, const hs_proposal_message& proposal); //evaluate quorum for a proposal + //evaluate quorum for a proposal + bool evaluate_quorum(const extended_schedule& es, const hs_bitset& finalizers, const fc::crypto::blslib::bls_signature& agg_sig, const hs_proposal_message& proposal); - // qc.quorum_met has to be updated by the caller (if it wants to) based on the return value of this method - bool is_quorum_met(const quorum_certificate& qc, const extended_schedule& schedule, const hs_proposal_message& proposal); //check if quorum has been met over a proposal + //check if quorum has been met over a proposal + bool is_quorum_met(const quorum_certificate& qc, const extended_schedule& schedule, const hs_proposal_message& proposal); - hs_proposal_message new_proposal_candidate(const block_id_type& block_id, uint8_t phase_counter); //create new proposal message - hs_new_block_message new_block_candidate(const block_id_type& block_id); //create new block message + hs_proposal_message new_proposal_candidate(const block_id_type& block_id, uint8_t phase_counter); + hs_new_block_message new_block_candidate(const block_id_type& block_id); - bool am_i_proposer(); //check if I am the current proposer - bool am_i_leader(); //check if I am the current leader - bool am_i_finalizer(); //check if I am one of the current finalizers + bool am_i_proposer(); + bool am_i_leader(); + bool am_i_finalizer(); - void process_proposal(const hs_proposal_message& msg); //handles proposal - void process_vote(const hs_vote_message& msg); //handles vote - void process_new_view(const hs_new_view_message& msg); //handles new view - void process_new_block(const hs_new_block_message& msg); //handles new block + // connection_id.has_value() when processing a non-loopback message + void process_proposal(const std::optional& connection_id, const hs_proposal_message& msg); + void process_vote(const std::optional& connection_id, const hs_vote_message& msg); + void process_new_view(const std::optional& connection_id, const hs_new_view_message& msg); + void process_new_block(const std::optional& connection_id, const hs_new_block_message& msg); - hs_vote_message sign_proposal(const hs_proposal_message& proposal, name finalizer); //sign proposal + hs_vote_message sign_proposal(const hs_proposal_message& proposal, name finalizer); - bool extends(const fc::sha256& descendant, const fc::sha256& ancestor); //verify that a proposal descends from another + //verify that a proposal descends from another + bool extends(const fc::sha256& descendant, const fc::sha256& ancestor); - bool update_high_qc(const quorum_certificate& high_qc); //check if update to our high qc is required + //update high qc if required + bool update_high_qc(const quorum_certificate& high_qc); - void leader_rotation_check(); //check if leader rotation is required + //rotate leader if required + void leader_rotation_check(); - bool is_node_safe(const hs_proposal_message& proposal); //verify if a proposal should be signed + //verify if a proposal should be signed + bool is_node_safe(const hs_proposal_message& proposal); - std::vector get_qc_chain(const fc::sha256& proposal_id); //get 3-phase proposal justification + //get 3-phase proposal justification + std::vector get_qc_chain(const fc::sha256& proposal_id); - void send_hs_proposal_msg(const hs_proposal_message& msg); //send vote msg - void send_hs_vote_msg(const hs_vote_message& msg); //send proposal msg - void send_hs_new_view_msg(const hs_new_view_message& msg); //send new view msg - void send_hs_new_block_msg(const hs_new_block_message& msg); //send new block msg + // connection_id.has_value() when just propagating a received message + void send_hs_proposal_msg(const std::optional& connection_id, const hs_proposal_message& msg); + void send_hs_vote_msg(const std::optional& connection_id, const hs_vote_message& msg); + void send_hs_new_view_msg(const std::optional& connection_id, const hs_new_view_message& msg); + void send_hs_new_block_msg(const std::optional& connection_id, const hs_new_block_message& msg); - void update(const hs_proposal_message& proposal); //update internal state - void commit(const hs_proposal_message& proposal); //commit proposal (finality) + void send_hs_message_warning(const std::optional& connection_id, const chain::hs_message_warning code); - void gc_proposals(uint64_t cutoff); //garbage collection of old proposals + void update(const hs_proposal_message& proposal); + void commit(const hs_proposal_message& proposal); + + void gc_proposals(uint64_t cutoff); #warning remove. bls12-381 key used for testing purposes //todo : remove. bls12-381 key used for testing purposes @@ -235,7 +246,6 @@ namespace eosio::hotstuff { proposal_store_type _proposal_store; //internal proposals store #endif - }; } /// eosio::hotstuff diff --git a/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp b/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp index 567eb44465..cfa6b05239 100644 --- a/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp +++ b/libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp @@ -57,10 +57,12 @@ namespace eosio { namespace hotstuff { uint32_t get_quorum_threshold(); - void send_hs_proposal_msg(const hs_proposal_message & msg, name id); - void send_hs_vote_msg(const hs_vote_message & msg, name id); - void send_hs_new_block_msg(const hs_new_block_message & msg, name id); - void send_hs_new_view_msg(const hs_new_view_message & msg, name id); + void send_hs_proposal_msg(const hs_proposal_message & msg, name id, const std::optional& exclude_peer); + void send_hs_vote_msg(const hs_vote_message & msg, name id, const std::optional& exclude_peer); + void send_hs_new_block_msg(const hs_new_block_message & msg, name id, const std::optional& exclude_peer); + void send_hs_new_view_msg(const hs_new_view_message & msg, name id, const std::optional& exclude_peer); + + void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code); std::vector _pending_message_queue; diff --git a/libraries/hotstuff/qc_chain.cpp b/libraries/hotstuff/qc_chain.cpp index e62381fb0e..667a0836cf 100644 --- a/libraries/hotstuff/qc_chain.cpp +++ b/libraries/hotstuff/qc_chain.cpp @@ -290,7 +290,7 @@ namespace eosio::hotstuff { return v_msg; } - void qc_chain::process_proposal(const hs_proposal_message & proposal){ + void qc_chain::process_proposal(const std::optional& connection_id, const hs_proposal_message& proposal){ //auto start = fc::time_point::now(); @@ -299,6 +299,7 @@ namespace eosio::hotstuff { const hs_proposal_message *jp = get_proposal( proposal.justify.proposal_id ); if (jp == nullptr) { fc_elog(_logger, " *** ${id} proposal justification unknown : ${proposal_id}", ("id",_id)("proposal_id", proposal.justify.proposal_id)); + send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need return; //can't recognize a proposal with an unknown justification } } @@ -318,6 +319,7 @@ namespace eosio::hotstuff { } + send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need return; //already aware of proposal, nothing to do } @@ -417,7 +419,7 @@ namespace eosio::hotstuff { update(proposal); for (auto &msg : msgs) { - send_hs_vote_msg(msg); + send_hs_vote_msg( std::nullopt, msg ); } //check for leader change @@ -427,7 +429,7 @@ namespace eosio::hotstuff { //fc_dlog(_logger, " ... process_proposal() total time : ${total_time}", ("total_time", total_time)); } - void qc_chain::process_vote(const hs_vote_message & vote){ + void qc_chain::process_vote(const std::optional& connection_id, const hs_vote_message& vote){ //auto start = fc::time_point::now(); #warning check for duplicate or invalid vote. We will return in either case, but keep proposals for evidence of double signing @@ -440,12 +442,15 @@ namespace eosio::hotstuff { fc_tlog(_logger, " === Process vote from ${finalizer} : current bitset ${value}" , ("finalizer", vote.finalizer)("value", _current_qc.get_active_finalizers_string())); // only leader need to take action on votes - if (vote.proposal_id != _current_qc.get_proposal_id()) + if (vote.proposal_id != _current_qc.get_proposal_id()) { + send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need return; + } const hs_proposal_message *p = get_proposal( vote.proposal_id ); if (p == nullptr) { fc_elog(_logger, " *** ${id} couldn't find proposal, vote : ${vote}", ("id",_id)("vote", vote)); + send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need return; } @@ -498,7 +503,7 @@ namespace eosio::hotstuff { _pending_proposal_block = {}; _b_leaf = proposal_candidate.proposal_id; - send_hs_proposal_msg(proposal_candidate); + send_hs_proposal_msg( std::nullopt, proposal_candidate ); fc_tlog(_logger, " === ${id} _b_leaf updated (process_vote): ${proposal_id}", ("proposal_id", proposal_candidate.proposal_id)("id", _id)); } } @@ -508,7 +513,7 @@ namespace eosio::hotstuff { //fc_tlog(_logger, " ... process_vote() total time : ${total_time}", ("total_time", total_time)); } - void qc_chain::process_new_view(const hs_new_view_message & msg){ + void qc_chain::process_new_view(const std::optional& connection_id, const hs_new_view_message& msg){ fc_tlog(_logger, " === ${id} process_new_view === ${qc}", ("qc", msg.high_qc)("id", _id)); auto increment_version = fc::make_scoped_exit([this]() { ++_state_version; }); if (!update_high_qc(quorum_certificate{msg.high_qc})) { @@ -516,7 +521,7 @@ namespace eosio::hotstuff { } } - void qc_chain::process_new_block(const hs_new_block_message & msg){ + void qc_chain::process_new_block(const std::optional& connection_id, const hs_new_block_message& msg){ // If I'm not a leader, I probably don't care about hs-new-block messages. #warning check for a need to gossip/rebroadcast even if it's not for us (maybe here, maybe somewhere else). @@ -566,32 +571,39 @@ namespace eosio::hotstuff { _pending_proposal_block = {}; _b_leaf = proposal_candidate.proposal_id; - send_hs_proposal_msg(proposal_candidate); + send_hs_proposal_msg( std::nullopt, proposal_candidate ); fc_tlog(_logger, " === ${id} _b_leaf updated (on_beat): ${proposal_id}", ("proposal_id", proposal_candidate.proposal_id)("id", _id)); } } - void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg){ + void qc_chain::send_hs_proposal_msg(const std::optional& connection_id, const hs_proposal_message & msg){ fc_tlog(_logger, " === broadcast_hs_proposal ==="); - _pacemaker->send_hs_proposal_msg(msg, _id); - process_proposal(msg); + _pacemaker->send_hs_proposal_msg(msg, _id, connection_id); + if (!connection_id.has_value()) + process_proposal( std::nullopt, msg ); } - void qc_chain::send_hs_vote_msg(const hs_vote_message & msg){ + void qc_chain::send_hs_vote_msg(const std::optional& connection_id, const hs_vote_message & msg){ fc_tlog(_logger, " === broadcast_hs_vote ==="); - _pacemaker->send_hs_vote_msg(msg, _id); - process_vote(msg); + _pacemaker->send_hs_vote_msg(msg, _id, connection_id); + if (!connection_id.has_value()) + process_vote( std::nullopt, msg ); } - void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg){ + void qc_chain::send_hs_new_view_msg(const std::optional& connection_id, const hs_new_view_message & msg){ fc_tlog(_logger, " === broadcast_hs_new_view ==="); - _pacemaker->send_hs_new_view_msg(msg, _id); + _pacemaker->send_hs_new_view_msg(msg, _id, connection_id); } - void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg){ + void qc_chain::send_hs_new_block_msg(const std::optional& connection_id, const hs_new_block_message & msg){ fc_tlog(_logger, " === broadcast_hs_new_block ==="); - _pacemaker->send_hs_new_block_msg(msg, _id); + _pacemaker->send_hs_new_block_msg(msg, _id, connection_id); + } + + void qc_chain::send_hs_message_warning(const std::optional& connection_id, const chain::hs_message_warning code) { + if (connection_id.has_value()) + _pacemaker->send_hs_message_warning(connection_id.value(), code); } //extends predicate @@ -654,7 +666,7 @@ namespace eosio::hotstuff { fc_tlog(_logger, " === I am a leader-proposer that is proposing a block for itself to lead"); // Hardwired consumption by self; no networking. - process_new_block( block_candidate ); + process_new_block( std::nullopt, block_candidate ); } else { @@ -662,7 +674,7 @@ namespace eosio::hotstuff { // the network, until it reaches the leader. fc_tlog(_logger, " === broadcasting new block = #${block_num} ${proposal_id}", ("proposal_id", block_candidate.block_id)("block_num",(block_header::num_from_id(block_candidate.block_id)))); - send_hs_new_block_msg( block_candidate ); + send_hs_new_block_msg( std::nullopt, block_candidate ); } } @@ -730,7 +742,7 @@ namespace eosio::hotstuff { new_view.high_qc = _high_qc.to_msg(); - send_hs_new_view_msg(new_view); + send_hs_new_view_msg( std::nullopt, new_view ); } } @@ -834,23 +846,23 @@ namespace eosio::hotstuff { } //on proposal received, called from network thread - void qc_chain::on_hs_proposal_msg(const hs_proposal_message& msg) { - process_proposal(msg); + void qc_chain::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) { + process_proposal( std::optional(connection_id), msg ); } //on vote received, called from network thread - void qc_chain::on_hs_vote_msg(const hs_vote_message& msg) { - process_vote(msg); + void qc_chain::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) { + process_vote( std::optional(connection_id), msg ); } //on new view received, called from network thread - void qc_chain::on_hs_new_view_msg(const hs_new_view_message& msg) { - process_new_view(msg); + void qc_chain::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) { + process_new_view( std::optional(connection_id), msg ); } //on new block received, called from network thread - void qc_chain::on_hs_new_block_msg(const hs_new_block_message& msg) { - process_new_block(msg); + void qc_chain::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) { + process_new_block( std::optional(connection_id), msg ); } void qc_chain::update(const hs_proposal_message& proposal) { diff --git a/libraries/hotstuff/test/test_pacemaker.cpp b/libraries/hotstuff/test/test_pacemaker.cpp index de42e32ff1..87aa3822a1 100644 --- a/libraries/hotstuff/test/test_pacemaker.cpp +++ b/libraries/hotstuff/test/test_pacemaker.cpp @@ -158,29 +158,31 @@ namespace eosio::hotstuff { _qcc_store.emplace( name, qcc_ptr ); }; - void test_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) { + void test_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) { + void test_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) { + void test_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; - void test_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) { + void test_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional& exclude_peer) { _pending_message_queue.push_back(std::make_pair(id, msg)); }; + void test_pacemaker::send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) { } + void test_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg, name id) { auto qc_itr = _qcc_store.begin(); while (qc_itr != _qcc_store.end()){ const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_proposal_msg(msg); + qcc_ptr->on_hs_proposal_msg(0, msg); qc_itr++; } } @@ -191,7 +193,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_vote_msg(msg); + qcc_ptr->on_hs_vote_msg(0, msg); qc_itr++; } } @@ -202,7 +204,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_new_block_msg(msg); + qcc_ptr->on_hs_new_block_msg(0, msg); qc_itr++; } } @@ -213,7 +215,7 @@ namespace eosio::hotstuff { const name & qcc_name = qc_itr->first; std::shared_ptr & qcc_ptr = qc_itr->second; if (qcc_ptr->get_id_i() != id && is_qc_chain_active(qcc_name) ) - qcc_ptr->on_hs_new_view_msg(msg); + qcc_ptr->on_hs_new_view_msg(0, msg); qc_itr++; } } diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index c64962b468..8a07e7af3e 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -1122,11 +1122,15 @@ void chain_plugin::create_pacemaker(std::set my_producers, my->_chain_pacemaker.emplace(&chain(), std::move(my_producers), std::move(finalizer_keys), hotstuff_logger); } -void chain_plugin::register_pacemaker_bcast_function(std::function bcast_hs_message) { +void chain_plugin::register_pacemaker_bcast_function(std::function&, const chain::hs_message&)> bcast_hs_message) { EOS_ASSERT( my->_chain_pacemaker, plugin_config_exception, "chain_pacemaker not created" ); my->_chain_pacemaker->register_bcast_function(std::move(bcast_hs_message)); } +void chain_plugin::register_pacemaker_warn_function(std::function warn_hs_message) { + EOS_ASSERT( my->_chain_pacemaker, plugin_config_exception, "chain_pacemaker not created" ); + my->_chain_pacemaker->register_warn_function(std::move(warn_hs_message)); +} void chain_plugin::plugin_initialize(const variables_map& options) { handle_sighup(); // Sets loggers @@ -2685,8 +2689,8 @@ read_only::get_finalizer_state(const get_finalizer_state_params&, const fc::time } // namespace chain_apis // called from net threads -void chain_plugin::notify_hs_message( const hs_message& msg ) { - my->_chain_pacemaker->on_hs_msg(msg); +void chain_plugin::notify_hs_message( const uint32_t connection_id, const hs_message& msg ) { + my->_chain_pacemaker->on_hs_msg(connection_id, msg); }; void chain_plugin::notify_hs_block_produced() { diff --git a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp index e4d96b2a4a..7ab17137a7 100644 --- a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp +++ b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp @@ -1032,8 +1032,9 @@ class chain_plugin : public plugin { const controller& chain() const; void create_pacemaker(std::set my_producers, chain::bls_key_map_t finalizer_keys); - void register_pacemaker_bcast_function(std::function bcast_hs_message); - void notify_hs_message( const chain::hs_message& msg ); + void register_pacemaker_bcast_function(std::function&, const chain::hs_message&)> bcast_hs_message); + void register_pacemaker_warn_function(std::function warn_hs_message); + void notify_hs_message( const uint32_t connection_id, const chain::hs_message& msg ); void notify_hs_block_produced(); chain::chain_id_type get_chain_id() const; diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a8bb78dc00..e8db373f81 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -301,7 +301,7 @@ namespace eosio { bool have_txn( const transaction_id_type& tid ) const; void expire_txns(); - void bcast_msg( send_buffer_type msg ); + void bcast_msg( const std::optional& exclude_peer, send_buffer_type msg ); void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) { std::optional rm_blk_id = unlinkable_block_cache.add_unlinkable_block(std::move(b), id); @@ -495,7 +495,8 @@ namespace eosio { void transaction_ack(const std::pair&); void on_irreversible_block( const block_state_ptr& block ); - void bcast_hs_message( const hs_message& msg ); + void bcast_hs_message( const std::optional& exclude_peer, const hs_message& msg ); + void warn_hs_message( const uint32_t sender_peer, const hs_message_warning& code ); void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); void start_expire_timer(); @@ -2531,9 +2532,10 @@ namespace eosio { } ); } - void dispatch_manager::bcast_msg( send_buffer_type msg ) { - my_impl->connections.for_each_block_connection( [msg{std::move(msg)}]( auto& cp ) { + void dispatch_manager::bcast_msg( const std::optional& exclude_peer, send_buffer_type msg ) { + my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; + if( exclude_peer.has_value() && cp->connection_id == exclude_peer.value() ) return true; cp->strand.post( [cp, msg]() { if (cp->protocol_version >= proto_instant_finality) cp->enqueue_buffer( msg, no_reason ); @@ -3563,7 +3565,7 @@ namespace eosio { void connection::handle_message( const hs_message& msg ) { peer_dlog(this, "received hs: ${msg}", ("msg", msg)); - my_impl->chain_plug->notify_hs_message(msg); + my_impl->chain_plug->notify_hs_message(connection_id, msg); } size_t calc_trx_size( const packed_transaction_ptr& trx ) { @@ -3817,17 +3819,21 @@ namespace eosio { on_active_schedule(chain_plug->chain().active_producers()); } - void net_plugin_impl::bcast_hs_message( const hs_message& msg ) { + void net_plugin_impl::bcast_hs_message( const std::optional& exclude_peer, const hs_message& msg ) { fc_dlog(logger, "sending hs msg: ${msg}", ("msg", msg)); buffer_factory buff_factory; auto send_buffer = buff_factory.get_send_buffer( msg ); - dispatcher->strand.post( [this, msg{std::move(send_buffer)}]() mutable { - dispatcher->bcast_msg( std::move(msg) ); + dispatcher->strand.post( [this, exclude_peer, msg{std::move(send_buffer)}]() mutable { + dispatcher->bcast_msg( exclude_peer, std::move(msg) ); }); } + void net_plugin_impl::warn_hs_message( const uint32_t sender_peer, const hs_message_warning& code ) { + // potentially react to (repeated) receipt of invalid, irrelevant, duplicate, etc. hotstuff messages from sender_peer (connection ID) here + } + // called from application thread void net_plugin_impl::on_irreversible_block( const block_state_ptr& block) { fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num)("id", block->id) ); @@ -4159,8 +4165,12 @@ namespace eosio { fc_ilog( logger, "my node_id is ${id}", ("id", node_id )); chain_plug->register_pacemaker_bcast_function( - [my = shared_from_this()](const hs_message& s) { - my->bcast_hs_message(s); + [my = shared_from_this()](const std::optional& c, const hs_message& s) { + my->bcast_hs_message(c, s); + } ); + chain_plug->register_pacemaker_warn_function( + [my = shared_from_this()](const uint32_t c, const hs_message_warning& s) { + my->warn_hs_message(c, s); } ); producer_plug = app().find_plugin();