Skip to content

Commit

Permalink
Merge pull request #1811 from AntelopeIO/GH-1808-sync-5.0
Browse files Browse the repository at this point in the history
[5.0] P2P: Throttle over sync window
  • Loading branch information
heifner authored Oct 24, 2023
2 parents 5871209 + 4b5d4e7 commit def1432
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
32 changes: 25 additions & 7 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ namespace eosio {
size_t get_bytes_sent() const { return bytes_sent.load(); }
std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); }
size_t get_block_sync_bytes_received() const { return block_sync_bytes_received.load(); }
size_t get_block_sync_bytes_sent() const { return block_sync_bytes_sent.load(); }
size_t get_block_sync_bytes_sent() const { return block_sync_total_bytes_sent.load(); }
bool get_block_sync_throttling() const { return block_sync_throttling.load(); }
boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
Expand Down Expand Up @@ -891,7 +891,9 @@ namespace eosio {
std::atomic<std::chrono::nanoseconds> last_bytes_received{0ns};
std::atomic<size_t> bytes_sent{0};
std::atomic<size_t> block_sync_bytes_received{0};
std::atomic<size_t> block_sync_bytes_sent{0};
std::atomic<size_t> block_sync_total_bytes_sent{0};
std::chrono::nanoseconds block_sync_send_start{0ns}; // start of enqueue blocks
size_t block_sync_frame_bytes_sent{0}; // bytes sent in this set of enqueue blocks
std::atomic<bool> block_sync_throttling{false};
std::atomic<std::chrono::nanoseconds> last_bytes_sent{0ns};
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};
Expand Down Expand Up @@ -1460,6 +1462,9 @@ namespace eosio {
latest_msg_time = std::chrono::system_clock::time_point::min();
latest_blk_time = std::chrono::system_clock::time_point::min();
set_state(connection_state::closed);
block_sync_send_start = 0ns;
block_sync_frame_bytes_sent = 0;
block_sync_throttling = false;

if( reconnect && !shutdown ) {
my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ),
Expand Down Expand Up @@ -1740,25 +1745,38 @@ namespace eosio {
} FC_LOG_AND_DROP();
if( sb ) {
// Skip transmitting block this loop if threshold exceeded
if( block_sync_rate_limit > 0 && peer_syncing_from_us ) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(get_time() - connection_start_time);
auto current_rate = double(block_sync_bytes_sent) / elapsed.count();
if( current_rate >= block_sync_rate_limit ) {
if (block_sync_send_start == 0ns) { // start of enqueue blocks
block_sync_send_start = get_time();
block_sync_frame_bytes_sent = 0;
}
if( block_sync_rate_limit > 0 && block_sync_frame_bytes_sent > 0 && peer_syncing_from_us ) {
auto now = get_time();
auto elapsed_us = std::chrono::duration_cast<std::chrono::microseconds>(now - block_sync_send_start);
double current_rate_sec = (double(block_sync_frame_bytes_sent) / elapsed_us.count()) * 100000; // convert from bytes/us => bytes/sec
peer_dlog(this, "start enqueue block time ${st}, now ${t}, elapsed ${e}, rate ${r}, limit ${l}",
("st", block_sync_send_start.count())("t", now.count())("e", elapsed_us.count())("r", current_rate_sec)("l", block_sync_rate_limit));
if( current_rate_sec >= block_sync_rate_limit ) {
block_sync_throttling = true;
peer_dlog( this, "throttling block sync to peer ${host}:${port}", ("host", log_remote_endpoint_ip)("port", log_remote_endpoint_port));
return false;
}
}
block_sync_throttling = false;
block_sync_bytes_sent += enqueue_block( sb, true );
auto sent = enqueue_block( sb, true );
block_sync_total_bytes_sent += sent;
block_sync_frame_bytes_sent += sent;
++peer_requested->last;
if(num == peer_requested->end_block) {
peer_requested.reset();
block_sync_send_start = 0ns;
block_sync_frame_bytes_sent = 0;
peer_dlog( this, "completing enqueue_sync_block ${num}", ("num", num) );
}
} else {
peer_ilog( this, "enqueue sync, unable to fetch block ${num}, sending benign_other go away", ("num", num) );
peer_requested.reset(); // unable to provide requested blocks
block_sync_send_start = 0ns;
block_sync_frame_bytes_sent = 0;
no_retry = benign_other;
enqueue( go_away_message( benign_other ) );
}
Expand Down
15 changes: 8 additions & 7 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
appArgs.add(flag='--plugin',action='append',type=str,help='Run nodes with additional plugins')
appArgs.add(flag='--connection-cleanup-period',type=int,help='Interval in whole seconds to run the connection reaper and metric collection')

args=TestHelper.parse_args({"-p","-d","--keep-logs","--prod-count"
args=TestHelper.parse_args({"-d","--keep-logs"
,"--dump-error-details","-v","--leave-running"
,"--unshared"},
applicationSpecificArgs=appArgs)
pnodes=args.p
pnodes=1
delay=args.d
debug=args.v
prod_count = args.prod_count
prod_count = 2
total_nodes=4
dumpErrorDetails=args.dump_error_details

Expand Down Expand Up @@ -106,10 +106,11 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
throttlingNode = cluster.unstartedNodes[0]
i = throttlingNode.cmd.index('--p2p-listen-endpoint')
throttleListenAddr = throttlingNode.cmd[i+1]
# Using 4000 bytes per second to allow syncing of ~250 transaction blocks resulting from
# the trx generators in a reasonable amount of time, while still being able to capture
# Using 40 Kilobytes per second to allow syncing of ~250 transaction blocks at ~175 bytes per transaction
# (250*175=43750 per block or 87500 per second)
# resulting from the trx generators in a reasonable amount of time, while still being able to capture
# throttling state within the Prometheus update window (3 seconds in this test).
throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':4000B/s'
throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':40KB/s'
throttleListenIP, throttleListenPort = throttleListenAddr.split(':')
throttlingNode.cmd.append('--p2p-listen-endpoint')
throttlingNode.cmd.append(f'{throttleListenIP}:{int(throttleListenPort)+100}:1TB/s')
Expand Down Expand Up @@ -213,7 +214,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
if throttledState:
wasThrottled = True
break
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=30), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out'
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=90), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out'
endThrottledSync = time.time()
response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttled Node End State')
Expand Down

0 comments on commit def1432

Please sign in to comment.