diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 9b3ed88405..154d5c23f3 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -852,7 +852,7 @@ namespace eosio { size_t get_bytes_sent() const { return bytes_sent.load(); } std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); } size_t get_block_sync_bytes_received() const { return block_sync_bytes_received.load(); } - size_t get_block_sync_bytes_sent() const { return block_sync_bytes_sent.load(); } + size_t get_block_sync_bytes_sent() const { return block_sync_total_bytes_sent.load(); } bool get_block_sync_throttling() const { return block_sync_throttling.load(); } boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); } void set_heartbeat_timeout(std::chrono::milliseconds msec) { @@ -891,7 +891,9 @@ namespace eosio { std::atomic last_bytes_received{0ns}; std::atomic bytes_sent{0}; std::atomic block_sync_bytes_received{0}; - std::atomic block_sync_bytes_sent{0}; + std::atomic block_sync_total_bytes_sent{0}; + std::chrono::nanoseconds block_sync_send_start{0ns}; // start of enqueue blocks + size_t block_sync_frame_bytes_sent{0}; // bytes sent in this set of enqueue blocks std::atomic block_sync_throttling{false}; std::atomic last_bytes_sent{0ns}; std::atomic remote_endpoint_port{0}; @@ -1460,6 +1462,9 @@ namespace eosio { latest_msg_time = std::chrono::system_clock::time_point::min(); latest_blk_time = std::chrono::system_clock::time_point::min(); set_state(connection_state::closed); + block_sync_send_start = 0ns; + block_sync_frame_bytes_sent = 0; + block_sync_throttling = false; if( reconnect && !shutdown ) { my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), @@ -1740,25 +1745,38 @@ namespace eosio { } FC_LOG_AND_DROP(); if( sb ) { // Skip transmitting block this loop if threshold exceeded - if( block_sync_rate_limit > 0 && peer_syncing_from_us ) { - auto elapsed = std::chrono::duration_cast(get_time() - connection_start_time); - auto current_rate = double(block_sync_bytes_sent) / elapsed.count(); - if( current_rate >= block_sync_rate_limit ) { + if (block_sync_send_start == 0ns) { // start of enqueue blocks + block_sync_send_start = get_time(); + block_sync_frame_bytes_sent = 0; + } + if( block_sync_rate_limit > 0 && block_sync_frame_bytes_sent > 0 && peer_syncing_from_us ) { + auto now = get_time(); + auto elapsed_us = std::chrono::duration_cast(now - block_sync_send_start); + double current_rate_sec = (double(block_sync_frame_bytes_sent) / elapsed_us.count()) * 100000; // convert from bytes/us => bytes/sec + peer_dlog(this, "start enqueue block time ${st}, now ${t}, elapsed ${e}, rate ${r}, limit ${l}", + ("st", block_sync_send_start.count())("t", now.count())("e", elapsed_us.count())("r", current_rate_sec)("l", block_sync_rate_limit)); + if( current_rate_sec >= block_sync_rate_limit ) { block_sync_throttling = true; peer_dlog( this, "throttling block sync to peer ${host}:${port}", ("host", log_remote_endpoint_ip)("port", log_remote_endpoint_port)); return false; } } block_sync_throttling = false; - block_sync_bytes_sent += enqueue_block( sb, true ); + auto sent = enqueue_block( sb, true ); + block_sync_total_bytes_sent += sent; + block_sync_frame_bytes_sent += sent; ++peer_requested->last; if(num == peer_requested->end_block) { peer_requested.reset(); + block_sync_send_start = 0ns; + block_sync_frame_bytes_sent = 0; peer_dlog( this, "completing enqueue_sync_block ${num}", ("num", num) ); } } else { peer_ilog( this, "enqueue sync, unable to fetch block ${num}, sending benign_other go away", ("num", num) ); peer_requested.reset(); // unable to provide requested blocks + block_sync_send_start = 0ns; + block_sync_frame_bytes_sent = 0; no_retry = benign_other; enqueue( go_away_message( benign_other ) ); } diff --git a/tests/p2p_sync_throttle_test.py b/tests/p2p_sync_throttle_test.py index 9205b81086..421d411243 100755 --- a/tests/p2p_sync_throttle_test.py +++ b/tests/p2p_sync_throttle_test.py @@ -24,14 +24,14 @@ appArgs.add(flag='--plugin',action='append',type=str,help='Run nodes with additional plugins') appArgs.add(flag='--connection-cleanup-period',type=int,help='Interval in whole seconds to run the connection reaper and metric collection') -args=TestHelper.parse_args({"-p","-d","--keep-logs","--prod-count" +args=TestHelper.parse_args({"-d","--keep-logs" ,"--dump-error-details","-v","--leave-running" ,"--unshared"}, applicationSpecificArgs=appArgs) -pnodes=args.p +pnodes=1 delay=args.d debug=args.v -prod_count = args.prod_count +prod_count = 2 total_nodes=4 dumpErrorDetails=args.dump_error_details @@ -106,10 +106,11 @@ def extractPrometheusMetric(connID: str, metric: str, text: str): throttlingNode = cluster.unstartedNodes[0] i = throttlingNode.cmd.index('--p2p-listen-endpoint') throttleListenAddr = throttlingNode.cmd[i+1] - # Using 4000 bytes per second to allow syncing of ~250 transaction blocks resulting from - # the trx generators in a reasonable amount of time, while still being able to capture + # Using 40 Kilobytes per second to allow syncing of ~250 transaction blocks at ~175 bytes per transaction + # (250*175=43750 per block or 87500 per second) + # resulting from the trx generators in a reasonable amount of time, while still being able to capture # throttling state within the Prometheus update window (3 seconds in this test). - throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':4000B/s' + throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':40KB/s' throttleListenIP, throttleListenPort = throttleListenAddr.split(':') throttlingNode.cmd.append('--p2p-listen-endpoint') throttlingNode.cmd.append(f'{throttleListenIP}:{int(throttleListenPort)+100}:1TB/s') @@ -213,7 +214,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str): if throttledState: wasThrottled = True break - assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=30), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out' + assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=90), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out' endThrottledSync = time.time() response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode() Print('Throttled Node End State')