Skip to content

Commit

Permalink
dp/xdp: initial tx burst support
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx committed Oct 16, 2023
1 parent 0e38afd commit 9b0b0e8
Showing 1 changed file with 133 additions and 24 deletions.
157 changes: 133 additions & 24 deletions lib/src/dev/mt_af_xdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
#endif

struct mt_xdp_queue {
enum mtl_port port;
struct rte_mempool* mbuf_pool;
uint16_t q;
uint32_t umem_ring_size;
uint32_t tx_free_thresh;

struct xsk_umem* umem;
struct xsk_ring_prod pq;
Expand All @@ -28,6 +31,7 @@ struct mt_xdp_queue {
void* umem_buffer;

struct xsk_socket* socket;
int socket_fd;
struct xsk_ring_cons socket_rx;
struct xsk_ring_prod socket_tx;

Expand All @@ -43,30 +47,29 @@ struct mt_xdp_priv {
uint32_t max_combined;
uint32_t combined_count;

uint32_t umem_ring_size;
struct mt_xdp_queue* queues_info;
pthread_mutex_t tx_queues_lock;
pthread_mutex_t rx_queues_lock;
};

static int xdp_pq_uinit(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
static int xdp_pq_uinit(struct mt_xdp_queue* xq) {
if (!xq->pq_mbufs) return 0;

if (xq->pq_mbufs[0]) rte_pktmbuf_free_bulk(xq->pq_mbufs, xdp->umem_ring_size);
if (xq->pq_mbufs[0]) rte_pktmbuf_free_bulk(xq->pq_mbufs, xq->umem_ring_size);

mt_rte_free(xq->pq_mbufs);
xq->pq_mbufs = NULL;

return 0;
}

static int xdp_queue_uinit(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
static int xdp_queue_uinit(struct mt_xdp_queue* xq) {
if (xq->socket) {
xsk_socket__delete(xq->socket);
xq->socket = NULL;
}

xdp_pq_uinit(xdp, xq);
xdp_pq_uinit(xq);

if (xq->umem) {
xsk_umem__delete(xq->umem);
Expand All @@ -82,7 +85,7 @@ static int xdp_free(struct mt_xdp_priv* xdp) {
if (xdp->queues_info) {
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
struct mt_xdp_queue* xq = &xdp->queues_info[i];
xdp_queue_uinit(xdp, xq);
xdp_queue_uinit(xq);
if (xq->tx_entry) {
warn("%s(%d,%u), tx_entry still active\n", __func__, port, xq->q);
mt_tx_xdp_put(xq->tx_entry);
Expand Down Expand Up @@ -141,8 +144,8 @@ static inline uintptr_t xdp_mp_base_addr(struct rte_mempool* mp, uint64_t* align
return aligned_addr;
}

static int xdp_umem_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
enum mtl_port port = xdp->port;
static int xdp_umem_init(struct mt_xdp_queue* xq) {
enum mtl_port port = xq->port;
uint16_t q = xq->q;
int ret;
struct xsk_umem_config cfg;
Expand All @@ -151,8 +154,8 @@ static int xdp_umem_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
uint64_t umem_size, align = 0;

memset(&cfg, 0, sizeof(cfg));
cfg.fill_size = xdp->umem_ring_size * 2;
cfg.comp_size = xdp->umem_ring_size;
cfg.fill_size = xq->umem_ring_size * 2;
cfg.comp_size = xq->umem_ring_size;
cfg.flags = XDP_UMEM_UNALIGNED_CHUNK_FLAG;

cfg.frame_size = rte_mempool_calc_obj_size(pool->elt_size, pool->flags, NULL);
Expand All @@ -176,9 +179,9 @@ static int xdp_umem_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
}

static int xdp_pq_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
enum mtl_port port = xdp->port;
enum mtl_port port = xq->port;
uint16_t q = xq->q;
uint32_t ring_sz = xdp->umem_ring_size;
uint32_t ring_sz = xq->umem_ring_size;
struct xsk_ring_prod* pq = &xq->pq;
int ret;

Expand All @@ -190,15 +193,15 @@ static int xdp_pq_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
}
ret = rte_pktmbuf_alloc_bulk(xq->mbuf_pool, xq->pq_mbufs, ring_sz);
if (ret < 0) {
xdp_pq_uinit(xdp, xq);
xdp_pq_uinit(xq);
err("%s(%d,%u), mbuf alloc fail %d\n", __func__, port, q, ret);
return ret;
}

uint32_t idx = 0;
if (!xsk_ring_prod__reserve(pq, ring_sz, &idx)) {
err("%s(%d,%u), reserve fail\n", __func__, port, q);
xdp_pq_uinit(xdp, xq);
xdp_pq_uinit(xq);
return -EIO;
}

Expand All @@ -218,7 +221,7 @@ static int xdp_pq_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
}

static int xdp_socket_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
enum mtl_port port = xdp->port;
enum mtl_port port = xq->port;
uint16_t q = xq->q;
struct mtl_main_impl* impl = xdp->parent;
struct xsk_socket_config cfg;
Expand All @@ -238,38 +241,141 @@ static int xdp_socket_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
return ret;
}

xq->socket_fd = xsk_socket__fd(xq->socket);
return 0;
}

static int xdp_queue_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
enum mtl_port port = xdp->port;
enum mtl_port port = xq->port;
uint16_t q = xq->q;
int ret;

ret = xdp_umem_init(xdp, xq);
ret = xdp_umem_init(xq);
if (ret < 0) {
err("%s(%d,%u), umem init fail %d\n", __func__, port, q, ret);
xdp_queue_uinit(xdp, xq);
xdp_queue_uinit(xq);
return ret;
}

ret = xdp_pq_init(xdp, xq);
if (ret < 0) {
err("%s(%d,%u), pq init fail %d\n", __func__, port, q, ret);
xdp_queue_uinit(xdp, xq);
xdp_queue_uinit(xq);
return ret;
}

ret = xdp_socket_init(xdp, xq);
if (ret < 0) {
err("%s(%d,%u), socket init fail %d\n", __func__, port, q, ret);
xdp_queue_uinit(xdp, xq);
xdp_queue_uinit(xq);
return ret;
}

return 0;
}

static void xdp_tx_pull(struct mt_xdp_queue* xq) {
struct xsk_ring_cons* cq = &xq->cq;
uint32_t idx = 0;
uint32_t size = xq->umem_ring_size;
uint32_t n = xsk_ring_cons__peek(cq, size, &idx);

for (uint32_t i = 0; i < n; i++) {
uint64_t addr = *xsk_ring_cons__comp_addr(cq, idx++);
addr = xsk_umem__extract_addr(addr);
struct rte_mbuf* m = (struct rte_mbuf*)xsk_umem__get_data(
xq->umem_buffer, addr + xq->mbuf_pool->header_size);
dbg("%s(%d, %u), free mbuf %p addr 0x%" PRIu64 "\n", __func__, xq->port, xq->q, m,
addr);
rte_pktmbuf_free(m);
}

xsk_ring_cons__release(cq, n);
}

static void xdp_tx_kick(struct mt_xdp_queue* xq) {
struct xsk_ring_prod* socket_tx = &xq->socket_tx;
enum mtl_port port = xq->port;
uint16_t q = xq->q;

xdp_tx_pull(xq);
if (xsk_ring_prod__needs_wakeup(socket_tx)) {
int ret = send(xq->socket_fd, NULL, 0, MSG_DONTWAIT);
dbg("%s(%d, %u), wake up %d\n", __func__, port, q, ret);
if (ret < 0) {
err("%s(%d, %u), wake up fail %d(%s)\n", __func__, port, q, ret, strerror(errno));
}
}
}

static uint16_t xdp_tx(struct mtl_main_impl* impl, struct mt_xdp_queue* xq,
struct rte_mbuf** tx_pkts, uint16_t nb_pkts) {
struct xsk_ring_cons* cq = &xq->cq;
uint32_t free_thresh = xq->tx_free_thresh;
enum mtl_port port = xq->port;
uint16_t q = xq->q;
struct rte_mempool* mbuf_pool = xq->mbuf_pool;
uint16_t tx = 0;
struct xsk_ring_prod* socket_tx = &xq->socket_tx;
struct mtl_port_status* stats = mt_if(impl, port)->dev_stats_sw;
uint64_t tx_bytes = 0;

uint32_t cq_avail = xsk_cons_nb_avail(cq, free_thresh);
dbg("%s(%d, %u), cq_avail %u\n", __func__, port, q, cq_avail);
if (cq_avail >= free_thresh) {
xdp_tx_pull(xq);
}

for (uint16_t i = 0; i < nb_pkts; i++) {
struct rte_mbuf* m = tx_pkts[i];

if (m->pool == mbuf_pool) {
warn("%s(%d, %u), same mbuf_pool todo\n", __func__, port, q);
goto exit;
} else {
struct rte_mbuf* local = rte_pktmbuf_alloc(mbuf_pool);
if (!local) {
err("%s(%d, %u), local mbuf alloc fail\n", __func__, port, q);
goto exit;
}

uint32_t idx;
if (!xsk_ring_prod__reserve(socket_tx, 1, &idx)) {
err("%s(%d, %u), socket_tx reserve fail\n", __func__, port, q);
rte_pktmbuf_free(local);
goto exit;
}
struct xdp_desc* desc = xsk_ring_prod__tx_desc(socket_tx, idx);
desc->len = m->pkt_len;
uint64_t addr =
(uint64_t)local - (uint64_t)xq->umem_buffer - xq->mbuf_pool->header_size;
uint64_t offset = rte_pktmbuf_mtod(local, uint64_t) - (uint64_t)local +
xq->mbuf_pool->header_size;
void* pkt = xsk_umem__get_data(xq->umem_buffer, addr + offset);
offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
desc->addr = addr | offset;
rte_memcpy(pkt, rte_pktmbuf_mtod(m, void*), desc->len);
tx_bytes += m->data_len;
rte_pktmbuf_free(m);
dbg("%s(%d, %u), tx local mbuf %p umem pkt %p addr 0x%" PRIu64 "\n", __func__, port,
q, local, pkt, addr);
tx++;
}
}

exit:
if (tx) {
dbg("%s(%d, %u), submit %u\n", __func__, port, q, tx);
xsk_ring_prod__submit(socket_tx, tx);
xdp_tx_kick(xq);
if (stats) {
stats->tx_packets += tx;
stats->tx_bytes += tx_bytes;
}
}
return tx;
}

int mt_dev_xdp_init(struct mt_interface* inf) {
struct mtl_main_impl* impl = inf->parent;
enum mtl_port port = inf->port;
Expand Down Expand Up @@ -311,12 +417,14 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
xdp_free(xdp);
return -ENOMEM;
}
xdp->umem_ring_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
struct mt_xdp_queue* xq = &xdp->queues_info[i];
uint16_t q = i + xdp->start_queue;

xq->port = port;
xq->q = q;
xq->umem_ring_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
xq->tx_free_thresh = xq->umem_ring_size / 2;
xq->mbuf_pool = inf->rx_queues[i].mbuf_pool;
if (!xq->mbuf_pool) {
err("%s(%d), no mbuf_pool for q %u\n", __func__, port, q);
Expand Down Expand Up @@ -397,6 +505,9 @@ int mt_tx_xdp_put(struct mt_tx_xdp_entry* entry) {
uint8_t* ip = flow->dip_addr;
struct mt_xdp_queue* xq = entry->xq;

/* pull all buf sent */
xdp_tx_pull(xq);

xq->tx_entry = NULL;
info("%s(%d), ip %u.%u.%u.%u, port %u, queue %u\n", __func__, port, ip[0], ip[1], ip[2],
ip[3], flow->dst_port, entry->queue_id);
Expand All @@ -406,9 +517,7 @@ int mt_tx_xdp_put(struct mt_tx_xdp_entry* entry) {

uint16_t mt_tx_xdp_burst(struct mt_tx_xdp_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts) {
MTL_MAY_UNUSED(entry);
rte_pktmbuf_free_bulk(tx_pkts, nb_pkts);
return nb_pkts;
return xdp_tx(entry->parent, entry->xq, tx_pkts, nb_pkts);
}

struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port port,
Expand Down

0 comments on commit 9b0b0e8

Please sign in to comment.