Skip to content

Commit

Permalink
dp/scoket: add stat dump (#549)
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx authored Oct 26, 2023
1 parent d4f38eb commit 9fd7ea6
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
78 changes: 71 additions & 7 deletions lib/src/datapath/mt_dp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "mt_dp_socket.h"

#include "../mt_log.h"
#include "../mt_stat.h"
#include "../mt_util.h"
#ifndef WINDOWSENV
#include "mudp_api.h"
Expand Down Expand Up @@ -63,6 +64,7 @@ static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m
struct rte_udp_hdr* udp = &hdr->udp;
mudp_init_sockaddr(&send_addr, (uint8_t*)&ipv4->dst_addr, ntohs(udp->dst_port));

t->stat_tx_try++;
/* nonblocking */
ssize_t send = sendto(fd, payload, payload_len, MSG_DONTWAIT,
(const struct sockaddr*)&send_addr, sizeof(send_addr));
Expand Down Expand Up @@ -105,6 +107,7 @@ static uint16_t tx_socket_send_mbuf_gso(struct mt_tx_socket_thread* t,
return tx;
}

t->stat_tx_try++;
uint16_t payload_len = m->data_len - sizeof(struct mt_udp_hdr);
void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr));
dbg("%s(%d,%d), mbuf %u payload_len %u\n", __func__, port, fd, i, payload_len);
Expand Down Expand Up @@ -186,7 +189,7 @@ static void* tx_socket_thread_loop(void* arg) {
} while ((ret < 0) && (rte_atomic32_read(&t->stop_thread) == 0));
rte_pktmbuf_free(m);
}
info("%s(%d,%d), stop, tx pkt %d\n", __func__, port, t->fd, t->stat_tx_pkt);
info("%s(%d,%d), stop\n", __func__, port, t->fd);

return NULL;
}
Expand Down Expand Up @@ -282,6 +285,24 @@ static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) {
return 0;
}

static int tx_socket_stat_dump(void* priv) {
struct mt_tx_socket_entry* entry = priv;
enum mtl_port port = entry->port;

for (int i = 0; i < entry->threads; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];
int fd = t->fd;

info("%s(%d,%d), tx pkt %d gso %d try %d on thread %d\n", __func__, port, fd,
t->stat_tx_pkt, t->stat_tx_gso, t->stat_tx_try, i);
t->stat_tx_pkt = 0;
t->stat_tx_gso = 0;
t->stat_tx_try = 0;
}

return 0;
}

struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
enum mtl_port port,
struct mt_txq_flow* flow) {
Expand Down Expand Up @@ -330,6 +351,14 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
}
}

ret = mt_stat_register(impl, tx_socket_stat_dump, entry, "tx_socket");
if (ret < 0) {
err("%s(%d), stat register fail %d\n", __func__, port, ret);
mt_tx_socket_put(entry);
return NULL;
}
entry->stat_registered = true;

uint8_t* ip = flow->dip_addr;
info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u gso_sz %u\n", __func__, port,
entry->threads_data[0].fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port,
Expand All @@ -341,6 +370,12 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) {
int idx = entry->threads_data[0].fd;
enum mtl_port port = entry->port;

if (entry->stat_registered) {
tx_socket_stat_dump(entry);
mt_stat_unregister(entry->parent, tx_socket_stat_dump, entry);
entry->stat_registered = false;
}

/* stop threads */
for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];
Expand All @@ -365,8 +400,6 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) {
if (t->fd >= 0) {
close(t->fd);
t->fd = -1;
info("%s(%d,%d), tx pkt %d gso %d on thread %d\n", __func__, port, idx,
t->stat_tx_pkt, t->stat_tx_gso, i);
}
}

Expand Down Expand Up @@ -485,6 +518,7 @@ static struct rte_mbuf* rx_socket_recv_mbuf(struct mt_rx_socket_thread* t) {
struct sockaddr_in addr_in;
socklen_t addr_in_len = sizeof(addr_in);

t->stat_rx_try++;
ssize_t len =
recvfrom(fd, payload, entry->pool_element_sz, MSG_DONTWAIT, &addr_in, &addr_in_len);
if (len <= 0) {
Expand Down Expand Up @@ -527,7 +561,7 @@ static void* rx_socket_thread_loop(void* arg) {
if (ret >= 0) break; /* succ */
}
}
info("%s(%d,%d), stop thread %d, rx pkt %d\n", __func__, port, fd, idx, t->stat_rx_pkt);
info("%s(%d,%d), stop thread %d\n", __func__, port, fd, idx);

return NULL;
}
Expand Down Expand Up @@ -566,6 +600,23 @@ static int rx_socket_init_threads(struct mt_rx_socket_entry* entry) {
return 0;
}

static int rx_socket_stat_dump(void* priv) {
struct mt_rx_socket_entry* entry = priv;
enum mtl_port port = entry->port;
int fd = entry->fd;

for (int i = 0; i < entry->threads; i++) {
struct mt_rx_socket_thread* t = &entry->threads_data[i];

info("%s(%d,%d), rx pkt %d try %d on thread %d\n", __func__, port, fd, t->stat_rx_pkt,
t->stat_rx_try, i);
t->stat_rx_pkt = 0;
t->stat_rx_try = 0;
}

return 0;
}

struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,
enum mtl_port port,
struct mt_rxq_flow* flow) {
Expand Down Expand Up @@ -645,17 +696,31 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,
}
}

ret = mt_stat_register(impl, rx_socket_stat_dump, entry, "rx_socket");
if (ret < 0) {
err("%s(%d), stat register fail %d\n", __func__, port, ret);
mt_rx_socket_put(entry);
return NULL;
}
entry->stat_registered = true;

uint8_t* ip = flow->dip_addr;
info("%s(%d), fd %d ip %u.%u.%u.%u port %u threads %d\n", __func__, port, fd, ip[0],
ip[1], ip[2], ip[3], flow->dst_port, entry->threads);
return entry;
}

int mt_rx_socket_put(struct mt_rx_socket_entry* entry) {
int idx = entry->fd;
int fd = entry->fd;
enum mtl_port port = entry->port;
struct mt_rx_socket_thread* t;

if (entry->stat_registered) {
rx_socket_stat_dump(entry);
mt_stat_unregister(entry->parent, rx_socket_stat_dump, entry);
entry->stat_registered = false;
}

/* stop threads */
for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
t = &entry->threads_data[i];
Expand All @@ -669,7 +734,6 @@ int mt_rx_socket_put(struct mt_rx_socket_entry* entry) {
rte_pktmbuf_free(t->mbuf);
t->mbuf = NULL;
}
info("%s(%d,%d), rx pkt %d on thread %d\n", __func__, port, idx, t->stat_rx_pkt, i);
}

if (entry->ring) {
Expand All @@ -687,7 +751,7 @@ int mt_rx_socket_put(struct mt_rx_socket_entry* entry) {
entry->pool = NULL;
}

info("%s(%d,%d), succ\n", __func__, entry->port, idx);
info("%s(%d,%d), succ\n", __func__, port, fd);
mt_rte_free(entry);
return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ struct mt_tx_socket_thread {
char msg_control[CMSG_SPACE(sizeof(uint16_t))];
#endif

int stat_tx_try;
int stat_tx_pkt;
int stat_tx_gso;
};
Expand All @@ -975,6 +976,7 @@ struct mt_tx_socket_entry {
int threads;
struct rte_ring* ring;
struct mt_tx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX];
bool stat_registered;
};

struct mt_rx_socket_thread {
Expand All @@ -984,6 +986,7 @@ struct mt_rx_socket_thread {
pthread_t tid;
rte_atomic32_t stop_thread;

int stat_rx_try;
int stat_rx_pkt;
};

Expand All @@ -1000,6 +1003,7 @@ struct mt_rx_socket_entry {
int threads;
struct rte_ring* ring;
struct mt_rx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX];
bool stat_registered;
};

struct mt_tx_xdp_entry {
Expand Down

0 comments on commit 9fd7ea6

Please sign in to comment.