Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0 -> main] P2P: Track sync time to know when to allow a new start sync #593

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ namespace eosio {

alignas(hardware_destructive_interference_sz)
std::atomic<stages> sync_state{in_sync};
std::atomic<int32_t> sync_timers_active{0};
std::atomic<std::chrono::steady_clock::time_point> sync_active_time{};
std::atomic<uint32_t> sync_ordinal{0};
// indicate that we have received blocks to catch us up to head, delay sending out handshakes until we have
// applied the blocks and our controller head is updated
Expand All @@ -249,6 +251,7 @@ namespace eosio {
void request_next_chunk( const connection_ptr& conn = connection_ptr() ) REQUIRES(sync_mtx);
connection_ptr find_next_sync_node(); // call with locked mutex
void start_sync( const connection_ptr& c, uint32_t target ); // locks mutex
bool sync_recently_active() const;
bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); // locks mutex
uint32_t active_sync_fetch_span() const;
public:
Expand All @@ -261,6 +264,8 @@ namespace eosio {
bool syncing_from_peer() const { return sync_state == lib_catchup; }
bool is_in_sync() const { return sync_state == in_sync; }
void sync_reset_lib_num( const connection_ptr& conn, bool closing );
void sync_timeout(const connection_ptr& c, const boost::system::error_code& ec);
void sync_wait(const connection_ptr& c);
void sync_reassign_fetch( const connection_ptr& c );
void rejected_block( const connection_ptr& c, uint32_t blk_num, closing_mode mode );
void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied,
Expand Down Expand Up @@ -1078,7 +1083,6 @@ namespace eosio {

void cancel_sync_wait();
void sync_wait();
void sync_timeout(boost::system::error_code ec);

void queue_write(const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback,
Expand Down Expand Up @@ -1942,23 +1946,13 @@ namespace eosio {
connection_ptr c(shared_from_this());
fc::lock_guard g( sync_response_expected_timer_mtx );
sync_response_expected_timer.expires_from_now( my_impl->resp_expected_period );
my_impl->sync_master->sync_wait(c);
sync_response_expected_timer.async_wait(
boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
c->sync_timeout( ec );
my_impl->sync_master->sync_timeout(c, ec);
} ) );
}

// called from connection strand
void connection::sync_timeout( boost::system::error_code ec ) {
if( !ec ) {
peer_dlog(this, "sync timeout");
my_impl->sync_master->sync_reassign_fetch( shared_from_this() );
close(true);
} else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
peer_elog( this, "setting timer for sync request got error ${ec}", ("ec", ec.message()) );
}
}

// called from connection strand
void connection::request_sync_blocks(uint32_t start, uint32_t end) {
sync_last_requested_block = end;
Expand Down Expand Up @@ -2175,6 +2169,7 @@ namespace eosio {
sync_last_requested_num = end;
sync_source = new_sync_source;
request_sent = true;
sync_active_time = std::chrono::steady_clock::now();
new_sync_source->strand.post( [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}, fhead ${h}, lib ${lib}", ("s", start)("e", end)("h", fork_head_num)("lib", lib) );
new_sync_source->request_sync_blocks( start, end );
Expand Down Expand Up @@ -2220,7 +2215,7 @@ namespace eosio {
return;
}

if( sync_state != lib_catchup ) {
if( sync_state != lib_catchup || !sync_recently_active()) {
set_state( lib_catchup );
sync_last_requested_num = 0;
sync_next_expected_num = chain_info.lib_num + 1;
Expand All @@ -2234,6 +2229,31 @@ namespace eosio {
request_next_chunk( c );
}

// thread safe
bool sync_manager::sync_recently_active() const {
return std::chrono::steady_clock::now() - sync_active_time.load() < my_impl->resp_expected_period;
}

// called from connection strand
void sync_manager::sync_wait(const connection_ptr& c) {
++sync_timers_active;
sync_active_time = std::chrono::steady_clock::now(); // reset when we receive a block
peer_dlog(c, "sync wait, active_timers ${t}", ("t", sync_timers_active.load()));
}

// called from connection strand
void sync_manager::sync_timeout(const connection_ptr& c, const boost::system::error_code& ec) {
if( !ec ) {
peer_dlog(c, "sync timeout");
sync_reassign_fetch( c );
close(true);
} else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
peer_elog( c, "setting timer for sync request got error ${ec}", ("ec", ec.message()) );
}
--sync_timers_active;
peer_dlog(c, "sync timeout, active_timers ${t}", ("t", sync_timers_active.load()));
}

// called from connection strand
void sync_manager::sync_reassign_fetch(const connection_ptr& c) {
fc::unique_lock g( sync_mtx );
Expand Down