Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.0] Prometheus: Add stable identifier for P2P connections #1750

Merged
merged 5 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ namespace eosio {
size_t block_sync_bytes_sent{0};
bool block_sync_throttling{false};
std::chrono::nanoseconds connection_start_time{0};
std::string log_p2p_address;
std::string p2p_address;
std::string unique_conn_node_id;
};
explicit p2p_per_connection_metrics(size_t count) {
peers.reserve(count);
Expand Down
16 changes: 13 additions & 3 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,9 @@ namespace eosio {
block_id_type fork_head GUARDED_BY(conn_mtx);
uint32_t fork_head_num GUARDED_BY(conn_mtx) {0};
fc::time_point last_close GUARDED_BY(conn_mtx);
string remote_endpoint_ip GUARDED_BY(conn_mtx);
std::string p2p_address GUARDED_BY(conn_mtx);
std::string unique_conn_node_id GUARDED_BY(conn_mtx);
std::string remote_endpoint_ip GUARDED_BY(conn_mtx);
boost::asio::ip::address_v6::bytes_type remote_endpoint_ip_array GUARDED_BY(conn_mtx);

std::chrono::nanoseconds connection_start_time{0};
Expand Down Expand Up @@ -1254,7 +1256,8 @@ namespace eosio {
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
last_handshake_sent(),
p2p_address( endpoint )
{
my_impl->mark_bp_connection(this);
update_endpoints();
Expand Down Expand Up @@ -3256,6 +3259,10 @@ namespace eosio {
}

log_p2p_address = msg.p2p_address;
fc::unique_lock g_conn( conn_mtx );
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
p2p_address = msg.p2p_address;
unique_conn_node_id = msg.node_id.str().substr( 0, 7 );
g_conn.unlock();

my_impl->mark_bp_connection(this);
if (my_impl->exceeding_connection_limit(shared_from_this())) {
Expand Down Expand Up @@ -4741,6 +4748,8 @@ namespace eosio {
}
fc::unique_lock g_conn(c->conn_mtx);
boost::asio::ip::address_v6::bytes_type addr = c->remote_endpoint_ip_array;
std::string p2p_addr = c->p2p_address;
std::string conn_node_id = c->unique_conn_node_id;
g_conn.unlock();
per_connection.peers.emplace_back(
net_plugin::p2p_per_connection_metrics::connection_metric{
Expand All @@ -4761,7 +4770,8 @@ namespace eosio {
, .block_sync_bytes_sent = c->get_block_sync_bytes_sent()
, .block_sync_throttling = c->get_block_sync_throttling()
, .connection_start_time = c->connection_start_time
, .log_p2p_address = c->log_p2p_address
, .p2p_address = p2p_addr
, .unique_conn_node_id = conn_node_id
});
}
g.unlock();
Expand Down
113 changes: 76 additions & 37 deletions plugins/prometheus_plugin/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,37 @@ struct catalog_type {
// http plugin
prometheus::Family<Counter>& http_request_counts;

// net plugin p2p-connections
prometheus::Family<Gauge>& p2p_connections;

Gauge& num_peers;
Gauge& num_clients;

// net plugin failed p2p connection
Counter& failed_p2p_connections;

// net plugin dropped_trxs
Counter& dropped_trxs_total;

struct p2p_connection_metrics {
Gauge& num_peers;
Gauge& num_clients;

prometheus::Family<Gauge>& addr; // Empty gauge; ipv6 address can't be transmitted as a double
prometheus::Family<Gauge>& port;
prometheus::Family<Gauge>& connection_number;
prometheus::Family<Gauge>& accepting_blocks;
prometheus::Family<Gauge>& last_received_block;
prometheus::Family<Gauge>& first_available_block;
prometheus::Family<Gauge>& last_available_block;
prometheus::Family<Gauge>& unique_first_block_count;
prometheus::Family<Gauge>& latency;
prometheus::Family<Gauge>& bytes_received;
prometheus::Family<Gauge>& last_bytes_received;
prometheus::Family<Gauge>& bytes_sent;
prometheus::Family<Gauge>& last_bytes_sent;
prometheus::Family<Gauge>& block_sync_bytes_received;
prometheus::Family<Gauge>& block_sync_bytes_sent;
prometheus::Family<Gauge>& block_sync_throttling;
prometheus::Family<Gauge>& connection_start_time;
prometheus::Family<Gauge>& peer_addr; // Empty gauge; we only want the label
};
p2p_connection_metrics p2p_metrics;

// producer plugin
prometheus::Family<Counter>& cpu_usage_us;
prometheus::Family<Counter>& net_usage_us;
Expand Down Expand Up @@ -97,12 +116,30 @@ struct catalog_type {
catalog_type()
: info(family<prometheus::Info>("nodeos", "static information about the server"))
, http_request_counts(family<Counter>("nodeos_http_requests_total", "number of HTTP requests"))
, p2p_connections(family<Gauge>("nodeos_p2p_connections", "current number of connected p2p connections"))
, num_peers(p2p_connections.Add({{"direction", "out"}}))
, num_clients(p2p_connections.Add({{"direction", "in"}}))
, failed_p2p_connections(
build<Counter>("nodeos_failed_p2p_connections", "total number of failed out-going p2p connections"))
, dropped_trxs_total(build<Counter>("nodeos_dropped_trxs_total", "total number of dropped transactions by net plugin"))
, failed_p2p_connections(build<Counter>("nodeos_p2p_failed_connections", "total number of failed out-going p2p connections"))
, dropped_trxs_total(build<Counter>("nodeos_p2p_dropped_trxs_total", "total number of dropped transactions by net plugin"))
, p2p_metrics{
.num_peers{build<Gauge>("nodeos_p2p_peers", "current number of connected outgoing peers")}
, .num_clients{build<Gauge>("nodeos_p2p_clients", "current number of connected incoming clients")}
, .addr{family<Gauge>("nodeos_p2p_addr", "ipv6 address")}
, .port{family<Gauge>("nodeos_p2p_port", "port")}
, .connection_number{family<Gauge>("nodeos_p2p_connection_number", "monatomic increasing connection number")}
, .accepting_blocks{family<Gauge>("nodeos_p2p_accepting_blocks", "accepting blocks on connection")}
, .last_received_block{family<Gauge>("nodeos_p2p_last_received_block", "last received block on connection")}
, .first_available_block{family<Gauge>("nodeos_p2p_first_available_block", "first block available from connection")}
, .last_available_block{family<Gauge>("nodeos_p2p_last_available_block", "last block available from connection")}
, .unique_first_block_count{family<Gauge>("nodeos_p2p_unique_first_block_count", "number of blocks first received from any connection on this connection")}
, .latency{family<Gauge>("nodeos_p2p_latency", "last calculated latency with connection")}
, .bytes_received{family<Gauge>("nodeos_p2p_bytes_received", "total bytes received on connection")}
, .last_bytes_received{family<Gauge>("nodeos_p2p_last_bytes_received", "last time anything received from peer")}
, .bytes_sent{family<Gauge>("nodeos_p2p_bytes_sent", "total bytes sent to peer")}
, .last_bytes_sent{family<Gauge>("nodeos_p2p_last_bytes_sent", "last time anything sent to peer")}
, .block_sync_bytes_received{family<Gauge>("nodeos_p2p_block_sync_bytes_received", "bytes of blocks received during syncing")}
, .block_sync_bytes_sent{family<Gauge>("nodeos_p2p_block_sync_bytes_sent", "bytes of blocks sent during syncing")}
, .block_sync_throttling{family<Gauge>("nodeos_p2p_block_sync_throttling", "is block sync throttling currently active")}
, .connection_start_time{family<Gauge>("nodeos_p2p_connection_start_time", "time of last connection to peer")}
, .peer_addr{family<Gauge>("nodeos_p2p_peer_addr", "peer address")}
}
, cpu_usage_us(family<Counter>("nodeos_cpu_usage_us_total", "total cpu usage in microseconds for blocks"))
, net_usage_us(family<Counter>("nodeos_net_usage_us_total", "total net usage in microseconds for blocks"))
, last_irreversible(build<Gauge>("nodeos_last_irreversible", "last irreversible block number"))
Expand Down Expand Up @@ -164,34 +201,36 @@ struct catalog_type {
}

void update(const net_plugin::p2p_connections_metrics& metrics) {
num_peers.Set(metrics.num_peers);
num_clients.Set(metrics.num_clients);
p2p_metrics.num_peers.Set(metrics.num_peers);
p2p_metrics.num_clients.Set(metrics.num_clients);
for(size_t i = 0; i < metrics.stats.peers.size(); ++i) {
std::string label{"connid_" + to_string(metrics.stats.peers[i].connection_id)};
auto add_and_set_gauge = [&](const std::string& label_value,
const auto& value) {
auto& gauge = p2p_connections.Add({{label, label_value}});
const auto& peer = metrics.stats.peers[i];
const auto& conn_id = peer.unique_conn_node_id;

const auto addr = boost::asio::ip::make_address_v6(peer.address).to_string();
p2p_metrics.addr.Add({{"connid", conn_id},{"ipv6", addr},{"address", peer.p2p_address}});

auto add_and_set_gauge = [&](auto& fam, const auto& value) {
auto& gauge = fam.Add({{"connid", conn_id}});
gauge.Set(value);
};
auto& peer = metrics.stats.peers[i];
auto addr = std::string("addr_") + boost::asio::ip::make_address_v6(peer.address).to_string();
add_and_set_gauge(addr, 0); // Empty gauge; ipv6 address can't be transmitted as a double
add_and_set_gauge("port", peer.port);
add_and_set_gauge("accepting_blocks", peer.accepting_blocks);
add_and_set_gauge("last_received_block", peer.last_received_block);
add_and_set_gauge("first_available_block", peer.first_available_block);
add_and_set_gauge("last_available_block", peer.last_available_block);
add_and_set_gauge("unique_first_block_count", peer.unique_first_block_count);
add_and_set_gauge("latency", peer.latency);
add_and_set_gauge("bytes_received", peer.bytes_received);
add_and_set_gauge("last_bytes_received", peer.last_bytes_received.count());
add_and_set_gauge("bytes_sent", peer.bytes_sent);
add_and_set_gauge("last_bytes_sent", peer.last_bytes_sent.count());
add_and_set_gauge("block_sync_bytes_received", peer.block_sync_bytes_received);
add_and_set_gauge("block_sync_bytes_sent", peer.block_sync_bytes_sent);
add_and_set_gauge("block_sync_throttling", peer.block_sync_throttling);
add_and_set_gauge("connection_start_time", peer.connection_start_time.count());
add_and_set_gauge(peer.log_p2p_address, 0); // Empty gauge; we only want the label

add_and_set_gauge(p2p_metrics.connection_number, peer.connection_id);
add_and_set_gauge(p2p_metrics.port, peer.port);
add_and_set_gauge(p2p_metrics.accepting_blocks, peer.accepting_blocks);
add_and_set_gauge(p2p_metrics.last_received_block, peer.last_received_block);
add_and_set_gauge(p2p_metrics.first_available_block, peer.first_available_block);
add_and_set_gauge(p2p_metrics.last_available_block, peer.last_available_block);
add_and_set_gauge(p2p_metrics.unique_first_block_count, peer.unique_first_block_count);
add_and_set_gauge(p2p_metrics.latency, peer.latency);
add_and_set_gauge(p2p_metrics.bytes_received, peer.bytes_received);
add_and_set_gauge(p2p_metrics.last_bytes_received, peer.last_bytes_received.count());
add_and_set_gauge(p2p_metrics.bytes_sent, peer.bytes_sent);
add_and_set_gauge(p2p_metrics.last_bytes_sent, peer.last_bytes_sent.count());
add_and_set_gauge(p2p_metrics.block_sync_bytes_received, peer.block_sync_bytes_received);
add_and_set_gauge(p2p_metrics.block_sync_bytes_sent, peer.block_sync_bytes_sent);
add_and_set_gauge(p2p_metrics.block_sync_throttling, peer.block_sync_throttling);
add_and_set_gauge(p2p_metrics.connection_start_time, peer.connection_start_time.count());
}
}

Expand Down
3 changes: 2 additions & 1 deletion tests/nodeos_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@

abs_path = os.path.abspath(os.getcwd() + '/unittests/contracts/eosio.token/eosio.token.abi')
traceNodeosArgs=" --http-max-response-time-ms 990000 --trace-rpc-abi eosio.token=" + abs_path
extraNodeosArgs=traceNodeosArgs + " --plugin eosio::prometheus_plugin --database-map-mode mapped_private "
specificNodeosInstances={0: "bin/nodeos"}
if cluster.launch(totalNodes=2, prodCount=prodCount, onlyBios=onlyBios, dontBootstrap=dontBootstrap, extraNodeosArgs=traceNodeosArgs, specificNodeosInstances=specificNodeosInstances) is False:
if cluster.launch(totalNodes=2, prodCount=prodCount, onlyBios=onlyBios, dontBootstrap=dontBootstrap, extraNodeosArgs=extraNodeosArgs, specificNodeosInstances=specificNodeosInstances) is False:
cmdError("launcher")
errorExit("Failed to stand up eos cluster.")
else:
Expand Down
27 changes: 17 additions & 10 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
walletMgr=WalletMgr(True)

def extractPrometheusMetric(connID: str, metric: str, text: str):
searchStr = f'nodeos_p2p_connections{{connid_{connID}="{metric}"}} '
searchStr = f'nodeos_p2p_{metric}{{connid="{connID}"}} '
begin = text.find(searchStr) + len(searchStr)
return int(text[begin:text.find('\n', begin)])

prometheusHostPortPattern = re.compile(r'^nodeos_p2p_connections.connid_([0-9])="localhost:([0-9]*)', re.MULTILINE)
prometheusHostPortPattern = re.compile(r'^nodeos_p2p_port.connid="([a-f0-9]*)". ([0-9]*)', re.MULTILINE)

try:
TestHelper.printSystemInfo("BEGIN")
Expand Down Expand Up @@ -120,6 +120,8 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):

errorLimit = 40 # Approximately 20 retries required
throttledNode = cluster.getNode(3)
throttledNodeConnId = None
throttlingNodeConnId = None
while errorLimit > 0:
try:
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', returnType=ReturnType.raw, printReturnLimit=16).decode()
Expand All @@ -134,17 +136,19 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
errorLimit -= 1
continue
connPorts = prometheusHostPortPattern.findall(response)
Print(connPorts)
if len(connPorts) < 3:
# wait for node to be connected
errorLimit -= 1
time.sleep(0.5)
continue
Print('Throttling Node Start State')
throttlingNodePortMap = {port: id for id, port in connPorts}
startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'],
throttlingNodePortMap = {port: id for id, port in connPorts if id != '' and port != '9877'}
throttlingNodeConnId = next(iter(throttlingNodePortMap.values())) # 9879
startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_bytes_sent',
response)
startSyncThrottlingState = extractPrometheusMetric(throttlingNodePortMap['9879'],
startSyncThrottlingState = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_throttling',
response)
Print(f'Start sync throttling bytes sent: {startSyncThrottlingBytesSent}')
Expand All @@ -170,13 +174,16 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
time.sleep(0.5)
continue
connPorts = prometheusHostPortPattern.findall(response)
Print(connPorts)
if len(connPorts) < 2:
# wait for sending node to be connected
errorLimit -= 1
continue
Print('Throttled Node Start State')
throttledNodePortMap = {port: id for id, port in connPorts}
startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'],
throttledNodePortMap = {port: id for id, port in connPorts if id != ''}
throttledNodeConnId = next(iter(throttledNodePortMap.values())) # 9878
Print(throttledNodeConnId)
startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodeConnId,
'block_sync_bytes_received',
response)
Print(f'Start sync throttled bytes received: {startSyncThrottledBytesReceived}')
Expand All @@ -190,7 +197,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
endThrottlingSync = time.time()
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttling Node End State')
endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'],
endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_bytes_sent',
response)
Print(f'End sync throttling bytes sent: {endSyncThrottlingBytesSent}')
Expand All @@ -200,7 +207,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
while time.time() < endThrottlingSync + 30:
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True,
returnType=ReturnType.raw, printReturnLimit=16).decode()
throttledState = extractPrometheusMetric(throttlingNodePortMap['9879'],
throttledState = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_throttling',
response)
if throttledState:
Expand All @@ -210,7 +217,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
endThrottledSync = time.time()
response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttled Node End State')
endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'],
endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodeConnId,
'block_sync_bytes_received',
response)
Print(f'End sync throttled bytes received: {endSyncThrottledBytesReceived}')
Expand Down
Loading