Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IF: Hotstuff message propagation/gossip #1583

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libraries/chain/include/eosio/chain/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ namespace eosio::chain {
quorum_certificate_message high_qc; //justification
};

using hs_message = std::variant<hs_vote_message, hs_proposal_message, hs_new_block_message, hs_new_view_message>;
using hs_message_inner = std::variant<hs_vote_message, hs_proposal_message, hs_new_block_message, hs_new_view_message>;

struct hs_message {
std::optional<uint32_t> connection_id;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be added to the message. This requires the hs_message_inner to be copied. Better to pass it along as extra params.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will do that.

hs_message_inner message;
};

struct finalizer_state {
bool chained_mode = false;
Expand Down
43 changes: 22 additions & 21 deletions libraries/hotstuff/chain_pacemaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,65 +292,66 @@ namespace eosio { namespace hotstuff {
prof.core_out();
}

void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional<uint32_t>& connection_id) {
bcast_hs_message( { connection_id, msg } );
}

void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional<uint32_t>& connection_id) {
bcast_hs_message( { connection_id, msg } );
}

void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional<uint32_t>& connection_id) {
bcast_hs_message( { connection_id, msg } );
}

void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) {
bcast_hs_message(msg);
void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional<uint32_t>& connection_id) {
bcast_hs_message( { connection_id, msg } );
}

// called from net threads
void chain_pacemaker::on_hs_msg(const eosio::chain::hs_message &msg) {
uint32_t connection_id = msg.connection_id.value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would throw an exception if the connection_id optional does not contain a value. It is not clear to me when we expect the connection_id to contain a value and when not, and what is the benefit of including it in the hs_message rather than passing it as a separate parameter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change/fix.

std::visit(overloaded{
[this](const hs_vote_message& m) { on_hs_vote_msg(m); },
[this](const hs_proposal_message& m) { on_hs_proposal_msg(m); },
[this](const hs_new_block_message& m) { on_hs_new_block_msg(m); },
[this](const hs_new_view_message& m) { on_hs_new_view_msg(m); },
}, msg);
[this, connection_id](const hs_vote_message& m) { on_hs_vote_msg(connection_id, m); },
[this, connection_id](const hs_proposal_message& m) { on_hs_proposal_msg(connection_id, m); },
[this, connection_id](const hs_new_block_message& m) { on_hs_new_block_msg(connection_id, m); },
[this, connection_id](const hs_new_view_message& m) { on_hs_new_view_msg(connection_id, m); },
Comment on lines +315 to +318
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we just call _qc_chain directly here, instead of going through these 4 almost identical functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to refactor this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed when we decide to delete the "core profiling" code.

}, msg.message);
}

// called from net threads
void chain_pacemaker::on_hs_proposal_msg(const hs_proposal_message& msg) {
void chain_pacemaker::on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg) {
csc prof("prop");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_proposal_msg(msg);
_qc_chain.on_hs_proposal_msg(msg, std::make_optional<uint32_t>(connection_id));
prof.core_out();
}

// called from net threads
void chain_pacemaker::on_hs_vote_msg(const hs_vote_message& msg) {
void chain_pacemaker::on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg) {
csc prof("vote");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_vote_msg(msg);
_qc_chain.on_hs_vote_msg(msg, std::make_optional<uint32_t>(connection_id));
prof.core_out();
}

// called from net threads
void chain_pacemaker::on_hs_new_block_msg(const hs_new_block_message& msg) {
void chain_pacemaker::on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg) {
csc prof("nblk");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_new_block_msg(msg);
_qc_chain.on_hs_new_block_msg(msg, std::make_optional<uint32_t>(connection_id));
prof.core_out();
}

// called from net threads
void chain_pacemaker::on_hs_new_view_msg(const hs_new_view_message& msg) {
void chain_pacemaker::on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg) {
csc prof("view");
std::lock_guard g( _hotstuff_global_mutex );
prof.core_in();
_qc_chain.on_hs_new_view_msg(msg);
_qc_chain.on_hs_new_view_msg(msg, std::make_optional<uint32_t>(connection_id));
prof.core_out();
}

Expand Down
8 changes: 4 additions & 4 deletions libraries/hotstuff/include/eosio/hotstuff/base_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ namespace eosio::hotstuff {
virtual std::vector<chain::name> get_finalizers() = 0;

//outbound communications; 'id' is the producer name (can be ignored if/when irrelevant to the implementer)
virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id) = 0;
virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id) = 0;
virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id) = 0;
virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id) = 0;
virtual void send_hs_proposal_msg(const chain::hs_proposal_message& msg, chain::name id, const std::optional<uint32_t>& connection_id = std::nullopt) = 0;
virtual void send_hs_vote_msg(const chain::hs_vote_message& msg, chain::name id, const std::optional<uint32_t>& connection_id = std::nullopt) = 0;
virtual void send_hs_new_view_msg(const chain::hs_new_view_message& msg, chain::name id, const std::optional<uint32_t>& connection_id = std::nullopt) = 0;
virtual void send_hs_new_block_msg(const chain::hs_new_block_message& msg, chain::name id, const std::optional<uint32_t>& connection_id = std::nullopt) = 0;
};

} // namespace eosio::hotstuff
16 changes: 8 additions & 8 deletions libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ namespace eosio::hotstuff {

uint32_t get_quorum_threshold();

void send_hs_proposal_msg(const hs_proposal_message& msg, name id);
void send_hs_vote_msg(const hs_vote_message& msg, name id);
void send_hs_new_view_msg(const hs_new_view_message& msg, name id);
void send_hs_new_block_msg(const hs_new_block_message& msg, name id);
void send_hs_proposal_msg(const hs_proposal_message& msg, name id, const std::optional<uint32_t>& connection_id);
void send_hs_vote_msg(const hs_vote_message& msg, name id, const std::optional<uint32_t>& connection_id);
void send_hs_new_view_msg(const hs_new_view_message& msg, name id, const std::optional<uint32_t>& connection_id);
void send_hs_new_block_msg(const hs_new_block_message& msg, name id, const std::optional<uint32_t>& connection_id);

private:
void on_accepted_block( const block_state_ptr& blk );

void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler
void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler
void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg); //consensus msg event handler
void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg); //confirmation msg event handler
void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg); //new block msg event handler
private:

//FIXME/REMOVE: for testing/debugging only
Expand Down
25 changes: 13 additions & 12 deletions libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ namespace eosio::hotstuff {

void on_beat(); //handler for pacemaker beat()

void on_hs_vote_msg(const hs_vote_message& msg); //vote msg event handler
void on_hs_proposal_msg(const hs_proposal_message& msg); //proposal msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler
void on_hs_vote_msg(const hs_vote_message& msg, const std::optional<uint32_t>& connection_id = std::nullopt); //vote msg event handler
void on_hs_proposal_msg(const hs_proposal_message& msg, const std::optional<uint32_t>& connection_id = std::nullopt); //proposal msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg, const std::optional<uint32_t>& connection_id = std::nullopt); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg, const std::optional<uint32_t>& connection_id = std::nullopt); //new block msg event handler

private:

Expand Down Expand Up @@ -139,10 +139,10 @@ namespace eosio::hotstuff {
bool am_i_leader(); //check if I am the current leader
bool am_i_finalizer(); //check if I am one of the current finalizers

void process_proposal(const hs_proposal_message& msg); //handles proposal
void process_vote(const hs_vote_message& msg); //handles vote
void process_new_view(const hs_new_view_message& msg); //handles new view
void process_new_block(const hs_new_block_message& msg); //handles new block
void process_proposal(const hs_proposal_message& msg, bool is_loopback = false); //handles proposal
void process_vote(const hs_vote_message& msg, bool is_loopback = false); //handles vote
void process_new_view(const hs_new_view_message& msg, bool is_loopback = false); //handles new view
void process_new_block(const hs_new_block_message& msg, bool is_loopback = false); //handles new block

hs_vote_message sign_proposal(const hs_proposal_message& proposal, name finalizer); //sign proposal

Expand All @@ -156,10 +156,10 @@ namespace eosio::hotstuff {

std::vector<hs_proposal_message> get_qc_chain(const fc::sha256& proposal_id); //get 3-phase proposal justification

void send_hs_proposal_msg(const hs_proposal_message& msg); //send vote msg
void send_hs_vote_msg(const hs_vote_message& msg); //send proposal msg
void send_hs_new_view_msg(const hs_new_view_message& msg); //send new view msg
void send_hs_new_block_msg(const hs_new_block_message& msg); //send new block msg
void send_hs_proposal_msg(const hs_proposal_message& msg, bool is_forwarding = false); //send vote msg
void send_hs_vote_msg(const hs_vote_message& msg, bool is_forwarding = false); //send proposal msg
void send_hs_new_view_msg(const hs_new_view_message& msg, bool is_forwarding = false); //send new view msg
void send_hs_new_block_msg(const hs_new_block_message& msg, bool is_forwarding = false); //send new block msg

void update(const hs_proposal_message& proposal); //update internal state
void commit(const hs_proposal_message& proposal); //commit proposal (finality)
Expand Down Expand Up @@ -232,6 +232,7 @@ namespace eosio::hotstuff {
proposal_store_type _proposal_store; //internal proposals store
#endif

std::optional<uint32_t> _connection_id; // last net_plugin sender
};

} /// eosio::hotstuff
8 changes: 4 additions & 4 deletions libraries/hotstuff/include/eosio/hotstuff/test_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ namespace eosio { namespace hotstuff {

uint32_t get_quorum_threshold();

void send_hs_proposal_msg(const hs_proposal_message & msg, name id);
void send_hs_vote_msg(const hs_vote_message & msg, name id);
void send_hs_new_block_msg(const hs_new_block_message & msg, name id);
void send_hs_new_view_msg(const hs_new_view_message & msg, name id);
void send_hs_proposal_msg(const hs_proposal_message & msg, name id, const std::optional<uint32_t>& connection_id);
void send_hs_vote_msg(const hs_vote_message & msg, name id, const std::optional<uint32_t>& connection_id);
void send_hs_new_block_msg(const hs_new_block_message & msg, name id, const std::optional<uint32_t>& connection_id);
void send_hs_new_view_msg(const hs_new_view_message & msg, name id, const std::optional<uint32_t>& connection_id);

std::vector<hotstuff_message> _pending_message_queue;

Expand Down
71 changes: 52 additions & 19 deletions libraries/hotstuff/qc_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ namespace eosio::hotstuff {
return v_msg;
}

void qc_chain::process_proposal(const hs_proposal_message & proposal){
void qc_chain::process_proposal(const hs_proposal_message & proposal, bool is_loopback){

//auto start = fc::time_point::now();

Expand Down Expand Up @@ -412,6 +412,13 @@ namespace eosio::hotstuff {
//update internal state
update(proposal);

// Conditional propagation of proposal messages: we only rebroadcast proposal
// messages that have been considered new and relevant to this node; do not
// rebroadcast if loopback (receiving our own proposal message).
if (!is_loopback) {
send_hs_proposal_msg(proposal, true);
}

for (auto &msg : msgs) {
send_hs_vote_msg(msg);
}
Expand All @@ -423,16 +430,24 @@ namespace eosio::hotstuff {
//fc_dlog(_logger, " ... process_proposal() total time : ${total_time}", ("total_time", total_time));
}

void qc_chain::process_vote(const hs_vote_message & vote){
void qc_chain::process_vote(const hs_vote_message & vote, bool is_loopback){

//auto start = fc::time_point::now();
#warning check for duplicate or invalid vote. We will return in either case, but keep proposals for evidence of double signing
//TODO: check for duplicate or invalid vote. We will return in either case, but keep proposals for evidence of double signing

bool am_leader = am_i_leader();

if (!am_leader)
if (!am_leader) {
// Unconditional propagation of vote messages: we rebroadcast all votes,
// unless we are the leader, which is the single intended recipient.
// Do not rebroadcast if loopback (receiving our own vote message).
if (!is_loopback) {
send_hs_vote_msg(vote, true);
}
return;
}

fc_tlog(_logger, " === Process vote from ${finalizer} : current bitset ${value}" ,
("finalizer", vote.finalizer)("value", _current_qc.get_active_finalizers_string()));
// only leader need to take action on votes
Expand Down Expand Up @@ -504,21 +519,31 @@ namespace eosio::hotstuff {
//fc_tlog(_logger, " ... process_vote() total time : ${total_time}", ("total_time", total_time));
}

void qc_chain::process_new_view(const hs_new_view_message & msg){
void qc_chain::process_new_view(const hs_new_view_message & msg, bool is_loopback){
fc_tlog(_logger, " === ${id} process_new_view === ${qc}", ("qc", msg.high_qc)("id", _id));

// Unconditional propagation of new-view messages.
if (!is_loopback) { // note: is_loopback==true may never happen here
send_hs_new_view_msg(msg, true);
}

auto increment_version = fc::make_scoped_exit([this]() { ++_state_version; });
if (!update_high_qc(quorum_certificate{msg.high_qc})) {
increment_version.cancel();
}
}

void qc_chain::process_new_block(const hs_new_block_message & msg){
void qc_chain::process_new_block(const hs_new_block_message & msg, bool is_loopback){

// If I'm not a leader, I probably don't care about hs-new-block messages.
#warning check for a need to gossip/rebroadcast even if it's not for us (maybe here, maybe somewhere else).
// TODO: check for a need to gossip/rebroadcast even if it's not for us (maybe here, maybe somewhere else).
if (! am_i_leader()) {
fc_tlog(_logger, " === ${id} process_new_block === discarding because I'm not the leader; block_id : ${bid}, justify : ${just}", ("bid", msg.block_id)("just", msg.justify)("id", _id));
// Unconditional propagation of new-block messages (if not the leader, which is the final recipient).
if (!is_loopback) {
send_hs_new_block_msg(msg, true); // note: is_loopback==true may never happen here
}
return;
}

Expand Down Expand Up @@ -568,26 +593,28 @@ namespace eosio::hotstuff {
}
}

void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg){
void qc_chain::send_hs_proposal_msg(const hs_proposal_message & msg, bool is_forwarding){
fc_tlog(_logger, " === broadcast_hs_proposal ===");
_pacemaker->send_hs_proposal_msg(msg, _id);
process_proposal(msg);
_pacemaker->send_hs_proposal_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt);
if (!is_forwarding)
process_proposal(msg, true);
}

void qc_chain::send_hs_vote_msg(const hs_vote_message & msg){
void qc_chain::send_hs_vote_msg(const hs_vote_message & msg, bool is_forwarding){
fc_tlog(_logger, " === broadcast_hs_vote ===");
_pacemaker->send_hs_vote_msg(msg, _id);
process_vote(msg);
_pacemaker->send_hs_vote_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt);
if (!is_forwarding)
process_vote(msg, true);
}

void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg){
void qc_chain::send_hs_new_view_msg(const hs_new_view_message & msg, bool is_forwarding){
fc_tlog(_logger, " === broadcast_hs_new_view ===");
_pacemaker->send_hs_new_view_msg(msg, _id);
_pacemaker->send_hs_new_view_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt);
}

void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg){
void qc_chain::send_hs_new_block_msg(const hs_new_block_message & msg, bool is_forwarding){
fc_tlog(_logger, " === broadcast_hs_new_block ===");
_pacemaker->send_hs_new_block_msg(msg, _id);
_pacemaker->send_hs_new_block_msg(msg, _id, is_forwarding ? _connection_id : std::nullopt);
}

//extends predicate
Expand Down Expand Up @@ -626,6 +653,8 @@ namespace eosio::hotstuff {
// Called from the main application thread
void qc_chain::on_beat(){

_connection_id = std::nullopt;

// Non-proposing leaders do not care about on_beat(), because leaders react to a block proposal
// which comes from processing an incoming new block message from a proposer instead.
// on_beat() is called by the pacemaker, which decides when it's time to check whether we are
Expand Down Expand Up @@ -830,22 +859,26 @@ namespace eosio::hotstuff {
}

//on proposal received, called from network thread
void qc_chain::on_hs_proposal_msg(const hs_proposal_message& msg) {
void qc_chain::on_hs_proposal_msg(const hs_proposal_message& msg, const std::optional<uint32_t>& connection_id) {
_connection_id = connection_id;
process_proposal(msg);
}

//on vote received, called from network thread
void qc_chain::on_hs_vote_msg(const hs_vote_message& msg) {
void qc_chain::on_hs_vote_msg(const hs_vote_message& msg, const std::optional<uint32_t>& connection_id) {
_connection_id = connection_id;
process_vote(msg);
}

//on new view received, called from network thread
void qc_chain::on_hs_new_view_msg(const hs_new_view_message& msg) {
void qc_chain::on_hs_new_view_msg(const hs_new_view_message& msg, const std::optional<uint32_t>& connection_id) {
_connection_id = connection_id;
process_new_view(msg);
}

//on new block received, called from network thread
void qc_chain::on_hs_new_block_msg(const hs_new_block_message& msg) {
void qc_chain::on_hs_new_block_msg(const hs_new_block_message& msg, const std::optional<uint32_t>& connection_id) {
_connection_id = connection_id;
process_new_block(msg);
}

Expand Down
Loading