Skip to content

Commit

Permalink
Fix build error. Lambda captures by value are const.
Browse files Browse the repository at this point in the history
Update connections_manager::add method.
Clean up cruft and rename connection data structure back to
'connections'.
  • Loading branch information
jgiszczak committed Sep 11, 2023
1 parent e998cc6 commit 92e4022
Showing 1 changed file with 30 additions and 56 deletions.
86 changes: 30 additions & 56 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ namespace eosio {
private:
alignas(hardware_destructive_interference_size)
mutable std::shared_mutex connections_mtx;
//chain::flat_set<connection_ptr> connections GUARDED_BY(connections_mtx);
connection_details_index peer_ips GUARDED_BY(connections_mtx);
connection_details_index connections GUARDED_BY(connections_mtx);
chain::flat_set<string> supplied_peers;

alignas(hardware_destructive_interference_size)
Expand Down Expand Up @@ -973,8 +972,6 @@ namespace eosio {

bool populate_handshake( handshake_message& hello ) const;

// bool connect();
//typedef boost:multi_index::index<connections_manager,
void connect( const tcp::resolver::results_type& endpoints,
connections_manager::connection_details_index& connections,
connections_manager::connection_details_index::const_iterator conn_details );
Expand Down Expand Up @@ -1186,14 +1183,14 @@ namespace eosio {
template<typename Function>
void connections_manager::for_each_connection( Function&& f ) const {
std::shared_lock g( connections_mtx );
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
std::for_each(index.begin(), index.end(), std::forward<Function>(f));
}

template<typename Function>
void connections_manager::for_each_block_connection( Function&& f ) const {
std::shared_lock g( connections_mtx );
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
for( const connection_ptr& c : index ) {
if (c->is_blocks_connection()) {
f(c);
Expand All @@ -1204,14 +1201,14 @@ namespace eosio {
template <typename UnaryPredicate>
bool connections_manager::any_of_connections(UnaryPredicate&& p) const {
std::shared_lock g(connections_mtx);
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
return std::any_of(index.cbegin(), index.cend(), std::forward<UnaryPredicate>(p));
}

template <typename UnaryPredicate>
bool connections_manager::any_of_block_connections(UnaryPredicate&& p) const {
std::shared_lock g( connections_mtx );
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
for( const connection_ptr& c : index ) {
if( c->is_blocks_connection() ) {
if (p(c))
Expand Down Expand Up @@ -2709,9 +2706,10 @@ namespace eosio {

//------------------------------------------------------------------------

// called from any thread
#if 0
bool connection::connect() {
// called from connection strand
void connection::connect( const tcp::resolver::results_type& endpoints,
connections_manager::connection_details_index& connections,
connections_manager::connection_details_index::const_iterator conn_details ) {
switch ( no_retry ) {
case no_reason:
case wrong_version:
Expand All @@ -2720,50 +2718,21 @@ namespace eosio {
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
return;
}

connection_ptr c = shared_from_this();

if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
return;
}
}

strand.post([c]() {
c->set_connection_type( c->peer_address() );

connection_wptr weak_conn = c;
resolver->async_resolve(host, port, boost::asio::bind_executor( c->strand,
[resolver, weak_conn, host = host, port = port]( const boost::system::error_code& err, const tcp::resolver::results_type& endpoints ) {
auto c = weak_conn.lock();
if( !c ) return;
if( !err ) {
c->connect( endpoints );
} else {
fc_elog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection_state::closed);
++c->consecutive_immediate_connection_close;
}
} ) );
} );
return true;
}
#endif
// called from connection strand
void connection::connect( const tcp::resolver::results_type& endpoints,
connections_manager::connection_details_index& connections,
connections_manager::connection_details_index::const_iterator conn_details ) {
set_state(connection_state::connecting);
pending_message_buffer.reset();
buffer_queue.clear_out_queue();
boost::asio::async_connect( *socket, endpoints,
boost::asio::bind_executor( strand,
[c = shared_from_this(), socket=socket, connections, conn_details]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
[c = shared_from_this(), socket=socket, &connections, conn_details]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
if( !err && socket->is_open() && socket == c->socket ) {
auto& index = connections.get<by_active_ip>();
index.modify_key(connections.project<by_active_ip>(conn_details), [endpoint](tcp::endpoint& e) {
Expand Down Expand Up @@ -4421,7 +4390,7 @@ namespace eosio {

size_t connections_manager::number_connections() const {
std::lock_guard g(connections_mtx);
return peer_ips.size();
return connections.size();
}

void connections_manager::add_supplied_peers(const vector<string>& peers ) {
Expand Down Expand Up @@ -4457,7 +4426,12 @@ namespace eosio {

void connections_manager::add( connection_ptr c ) {
std::lock_guard g( connections_mtx );
add_i( std::move(c) );
boost::system::error_code ec;
auto endpoint = c->socket->remote_endpoint(ec);
connections.insert( connection_details{
.host = c->peer_address(),
.c = std::move(c),
.active_ip = endpoint} );
}

// called by API
Expand All @@ -4483,18 +4457,18 @@ namespace eosio {
c->set_heartbeat_timeout( heartbeat_timeout );
vector<tcp::endpoint> eps{results.begin(), results.end()};
std::lock_guard g( connections_mtx );
auto [it, inserted] = peer_ips.insert( connection_details{
auto [it, inserted] = connections.insert( connection_details{
.host = peer_address,
.c = std::move(c),
.ips = std::move(eps)
});
if( !err ) {
c->connect( results, peer_ips, it );
it->c->connect( results, connections, it );
} else {
fc_elog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection::connection_state::closed);
++c->consecutive_immediate_connection_close;
it->c->set_state(connection::connection_state::closed);
++(it->c->consecutive_immediate_connection_close);
}
} );

Expand All @@ -4504,26 +4478,26 @@ namespace eosio {
// called by API
string connections_manager::disconnect( const string& host ) {
std::lock_guard g( connections_mtx );
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
if( auto i = index.find( host ); i != index.end() ) {
fc_ilog( logger, "disconnecting: ${cid}", ("cid", i->c->connection_id) );
i->c->close();
peer_ips.erase(i);
connections.erase(i);
supplied_peers.erase(host);
return "connection removed";
}
return "no known connection for host";
}

void connections_manager::close_all() {
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
fc_ilog( logger, "close all ${s} connections", ("s", index.size()) );
std::lock_guard g( connections_mtx );
for( const connection_ptr& c : index ) {
fc_dlog( logger, "close: ${cid}", ("cid", c->connection_id) );
c->close( false, true );
}
peer_ips.clear();
connections.clear();
}

std::optional<connection_status> connections_manager::status( const string& host )const {
Expand All @@ -4538,7 +4512,7 @@ namespace eosio {
vector<connection_status> connections_manager::connection_statuses()const {
vector<connection_status> result;
std::shared_lock g( connections_mtx );
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
result.reserve( index.size() );
for( const connection_ptr& c : index ) {
result.push_back( c->get_status() );
Expand All @@ -4548,7 +4522,7 @@ namespace eosio {

// call with connections_mtx
connection_ptr connections_manager::find_connection_i( const string& host )const {
auto& index = peer_ips.get<by_host>();
auto& index = connections.get<by_host>();
auto iter = index.find(host);
if(iter != index.end())
return iter->c;
Expand Down Expand Up @@ -4586,7 +4560,7 @@ namespace eosio {
auto max_time = fc::time_point::now().safe_add(max_cleanup_time);
auto from = from_connection.lock();
std::unique_lock g( connections_mtx );
auto& index = peer_ips.get<by_connection>();
auto& index = connections.get<by_connection>();
auto it = (from ? index.find(from) : index.begin());
if (it == index.end()) it = index.begin();
size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0;
Expand Down

0 comments on commit 92e4022

Please sign in to comment.