From dbed17c3b52ad7c9fa4a6ffe638decaddb8b9d51 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 19 Aug 2024 15:36:39 -0500 Subject: [PATCH] GH-583 Track sync time to know when to allow a new start sync --- plugins/net_plugin/net_plugin.cpp | 48 ++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index d64a1c5abe..db5cdf3aef 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -231,6 +231,8 @@ namespace eosio { alignas(hardware_destructive_interference_sz) std::atomic sync_state{in_sync}; + std::atomic sync_timers_active{0}; + std::atomic sync_active_time{}; std::atomic sync_ordinal{0}; // indicate that we have received blocks to catch us up to head, delay sending out handshakes until we have // applied the blocks and our controller head is updated @@ -249,6 +251,7 @@ namespace eosio { void request_next_chunk( const connection_ptr& conn = connection_ptr() ) REQUIRES(sync_mtx); connection_ptr find_next_sync_node(); // call with locked mutex void start_sync( const connection_ptr& c, uint32_t target ); // locks mutex + bool sync_recently_active() const; bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); // locks mutex uint32_t active_sync_fetch_span() const; public: @@ -261,6 +264,8 @@ namespace eosio { bool syncing_from_peer() const { return sync_state == lib_catchup; } bool is_in_sync() const { return sync_state == in_sync; } void sync_reset_lib_num( const connection_ptr& conn, bool closing ); + void sync_timeout(const connection_ptr& c, const boost::system::error_code& ec); + void sync_wait(const connection_ptr& c); void sync_reassign_fetch( const connection_ptr& c ); void rejected_block( const connection_ptr& c, uint32_t blk_num, closing_mode mode ); void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied, @@ -1078,7 +1083,6 @@ namespace eosio { void cancel_sync_wait(); void sync_wait(); - void sync_timeout(boost::system::error_code ec); void queue_write(const std::shared_ptr>& buff, std::function callback, @@ -1942,23 +1946,13 @@ namespace eosio { connection_ptr c(shared_from_this()); fc::lock_guard g( sync_response_expected_timer_mtx ); sync_response_expected_timer.expires_from_now( my_impl->resp_expected_period ); + my_impl->sync_master->sync_wait(c); sync_response_expected_timer.async_wait( boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) { - c->sync_timeout( ec ); + my_impl->sync_master->sync_timeout(c, ec); } ) ); } - // called from connection strand - void connection::sync_timeout( boost::system::error_code ec ) { - if( !ec ) { - peer_dlog(this, "sync timeout"); - my_impl->sync_master->sync_reassign_fetch( shared_from_this() ); - close(true); - } else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy - peer_elog( this, "setting timer for sync request got error ${ec}", ("ec", ec.message()) ); - } - } - // called from connection strand void connection::request_sync_blocks(uint32_t start, uint32_t end) { sync_last_requested_block = end; @@ -2175,6 +2169,7 @@ namespace eosio { sync_last_requested_num = end; sync_source = new_sync_source; request_sent = true; + sync_active_time = std::chrono::steady_clock::now(); new_sync_source->strand.post( [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() { peer_ilog( new_sync_source, "requesting range ${s} to ${e}, fhead ${h}, lib ${lib}", ("s", start)("e", end)("h", fork_head_num)("lib", lib) ); new_sync_source->request_sync_blocks( start, end ); @@ -2220,7 +2215,7 @@ namespace eosio { return; } - if( sync_state != lib_catchup ) { + if( sync_state != lib_catchup || !sync_recently_active()) { set_state( lib_catchup ); sync_last_requested_num = 0; sync_next_expected_num = chain_info.lib_num + 1; @@ -2232,6 +2227,31 @@ namespace eosio { request_next_chunk( c ); } + // thread safe + bool sync_manager::sync_recently_active() const { + return std::chrono::steady_clock::now() - sync_active_time.load() < my_impl->resp_expected_period; + } + + // called from connection strand + void sync_manager::sync_wait(const connection_ptr& c) { + ++sync_timers_active; + sync_active_time = std::chrono::steady_clock::now(); // reset when we receive a block + peer_dlog(c, "sync wait, active_timers ${t}", ("t", sync_timers_active.load())); + } + + // called from connection strand + void sync_manager::sync_timeout(const connection_ptr& c, const boost::system::error_code& ec) { + if( !ec ) { + peer_dlog(c, "sync timeout"); + sync_reassign_fetch( c ); + close(true); + } else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy + peer_elog( c, "setting timer for sync request got error ${ec}", ("ec", ec.message()) ); + } + --sync_timers_active; + peer_dlog(c, "sync timeout, active_timers ${t}", ("t", sync_timers_active.load())); + } + // called from connection strand void sync_manager::sync_reassign_fetch(const connection_ptr& c) { fc::unique_lock g( sync_mtx );