Skip to content

Commit

Permalink
Infrastructure for hs message filtering
Browse files Browse the repository at this point in the history
- connection ID of hs message sender (net plugin -> IF engine)
- connection ID to exclude when propagating (IF engine -> net plugin)
- added callback to warn of bad HS messages from a connection (IF engine -> net plugin)
- added sample use of the warning callback for discarded proposals and votes
- helpers on qc_chain to allow for easy message propagation (no propagation decisions added)
- no-ops for this feature added to test_pacemaker (which does not have multi-hop)

Closes #1605
  • Loading branch information
fcecin committed Sep 9, 2023
1 parent 5ede17a commit ffa96a6
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 95 deletions.
7 changes: 7 additions & 0 deletions libraries/chain/include/eosio/chain/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ namespace eosio::chain {

using hs_message = std::variant<hs_vote_message, hs_proposal_message, hs_new_block_message, hs_new_view_message>;

enum hs_message_warning : uint32_t {
discarded, // default code for dropped messages (irrelevant, redundant, ...)
duplicate_signature, // same message signature already seen
invalid_signature, // invalid message signature
invalid // invalid message (other reason)
};

struct finalizer_state {
bool chained_mode = false;
fc::sha256 b_leaf;
Expand Down
54 changes: 32 additions & 22 deletions libraries/hotstuff/chain_pacemaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,18 @@ namespace eosio { namespace hotstuff {
_head_block_state = chain->head_block_state();
}

void chain_pacemaker::register_bcast_function(std::function<void(const chain::hs_message&)> broadcast_hs_message) {
void chain_pacemaker::register_bcast_function(std::function<void(const std::optional<uint32_t>&, const chain::hs_message&)> broadcast_hs_message) {
FC_ASSERT(broadcast_hs_message, "on_hs_message must be provided");
std::lock_guard g( _hotstuff_global_mutex ); // not actually needed but doesn't hurt
bcast_hs_message = std::move(broadcast_hs_message);
}

void chain_pacemaker::register_warn_function(std::function<void(const uint32_t, const chain::hs_message_warning&)> warning_hs_message) {
FC_ASSERT(warning_hs_message, "must provide callback");
std::lock_guard g( _hotstuff_global_mutex ); // not actually needed but doesn't hurt
warn_hs_message = std::move(warning_hs_message);
}

void chain_pacemaker::get_state(finalizer_state& fs) const {
// lock-free state version check
uint64_t current_state_version = _qc_chain.get_state_version();
Expand Down Expand Up @@ -297,65 +303,69 @@ namespace eosio { namespace hotstuff {
prof.core_out();
}

void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional<uint32_t>& exclude_peer) {
bcast_hs_message(exclude_peer, msg);
}

void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional<uint32_t>& exclude_peer) {
bcast_hs_message(exclude_peer, msg);
}

void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional<uint32_t>& exclude_peer) {
bcast_hs_message(exclude_peer, msg);
}

void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional<uint32_t>& exclude_peer) {
bcast_hs_message(exclude_peer, msg);
}

void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) {
warn_hs_message(sender_peer, code);
}

// called from net threads
void chain_pacemaker::on_hs_msg(const eosio::chain::hs_message &msg) {
void chain_pacemaker::on_hs_msg(const uint32_t connection_id, const eosio::chain::hs_message &msg) {
std::visit(overloaded{
[this](const hs_vote_message& m) { on_hs_vote_msg(m); },
[this](const hs_proposal_message& m) { on_hs_proposal_msg(m); },
[this](const hs_new_block_message& m) { on_hs_new_block_msg(m); },
[this](const hs_new_view_message& m) { on_hs_new_view_msg(m); },
[this, connection_id](const hs_vote_message& m) { on_hs_vote_msg(connection_id, m); },
[this, connection_id](const hs_proposal_message& m) { on_hs_proposal_msg(connection_id, m); },
[this, connection_id](const hs_new_block_message& m) { on_hs_new_block_msg(connection_id, m); },
[this, connection_id](const hs_new_view_message& m) { on_hs_new_view_msg(connection_id, m); },
}, msg);
}

// called from net threads
void chain_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg) {
void chain_pacemaker::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) {
csc prof("prop");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_proposal_msg(msg);
_qc_chain.on_hs_proposal_msg(connection_id, msg);
prof.core_out();
}

// called from net threads
void chain_pacemaker::on_hs_vote_msg(const hs_vote_message& msg) {
void chain_pacemaker::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) {
csc prof("vote");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_vote_msg(msg);
_qc_chain.on_hs_vote_msg(connection_id, msg);
prof.core_out();
}

// called from net threads
void chain_pacemaker::on_hs_new_block_msg(const hs_new_block_message& msg) {
void chain_pacemaker::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) {
csc prof("nblk");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_new_block_msg(msg);
_qc_chain.on_hs_new_block_msg(connection_id, msg);
prof.core_out();
}

// called from net threads
void chain_pacemaker::on_hs_new_view_msg(const hs_new_view_message& msg) {
void chain_pacemaker::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) {
csc prof("view");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_new_view_msg(msg);
_qc_chain.on_hs_new_view_msg(connection_id, msg);
prof.core_out();
}

Expand Down
11 changes: 7 additions & 4 deletions libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace eosio::chain {
struct hs_vote_message;
struct hs_new_view_message;
struct hs_new_block_message;
enum hs_message_warning : uint32_t;
}

namespace eosio::hotstuff {
Expand All @@ -36,10 +37,12 @@ namespace eosio::hotstuff {
virtual std::vector<chain::name> get_finalizers() = 0;

//outbound communications; 'id' is the producer name (can be ignored if/when irrelevant to the implementer)
virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id) = 0;
virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id) = 0;
virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id) = 0;
virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id) = 0;
virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id, const std::optional<uint32_t>& exclude_peer = std::nullopt) = 0;
virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id, const std::optional<uint32_t>& exclude_peer = std::nullopt) = 0;
virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id, const std::optional<uint32_t>& exclude_peer = std::nullopt) = 0;
virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id, const std::optional<uint32_t>& exclude_peer = std::nullopt) = 0;

virtual void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code) = 0;
};

} // namespace eosio::hotstuff
26 changes: 15 additions & 11 deletions libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ namespace eosio::hotstuff {
std::set<account_name> my_producers,
chain::bls_key_map_t finalizer_keys,
fc::logger& logger);
void register_bcast_function(std::function<void(const chain::hs_message&)> broadcast_hs_message);
void register_bcast_function(std::function<void(const std::optional<uint32_t>&, const chain::hs_message&)> broadcast_hs_message);
void register_warn_function(std::function<void(const uint32_t, const chain::hs_message_warning&)> warning_hs_message);

void beat();

void on_hs_msg(const hs_message& msg);
void on_hs_msg(const uint32_t connection_id, const hs_message& msg);

void get_state(finalizer_state& fs) const;

Expand All @@ -43,19 +44,21 @@ namespace eosio::hotstuff {

uint32_t get_quorum_threshold();

void send_hs_proposal_msg(const hs_proposal_message& msg, name id);
void send_hs_vote_msg(const hs_vote_message& msg, name id);
void send_hs_new_view_msg(const hs_new_view_message& msg, name id);
void send_hs_new_block_msg(const hs_new_block_message& msg, name id);
void send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional<uint32_t>& exclude_peer);
void send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional<uint32_t>& exclude_peer);
void send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional<uint32_t>& exclude_peer);
void send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional<uint32_t>& exclude_peer);

void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code);

private:
void on_accepted_block( const block_state_ptr& blk );
void on_irreversible_block( const block_state_ptr& blk );

void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler
void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler
void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //consensus msg event handler
void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //confirmation msg event handler
void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler
private:

//FIXME/REMOVE: for testing/debugging only
Expand Down Expand Up @@ -84,7 +87,8 @@ namespace eosio::hotstuff {
boost::signals2::scoped_connection _irreversible_block_connection;

qc_chain _qc_chain;
std::function<void(const chain::hs_message&)> bcast_hs_message;
std::function<void(const std::optional<uint32_t>&, const chain::hs_message&)> bcast_hs_message;
std::function<void(const uint32_t, const chain::hs_message_warning&)> warn_hs_message;

uint32_t _quorum_threshold = 15; //FIXME/TODO: calculate from schedule
fc::logger& _logger;
Expand Down
33 changes: 21 additions & 12 deletions libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ namespace eosio::hotstuff {

void on_beat(); //handler for pacemaker beat()

void on_hs_vote_msg(const hs_vote_message& msg); //vote msg event handler
void on_hs_proposal_msg(const hs_proposal_message& msg); //proposal msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler
void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //vote msg event handler
void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //proposal msg event handler
void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler

private:

Expand Down Expand Up @@ -142,10 +142,11 @@ namespace eosio::hotstuff {
bool am_i_leader(); //check if I am the current leader
bool am_i_finalizer(); //check if I am one of the current finalizers

void process_proposal(const hs_proposal_message& msg); //handles proposal
void process_vote(const hs_vote_message& msg); //handles vote
void process_new_view(const hs_new_view_message& msg); //handles new view
void process_new_block(const hs_new_block_message& msg); //handles new block
// is_loopback is used to check if the call is processing a self-receipt message (if so, never propagate it)
void process_proposal(const hs_proposal_message& msg, bool is_loopback = false); //handles proposal
void process_vote(const hs_vote_message& msg, bool is_loopback = false); //handles vote
void process_new_view(const hs_new_view_message& msg, bool is_loopback = false); //handles new view
void process_new_block(const hs_new_block_message& msg, bool is_loopback = false); //handles new block

hs_vote_message sign_proposal(const hs_proposal_message& proposal, name finalizer); //sign proposal

Expand All @@ -159,10 +160,12 @@ namespace eosio::hotstuff {

std::vector<hs_proposal_message> get_qc_chain(const fc::sha256& proposal_id); //get 3-phase proposal justification

void send_hs_proposal_msg(const hs_proposal_message& msg); //send vote msg
void send_hs_vote_msg(const hs_vote_message& msg); //send proposal msg
void send_hs_new_view_msg(const hs_new_view_message& msg); //send new view msg
void send_hs_new_block_msg(const hs_new_block_message& msg); //send new block msg
void send_hs_proposal_msg(const hs_proposal_message& msg, bool propagation = false); //send vote msg
void send_hs_vote_msg(const hs_vote_message& msg, bool propagation = false); //send proposal msg
void send_hs_new_view_msg(const hs_new_view_message& msg, bool propagation = false); //send new view msg
void send_hs_new_block_msg(const hs_new_block_message& msg, bool propagation = false); //send new block msg

void send_hs_message_warning(const chain::hs_message_warning code = hs_message_warning::discarded); //use generic discard reason if none given

void update(const hs_proposal_message& proposal); //update internal state
void commit(const hs_proposal_message& proposal); //commit proposal (finality)
Expand Down Expand Up @@ -236,6 +239,12 @@ namespace eosio::hotstuff {
proposal_store_type _proposal_store; //internal proposals store
#endif

// connection_id of the network peer that originally sent the message
// being processed by the qc_chain. This is used to fill in the
// exclude_peer parameter to the pacemaker when qc_chain is calling
// the pacemaker to propagate that message, which the original sender
// peer won't need to receive back.
std::optional<uint32_t> _sender_connection_id;
};

} /// eosio::hotstuff
10 changes: 6 additions & 4 deletions libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ namespace eosio { namespace hotstuff {

uint32_t get_quorum_threshold();

void send_hs_proposal_msg(const hs_proposal_message & msg, name id);
void send_hs_vote_msg(const hs_vote_message & msg, name id);
void send_hs_new_block_msg(const hs_new_block_message & msg, name id);
void send_hs_new_view_msg(const hs_new_view_message & msg, name id);
void send_hs_proposal_msg(const hs_proposal_message & msg, name id, const std::optional<uint32_t>& exclude_peer);
void send_hs_vote_msg(const hs_vote_message & msg, name id, const std::optional<uint32_t>& exclude_peer);
void send_hs_new_block_msg(const hs_new_block_message & msg, name id, const std::optional<uint32_t>& exclude_peer);
void send_hs_new_view_msg(const hs_new_view_message & msg, name id, const std::optional<uint32_t>& exclude_peer);

void send_hs_message_warning(const uint32_t sender_peer, const chain::hs_message_warning code);

std::vector<hotstuff_message> _pending_message_queue;

Expand Down
Loading

0 comments on commit ffa96a6

Please sign in to comment.