Skip to content

Commit

Permalink
Merge pull request #1676 from AntelopeIO/hs_1548_propag_msgs
Browse files Browse the repository at this point in the history
IF: Implement message propagation
  • Loading branch information
fcecin authored Nov 16, 2023
2 parents 1a1bbe3 + c5295b3 commit ad5ba52
Show file tree
Hide file tree
Showing 4 changed files with 406 additions and 33 deletions.
31 changes: 31 additions & 0 deletions libraries/hotstuff/include/eosio/hotstuff/qc_chain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ namespace eosio::hotstuff {
bool quorum_met = false; // not serialized across network
};

struct seen_votes {
fc::sha256 proposal_id; // id of proposal being voted on
uint64_t height; // height of the proposal (for GC)
std::set<fc::crypto::blslib::bls_public_key> finalizers; // finalizers that have voted on the proposal
};

// Concurrency note: qc_chain is a single-threaded and lock-free decision engine.
// All thread synchronization, if any, is external.
class qc_chain {
Expand All @@ -118,6 +124,10 @@ namespace eosio::hotstuff {
void on_hs_vote_msg(const uint32_t connection_id, const hs_vote_message& msg);
void on_hs_proposal_msg(const uint32_t connection_id, const hs_proposal_message& msg);
void on_hs_new_view_msg(const uint32_t connection_id, const hs_new_view_message& msg);

// NOTE: The hotstuff New Block message is not ever propagated (multi-hop) by this method.
// Unit tests do not use network topology emulation for this message.
// The live network does not actually dispatch this message to the wire; this is a local callback.
void on_hs_new_block_msg(const uint32_t connection_id, const hs_new_block_message& msg);

private:
Expand Down Expand Up @@ -240,6 +250,27 @@ namespace eosio::hotstuff {

proposal_store_type _proposal_store; //internal proposals store
#endif

// Possible optimization: merge _proposal_store and _seen_votes_store.
// Store a struct { set<name> seen_votes; hs_proposal_message p; } in the (now single) multi-index.
struct by_seen_votes_proposal_id{};
struct by_seen_votes_proposal_height{};
typedef multi_index_container<
seen_votes,
indexed_by<
hashed_unique<
tag<by_seen_votes_proposal_id>,
BOOST_MULTI_INDEX_MEMBER(seen_votes,fc::sha256,proposal_id)
>,
ordered_non_unique<
tag<by_seen_votes_proposal_height>,
BOOST_MULTI_INDEX_MEMBER(seen_votes,uint64_t,height)
>
>
> seen_votes_store_type;

// given a height, store a map of proposal IDs at that height and the seen votes for it
seen_votes_store_type _seen_votes_store;
};

} /// eosio::hotstuff
60 changes: 49 additions & 11 deletions libraries/hotstuff/qc_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ namespace eosio::hotstuff {
//update internal state
update(proposal);

//propagate this proposal since it was new to us
send_hs_proposal_msg(connection_id, proposal);

for (auto &msg : msgs) {
send_hs_vote_msg( std::nullopt, msg );
}
Expand All @@ -401,23 +404,45 @@ namespace eosio::hotstuff {

bool am_leader = am_i_leader();

if (!am_leader)
return;
fc_tlog(_logger, " === Process vote from ${finalizer_key} : current bitset ${value}" ,
("finalizer_key", vote.finalizer_key)("value", _current_qc.get_active_finalizers_string()));
// only leader need to take action on votes
if (vote.proposal_id != _current_qc.get_proposal_id()) {
send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need
return;
}
if (am_leader) {
if (vote.proposal_id != _current_qc.get_proposal_id()) {
send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need
return;
}
}

const hs_proposal_message *p = get_proposal( vote.proposal_id );
if (p == nullptr) {
fc_elog(_logger, " *** ${id} couldn't find proposal, vote : ${vote}", ("id",_id)("vote", vote));
if (am_leader)
fc_elog(_logger, " *** ${id} couldn't find proposal, vote : ${vote}", ("id",_id)("vote", vote));
send_hs_message_warning(connection_id, hs_message_warning::discarded); // example; to be tuned to actual need
return;
}

// if not leader, check message propagation and quit
if (! am_leader) {
seen_votes_store_type::nth_index<0>::type::iterator itr = _seen_votes_store.get<by_seen_votes_proposal_id>().find( p->proposal_id );
bool propagate = false;
if (itr == _seen_votes_store.get<by_seen_votes_proposal_id>().end()) {
seen_votes sv = { p->proposal_id, p->get_height(), { vote.finalizer_key } };
_seen_votes_store.insert(sv);
propagate = true;
} else {
_seen_votes_store.get<by_seen_votes_proposal_id>().modify(itr, [&](seen_votes& sv) {
if (sv.finalizers.count(vote.finalizer_key) == 0) {
sv.finalizers.insert(vote.finalizer_key);
propagate = true;
}
});
}
if (propagate)
send_hs_vote_msg(connection_id, vote);
return;
}

fc_tlog(_logger, " === Process vote from ${finalizer_key} : current bitset ${value}" ,
("finalizer_key", vote.finalizer_key)("value", _current_qc.get_active_finalizers_string()));

bool quorum_met = _current_qc.is_quorum_met(); //check if quorum already met

// If quorum is already met, we don't need to do anything else. Otherwise, we aggregate the signature.
Expand Down Expand Up @@ -490,6 +515,11 @@ namespace eosio::hotstuff {
auto increment_version = fc::make_scoped_exit([this]() { ++_state_version; });
if (!update_high_qc(quorum_certificate{msg.high_qc, 21})) { // TODO: use active schedule size
increment_version.cancel();
} else {
// Always propagate a view that's newer than ours.
// If it's not newer, then we have already propagated ours.
// If the recipient doesn't think ours is newer, it has already propagated its own, and so on.
send_hs_new_view_msg(connection_id, msg);
}
}

Expand Down Expand Up @@ -662,7 +692,12 @@ namespace eosio::hotstuff {
_b_leaf = _high_qc.get_proposal_id();

fc_tlog(_logger, " === ${id} _b_leaf updated (update_high_qc) : ${proposal_id}", ("proposal_id", _high_qc.get_proposal_id())("id", _id));
return true;

// avoid looping message propagation when receiving a new-view message with a high_qc.get_proposal_id().empty().
// not sure if high_qc.get_proposal_id().empty() + _high_qc.get_proposal_id().empty() is something that actually ever happens in the real world.
// not sure if high_qc.get_proposal_id().empty() should be tested and always rejected (return false + no _high_qc / _b_leaf update).
// if this returns false, we won't update the get_finality_status information, but I don't think we care about that at all.
return !high_qc.get_proposal_id().empty();
} else {
const hs_proposal_message *old_high_qc_prop = get_proposal( _high_qc.get_proposal_id() );
const hs_proposal_message *new_high_qc_prop = get_proposal( high_qc.get_proposal_id() );
Expand Down Expand Up @@ -952,6 +987,9 @@ namespace eosio::hotstuff {
void qc_chain::gc_proposals(uint64_t cutoff){
//fc_tlog(_logger, " === garbage collection on old data");

auto& seen_votes_index = _seen_votes_store.get<by_seen_votes_proposal_height>();
seen_votes_index.erase(seen_votes_index.begin(), seen_votes_index.upper_bound(cutoff));

#ifdef QC_CHAIN_SIMPLE_PROPOSAL_STORE
ps_height_iterator psh_it = _proposal_stores_by_height.begin();
while (psh_it != _proposal_stores_by_height.end()) {
Expand Down
Loading

0 comments on commit ad5ba52

Please sign in to comment.