diff --git a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp index b5122f80aa..d88c47729e 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp @@ -145,7 +145,7 @@ class bp_connection_manager { // Only called from connection strand std::size_t num_established_clients() const { uint32_t num_clients = 0; - self()->connections.for_each_connection([&num_clients](auto&& conn) { + self()->connections.for_each_connection([&num_clients](const std::shared_ptr& conn) { if (established_client_connection(conn)) { ++num_clients; } @@ -157,7 +157,7 @@ class bp_connection_manager { // Only called from connection strand // This should only be called after the first handshake message is received to check if an incoming connection // has exceeded the pre-configured max_client_count limit. - bool exceeding_connection_limit(Connection* new_connection) const { + bool exceeding_connection_limit(std::shared_ptr new_connection) const { return auto_bp_peering_enabled() && self()->connections.get_max_client_count() != 0 && established_client_connection(new_connection) && num_established_clients() > self()->connections.get_max_client_count(); } @@ -182,7 +182,7 @@ class bp_connection_manager { fc_dlog(self()->get_logger(), "pending_downstream_neighbors: ${pending_downstream_neighbors}", ("pending_downstream_neighbors", to_string(pending_downstream_neighbors))); - for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor], *self()->p2p_addresses.begin() ); } + for (auto neighbor : pending_downstream_neighbors) { self()->connections.resolve_and_connect(config.bp_peer_addresses[neighbor], self()->get_first_p2p_address() ); } pending_neighbors = std::move(pending_downstream_neighbors); finder.add_upstream_neighbors(pending_neighbors); diff --git a/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp b/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp index d0c482e5b1..1548006803 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp @@ -54,6 +54,9 @@ namespace eosio { std::chrono::nanoseconds last_bytes_received{0}; size_t bytes_sent{0}; std::chrono::nanoseconds last_bytes_sent{0}; + size_t block_sync_bytes_received{0}; + size_t block_sync_bytes_sent{0}; + bool block_sync_throttling{false}; std::chrono::nanoseconds connection_start_time{0}; std::string log_p2p_address; }; diff --git a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp index 5ca2ba1456..d37fdbc18d 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp @@ -16,9 +16,11 @@ namespace eosio { // Longest domain name is 253 characters according to wikipedia. // Addresses include ":port" where max port is 65535, which adds 6 chars. + // Addresses may also include ":bitrate" with suffix and separators, which adds 30 chars, + // for the maximum comma-separated value that fits in a size_t expressed in decimal plus a suffix. // We also add our own extentions of "[:trx|:blk] - xxxxxxx", which adds 14 chars, total= 273. // Allow for future extentions as well, hence 384. - constexpr size_t max_p2p_address_length = 253 + 6; + constexpr size_t max_p2p_address_length = 253 + 6 + 30; constexpr size_t max_handshake_str_length = 384; struct handshake_message { diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 807660bd6f..449edfd68a 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -24,11 +24,13 @@ #include #include #include +#include #include #include #include #include +#include // should be defined for c++17, but clang++16 still has not implemented it #ifdef __cpp_lib_hardware_interference_size @@ -68,6 +70,7 @@ namespace eosio { using boost::asio::ip::address_v4; using boost::asio::ip::host_name; using boost::multi_index_container; + using namespace boost::multi_index; using fc::time_point; using fc::time_point_sec; @@ -339,14 +342,39 @@ namespace eosio { constexpr uint32_t packed_transaction_which = fc::get_index(); // see protocol net_message class connections_manager { + public: + struct connection_detail { + std::string host; + connection_ptr c; + tcp::endpoint active_ip; + tcp::resolver::results_type ips; + }; + + using connection_details_index = multi_index_container< + connection_detail, + indexed_by< + ordered_non_unique< + tag, + key<&connection_detail::host> + >, + ordered_unique< + tag, + key<&connection_detail::c> + > + > + >; + enum class timer_type { check, stats }; + private: alignas(hardware_destructive_interference_size) mutable std::shared_mutex connections_mtx; - chain::flat_set connections; + connection_details_index connections; chain::flat_set supplied_peers; alignas(hardware_destructive_interference_size) fc::mutex connector_check_timer_mtx; unique_ptr connector_check_timer GUARDED_BY(connector_check_timer_mtx); + fc::mutex connection_stats_timer_mtx; + unique_ptr connection_stats_timer GUARDED_BY(connection_stats_timer_mtx); /// thread safe, only modified on startup std::chrono::milliseconds heartbeat_timeout{def_keepalive_interval*2}; @@ -357,10 +385,9 @@ namespace eosio { private: // must call with held mutex connection_ptr find_connection_i(const string& host) const; - void add_i(connection_ptr&& c); - void connect_i(const string& peer, const string& p2p_address); void connection_monitor(const std::weak_ptr& from_connection); + void connection_statistics_monitor(const std::weak_ptr& from_connection); public: size_t number_connections() const; @@ -380,18 +407,26 @@ namespace eosio { void connect_supplied_peers(const string& p2p_address); - void start_conn_timer(); - void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); - void stop_conn_timer(); + void start_conn_timers(); + void start_conn_timer(boost::asio::steady_timer::duration du, + std::weak_ptr from_connection, + timer_type which); + void stop_conn_timers(); void add(connection_ptr c); string connect(const string& host, const string& p2p_address); + string resolve_and_connect(const string& host, const string& p2p_address); + void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint); + void connect(const connection_ptr& c); string disconnect(const string& host); void close_all(); std::optional status(const string& host) const; vector connection_statuses() const; + template + bool any_of_supplied_peers(Function&& f) const; + template void for_each_connection(Function&& f) const; @@ -403,7 +438,7 @@ namespace eosio { template bool any_of_block_connections(UnaryPredicate&& p) const; - }; + }; // connections_manager class net_plugin_impl : public std::enable_shared_from_this, public auto_bp_peering::bp_connection_manager { @@ -420,6 +455,7 @@ namespace eosio { */ vector p2p_addresses; vector p2p_server_addresses; + const string& get_first_p2p_address() const; vector allowed_peers; ///< peer keys allowed to connect std::map increment_dropped_trxs; private: + inline static const std::map prefix_multipliers{ + {"",1},{"K",pow(10,3)},{"M",pow(10,6)},{"G",pow(10, 9)},{"T",pow(10, 12)}, + {"Ki",pow(2,10)},{"Mi",pow(2,20)},{"Gi",pow(2,30)},{"Ti",pow(2,40)} + }; alignas(hardware_destructive_interference_size) mutable fc::mutex chain_info_mtx; // protects chain_info_t chain_info_t chain_info GUARDED_BY(chain_info_mtx); @@ -535,14 +575,18 @@ namespace eosio { constexpr static uint16_t to_protocol_version(uint16_t v); + std::tuple parse_listen_address(const std::string& peer) const; + size_t parse_connection_rate_limit(const string& limit_str) const; void plugin_initialize(const variables_map& options); void plugin_startup(); void plugin_shutdown(); bool in_sync() const; fc::logger& get_logger() { return logger; } - void create_session(tcp::socket&& socket, const string listen_address); - }; + void create_session(tcp::socket&& socket, const string listen_address, size_t limit); + + std::string empty{}; + }; //net_plugin_impl // peer_[x]log must be called from thread in connection strand #define peer_dlog( PEER, FORMAT, ... ) \ @@ -767,7 +811,8 @@ namespace eosio { /// assignment not allowed block_status_monitor& operator=( const block_status_monitor& ) = delete; block_status_monitor& operator=( block_status_monitor&& ) = delete; - }; + }; // block_status_monitor + class connection : public std::enable_shared_from_this { public: @@ -777,7 +822,7 @@ namespace eosio { /// @brief ctor /// @param socket created by boost::asio in fc::listener /// @param address identifier of listen socket which accepted this new connection - explicit connection( tcp::socket&& socket, const string& listen_address ); + explicit connection( tcp::socket&& socket, const string& listen_address, size_t block_sync_rate_limit ); ~connection() = default; connection( const connection& ) = delete; @@ -806,6 +851,9 @@ namespace eosio { std::chrono::nanoseconds get_last_bytes_received() const { return last_bytes_received.load(); } size_t get_bytes_sent() const { return bytes_sent.load(); } std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); } + size_t get_block_sync_bytes_received() const { return block_sync_bytes_received.load(); } + size_t get_block_sync_bytes_sent() const { return block_sync_bytes_sent.load(); } + bool get_block_sync_throttling() const { return block_sync_throttling.load(); } boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); } void set_heartbeat_timeout(std::chrono::milliseconds msec) { hb_timeout = msec; @@ -825,7 +873,6 @@ namespace eosio { std::atomic conn_state{connection_state::connecting}; - string listen_address; // address sent to peer in handshake const string peer_addr; enum connection_types : char { both, @@ -833,6 +880,8 @@ namespace eosio { blocks_only }; + size_t block_sync_rate_limit{0}; // bytes/second, default unlimited + std::atomic connection_type{both}; std::atomic peer_start_block_num{0}; std::atomic peer_head_block_num{0}; @@ -841,6 +890,9 @@ namespace eosio { std::atomic bytes_received{0}; std::atomic last_bytes_received{0ns}; std::atomic bytes_sent{0}; + std::atomic block_sync_bytes_received{0}; + std::atomic block_sync_bytes_sent{0}; + std::atomic block_sync_throttling{false}; std::atomic last_bytes_sent{0ns}; std::atomic remote_endpoint_port{0}; @@ -855,6 +907,7 @@ namespace eosio { fc::sha256 conn_node_id; string short_conn_node_id; + string listen_address; // address sent to peer in handshake string log_p2p_address; string log_remote_endpoint_ip; string log_remote_endpoint_port; @@ -933,13 +986,13 @@ namespace eosio { bool process_next_block_message(uint32_t message_length); bool process_next_trx_message(uint32_t message_length); - void update_endpoints(); + void update_endpoints(const tcp::endpoint& endpoint = tcp::endpoint()); public: bool populate_handshake( handshake_message& hello ) const; - bool resolve_and_connect(); - void connect( const std::shared_ptr& resolver, const tcp::resolver::results_type& endpoints ); + bool reconnect(); + void connect( const tcp::resolver::results_type& endpoints ); void start_read_message(); /** \brief Process the next message from the pending message buffer @@ -968,10 +1021,11 @@ namespace eosio { void send_time(const time_message& msg); /** \brief Read system time and convert to a 64 bit integer. * - * There are five calls to this routine in the program. One + * There are six calls to this routine in the program. One * when a packet arrives from the network, one when a packet - * is placed on the send queue, one during start session, and - * one each when data is counted as received or sent. + * is placed on the send queue, one during start session, one + * when a sync block is queued and one each when data is + * counted as received or sent. * Calls the kernel time of day routine and converts to * a (at least) 64 bit integer. */ @@ -986,7 +1040,7 @@ namespace eosio { void stop_send(); void enqueue( const net_message &msg ); - void enqueue_block( const signed_block_ptr& sb, bool to_sync_queue = false); + size_t enqueue_block( const signed_block_ptr& sb, bool to_sync_queue = false); void enqueue_buffer( const std::shared_ptr>& send_buffer, go_away_reason close_after_send, bool to_sync_queue = false); @@ -1138,18 +1192,28 @@ namespace eosio { } + template + bool connections_manager::any_of_supplied_peers( Function&& f ) const { + std::shared_lock g( connections_mtx ); + return std::any_of(supplied_peers.begin(), supplied_peers.end(), std::forward(f)); + } + template void connections_manager::for_each_connection( Function&& f ) const { std::shared_lock g( connections_mtx ); - std::for_each(connections.begin(), connections.end(), std::forward(f)); + auto& index = connections.get(); + for( const connection_detail& cd : index ) { + f(cd.c); + } } template void connections_manager::for_each_block_connection( Function&& f ) const { std::shared_lock g( connections_mtx ); - for( auto& c : connections ) { - if (c->is_blocks_connection()) { - f(c); + auto& index = connections.get(); + for( const connection_detail& cd : index ) { + if (cd.c->is_blocks_connection()) { + f(cd.c); } } } @@ -1157,15 +1221,21 @@ namespace eosio { template bool connections_manager::any_of_connections(UnaryPredicate&& p) const { std::shared_lock g(connections_mtx); - return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); + auto& index = connections.get(); + for( const connection_detail& cd : index ) { + if (p(cd.c)) + return true; + } + return false; } template bool connections_manager::any_of_block_connections(UnaryPredicate&& p) const { std::shared_lock g( connections_mtx ); - for( auto& c : connections ) { - if( c->is_blocks_connection() ) { - if (p(c)) + auto& index = connections.get(); + for( const connection_detail& cd : index ) { + if( cd.c->is_blocks_connection() ) { + if (p(cd.c)) return true; } } @@ -1176,10 +1246,10 @@ namespace eosio { //--------------------------------------------------------------------------- connection::connection( const string& endpoint, const string& listen_address ) - : listen_address( listen_address ), - peer_addr( endpoint ), + : peer_addr( endpoint ), strand( my_impl->thread_pool.get_executor() ), socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ), + listen_address( listen_address ), log_p2p_address( endpoint ), connection_id( ++my_impl->current_connection_id ), response_expected_timer( my_impl->thread_pool.get_executor() ), @@ -1191,11 +1261,12 @@ namespace eosio { fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) ); } - connection::connection(tcp::socket&& s, const string& listen_address) - : listen_address( listen_address ), - peer_addr(), + connection::connection(tcp::socket&& s, const string& listen_address, size_t block_sync_rate_limit) + : peer_addr(), + block_sync_rate_limit(block_sync_rate_limit), strand( my_impl->thread_pool.get_executor() ), socket( new tcp::socket( std::move(s) ) ), + listen_address( listen_address ), connection_id( ++my_impl->current_connection_id ), response_expected_timer( my_impl->thread_pool.get_executor() ), last_handshake_recv(), @@ -1205,11 +1276,10 @@ namespace eosio { fc_dlog( logger, "new connection object created for peer ${address}:${port} from listener ${addr}", ("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) ); } - // called from connection strand - void connection::update_endpoints() { + void connection::update_endpoints(const tcp::endpoint& endpoint) { boost::system::error_code ec; boost::system::error_code ec2; - auto rep = socket->remote_endpoint(ec); + auto rep = endpoint == tcp::endpoint() ? socket->remote_endpoint(ec) : endpoint; auto lep = socket->local_endpoint(ec2); remote_endpoint_port = ec ? 0 : rep.port(); log_remote_endpoint_ip = ec ? unknown : rep.address().to_string(); @@ -1389,7 +1459,9 @@ namespace eosio { set_state(connection_state::closed); if( reconnect && !shutdown ) { - my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() ); + my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), + connection_wptr(), + connections_manager::timer_type::check ); } } @@ -1655,7 +1727,7 @@ namespace eosio { } else { peer_dlog( this, "enqueue sync block ${num}", ("num", peer_requested->last + 1) ); } - uint32_t num = ++peer_requested->last; + uint32_t num = peer_requested->last + 1; if(num == peer_requested->end_block) { peer_requested.reset(); peer_dlog( this, "completing enqueue_sync_block ${num}", ("num", num) ); @@ -1667,14 +1739,25 @@ namespace eosio { sb = cc.fetch_block_by_number( num ); // thread-safe } FC_LOG_AND_DROP(); if( sb ) { - enqueue_block( sb, true ); + // Skip transmitting block this loop if threshold exceeded + if( block_sync_rate_limit > 0 && peer_syncing_from_us ) { + auto elapsed = std::chrono::duration_cast(get_time() - connection_start_time); + auto current_rate = double(block_sync_bytes_sent) / elapsed.count(); + if( current_rate >= block_sync_rate_limit ) { + block_sync_throttling = true; + peer_dlog( this, "throttling block sync to peer ${host}:${port}", ("host", log_remote_endpoint_ip)("port", log_remote_endpoint_port)); + return false; + } + } + block_sync_throttling = false; + block_sync_bytes_sent += enqueue_block( sb, true ); + ++peer_requested->last; } else { peer_ilog( this, "enqueue sync, unable to fetch block ${num}, sending benign_other go away", ("num", num) ); peer_requested.reset(); // unable to provide requested blocks no_retry = benign_other; enqueue( go_away_message( benign_other ) ); } - return true; } @@ -1787,7 +1870,7 @@ namespace eosio { } // called from connection strand - void connection::enqueue_block( const signed_block_ptr& b, bool to_sync_queue) { + size_t connection::enqueue_block( const signed_block_ptr& b, bool to_sync_queue) { peer_dlog( this, "enqueue block ${num}", ("num", b->block_num()) ); verify_strand_in_this_thread( strand, __func__, __LINE__ ); @@ -1795,6 +1878,7 @@ namespace eosio { auto sb = buff_factory.get_send_buffer( b ); latest_blk_time = std::chrono::system_clock::now(); enqueue_buffer( sb, no_reason, to_sync_queue); + return sb->size(); } // called from connection strand @@ -2075,7 +2159,7 @@ namespace eosio { // static, thread safe void sync_manager::send_handshakes() { - my_impl->connections.for_each_connection( []( auto& ci ) { + my_impl->connections.for_each_connection( []( const connection_ptr& ci ) { if( ci->current() ) { ci->send_handshake(); } @@ -2551,7 +2635,7 @@ namespace eosio { void dispatch_manager::bcast_transaction(const packed_transaction_ptr& trx) { trx_buffer_factory buff_factory; const fc::time_point_sec now{fc::time_point::now()}; - my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { + my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( const connection_ptr& cp ) { if( !cp->is_transactions_connection() || !cp->current() ) { return; } @@ -2649,8 +2733,7 @@ namespace eosio { //------------------------------------------------------------------------ - // called from any thread - bool connection::resolve_and_connect() { + bool connection::reconnect() { switch ( no_retry ) { case no_reason: case wrong_version: @@ -2661,15 +2744,6 @@ namespace eosio { fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry ))); return false; } - - string::size_type colon = peer_address().find(':'); - if (colon == std::string::npos || colon == 0) { - fc_elog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_address()) ); - return false; - } - - connection_ptr c = shared_from_this(); - if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { fc::microseconds connector_period = my_impl->connections.get_connector_period(); fc::lock_guard g( conn_mtx ); @@ -2677,40 +2751,24 @@ namespace eosio { return true; // true so doesn't remove from valid connections } } - + connection_ptr c = shared_from_this(); strand.post([c]() { - auto [host, port, type] = split_host_port_type(c->peer_address()); - c->set_connection_type( c->peer_address() ); - - auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); - connection_wptr weak_conn = c; - resolver->async_resolve(host, port, boost::asio::bind_executor( c->strand, - [resolver, weak_conn, host = host, port = port]( const boost::system::error_code& err, const tcp::resolver::results_type& endpoints ) { - auto c = weak_conn.lock(); - if( !c ) return; - if( !err ) { - c->connect( resolver, endpoints ); - } else { - fc_elog( logger, "Unable to resolve ${host}:${port} ${error}", - ("host", host)("port", port)( "error", err.message() ) ); - c->set_state(connection_state::closed); - ++c->consecutive_immediate_connection_close; - } - } ) ); - } ); + my_impl->connections.connect(c); + }); return true; } // called from connection strand - void connection::connect( const std::shared_ptr& resolver, const tcp::resolver::results_type& endpoints ) { + void connection::connect( const tcp::resolver::results_type& endpoints ) { set_state(connection_state::connecting); pending_message_buffer.reset(); buffer_queue.clear_out_queue(); boost::asio::async_connect( *socket, endpoints, boost::asio::bind_executor( strand, - [resolver, c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) { + [c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) { if( !err && socket->is_open() && socket == c->socket ) { - c->update_endpoints(); + my_impl->connections.update_connection_endpoint(c, endpoint); + c->update_endpoints(endpoint); if( c->start_session() ) { c->send_handshake(); c->send_time(); @@ -2726,7 +2784,11 @@ namespace eosio { } - void net_plugin_impl::create_session(tcp::socket&& socket, const string listen_address) { + const string& net_plugin_impl::get_first_p2p_address() const { + return p2p_addresses.size() > 0 ? *p2p_addresses.begin() : empty; + } + + void net_plugin_impl::create_session(tcp::socket&& socket, const string listen_address, size_t limit) { uint32_t visitors = 0; uint32_t from_addr = 0; boost::system::error_code rec; @@ -2736,7 +2798,7 @@ namespace eosio { fc_elog(logger, "Error getting remote endpoint: ${m}", ("m", rec.message())); } else { paddr_str = paddr_add.to_string(); - connections.for_each_connection([&visitors, &from_addr, &paddr_str](auto& conn) { + connections.for_each_connection([&visitors, &from_addr, &paddr_str](const connection_ptr& conn) { if (conn->socket_is_open()) { if (conn->peer_address().empty()) { ++visitors; @@ -2752,7 +2814,19 @@ namespace eosio { visitors < connections.get_max_client_count())) { fc_ilog(logger, "Accepted new connection: " + paddr_str); - connection_ptr new_connection = std::make_shared(std::move(socket), listen_address); + connections.any_of_supplied_peers([&listen_address, &paddr_str, &limit](const string& peer_addr) { + auto [host, port, type] = split_host_port_type(peer_addr); + if (host == paddr_str) { + if (limit > 0) { + fc_dlog(logger, "Connection inbound to ${la} from ${a} is a configured p2p-peer-address and will not be throttled", ("la", listen_address)("a", paddr_str)); + } + limit = 0; + return true; + } + return false; + }); + + connection_ptr new_connection = std::make_shared(std::move(socket), listen_address, limit); new_connection->strand.post([new_connection, this]() { if (new_connection->start_session()) { connections.add(new_connection); @@ -2953,7 +3027,6 @@ namespace eosio { fc::raw::unpack( peek_ds, which ); // throw away block_header bh; fc::raw::unpack( peek_ds, bh ); - const block_id_type blk_id = bh.calculate_id(); const uint32_t blk_num = last_received_block_num = block_header::num_from_id(blk_id); // don't add_peer_block because we have not validated this block header yet @@ -2987,6 +3060,7 @@ namespace eosio { return true; } } else { + block_sync_bytes_received += message_length; my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, false); } @@ -3065,7 +3139,7 @@ namespace eosio { void net_plugin_impl::plugin_shutdown() { in_shutdown = true; - connections.stop_conn_timer(); + connections.stop_conn_timers(); { fc::lock_guard g( expire_timer_mtx ); if( expire_timer ) @@ -3183,7 +3257,7 @@ namespace eosio { log_p2p_address = msg.p2p_address; my_impl->mark_bp_connection(this); - if (my_impl->exceeding_connection_limit(this)) { + if (my_impl->exceeding_connection_limit(shared_from_this())) { // When auto bp peering is enabled, create_session() check doesn't have enough information to determine // if a client is a BP peer. In create_session(), it only has the peer address which a node is connecting // from, but it would be different from the address it is listening. The only way to make sure is when the @@ -3200,7 +3274,7 @@ namespace eosio { set_connection_type( msg.p2p_address ); peer_dlog( this, "checking for duplicate" ); - auto is_duplicate = [&](const auto& check) { + auto is_duplicate = [&](const connection_ptr& check) { if(check.get() == this) return false; fc::unique_lock g_check_conn( check->conn_mtx ); @@ -3754,7 +3828,7 @@ namespace eosio { } auto current_time = std::chrono::system_clock::now(); - my->connections.for_each_connection( [current_time]( auto& c ) { + my->connections.for_each_connection( [current_time]( const connection_ptr& c ) { if( c->socket_is_open() ) { c->strand.post([c, current_time]() { c->check_heartbeat(current_time); @@ -3769,7 +3843,7 @@ namespace eosio { fc::lock_guard g( expire_timer_mtx ); expire_timer = std::make_unique( my_impl->thread_pool.get_executor() ); } - connections.start_conn_timer(); + connections.start_conn_timers(); start_expire_timer(); } @@ -3935,7 +4009,17 @@ namespace eosio { void net_plugin::set_program_options( options_description& /*cli*/, options_description& cfg ) { cfg.add_options() - ( "p2p-listen-endpoint", bpo::value< vector >()->default_value( vector(1, string("0.0.0.0:9876")) ), "The actual host:port used to listen for incoming p2p connections. May be used multiple times.") + ( "p2p-listen-endpoint", bpo::value< vector >()->default_value( vector(1, string("0.0.0.0:9876:0")) ), "The actual host:port[:] used to listen for incoming p2p connections. May be used multiple times. " + " The optional rate cap will limit per connection block sync bandwidth to the specified rate. Total " + " allowed bandwidth is the rate-cap multiplied by the connection count limit. A number alone will be " + " interpreted as bytes per second. The number may be suffixed with units. Supported units are: " + " 'B/s', 'KB/s', 'MB/s, 'GB/s', 'TB/s', 'KiB/s', 'MiB/s', 'GiB/s', 'TiB/s'." + " Transactions and blocks outside of sync mode are not throttled." + " Examples:\n" + " 192.168.0.100:9876:1MiB/s\n" + " node.eos.io:9876:1512KB/s\n" + " node.eos.io:9876:0.5GB/s\n" + " [2001:db8:85a3:8d3:1319:8a2e:370:7348]:9876:250KB/s") ( "p2p-server-address", bpo::value< vector >(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint. May be used as many times as p2p-listen-endpoint. If provided, the first address will be used in handshakes with other nodes. Otherwise the default is used.") ( "p2p-peer-address", bpo::value< vector >()->composing(), "The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n" @@ -3991,6 +4075,52 @@ namespace eosio { return fc::json::from_string(s).as(); } + std::tuple net_plugin_impl::parse_listen_address( const std::string& address ) const { + auto listen_addr = address; + auto limit = string("0"); + auto last_colon_location = address.rfind(':'); + if( auto right_bracket_location = address.find(']'); right_bracket_location != address.npos ) { + if( std::count(address.begin()+right_bracket_location, address.end(), ':') > 1 ) { + listen_addr = std::string(address, 0, last_colon_location); + limit = std::string(address, last_colon_location+1); + } + } else { + if( auto colon_count = std::count(address.begin(), address.end(), ':'); colon_count > 1 ) { + EOS_ASSERT( colon_count <= 2, plugin_config_exception, "Invalid address specification ${addr}; IPv6 addresses must be enclosed in square brackets.", ("addr", address)); + listen_addr = std::string(address, 0, last_colon_location); + limit = std::string(address, last_colon_location+1); + } + } + auto block_sync_rate_limit = parse_connection_rate_limit(limit); + + return {listen_addr, block_sync_rate_limit}; + } + + size_t net_plugin_impl::parse_connection_rate_limit( const std::string& limit_str) const { + std::istringstream in(limit_str); + double limit{0}; + in >> limit; + EOS_ASSERT(limit >= 0.0, plugin_config_exception, "block sync rate limit must not be negative: ${limit}", ("limit", limit_str)); + size_t block_sync_rate_limit = 0; + if( limit > 0.0 ) { + std::string units; + in >> units; + std::regex units_regex{"([KMGT]?[i]?)B/s"}; + std::smatch units_match; + std::regex_match(units, units_match, units_regex); + if( units.length() > 0 ) { + EOS_ASSERT(units_match.size() == 2, plugin_config_exception, "invalid block sync rate limit specification: ${limit}", ("limit", units)); + try { + block_sync_rate_limit = boost::numeric_cast(limit * prefix_multipliers.at(units_match[1].str())); + fc_ilog( logger, "setting block_sync_rate_limit to ${limit} megabytes per second", ("limit", double(block_sync_rate_limit)/1000000)); + } catch (boost::numeric::bad_numeric_cast&) { + EOS_THROW(plugin_config_exception, "block sync rate limit specification overflowed: ${limit}", ("limit", limit_str)); + } + } + } + return block_sync_rate_limit; + } + void net_plugin_impl::plugin_initialize( const variables_map& options ) { try { fc_ilog( logger, "Initialize net plugin" ); @@ -4138,7 +4268,6 @@ namespace eosio { set_producer_accounts(producer_plug->producer_accounts()); thread_pool.start( thread_pool_size, []( const fc::exception& e ) { - fc_elog( logger, "Exception in net plugin thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); app().quit(); } ); @@ -4206,10 +4335,16 @@ namespace eosio { std::string extra_listening_log_info = ", max clients is " + std::to_string(my->connections.get_max_client_count()); - + + auto [listen_addr, block_sync_rate_limit] = my->parse_listen_address(address); + fc::create_listener( - my->thread_pool.get_executor(), logger, accept_timeout, address, extra_listening_log_info, - [my = my, addr = p2p_addr](tcp::socket&& socket) { my->create_session(std::move(socket), addr); }); + my->thread_pool.get_executor(), logger, accept_timeout, listen_addr, extra_listening_log_info, + [my = my, addr = p2p_addr, block_sync_rate_limit = block_sync_rate_limit](tcp::socket&& socket) { fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}", ("addr", addr)("limit", block_sync_rate_limit)); my->create_session(std::move(socket), addr, block_sync_rate_limit); }); + } catch (const plugin_config_exception& e) { + fc_elog( logger, "${msg}", ("msg", e.top_message())); + app().quit(); + return; } catch (const std::exception& e) { fc_elog( logger, "net_plugin::plugin_startup failed to listen on ${addr}, ${what}", ("addr", address)("what", e.what()) ); @@ -4222,7 +4357,7 @@ namespace eosio { my->ticker(); my->start_monitors(); my->update_chain_info(); - my->connections.connect_supplied_peers(*my->p2p_addresses.begin()); // attribute every outbound connection to the first listen port + my->connections.connect_supplied_peers(my->get_first_p2p_address()); // attribute every outbound connection to the first listen port when one exists }); } @@ -4259,7 +4394,7 @@ namespace eosio { /// RPC API string net_plugin::connect( const string& host ) { - return my->connections.connect( host, *my->p2p_addresses.begin() ); + return my->connections.connect( host, my->get_first_p2p_address() ); } /// RPC API @@ -4334,35 +4469,99 @@ namespace eosio { } void connections_manager::connect_supplied_peers(const string& p2p_address) { - std::lock_guard g(connections_mtx); - for (const auto& peer : supplied_peers) { - connect_i(peer, p2p_address); + std::unique_lock g(connections_mtx); + chain::flat_set peers = supplied_peers; + g.unlock(); + for (const auto& peer : peers) { + resolve_and_connect(peer, p2p_address); } } void connections_manager::add( connection_ptr c ) { std::lock_guard g( connections_mtx ); - add_i( std::move(c) ); + boost::system::error_code ec; + auto endpoint = c->socket->remote_endpoint(ec); + connections.insert( connection_detail{ + .host = c->peer_address(), + .c = std::move(c), + .active_ip = endpoint} ); } // called by API string connections_manager::connect( const string& host, const string& p2p_address ) { + std::unique_lock g( connections_mtx ); + supplied_peers.insert(host); + g.unlock(); + return resolve_and_connect( host, p2p_address ); + } + + string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) { + string::size_type colon = peer_address.find(':'); + if (colon == std::string::npos || colon == 0) { + fc_elog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_address) ); + return "invalid peer address"; + } + std::lock_guard g( connections_mtx ); - if( find_connection_i( host ) ) + if( find_connection_i( peer_address ) ) return "already connected"; - connect_i( host, p2p_address ); - supplied_peers.insert(host); + auto [host, port, type] = split_host_port_type(peer_address); + + auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); + + resolver->async_resolve(host, port, + [resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) { + connection_ptr c = std::make_shared( peer_address, listen_address ); + c->set_heartbeat_timeout( heartbeat_timeout ); + std::lock_guard g( connections_mtx ); + auto [it, inserted] = connections.emplace( connection_detail{ + .host = peer_address, + .c = std::move(c), + .ips = results + }); + if( !err ) { + it->c->connect( results ); + } else { + fc_elog( logger, "Unable to resolve ${host}:${port} ${error}", + ("host", host)("port", port)( "error", err.message() ) ); + it->c->set_state(connection::connection_state::closed); + ++(it->c->consecutive_immediate_connection_close); + } + } ); + return "added connection"; } + void connections_manager::update_connection_endpoint(connection_ptr c, + const tcp::endpoint& endpoint) { + std::unique_lock g( connections_mtx ); + auto& index = connections.get(); + const auto& it = index.find(c); + if( it != index.end() ) { + index.modify(it, [endpoint](connection_detail& cd) { + cd.active_ip = endpoint; + }); + } + } + + void connections_manager::connect(const connection_ptr& c) { + std::lock_guard g( connections_mtx ); + const auto& index = connections.get(); + const auto& it = index.find(c); + if( it != index.end() ) { + it->c->connect( it->ips ); + } + } + // called by API string connections_manager::disconnect( const string& host ) { std::lock_guard g( connections_mtx ); - if( auto c = find_connection_i( host ) ) { - fc_ilog( logger, "disconnecting: ${cid}", ("cid", c->connection_id) ); - c->close(); - connections.erase(c); + auto& index = connections.get(); + if( auto i = index.find( host ); i != index.end() ) { + fc_ilog( logger, "disconnecting: ${cid}", ("cid", i->c->connection_id) ); + i->c->close(); + connections.erase(i); supplied_peers.erase(host); return "connection removed"; } @@ -4370,11 +4569,12 @@ namespace eosio { } void connections_manager::close_all() { - fc_ilog( logger, "close all ${s} connections", ("s", connections.size()) ); std::lock_guard g( connections_mtx ); - for( auto& con : connections ) { - fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); - con->close( false, true ); + auto& index = connections.get(); + fc_ilog( logger, "close all ${s} connections", ("s", index.size()) ); + for( const connection_detail& cd : index ) { + fc_dlog( logger, "close: ${cid}", ("cid", cd.c->connection_id) ); + cd.c->close( false, true ); } connections.clear(); } @@ -4391,143 +4591,177 @@ namespace eosio { vector connections_manager::connection_statuses()const { vector result; std::shared_lock g( connections_mtx ); - result.reserve( connections.size() ); - for( const auto& c : connections ) { - result.push_back( c->get_status() ); + auto& index = connections.get(); + result.reserve( index.size() ); + for( const connection_detail& cd : index ) { + result.emplace_back( cd.c->get_status() ); } return result; } // call with connections_mtx connection_ptr connections_manager::find_connection_i( const string& host )const { - for( const auto& c : connections ) { - if (c->peer_address() == host) - return c; - } + auto& index = connections.get(); + auto iter = index.find(host); + if(iter != index.end()) + return iter->c; return {}; } - // call with connections_mtx - void connections_manager::connect_i( const string& host, const string& p2p_address ) { - connection_ptr c = std::make_shared( host, p2p_address ); - fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); - if( c->resolve_and_connect() ) { - fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); - add_i( std::move(c) ); - } - } - - // call with connections_mtx - void connections_manager::add_i(connection_ptr&& c) { - c->set_heartbeat_timeout( heartbeat_timeout ); - connections.insert( std::move(c) ); - } - // called from any thread - void connections_manager::start_conn_timer() { - start_conn_timer(connector_period, {}); // this locks mutex + void connections_manager::start_conn_timers() { + start_conn_timer(connector_period, {}, timer_type::check); // this locks mutex + start_conn_timer(connector_period, {}, timer_type::stats); // this locks mutex + if (update_p2p_connection_metrics) { + start_conn_timer(connector_period + connector_period / 2, {}, timer_type::stats); // this locks mutex + } } // called from any thread - void connections_manager::start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection) { - fc::lock_guard g( connector_check_timer_mtx ); - if (!connector_check_timer) { - connector_check_timer = std::make_unique( my_impl->thread_pool.get_executor() ); - } - connector_check_timer->expires_from_now( du ); - connector_check_timer->async_wait( [this, from_connection{std::move(from_connection)}](boost::system::error_code ec) mutable { + void connections_manager::start_conn_timer(boost::asio::steady_timer::duration du, + std::weak_ptr from_connection, + timer_type which) { + auto& mtx = which == timer_type::check ? connector_check_timer_mtx : connection_stats_timer_mtx; + auto& timer = which == timer_type::check ? connector_check_timer : connection_stats_timer; + const auto& func = which == timer_type::check ? &connections_manager::connection_monitor : &connections_manager::connection_statistics_monitor; + fc::lock_guard g( mtx ); + if (!timer) { + timer = std::make_unique( my_impl->thread_pool.get_executor() ); + } + timer->expires_from_now( du ); + timer->async_wait( [this, from_connection{std::move(from_connection)}, f = func](boost::system::error_code ec) mutable { if( !ec ) { - connection_monitor(from_connection); + (this->*f)(from_connection); } }); } - void connections_manager::stop_conn_timer() { - fc::lock_guard g( connector_check_timer_mtx ); - if (connector_check_timer) { - connector_check_timer->cancel(); + void connections_manager::stop_conn_timers() { + { + fc::lock_guard g( connector_check_timer_mtx ); + if (connector_check_timer) { + connector_check_timer->cancel(); + } + } + { + fc::lock_guard g( connection_stats_timer_mtx ); + if (connection_stats_timer) { + connection_stats_timer->cancel(); + } } } // called from any thread void connections_manager::connection_monitor(const std::weak_ptr& from_connection) { + size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0; + auto cleanup = [&num_peers, &num_rm, this](vector&& reconnecting, + vector&& removing) { + for( auto& c : reconnecting ) { + if (!c->reconnect()) { + --num_peers; + ++num_rm; + removing.push_back(c); + } + } + std::scoped_lock g( connections_mtx ); + auto& index = connections.get(); + for( auto& c : removing ) { + index.erase(c); + } + }; auto max_time = fc::time_point::now().safe_add(max_cleanup_time); + std::vector reconnecting, removing; auto from = from_connection.lock(); std::unique_lock g( connections_mtx ); - auto it = (from ? connections.find(from) : connections.begin()); - if (it == connections.end()) it = connections.begin(); - size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0; - net_plugin::p2p_per_connection_metrics per_connection(connections.size()); - while (it != connections.end()) { + auto& index = connections.get(); + auto it = (from ? index.find(from) : index.begin()); + if (it == index.end()) it = index.begin(); + while (it != index.end()) { if (fc::time_point::now() >= max_time) { - connection_wptr wit = *it; + connection_wptr wit = (*it).c; g.unlock(); + cleanup(std::move(reconnecting), std::move(removing)); fc_dlog( logger, "Exiting connection monitor early, ran out of time: ${t}", ("t", max_time - fc::time_point::now()) ); fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}", ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) ); - start_conn_timer( std::chrono::milliseconds( 1 ), wit ); // avoid exhausting + start_conn_timer( std::chrono::milliseconds( 1 ), wit, timer_type::check ); // avoid exhausting return; } - if ((*it)->is_bp_connection) { + const connection_ptr& c = it->c; + if (c->is_bp_connection) { ++num_bp_peers; - } else if ((*it)->incoming()) { + } else if (c->incoming()) { ++num_clients; } else { ++num_peers; } - if (update_p2p_connection_metrics) { - fc::unique_lock g_conn((*it)->conn_mtx); - boost::asio::ip::address_v6::bytes_type addr = (*it)->remote_endpoint_ip_array; - g_conn.unlock(); - net_plugin::p2p_per_connection_metrics::connection_metric metrics{ - .connection_id = (*it)->connection_id - , .address = addr - , .port = (*it)->get_remote_endpoint_port() - , .accepting_blocks = (*it)->is_blocks_connection() - , .last_received_block = (*it)->get_last_received_block_num() - , .first_available_block = (*it)->get_peer_start_block_num() - , .last_available_block = (*it)->get_peer_head_block_num() - , .unique_first_block_count = (*it)->get_unique_blocks_rcvd_count() - , .latency = (*it)->get_peer_ping_time_ns() - , .bytes_received = (*it)->get_bytes_received() - , .last_bytes_received = (*it)->get_last_bytes_received() - , .bytes_sent = (*it)->get_bytes_sent() - , .last_bytes_sent = (*it)->get_last_bytes_sent() - , .connection_start_time = (*it)->connection_start_time - , .log_p2p_address = (*it)->log_p2p_address - }; - per_connection.peers.push_back(metrics); - } - if (!(*it)->socket_is_open() && (*it)->state() != connection::connection_state::connecting) { - if (!(*it)->incoming()) { - if (!(*it)->resolve_and_connect()) { - it = connections.erase(it); - --num_peers; - ++num_rm; - continue; - } + if (!c->socket_is_open() && c->state() != connection::connection_state::connecting) { + if (!c->incoming()) { + reconnecting.push_back(c); } else { --num_clients; ++num_rm; - it = connections.erase(it); - continue; + removing.push_back(c); } } ++it; } g.unlock(); - - if (update_p2p_connection_metrics) { - update_p2p_connection_metrics({num_peers, num_clients, std::move(per_connection)}); - } + cleanup(std::move(reconnecting), std::move(removing)); if( num_clients > 0 || num_peers > 0 ) { fc_ilog(logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}, block producer peers: ${num_bp_peers}", ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size())("num_bp_peers", num_bp_peers)); } fc_dlog( logger, "connection monitor, removed ${n} connections", ("n", num_rm) ); - start_conn_timer( connector_period, {}); + start_conn_timer( connector_period, {}, timer_type::check ); } -} // namespace eosio + // called from any thread + void connections_manager::connection_statistics_monitor(const std::weak_ptr& from_connection) { + assert(update_p2p_connection_metrics); + auto from = from_connection.lock(); + std::shared_lock g(connections_mtx); + auto& index = connections.get(); + size_t num_clients = 0, num_peers = 0, num_bp_peers = 0; + net_plugin::p2p_per_connection_metrics per_connection(index.size()); + for (auto it = index.begin(); it != index.end(); ++it) { + const connection_ptr& c = it->c; + if(c->is_bp_connection) { + ++num_bp_peers; + } else if(c->incoming()) { + ++num_clients; + } else { + ++num_peers; + } + fc::unique_lock g_conn(c->conn_mtx); + boost::asio::ip::address_v6::bytes_type addr = c->remote_endpoint_ip_array; + g_conn.unlock(); + per_connection.peers.emplace_back( + net_plugin::p2p_per_connection_metrics::connection_metric{ + .connection_id = c->connection_id + , .address = addr + , .port = c->get_remote_endpoint_port() + , .accepting_blocks = c->is_blocks_connection() + , .last_received_block = c->get_last_received_block_num() + , .first_available_block = c->get_peer_start_block_num() + , .last_available_block = c->get_peer_head_block_num() + , .unique_first_block_count = c->get_unique_blocks_rcvd_count() + , .latency = c->get_peer_ping_time_ns() + , .bytes_received = c->get_bytes_received() + , .last_bytes_received = c->get_last_bytes_received() + , .bytes_sent = c->get_bytes_sent() + , .last_bytes_sent = c->get_last_bytes_sent() + , .block_sync_bytes_received = c->get_block_sync_bytes_received() + , .block_sync_bytes_sent = c->get_block_sync_bytes_sent() + , .block_sync_throttling = c->get_block_sync_throttling() + , .connection_start_time = c->connection_start_time + , .log_p2p_address = c->log_p2p_address + }); + } + g.unlock(); + update_p2p_connection_metrics({num_peers+num_bp_peers, num_clients, std::move(per_connection)}); + start_conn_timer( connector_period, {}, timer_type::stats ); + } +} // namespace eosio \ No newline at end of file diff --git a/plugins/net_plugin/tests/CMakeLists.txt b/plugins/net_plugin/tests/CMakeLists.txt index bcabe6428f..210a748e07 100644 --- a/plugins/net_plugin/tests/CMakeLists.txt +++ b/plugins/net_plugin/tests/CMakeLists.txt @@ -5,4 +5,12 @@ target_link_libraries(auto_bp_peering_unittest eosio_chain) target_include_directories(auto_bp_peering_unittest PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" ) -add_test(auto_bp_peering_unittest auto_bp_peering_unittest) \ No newline at end of file +add_test(auto_bp_peering_unittest auto_bp_peering_unittest) + +add_executable(rate_limit_parse_unittest rate_limit_parse_unittest.cpp) + +target_link_libraries(rate_limit_parse_unittest net_plugin) + +target_include_directories(rate_limit_parse_unittest PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") + +add_test(rate_limit_parse_unittest rate_limit_parse_unittest) diff --git a/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp b/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp index ddfeba7b1c..57c7a8f6a1 100644 --- a/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp +++ b/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp @@ -6,6 +6,11 @@ struct mock_connection { bool is_bp_connection = false; bool is_open = false; bool handshake_received = false; + mock_connection(bool bp_connection, bool open, bool received) + : is_bp_connection(bp_connection) + , is_open(open) + , handshake_received(received) + {} bool socket_is_open() const { return is_open; } bool incoming_and_handshake_received() const { return handshake_received; } @@ -16,9 +21,9 @@ using namespace std::literals::string_literals; struct mock_connections_manager { uint32_t max_client_count = 0; - std::vector connections; + std::vector> connections; - std::function connect; + std::function resolve_and_connect; std::function disconnect; uint32_t get_max_client_count() const { return max_client_count; } @@ -26,7 +31,7 @@ struct mock_connections_manager { template void for_each_connection(Function&& func) const { for (auto c : connections) { - if (!func(&c)) + if (!func(c)) return; } } @@ -37,6 +42,7 @@ struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager p2p_addresses{"0.0.0.0:9876"}; + const std::string& get_first_p2p_address() const { return *p2p_addresses.begin(); } bool in_sync() { return is_in_sync; } @@ -166,7 +172,7 @@ BOOST_AUTO_TEST_CASE(test_on_pending_schedule) { std::vector connected_hosts; - plugin.connections.connect = [&connected_hosts](std::string host, std::string p2p_address) { connected_hosts.push_back(host); }; + plugin.connections.resolve_and_connect = [&connected_hosts](std::string host, std::string p2p_address) { connected_hosts.push_back(host); }; // make sure nothing happens when it is not in_sync plugin.is_in_sync = false; @@ -210,7 +216,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule1) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n }; - plugin.connections.connect = [](std::string host, std::string p2p_address) {}; + plugin.connections.resolve_and_connect = [](std::string host, std::string p2p_address) {}; std::vector disconnected_hosts; plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; @@ -246,7 +252,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule2) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n }; - plugin.connections.connect = [](std::string host, std::string p2p_address) {}; + plugin.connections.resolve_and_connect = [](std::string host, std::string p2p_address) {}; std::vector disconnected_hosts; plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; @@ -272,24 +278,24 @@ BOOST_AUTO_TEST_CASE(test_exceeding_connection_limit) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.connections.max_client_count = 1; plugin.connections.connections = { - { .is_bp_connection = true, .is_open = true, .handshake_received = true }, // 0 - { .is_bp_connection = true, .is_open = true, .handshake_received = false }, // 1 - { .is_bp_connection = true, .is_open = false, .handshake_received = true }, // 2 - { .is_bp_connection = true, .is_open = false, .handshake_received = false }, // 3 - { .is_bp_connection = false, .is_open = true, .handshake_received = true }, // 4 - { .is_bp_connection = false, .is_open = true, .handshake_received = false }, // 5 - { .is_bp_connection = false, .is_open = true, .handshake_received = true }, // 6 - { .is_bp_connection = false, .is_open = false, .handshake_received = false } // 7 + std::make_shared( true, true, true ), // 0 + std::make_shared( true, true, false ), // 1 + std::make_shared( true, false, true ), // 2 + std::make_shared( true, false, false ), // 3 + std::make_shared( false, true, true ), // 4 + std::make_shared( false, true, false ), // 5 + std::make_shared( false, true, true ), // 6 + std::make_shared( false, false, false ) // 7 }; BOOST_CHECK_EQUAL(plugin.num_established_clients(), 2u); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[0])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[1])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[2])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[3])); - BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections.connections[4])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[5])); - BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections.connections[6])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[7])); + BOOST_CHECK(!plugin.exceeding_connection_limit(plugin.connections.connections[0])); + BOOST_CHECK(!plugin.exceeding_connection_limit(plugin.connections.connections[1])); + BOOST_CHECK(!plugin.exceeding_connection_limit(plugin.connections.connections[2])); + BOOST_CHECK(!plugin.exceeding_connection_limit(plugin.connections.connections[3])); + BOOST_CHECK(plugin.exceeding_connection_limit(plugin.connections.connections[4])); + BOOST_CHECK(!plugin.exceeding_connection_limit(plugin.connections.connections[5])); + BOOST_CHECK(plugin.exceeding_connection_limit(plugin.connections.connections[6])); + BOOST_CHECK(!plugin.exceeding_connection_limit(plugin.connections.connections[7])); } diff --git a/plugins/net_plugin/tests/rate_limit_parse_unittest.cpp b/plugins/net_plugin/tests/rate_limit_parse_unittest.cpp new file mode 100644 index 0000000000..01c84e8a67 --- /dev/null +++ b/plugins/net_plugin/tests/rate_limit_parse_unittest.cpp @@ -0,0 +1,54 @@ +#define BOOST_TEST_MODULE rate_limit_parsing +#include +#include "../net_plugin.cpp" + +BOOST_AUTO_TEST_CASE(test_parse_rate_limit) { + eosio::net_plugin_impl plugin_impl; + std::vector p2p_addresses = { + "0.0.0.0:9876" + , "0.0.0.0:9776:0" + , "0.0.0.0:9877:640KB/s" + , "192.168.0.1:9878:20MiB/s" + , "localhost:9879:0.5KB/s" + , "[2001:db8:85a3:8d3:1319:8a2e:370:7348]:9876:250KB/s" + , "[::1]:9876:250KB/s" + , "2001:db8:85a3:8d3:1319:8a2e:370:7348:9876:250KB/s" + , "[::1]:9876:-250KB/s" + , "0.0.0.0:9877:640Kb/s" + , "0.0.0.0:9877:999999999999999999999999999TiB/s" + }; + size_t which = 0; + auto [listen_addr, block_sync_rate_limit] = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "0.0.0.0:9876"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 0); + std::tie(listen_addr, block_sync_rate_limit) = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "0.0.0.0:9776"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 0); + std::tie(listen_addr, block_sync_rate_limit) = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "0.0.0.0:9877"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 640000); + std::tie(listen_addr, block_sync_rate_limit) = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "192.168.0.1:9878"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 20971520); + std::tie(listen_addr, block_sync_rate_limit) = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "localhost:9879"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 500); + std::tie(listen_addr, block_sync_rate_limit) = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "[2001:db8:85a3:8d3:1319:8a2e:370:7348]:9876"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 250000); + std::tie(listen_addr, block_sync_rate_limit) = plugin_impl.parse_listen_address(p2p_addresses[which++]); + BOOST_CHECK_EQUAL(listen_addr, "[::1]:9876"); + BOOST_CHECK_EQUAL(block_sync_rate_limit, 250000); + BOOST_CHECK_EXCEPTION(plugin_impl.parse_listen_address(p2p_addresses[which++]), eosio::chain::plugin_config_exception, + [](const eosio::chain::plugin_config_exception& e) + {return std::strstr(e.top_message().c_str(), "IPv6 addresses must be enclosed in square brackets");}); + BOOST_CHECK_EXCEPTION(plugin_impl.parse_listen_address(p2p_addresses[which++]), eosio::chain::plugin_config_exception, + [](const eosio::chain::plugin_config_exception& e) + {return std::strstr(e.top_message().c_str(), "block sync rate limit must not be negative");}); + BOOST_CHECK_EXCEPTION(plugin_impl.parse_listen_address(p2p_addresses[which++]), eosio::chain::plugin_config_exception, + [](const eosio::chain::plugin_config_exception& e) + {return std::strstr(e.top_message().c_str(), "invalid block sync rate limit specification");}); + BOOST_CHECK_EXCEPTION(plugin_impl.parse_listen_address(p2p_addresses[which++]), eosio::chain::plugin_config_exception, + [](const eosio::chain::plugin_config_exception& e) + {return std::strstr(e.top_message().c_str(), "block sync rate limit specification overflowed");}); +} diff --git a/plugins/prometheus_plugin/metrics.hpp b/plugins/prometheus_plugin/metrics.hpp index b9de6b6435..9c0fb3ac88 100644 --- a/plugins/prometheus_plugin/metrics.hpp +++ b/plugins/prometheus_plugin/metrics.hpp @@ -187,6 +187,9 @@ struct catalog_type { add_and_set_gauge("last_bytes_received", peer.last_bytes_received.count()); add_and_set_gauge("bytes_sent", peer.bytes_sent); add_and_set_gauge("last_bytes_sent", peer.last_bytes_sent.count()); + add_and_set_gauge("block_sync_bytes_received", peer.block_sync_bytes_received); + add_and_set_gauge("block_sync_bytes_sent", peer.block_sync_bytes_sent); + add_and_set_gauge("block_sync_throttling", peer.block_sync_throttling); add_and_set_gauge("connection_start_time", peer.connection_start_time.count()); add_and_set_gauge(peer.log_p2p_address, 0); // Empty gauge; we only want the label } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 967f9ca202..fb26b0c350 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -51,6 +51,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_B configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_multiple_listen_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_multiple_listen_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_no_listen_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_no_listen_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_sync_throttle_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_sync_throttle_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_sync_throttle_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/p2p_sync_throttle_test_shape.json COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/compute_transaction_test.py ${CMAKE_CURRENT_BINARY_DIR}/compute_transaction_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/subjective_billing_test.py ${CMAKE_CURRENT_BINARY_DIR}/subjective_billing_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/get_account_test.py ${CMAKE_CURRENT_BINARY_DIR}/get_account_test.py COPYONLY) @@ -187,6 +189,8 @@ add_test(NAME p2p_multiple_listen_test COMMAND tests/p2p_multiple_listen_test.py set_property(TEST p2p_multiple_listen_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME p2p_no_listen_test COMMAND tests/p2p_no_listen_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST p2p_no_listen_test PROPERTY LABELS nonparallelizable_tests) +add_test(NAME p2p_sync_throttle_test COMMAND tests/p2p_sync_throttle_test.py -v -d 2 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_property(TEST p2p_sync_throttle_test PROPERTY LABELS nonparallelizable_tests) # needs iproute-tc or iproute2 depending on platform #add_test(NAME p2p_high_latency_test COMMAND tests/p2p_high_latency_test.py -v WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/tests/TestHarness/testUtils.py b/tests/TestHarness/testUtils.py index 8e50c79cd6..3894d00268 100644 --- a/tests/TestHarness/testUtils.py +++ b/tests/TestHarness/testUtils.py @@ -256,6 +256,9 @@ def waitForObj(lam, timeout=None, sleepTime=1, reporter=None): if reporter is not None: reporter() time.sleep(sleepTime) + else: + if timeout == 60: + raise RuntimeError('waitForObj reached 60 second timeout') finally: if needsNewLine: Utils.Print() diff --git a/tests/block_log_util_test.py b/tests/block_log_util_test.py index bd7bff144e..042e1467aa 100755 --- a/tests/block_log_util_test.py +++ b/tests/block_log_util_test.py @@ -70,7 +70,7 @@ def verifyBlockLog(expected_block_num, trimmedBlockLog): node0.kill(signal.SIGTERM) Print("Wait for node0's head block to become irreversible") - node1.waitForBlock(headBlockNum, blockType=BlockType.lib) + node1.waitForBlock(headBlockNum, blockType=BlockType.lib, timeout=90) infoAfter=node1.getInfo(exitOnError=True) headBlockNumAfter=infoAfter["head_block_num"] diff --git a/tests/p2p_multiple_listen_test.py b/tests/p2p_multiple_listen_test.py index 62f1534c63..7f537e9d35 100755 --- a/tests/p2p_multiple_listen_test.py +++ b/tests/p2p_multiple_listen_test.py @@ -75,7 +75,7 @@ assert conn['last_handshake']['p2p_address'].split()[0] == 'localhost:9878', f"Connected node is listening on '{conn['last_handshake']['p2p_address'].split()[0]}' instead of port 9878" elif conn['last_handshake']['agent'] == 'node-04': assert conn['last_handshake']['p2p_address'].split()[0] == 'localhost:9880', f"Connected node is listening on '{conn['last_handshake']['p2p_address'].split()[0]}' instead of port 9880" - assert open_socket_count == 2, 'Node 0 is expected to have only two open sockets' + assert open_socket_count == 2, 'Node 0 is expected to have exactly two open sockets' connections = cluster.nodes[2].processUrllibRequest('net', 'connections') open_socket_count = 0 @@ -84,7 +84,7 @@ open_socket_count += 1 assert conn['last_handshake']['agent'] == 'node-00', f"Connected node identifed as '{conn['last_handshake']['agent']}' instead of node-00" assert conn['last_handshake']['p2p_address'].split()[0] == 'ext-ip0:20000', f"Connected node is advertising '{conn['last_handshake']['p2p_address'].split()[0]}' instead of ext-ip0:20000" - assert open_socket_count == 1, 'Node 2 is expected to have only one open socket' + assert open_socket_count == 1, 'Node 2 is expected to have exactly one open socket' connections = cluster.nodes[4].processUrllibRequest('net', 'connections') open_socket_count = 0 @@ -93,7 +93,7 @@ open_socket_count += 1 assert conn['last_handshake']['agent'] == 'node-00', f"Connected node identifed as '{conn['last_handshake']['agent']}' instead of node-00" assert conn['last_handshake']['p2p_address'].split()[0] == 'ext-ip1:20001', f"Connected node is advertising '{conn['last_handshake']['p2p_address'].split()[0]} 'instead of ext-ip1:20001" - assert open_socket_count == 1, 'Node 4 is expected to have only one open socket' + assert open_socket_count == 1, 'Node 4 is expected to have exactly one open socket' testSuccessful=True finally: diff --git a/tests/p2p_sync_throttle_test.py b/tests/p2p_sync_throttle_test.py new file mode 100755 index 0000000000..4b15b8f49c --- /dev/null +++ b/tests/p2p_sync_throttle_test.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 + +import math +import re +import signal +import sys +import time +import urllib + +from TestHarness import Cluster, TestHelper, Utils, WalletMgr, CORE_SYMBOL, createAccountKeys, ReturnType +from TestHarness.TestHelper import AppArgs + +############################################################### +# p2p_sync_throttle_test +# +# Test throttling of a peer during block syncing. +# +############################################################### + +Print=Utils.Print +errorExit=Utils.errorExit + +appArgs = AppArgs() +appArgs.add(flag='--plugin',action='append',type=str,help='Run nodes with additional plugins') +appArgs.add(flag='--connection-cleanup-period',type=int,help='Interval in whole seconds to run the connection reaper and metric collection') + +args=TestHelper.parse_args({"-p","-d","--keep-logs","--prod-count" + ,"--dump-error-details","-v","--leave-running" + ,"--unshared"}, + applicationSpecificArgs=appArgs) +pnodes=args.p +delay=args.d +debug=args.v +prod_count = args.prod_count +total_nodes=4 +dumpErrorDetails=args.dump_error_details + +Utils.Debug=debug +testSuccessful=False + +cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs) +walletMgr=WalletMgr(True) + +def extractPrometheusMetric(connID: str, metric: str, text: str): + searchStr = f'nodeos_p2p_connections{{connid_{connID}="{metric}"}} ' + begin = text.find(searchStr) + len(searchStr) + return int(text[begin:text.find('\n', begin)]) + +prometheusHostPortPattern = re.compile(r'^nodeos_p2p_connections.connid_([0-9])="localhost:([0-9]*)', re.MULTILINE) + +try: + TestHelper.printSystemInfo("BEGIN") + + cluster.setWalletMgr(walletMgr) + + Print(f'producing nodes: {pnodes}, delay between nodes launch: {delay} second{"s" if delay != 1 else ""}') + + Print("Stand up cluster") + extraNodeosArgs = '--plugin eosio::prometheus_plugin --connection-cleanup-period 3' + # Custom topology is a line of singlely connected nodes from highest node number in sequence to lowest, + # the reverse of the usual TestHarness line topology. + if cluster.launch(pnodes=pnodes, unstartedNodes=2, totalNodes=total_nodes, prodCount=prod_count, + topo='./tests/p2p_sync_throttle_test_shape.json', delay=delay, + extraNodeosArgs=extraNodeosArgs) is False: + errorExit("Failed to stand up eos cluster.") + + prodNode = cluster.getNode(0) + nonProdNode = cluster.getNode(1) + + accounts=createAccountKeys(2) + if accounts is None: + Utils.errorExit("FAILURE - create keys") + + accounts[0].name="tester111111" + accounts[1].name="tester222222" + + account1PrivKey = accounts[0].activePrivateKey + account2PrivKey = accounts[1].activePrivateKey + + testWalletName="test" + + Print("Creating wallet \"%s\"." % (testWalletName)) + testWallet=walletMgr.create(testWalletName, [cluster.eosioAccount,accounts[0],accounts[1]]) + + # create accounts via eosio as otherwise a bid is needed + for account in accounts: + Print("Create new account %s via %s" % (account.name, cluster.eosioAccount.name)) + trans=nonProdNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=True, stakeNet=1000, stakeCPU=1000, buyRAM=1000, exitOnError=True) + transferAmount="100000000.0000 {0}".format(CORE_SYMBOL) + Print("Transfer funds %s from account %s to %s" % (transferAmount, cluster.eosioAccount.name, account.name)) + nonProdNode.transferFunds(cluster.eosioAccount, account, transferAmount, "test transfer", waitForTransBlock=True) + trans=nonProdNode.delegatebw(account, 20000000.0000, 20000000.0000, waitForTransBlock=True, exitOnError=True) + + beginLargeBlocksHeadBlock = nonProdNode.getHeadBlockNum() + + Print("Configure and launch txn generators") + targetTpsPerGenerator = 500 + testTrxGenDurationSec=60 + trxGeneratorCnt=1 + cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[accounts[0].name,accounts[1].name], + acctPrivKeysList=[account1PrivKey,account2PrivKey], nodeId=prodNode.nodeId, tpsPerGenerator=targetTpsPerGenerator, + numGenerators=trxGeneratorCnt, durationSec=testTrxGenDurationSec, waitToComplete=True) + + endLargeBlocksHeadBlock = nonProdNode.getHeadBlockNum() + + throttlingNode = cluster.unstartedNodes[0] + i = throttlingNode.cmd.index('--p2p-listen-endpoint') + throttleListenAddr = throttlingNode.cmd[i+1] + # Using 4000 bytes per second to allow syncing of ~250 transaction blocks resulting from + # the trx generators in a reasonable amount of time, while still being able to capture + # throttling state within the Prometheus update window (3 seconds in this test). + throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':4000B/s' + throttleListenIP, throttleListenPort = throttleListenAddr.split(':') + throttlingNode.cmd.append('--p2p-listen-endpoint') + throttlingNode.cmd.append(f'{throttleListenIP}:{int(throttleListenPort)+100}:1TB/s') + + cluster.biosNode.kill(signal.SIGTERM) + clusterStart = time.time() + cluster.launchUnstarted(2) + + errorLimit = 40 # Approximately 20 retries required + throttledNode = cluster.getNode(3) + while errorLimit > 0: + try: + response = throttlingNode.processUrllibRequest('prometheus', 'metrics', returnType=ReturnType.raw, printReturnLimit=16).decode() + except urllib.error.URLError: + # catch ConnectionRefusedEror waiting for node to finish startup and respond + errorLimit -= 1 + time.sleep(0.5) + continue + else: + if len(response) < 100: + # tolerate HTTPError as well (method returns only the exception code) + errorLimit -= 1 + continue + connPorts = prometheusHostPortPattern.findall(response) + if len(connPorts) < 3: + # wait for node to be connected + errorLimit -= 1 + time.sleep(0.5) + continue + Print('Throttling Node Start State') + throttlingNodePortMap = {port: id for id, port in connPorts} + startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'], + 'block_sync_bytes_sent', + response) + startSyncThrottlingState = extractPrometheusMetric(throttlingNodePortMap['9879'], + 'block_sync_throttling', + response) + Print(f'Start sync throttling bytes sent: {startSyncThrottlingBytesSent}') + Print(f'Start sync throttling node throttling: {"True" if startSyncThrottlingState else "False"}') + if time.time() > clusterStart + 30: errorExit('Timed out') + break + else: + errorExit('Exceeded error retry limit waiting for throttling node') + + errorLimit = 40 # Few if any retries required but for consistency... + while errorLimit > 0: + try: + response = throttledNode.processUrllibRequest('prometheus', 'metrics', returnType=ReturnType.raw, printReturnLimit=16).decode() + except urllib.error.URLError: + # catch ConnectionRefusedError waiting for node to finish startup and respond + errorLimit -= 1 + time.sleep(0.5) + continue + else: + if len(response) < 100: + # tolerate HTTPError as well (method returns only the exception code) + errorLimit -= 1 + time.sleep(0.5) + continue + connPorts = prometheusHostPortPattern.findall(response) + if len(connPorts) < 2: + # wait for sending node to be connected + errorLimit -= 1 + continue + Print('Throttled Node Start State') + throttledNodePortMap = {port: id for id, port in connPorts} + startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'], + 'block_sync_bytes_received', + response) + Print(f'Start sync throttled bytes received: {startSyncThrottledBytesReceived}') + break + else: + errorExit('Exceeded error retry limit waiting for throttled node') + + # Throttling node was offline during block generation and once online receives blocks as fast as possible while + # transmitting blocks to the next node in line at the above throttle setting. + assert throttlingNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on throttled node timed out' + endThrottlingSync = time.time() + response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode() + Print('Throttling Node End State') + endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'], + 'block_sync_bytes_sent', + response) + Print(f'End sync throttling bytes sent: {endSyncThrottlingBytesSent}') + # Throttled node is connecting to a listen port with a block sync throttle applied so it will receive + # blocks more slowly during syncing than an unthrottled node. + wasThrottled = False + while time.time() < endThrottlingSync + 30: + response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, + returnType=ReturnType.raw, printReturnLimit=16).decode() + throttledState = extractPrometheusMetric(throttlingNodePortMap['9879'], + 'block_sync_throttling', + response) + if throttledState: + wasThrottled = True + break + assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=30), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out' + endThrottledSync = time.time() + response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode() + Print('Throttled Node End State') + endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'], + 'block_sync_bytes_received', + response) + Print(f'End sync throttled bytes received: {endSyncThrottledBytesReceived}') + throttlingElapsed = endThrottlingSync - clusterStart + throttledElapsed = endThrottledSync - clusterStart + Print(f'Unthrottled sync time: {throttlingElapsed} seconds') + Print(f'Throttled sync time: {throttledElapsed} seconds') + assert wasThrottled, 'Throttling node never reported throttling its transmission rate' + + testSuccessful=True +finally: + TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) + +exitCode = 0 if testSuccessful else 1 +exit(exitCode) diff --git a/tests/p2p_sync_throttle_test_shape.json b/tests/p2p_sync_throttle_test_shape.json new file mode 100644 index 0000000000..8cfb5ce9a5 --- /dev/null +++ b/tests/p2p_sync_throttle_test_shape.json @@ -0,0 +1,132 @@ +{ + "name": "testnet_", + "nodes": { + "bios": { + "index": -100, + "name": "bios", + "keys": [ + { + "pubkey": "EOS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV", + "privkey": "5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3" + } + ], + "peers": [ + "testnet_00" + ], + "producers": [ + "eosio" + ], + "dont_start": false, + "p2p_port": 9776, + "http_port": 8788, + "host_name": "localhost", + "public_name": "localhost", + "listen_addr": "0.0.0.0", + "_dot_label": "localhost:9776\nbios\nprod=eosio" + }, + "testnet_00": { + "index": 0, + "name": "testnet_00", + "keys": [ + { + "pubkey": "EOS7D6jfN6bbJD9cYheyhnBT4bmUWc3Qf4Yphf5GBeAAy58okcwHU", + "privkey": "5KkmnyunnpCQzgFoLMEtU3j7BRBa5aWmsBNru49ke7LdnZKFhmt" + } + ], + "peers": [], + "producers": [ + "defproducera", + "defproducerb", + "defproducerc", + "defproducerd", + "defproducere", + "defproducerf", + "defproducerg", + "defproducerh", + "defproduceri", + "defproducerj", + "defproducerk", + "defproducerl", + "defproducerm", + "defproducern", + "defproducero", + "defproducerp", + "defproducerq", + "defproducerr", + "defproducers", + "defproducert", + "defproduceru" + ], + "dont_start": false, + "p2p_port": 9876, + "http_port": 8888, + "host_name": "localhost", + "public_name": "localhost", + "listen_addr": "0.0.0.0", + "_dot_label": "localhost:9876\ntestnet_00\nprod=defproducera\ndefproducerb\ndefproducerc\ndefproducerd\ndefproducere\ndefproducerf\ndefproducerg\ndefproducerh\ndefproduceri\ndefproducerj\ndefproducerk\ndefproducerl\ndefproducerm\ndefproducern\ndefproducero\ndefproducerp\ndefproducerq\ndefproducerr\ndefproducers\ndefproducert\ndefproduceru" + }, + "testnet_01": { + "index": 1, + "name": "testnet_01", + "keys": [ + { + "pubkey": "EOS5tZqxLB8y9q2yHkgcXU4QFBEV6QKN3NQ54ygaFLWHJbjqYzFhw", + "privkey": "5KBs4qR7T8shJjCJUeFQXd77iKrok5TCtZiQhWJpCpc1VRxpNAs" + } + ], + "peers": [ + "testnet_00" + ], + "producers": [], + "dont_start": false, + "p2p_port": 9877, + "http_port": 8889, + "host_name": "localhost", + "public_name": "localhost", + "listen_addr": "0.0.0.0", + "_dot_label": "localhost:9877\ntestnet_01\nprod=" + }, + "testnet_02": { + "index": 2, + "name": "testnet_02", + "keys": [ + { + "pubkey": "EOS5FBPf5EN9bYEqmsKfPx9bxyUZ9grDiE24zqLFXtPa6UpVzMjE7", + "privkey": "5HtVDiAsD24seDm5sdswTcdZpx672XbBW9gBkyrzbsj2j9Y9JeC" + } + ], + "peers": [ + "testnet_01" + ], + "producers": [], + "dont_start": true, + "p2p_port": 9878, + "http_port": 8890, + "host_name": "localhost", + "public_name": "localhost", + "listen_addr": "0.0.0.0", + "_dot_label": "localhost:9878\ntestnet_02\nprod=" + }, + "testnet_03": { + "index": 3, + "name": "testnet_03", + "keys": [ + { + "pubkey": "EOS8XH2gKxsef9zxmMHm4vaSvxQUhg7W4GC3nK2KSRxyYrNG5gZFS", + "privkey": "5JcoRRhDcgm51dkBrRTmErceTqrYhrq22UnmUjTZToMpH91B9N1" + } + ], + "peers": [ + "testnet_02" + ], + "producers": [], + "dont_start": true, + "p2p_port": 9879, + "http_port": 8891, + "host_name": "localhost", + "public_name": "localhost", + "listen_addr": "0.0.0.0", + "_dot_label": "localhost:9879\ntestnet_03\nprod=" + } + } +} \ No newline at end of file diff --git a/tests/plugin_http_api_test.py b/tests/plugin_http_api_test.py index 0f49f458ee..c11a5cc21f 100755 --- a/tests/plugin_http_api_test.py +++ b/tests/plugin_http_api_test.py @@ -791,7 +791,12 @@ def test_NetApi(self) : ret_json = self.nodeos.processUrllibRequest(resource, command, self.empty_content_dict, endpoint=endpoint) self.assertEqual(ret_json["code"], 400) self.assertEqual(ret_json["error"]["code"], 3200006) + # connect with incomplete content parameter payload = "localhost" + ret_json = self.nodeos.processUrllibRequest(resource, command, payload, endpoint=endpoint) + self.assertEqual(ret_json["code"], 201) + self.assertEqual(ret_json["payload"], 'invalid peer address') + payload = "localhost:9877" ret_str = self.nodeos.processUrllibRequest(resource, command, payload, returnType=ReturnType.raw, endpoint=endpoint).decode('ascii') self.assertEqual("\"added connection\"", ret_str) diff --git a/tools/net-util.py b/tools/net-util.py index 7dff39a70f..ca318ee5ab 100755 --- a/tools/net-util.py +++ b/tools/net-util.py @@ -35,7 +35,7 @@ def humanReadableBytesPerSecond(bytes: int, telco:bool = False): while bytes > power: bytes /= power n += 1 - return f'{"~0" if bytes < 0.01 else format(bytes, ".2f")} {labels[n]}B/s' + return f'{"-" if bytes == 0.0 else "~0" if bytes < 0.01 else format(bytes, ".2f")} {labels[n]}B/s' class TextSimpleFocusListWalker(urwid.SimpleFocusListWalker): @@ -164,6 +164,7 @@ def __init__(self): ('\nRcv\nRate', 'receiveBandwidthLW'), ('Last\nRcv\nTime', 'lastBytesReceivedLW'), ('Last\nRcvd\nBlock', 'lastReceivedBlockLW'), + ('Blk\nSync\nRate', 'blockSyncBandwidthLW'), ('Unique\nFirst\nBlks', 'uniqueFirstBlockCountLW'), ('First\nAvail\nBlk', 'firstAvailableBlockLW'), ('Last\nAvail\nBlk', 'lastAvailableBlockLW'), @@ -297,6 +298,7 @@ class bandwidthStats(): def __init__(self, bytesReceived=0, bytesSent=0, connectionStarted=0): self.bytesReceived = 0 self.bytesSent = 0 + self.blockSyncBytesSent = 0 self.connectionStarted = 0 for family in text_string_to_metric_families(response.text): bandwidths = {} @@ -326,19 +328,20 @@ def __init__(self, bytesReceived=0, bytesSent=0, connectionStarted=0): host = f'{str(addr.ipv4_mapped) if addr.ipv4_mapped else str(addr)}' listwalker[startOffset:endOffset] = [AttrMap(Text(host), None, 'reversed')] elif fieldName == 'bytes_received': - bytesReceived = int(sample.value) stats = bandwidths.get(connID, bandwidthStats()) - stats.bytesReceived = bytesReceived + stats.bytesReceived = int(sample.value) bandwidths[connID] = stats elif fieldName == 'bytes_sent': - bytesSent = int(sample.value) stats = bandwidths.get(connID, bandwidthStats()) - stats.bytesSent = bytesSent + stats.bytesSent = int(sample.value) + bandwidths[connID] = stats + elif fieldName == 'block_sync_bytes_sent': + stats = bandwidths.get(connID, bandwidthStats()) + stats.blockSyncBytesSent = int(sample.value) bandwidths[connID] = stats elif fieldName == 'connection_start_time': - connectionStarted = int(sample.value) stats = bandwidths.get(connID, bandwidthStats()) - stats.connectionStarted = connectionStarted + stats.connectionStarted = int(sample.value) bandwidths[connID] = stats else: attrname = fieldName[:1] + fieldName.replace('_', ' ').title().replace(' ', '')[1:] + 'LW' @@ -362,17 +365,19 @@ def __init__(self, bytesReceived=0, bytesSent=0, connectionStarted=0): else: if sample.name == 'nodeos_p2p_connections': now = time.time_ns() + def updateBandwidth(connectedSeconds, listwalker, byteCount, startOffset, endOffset): + bps = byteCount/connectedSeconds + listwalker[startOffset:endOffset] = [AttrMap(Text(humanReadableBytesPerSecond(bps)), None, 'reversed')] connIDListwalker = getattr(self, 'connectionIDLW') for connID, stats in bandwidths.items(): startOffset = connIDListwalker.index(connID) endOffset = startOffset + 1 - connected_seconds = (now - stats.connectionStarted)/1000000000 - listwalker = getattr(self, 'receiveBandwidthLW') - bps = stats.bytesReceived/connected_seconds - listwalker[startOffset:endOffset] = [AttrMap(Text(humanReadableBytesPerSecond(bps)), None, 'reversed')] - listwalker = getattr(self, 'sendBandwidthLW') - bps = stats.bytesSent/connected_seconds - listwalker[startOffset:endOffset] = [AttrMap(Text(humanReadableBytesPerSecond(bps)), None, 'reversed')] + connectedSeconds = (now - stats.connectionStarted)/1000000000 + for listwalkerName, attrName in [('receiveBandwidthLW', 'bytesReceived'), + ('sendBandwidthLW', 'bytesSent'), + ('blockSyncBandwidthLW', 'blockSyncBytesSent')]: + listwalker = getattr(self, listwalkerName) + updateBandwidth(connectedSeconds, listwalker, getattr(stats, attrName), startOffset, endOffset) mainLoop.set_alarm_in(float(self.args.refresh_interval), self.update) def exitOnQ(key):