Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx committed Oct 17, 2023
1 parent 9b0b0e8 commit 9386990
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 47 deletions.
2 changes: 2 additions & 0 deletions app/src/rxtx_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static int app_dump_ptp_sync_stat(struct st_app_context* ctx) {
static void app_stat(void* priv) {
struct st_app_context* ctx = priv;

if (ctx->stop) return;

if (ctx->mtl_log_stream) {
app_dump_io_stat(ctx);
st_app_tx_videos_io_stat(ctx);
Expand Down
176 changes: 136 additions & 40 deletions lib/src/dev/mt_af_xdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <xdp/xsk.h>

#include "../mt_log.h"
#include "../mt_stat.h"

#ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
#error "Please use XDP lib version with XDP_UMEM_UNALIGNED_CHUNK_FLAG"
Expand All @@ -22,21 +23,37 @@ struct mt_xdp_queue {
struct rte_mempool* mbuf_pool;
uint16_t q;
uint32_t umem_ring_size;
uint32_t tx_free_thresh;

struct xsk_umem* umem;
struct xsk_ring_prod pq;
struct rte_mbuf** pq_mbufs;
struct xsk_ring_cons cq;
void* umem_buffer;
struct rte_mbuf** pq_mbufs;

struct xsk_socket* socket;
int socket_fd;

struct xsk_ring_prod pq;
struct xsk_ring_cons socket_rx;
struct xsk_ring_prod socket_tx;

/* tx pkt done on this consumer ring, filled by kernel */
struct xsk_ring_cons tx_cons;
/* tx pkt send on this producer ring, filled from userspace on the TX data path */
struct xsk_ring_prod tx_prod;
uint32_t tx_free_thresh;

struct mt_tx_xdp_entry* tx_entry;
struct mt_rx_xdp_entry* rx_entry;

uint64_t stat_tx_pkts;
uint64_t stat_tx_bytes;
uint64_t stat_tx_free;
uint64_t stat_tx_submit;
uint64_t stat_tx_copy;
uint64_t stat_tx_wakeup;
uint64_t stat_tx_mbuf_alloc_fail;
uint64_t stat_tx_prod_reserve_fail;

uint64_t stat_rx_pkts;
uint64_t stat_rx_bytes;
};

struct mt_xdp_priv {
Expand All @@ -48,10 +65,69 @@ struct mt_xdp_priv {
uint32_t combined_count;

struct mt_xdp_queue* queues_info;
pthread_mutex_t tx_queues_lock;
pthread_mutex_t rx_queues_lock;
pthread_mutex_t queues_lock;
};

static int xdp_queue_tx_stat(struct mt_xdp_queue* xq) {
enum mtl_port port = xq->port;
uint16_t q = xq->q;

notice("%s(%d,%u), pkts %" PRIu64 " bytes %" PRIu64 " submit %" PRIu64 " free %" PRIu64
" wakeup %" PRIu64 "\n",
__func__, port, q, xq->stat_tx_pkts, xq->stat_tx_bytes, xq->stat_tx_submit,
xq->stat_tx_free, xq->stat_tx_wakeup);
xq->stat_tx_pkts = 0;
xq->stat_tx_bytes = 0;
xq->stat_tx_submit = 0;
xq->stat_tx_free = 0;
xq->stat_tx_wakeup = 0;
if (xq->stat_tx_copy) {
notice("%s(%d,%u), pkts copy %" PRIu64 "\n", __func__, port, q, xq->stat_tx_copy);
xq->stat_tx_copy = 0;
}

uint32_t ring_sz = xq->umem_ring_size;
uint32_t cons_avail = xsk_cons_nb_avail(&xq->tx_cons, ring_sz);
uint32_t prod_free = xsk_prod_nb_free(&xq->tx_prod, ring_sz);
notice("%s(%d,%u), cons_avail %u prod_free %u\n", __func__, port, q, cons_avail,
prod_free);

if (xq->stat_tx_mbuf_alloc_fail) {
warn("%s(%d,%u), mbuf alloc fail %" PRIu64 "\n", __func__, port, q,
xq->stat_tx_mbuf_alloc_fail);
xq->stat_tx_mbuf_alloc_fail = 0;
}
if (xq->stat_tx_prod_reserve_fail) {
err("%s(%d,%u), prod reserve fail %" PRIu64 "\n", __func__, port, q,
xq->stat_tx_prod_reserve_fail);
xq->stat_tx_prod_reserve_fail = 0;
}
return 0;
}

static int xdp_queue_rx_stat(struct mt_xdp_queue* xq) {
enum mtl_port port = xq->port;
uint16_t q = xq->q;

notice("%s(%d,%u), pkts %" PRIu64 " bytes %" PRIu64 "\n", __func__, port, q,
xq->stat_rx_pkts, xq->stat_rx_bytes);
xq->stat_rx_pkts = 0;
xq->stat_rx_bytes = 0;
return 0;
}

static int xdp_stat_dump(void* priv) {
struct mt_xdp_priv* xdp = priv;

for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
struct mt_xdp_queue* xq = &xdp->queues_info[i];
if (xq->tx_entry) xdp_queue_tx_stat(xq);
if (xq->rx_entry) xdp_queue_rx_stat(xq);
}

return 0;
}

static int xdp_pq_uinit(struct mt_xdp_queue* xq) {
if (!xq->pq_mbufs) return 0;

Expand Down Expand Up @@ -99,8 +175,7 @@ static int xdp_free(struct mt_xdp_priv* xdp) {
xdp->queues_info = NULL;
}

mt_pthread_mutex_destroy(&xdp->tx_queues_lock);
mt_pthread_mutex_destroy(&xdp->rx_queues_lock);
mt_pthread_mutex_destroy(&xdp->queues_lock);
mt_rte_free(xdp);
return 0;
}
Expand Down Expand Up @@ -166,7 +241,7 @@ static int xdp_umem_init(struct mt_xdp_queue* xq) {
umem_size = (uint64_t)pool->populated_size * (uint64_t)cfg.frame_size + align;
dbg("%s(%d), base_addr %p umem_size %" PRIu64 "\n", __func__, port, base_addr,
umem_size);
ret = xsk_umem__create(&xq->umem, base_addr, umem_size, &xq->pq, &xq->cq, &cfg);
ret = xsk_umem__create(&xq->umem, base_addr, umem_size, &xq->pq, &xq->tx_cons, &cfg);
if (ret < 0) {
err("%s(%d,%u), umem create fail %d %s\n", __func__, port, q, ret, strerror(errno));
return ret;
Expand Down Expand Up @@ -231,11 +306,11 @@ static int xdp_socket_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
cfg.rx_size = mt_if_nb_rx_desc(impl, port);
cfg.tx_size = mt_if_nb_tx_desc(impl, port);
cfg.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST;
cfg.bind_flags = XDP_USE_NEED_WAKEUP;
// cfg.bind_flags = XDP_USE_NEED_WAKEUP;

const char* if_name = mt_kernel_if_name(impl, port);
ret = xsk_socket__create(&xq->socket, if_name, q, xq->umem, &xq->socket_rx,
&xq->socket_tx, &cfg);
&xq->tx_prod, &cfg);
if (ret < 0) {
err("%s(%d,%u), xsk create fail %d\n", __func__, port, q, ret);
return ret;
Expand Down Expand Up @@ -274,8 +349,8 @@ static int xdp_queue_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
return 0;
}

static void xdp_tx_pull(struct mt_xdp_queue* xq) {
struct xsk_ring_cons* cq = &xq->cq;
static void xdp_tx_poll_done(struct mt_xdp_queue* xq) {
struct xsk_ring_cons* cq = &xq->tx_cons;
uint32_t idx = 0;
uint32_t size = xq->umem_ring_size;
uint32_t n = xsk_ring_cons__peek(cq, size, &idx);
Expand All @@ -289,18 +364,27 @@ static void xdp_tx_pull(struct mt_xdp_queue* xq) {
addr);
rte_pktmbuf_free(m);
}
xq->stat_tx_free += n;

xsk_ring_cons__release(cq, n);
}

static void xdp_tx_kick(struct mt_xdp_queue* xq) {
struct xsk_ring_prod* socket_tx = &xq->socket_tx;
static inline void xdp_tx_check_free(struct mt_xdp_queue* xq) {
struct xsk_ring_cons* cq = &xq->tx_cons;
uint32_t cq_avail = xsk_cons_nb_avail(cq, xq->umem_ring_size);
dbg("%s(%d, %u), cq_avail %u\n", __func__, port, q, cq_avail);
if (cq_avail >= xq->tx_free_thresh) {
xdp_tx_poll_done(xq);
}
}

static void xdp_tx_wakeup(struct mt_xdp_queue* xq) {
enum mtl_port port = xq->port;
uint16_t q = xq->q;

xdp_tx_pull(xq);
if (xsk_ring_prod__needs_wakeup(socket_tx)) {
if (xsk_ring_prod__needs_wakeup(&xq->tx_prod)) {
int ret = send(xq->socket_fd, NULL, 0, MSG_DONTWAIT);
xq->stat_tx_wakeup++;
dbg("%s(%d, %u), wake up %d\n", __func__, port, q, ret);
if (ret < 0) {
err("%s(%d, %u), wake up fail %d(%s)\n", __func__, port, q, ret, strerror(errno));
Expand All @@ -310,21 +394,15 @@ static void xdp_tx_kick(struct mt_xdp_queue* xq) {

static uint16_t xdp_tx(struct mtl_main_impl* impl, struct mt_xdp_queue* xq,
struct rte_mbuf** tx_pkts, uint16_t nb_pkts) {
struct xsk_ring_cons* cq = &xq->cq;
uint32_t free_thresh = xq->tx_free_thresh;
enum mtl_port port = xq->port;
uint16_t q = xq->q;
struct rte_mempool* mbuf_pool = xq->mbuf_pool;
uint16_t tx = 0;
struct xsk_ring_prod* socket_tx = &xq->socket_tx;
struct xsk_ring_prod* pd = &xq->tx_prod;
struct mtl_port_status* stats = mt_if(impl, port)->dev_stats_sw;
uint64_t tx_bytes = 0;

uint32_t cq_avail = xsk_cons_nb_avail(cq, free_thresh);
dbg("%s(%d, %u), cq_avail %u\n", __func__, port, q, cq_avail);
if (cq_avail >= free_thresh) {
xdp_tx_pull(xq);
}
xdp_tx_check_free(xq); /* do we need check free threshold for every tx burst */

for (uint16_t i = 0; i < nb_pkts; i++) {
struct rte_mbuf* m = tx_pkts[i];
Expand All @@ -335,17 +413,19 @@ static uint16_t xdp_tx(struct mtl_main_impl* impl, struct mt_xdp_queue* xq,
} else {
struct rte_mbuf* local = rte_pktmbuf_alloc(mbuf_pool);
if (!local) {
err("%s(%d, %u), local mbuf alloc fail\n", __func__, port, q);
dbg("%s(%d, %u), local mbuf alloc fail\n", __func__, port, q);
xq->stat_tx_mbuf_alloc_fail++;
goto exit;
}

uint32_t idx;
if (!xsk_ring_prod__reserve(socket_tx, 1, &idx)) {
if (!xsk_ring_prod__reserve(pd, 1, &idx)) {
err("%s(%d, %u), socket_tx reserve fail\n", __func__, port, q);
xq->stat_tx_prod_reserve_fail++;
rte_pktmbuf_free(local);
goto exit;
}
struct xdp_desc* desc = xsk_ring_prod__tx_desc(socket_tx, idx);
struct xdp_desc* desc = xsk_ring_prod__tx_desc(pd, idx);
desc->len = m->pkt_len;
uint64_t addr =
(uint64_t)local - (uint64_t)xq->umem_buffer - xq->mbuf_pool->header_size;
Expand All @@ -359,19 +439,25 @@ static uint16_t xdp_tx(struct mtl_main_impl* impl, struct mt_xdp_queue* xq,
rte_pktmbuf_free(m);
dbg("%s(%d, %u), tx local mbuf %p umem pkt %p addr 0x%" PRIu64 "\n", __func__, port,
q, local, pkt, addr);
xq->stat_tx_copy++;
tx++;
}
}

exit:
if (tx) {
dbg("%s(%d, %u), submit %u\n", __func__, port, q, tx);
xsk_ring_prod__submit(socket_tx, tx);
xdp_tx_kick(xq);
xsk_ring_prod__submit(pd, tx);
xdp_tx_wakeup(xq); /* do we need wakeup for every submit? */
if (stats) {
stats->tx_packets += tx;
stats->tx_bytes += tx_bytes;
}
xq->stat_tx_submit++;
xq->stat_tx_pkts += tx;
xq->stat_tx_bytes += tx_bytes;
} else {
xdp_tx_poll_done(xq);
}
return tx;
}
Expand All @@ -398,8 +484,7 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
xdp->combined_count = 1;
xdp->start_queue = p->xdp_info[port].start_queue;
xdp->queues_cnt = RTE_MAX(inf->max_tx_queues, inf->max_rx_queues);
mt_pthread_mutex_init(&xdp->tx_queues_lock, NULL);
mt_pthread_mutex_init(&xdp->rx_queues_lock, NULL);
mt_pthread_mutex_init(&xdp->queues_lock, NULL);

xdp_parse_combined_info(xdp);
if ((xdp->start_queue + xdp->queues_cnt) > xdp->combined_count) {
Expand All @@ -424,7 +509,7 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
xq->port = port;
xq->q = q;
xq->umem_ring_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
xq->tx_free_thresh = xq->umem_ring_size / 2;
xq->tx_free_thresh = 0; /* default check free always */
xq->mbuf_pool = inf->rx_queues[i].mbuf_pool;
if (!xq->mbuf_pool) {
err("%s(%d), no mbuf_pool for q %u\n", __func__, port, q);
Expand All @@ -440,6 +525,13 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
}
}

ret = mt_stat_register(impl, xdp_stat_dump, xdp, "xdp");
if (ret < 0) {
err("%s(%d), stat register fail %d\n", __func__, port, ret);
xdp_free(xdp);
return ret;
}

inf->xdp = xdp;
info("%s(%d), start queue %u cnt %u\n", __func__, port, xdp->start_queue,
xdp->queues_cnt);
Expand All @@ -449,6 +541,9 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
int mt_dev_xdp_uinit(struct mt_interface* inf) {
struct mt_xdp_priv* xdp = inf->xdp;
if (!xdp) return 0;
struct mtl_main_impl* impl = inf->parent;

mt_stat_unregister(impl, xdp_stat_dump, xdp);

xdp_free(xdp);
inf->xdp = NULL;
Expand Down Expand Up @@ -476,15 +571,15 @@ struct mt_tx_xdp_entry* mt_tx_xdp_get(struct mtl_main_impl* impl, enum mtl_port
struct mt_xdp_priv* xdp = mt_if(impl, port)->xdp;
struct mt_xdp_queue* xq = NULL;
/* find a null slot */
mt_pthread_mutex_lock(&xdp->tx_queues_lock);
mt_pthread_mutex_lock(&xdp->queues_lock);
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
if (!xdp->queues_info[i].tx_entry) {
xq = &xdp->queues_info[i];
xq->tx_entry = entry;
break;
}
}
mt_pthread_mutex_unlock(&xdp->tx_queues_lock);
mt_pthread_mutex_unlock(&xdp->queues_lock);
if (!xq) {
err("%s(%d), no free tx queue\n", __func__, port);
mt_tx_xdp_put(entry);
Expand All @@ -505,8 +600,9 @@ int mt_tx_xdp_put(struct mt_tx_xdp_entry* entry) {
uint8_t* ip = flow->dip_addr;
struct mt_xdp_queue* xq = entry->xq;

/* pull all buf sent */
xdp_tx_pull(xq);
/* poll all done buf */
xdp_tx_poll_done(xq);
xdp_queue_tx_stat(xq);

xq->tx_entry = NULL;
info("%s(%d), ip %u.%u.%u.%u, port %u, queue %u\n", __func__, port, ip[0], ip[1], ip[2],
Expand Down Expand Up @@ -540,15 +636,15 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port
struct mt_xdp_priv* xdp = mt_if(impl, port)->xdp;
struct mt_xdp_queue* xq = NULL;
/* find a null slot */
mt_pthread_mutex_lock(&xdp->rx_queues_lock);
mt_pthread_mutex_lock(&xdp->queues_lock);
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
if (!xdp->queues_info[i].rx_entry) {
xq = &xdp->queues_info[i];
xq->rx_entry = entry;
break;
}
}
mt_pthread_mutex_unlock(&xdp->rx_queues_lock);
mt_pthread_mutex_unlock(&xdp->queues_lock);
if (!xq) {
err("%s(%d), no free tx queue\n", __func__, port);
mt_rx_xdp_put(entry);
Expand Down
Loading

0 comments on commit 9386990

Please sign in to comment.