diff --git a/lib/src/datapath/mt_dp_socket.c b/lib/src/datapath/mt_dp_socket.c index 4eaed78b7..32e123ac6 100644 --- a/lib/src/datapath/mt_dp_socket.c +++ b/lib/src/datapath/mt_dp_socket.c @@ -6,6 +6,7 @@ #include "mt_dp_socket.h" #include "../mt_log.h" +#include "../mt_stat.h" #include "../mt_util.h" #ifndef WINDOWSENV #include "mudp_api.h" @@ -63,6 +64,7 @@ static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m struct rte_udp_hdr* udp = &hdr->udp; mudp_init_sockaddr(&send_addr, (uint8_t*)&ipv4->dst_addr, ntohs(udp->dst_port)); + t->stat_tx_try++; /* nonblocking */ ssize_t send = sendto(fd, payload, payload_len, MSG_DONTWAIT, (const struct sockaddr*)&send_addr, sizeof(send_addr)); @@ -105,6 +107,7 @@ static uint16_t tx_socket_send_mbuf_gso(struct mt_tx_socket_thread* t, return tx; } + t->stat_tx_try++; uint16_t payload_len = m->data_len - sizeof(struct mt_udp_hdr); void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr)); dbg("%s(%d,%d), mbuf %u payload_len %u\n", __func__, port, fd, i, payload_len); @@ -186,7 +189,7 @@ static void* tx_socket_thread_loop(void* arg) { } while ((ret < 0) && (rte_atomic32_read(&t->stop_thread) == 0)); rte_pktmbuf_free(m); } - info("%s(%d,%d), stop, tx pkt %d\n", __func__, port, t->fd, t->stat_tx_pkt); + info("%s(%d,%d), stop\n", __func__, port, t->fd); return NULL; } @@ -282,6 +285,24 @@ static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) { return 0; } +static int tx_socket_stat_dump(void* priv) { + struct mt_tx_socket_entry* entry = priv; + enum mtl_port port = entry->port; + + for (int i = 0; i < entry->threads; i++) { + struct mt_tx_socket_thread* t = &entry->threads_data[i]; + int fd = t->fd; + + info("%s(%d,%d), tx pkt %d gso %d try %d on thread %d\n", __func__, port, fd, + t->stat_tx_pkt, t->stat_tx_gso, t->stat_tx_try, i); + t->stat_tx_pkt = 0; + t->stat_tx_gso = 0; + t->stat_tx_try = 0; + } + + return 0; +} + struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, enum mtl_port port, struct mt_txq_flow* flow) { @@ -330,6 +351,14 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, } } + ret = mt_stat_register(impl, tx_socket_stat_dump, entry, "tx_socket"); + if (ret < 0) { + err("%s(%d), stat register fail %d\n", __func__, port, ret); + mt_tx_socket_put(entry); + return NULL; + } + entry->stat_registered = true; + uint8_t* ip = flow->dip_addr; info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u gso_sz %u\n", __func__, port, entry->threads_data[0].fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port, @@ -341,6 +370,12 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) { int idx = entry->threads_data[0].fd; enum mtl_port port = entry->port; + if (entry->stat_registered) { + tx_socket_stat_dump(entry); + mt_stat_unregister(entry->parent, tx_socket_stat_dump, entry); + entry->stat_registered = false; + } + /* stop threads */ for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { struct mt_tx_socket_thread* t = &entry->threads_data[i]; @@ -365,8 +400,6 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) { if (t->fd >= 0) { close(t->fd); t->fd = -1; - info("%s(%d,%d), tx pkt %d gso %d on thread %d\n", __func__, port, idx, - t->stat_tx_pkt, t->stat_tx_gso, i); } } @@ -485,6 +518,7 @@ static struct rte_mbuf* rx_socket_recv_mbuf(struct mt_rx_socket_thread* t) { struct sockaddr_in addr_in; socklen_t addr_in_len = sizeof(addr_in); + t->stat_rx_try++; ssize_t len = recvfrom(fd, payload, entry->pool_element_sz, MSG_DONTWAIT, &addr_in, &addr_in_len); if (len <= 0) { @@ -527,7 +561,7 @@ static void* rx_socket_thread_loop(void* arg) { if (ret >= 0) break; /* succ */ } } - info("%s(%d,%d), stop thread %d, rx pkt %d\n", __func__, port, fd, idx, t->stat_rx_pkt); + info("%s(%d,%d), stop thread %d\n", __func__, port, fd, idx); return NULL; } @@ -566,6 +600,23 @@ static int rx_socket_init_threads(struct mt_rx_socket_entry* entry) { return 0; } +static int rx_socket_stat_dump(void* priv) { + struct mt_rx_socket_entry* entry = priv; + enum mtl_port port = entry->port; + int fd = entry->fd; + + for (int i = 0; i < entry->threads; i++) { + struct mt_rx_socket_thread* t = &entry->threads_data[i]; + + info("%s(%d,%d), rx pkt %d try %d on thread %d\n", __func__, port, fd, t->stat_rx_pkt, + t->stat_rx_try, i); + t->stat_rx_pkt = 0; + t->stat_rx_try = 0; + } + + return 0; +} + struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, enum mtl_port port, struct mt_rxq_flow* flow) { @@ -645,6 +696,14 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, } } + ret = mt_stat_register(impl, rx_socket_stat_dump, entry, "rx_socket"); + if (ret < 0) { + err("%s(%d), stat register fail %d\n", __func__, port, ret); + mt_rx_socket_put(entry); + return NULL; + } + entry->stat_registered = true; + uint8_t* ip = flow->dip_addr; info("%s(%d), fd %d ip %u.%u.%u.%u port %u threads %d\n", __func__, port, fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port, entry->threads); @@ -652,10 +711,16 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, } int mt_rx_socket_put(struct mt_rx_socket_entry* entry) { - int idx = entry->fd; + int fd = entry->fd; enum mtl_port port = entry->port; struct mt_rx_socket_thread* t; + if (entry->stat_registered) { + rx_socket_stat_dump(entry); + mt_stat_unregister(entry->parent, rx_socket_stat_dump, entry); + entry->stat_registered = false; + } + /* stop threads */ for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { t = &entry->threads_data[i]; @@ -669,7 +734,6 @@ int mt_rx_socket_put(struct mt_rx_socket_entry* entry) { rte_pktmbuf_free(t->mbuf); t->mbuf = NULL; } - info("%s(%d,%d), rx pkt %d on thread %d\n", __func__, port, idx, t->stat_rx_pkt, i); } if (entry->ring) { @@ -687,7 +751,7 @@ int mt_rx_socket_put(struct mt_rx_socket_entry* entry) { entry->pool = NULL; } - info("%s(%d,%d), succ\n", __func__, entry->port, idx); + info("%s(%d,%d), succ\n", __func__, port, fd); mt_rte_free(entry); return 0; } diff --git a/lib/src/mt_main.h b/lib/src/mt_main.h index 442a8127c..0978d2ef5 100644 --- a/lib/src/mt_main.h +++ b/lib/src/mt_main.h @@ -961,6 +961,7 @@ struct mt_tx_socket_thread { char msg_control[CMSG_SPACE(sizeof(uint16_t))]; #endif + int stat_tx_try; int stat_tx_pkt; int stat_tx_gso; }; @@ -975,6 +976,7 @@ struct mt_tx_socket_entry { int threads; struct rte_ring* ring; struct mt_tx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX]; + bool stat_registered; }; struct mt_rx_socket_thread { @@ -984,6 +986,7 @@ struct mt_rx_socket_thread { pthread_t tid; rte_atomic32_t stop_thread; + int stat_rx_try; int stat_rx_pkt; }; @@ -1000,6 +1003,7 @@ struct mt_rx_socket_entry { int threads; struct rte_ring* ring; struct mt_rx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX]; + bool stat_registered; }; struct mt_tx_xdp_entry {