Skip to content

Commit

Permalink
shared_queue: add both dpdk and xdp support
Browse files Browse the repository at this point in the history
test with:
./build/app/RxTxApp --config_file tests/script/native_af_xdp_json/loop_shared.json --video_sha_check

Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx committed Nov 2, 2023
1 parent 6f71b56 commit 2687c05
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 75 deletions.
20 changes: 10 additions & 10 deletions lib/src/datapath/mt_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port,
if (!entry->rx_socket_q) goto fail;
entry->queue_id = mt_rx_socket_queue_id(entry->rx_socket_q);
entry->burst = rx_socket_burst;
} else if (mt_pmd_is_native_af_xdp(impl, port)) {
entry->rx_xdp_q = mt_rx_xdp_get(impl, port, flow);
if (!entry->rx_xdp_q) goto fail;
entry->queue_id = mt_rx_xdp_queue_id(entry->rx_xdp_q);
entry->burst = rx_xdp_burst;
} else if (mt_has_srss(impl, port)) {
entry->srss = mt_srss_get(impl, port, flow);
if (!entry->srss) goto fail;
Expand All @@ -73,6 +68,11 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port,
if (!entry->rsq) goto fail;
entry->queue_id = mt_rsq_queue_id(entry->rsq);
entry->burst = rx_rsq_burst;
} else if (mt_pmd_is_native_af_xdp(impl, port)) {
entry->rx_xdp_q = mt_rx_xdp_get(impl, port, flow, NULL);
if (!entry->rx_xdp_q) goto fail;
entry->queue_id = mt_rx_xdp_queue_id(entry->rx_xdp_q);
entry->burst = rx_xdp_burst;
} else if (flow->flags & MT_RXQ_FLOW_F_FORCE_CNI) {
entry->csq = mt_csq_get(impl, port, flow);
if (!entry->csq) goto fail;
Expand Down Expand Up @@ -162,16 +162,16 @@ struct mt_txq_entry* mt_txq_get(struct mtl_main_impl* impl, enum mtl_port port,
if (!entry->tx_socket_q) goto fail;
entry->queue_id = mt_tx_socket_queue_id(entry->tx_socket_q);
entry->burst = tx_socket_burst;
} else if (mt_pmd_is_native_af_xdp(impl, port)) {
entry->tx_xdp_q = mt_tx_xdp_get(impl, port, flow);
if (!entry->tx_xdp_q) goto fail;
entry->queue_id = mt_tx_xdp_queue_id(entry->tx_xdp_q);
entry->burst = tx_xdp_burst;
} else if (mt_shared_tx_queue(impl, port)) {
entry->tsq = mt_tsq_get(impl, port, flow);
if (!entry->tsq) goto fail;
entry->queue_id = mt_tsq_queue_id(entry->tsq);
entry->burst = tx_tsq_burst;
} else if (mt_pmd_is_native_af_xdp(impl, port)) {
entry->tx_xdp_q = mt_tx_xdp_get(impl, port, flow, NULL);
if (!entry->tx_xdp_q) goto fail;
entry->queue_id = mt_tx_xdp_queue_id(entry->tx_xdp_q);
entry->burst = tx_xdp_burst;
} else {
entry->txq = mt_dev_get_tx_queue(impl, port, flow);
if (!entry->txq) goto fail;
Expand Down
67 changes: 62 additions & 5 deletions lib/src/datapath/mt_shared_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "mt_shared_queue.h"

#include "../dev/mt_af_xdp.h"
#include "../dev/mt_dev.h"
#include "../mt_flow.h"
#include "../mt_log.h"
Expand Down Expand Up @@ -94,6 +95,11 @@ static int rsq_uinit(struct mt_rsq_impl* rsq) {
MT_TAILQ_REMOVE(&rsq_queue->head, entry, next);
rsq_entry_free(entry);
}

if (rsq_queue->xdp) {
mt_rx_xdp_put(rsq_queue->xdp);
rsq_queue->xdp = NULL;
}
}
mt_rte_free(rsq->rsq_queues);
rsq->rsq_queues = NULL;
Expand Down Expand Up @@ -173,10 +179,32 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port,
entry->parent = rsqm;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

if (rsqm->queue_mode == MT_SQ_MODE_XDP) {
rsq_lock(rsq_queue);
if (!rsq_queue->xdp) {
/* get a 1:1 mapped queue */
struct mt_rx_xdp_get_args args;
memset(&args, 0, sizeof(args));
args.queue_match = true;
args.queue_id = q;
args.skip_flow = true;
args.skip_udp_port_check = true;
rsq_queue->xdp = mt_rx_xdp_get(impl, port, flow, &args);
if (!rsq_queue->xdp) {
err("%s(%d:%u), xdp queue get fail\n", __func__, port, q);
rsq_unlock(rsq_queue);
mt_rte_free(entry);
return NULL;
}
}
rsq_unlock(rsq_queue);
}

if (!(flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE)) {
entry->flow_rsp = mt_rx_flow_create(impl, port, q, flow);
if (!entry->flow_rsp) {
err("%s(%u), create flow fail\n", __func__, q);
rsq_entry_free(entry);
return NULL;
}
}
Expand All @@ -188,8 +216,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port,
RING_F_SP_ENQ | RING_F_SC_DEQ);
if (!entry->ring) {
err("%s(%d,%d), ring %s create fail\n", __func__, port, idx, ring_name);
if (entry->flow_rsp) mt_rx_flow_free(impl, port, entry->flow_rsp);
mt_rte_free(entry);
rsq_entry_free(entry);
return NULL;
}

Expand Down Expand Up @@ -251,7 +278,10 @@ static int rsq_rx(struct mt_rsq_queue* rsq_queue) {
struct rte_ipv4_hdr* ipv4;
struct rte_udp_hdr* udp;

rx = rte_eth_rx_burst(rsq_queue->port_id, q, pkts, MT_SQ_BURST_SIZE);
if (rsq_queue->xdp)
rx = mt_rx_xdp_burst(rsq_queue->xdp, pkts, MT_SQ_BURST_SIZE);
else
rx = rte_eth_rx_burst(rsq_queue->port_id, q, pkts, MT_SQ_BURST_SIZE);
if (rx) dbg("%s(%u), rx pkts %u\n", __func__, q, rx);
rsq_queue->stat_pkts_recv += rx;

Expand Down Expand Up @@ -326,6 +356,8 @@ int mt_rsq_init(struct mtl_main_impl* impl) {
impl->rsq[i]->parent = impl;
impl->rsq[i]->port = i;
impl->rsq[i]->nb_rsq_queues = mt_if(impl, i)->nb_rx_q;
impl->rsq[i]->queue_mode =
mt_pmd_is_native_af_xdp(impl, i) ? MT_SQ_MODE_XDP : MT_SQ_MODE_DPDK;
ret = rsq_init(impl, impl->rsq[i]);
if (ret < 0) {
err("%s(%d), rsq init fail\n", __func__, i);
Expand Down Expand Up @@ -408,8 +440,11 @@ static int tsq_uinit(struct mt_tsq_impl* tsq) {
mt_mempool_free(tsq_queue->tx_pool);
tsq_queue->tx_pool = NULL;
}
if (tsq_queue->xdp) {
mt_tx_xdp_put(tsq_queue->xdp);
tsq_queue->xdp = NULL;
}
mt_pthread_mutex_destroy(&tsq_queue->mutex);
rte_spinlock_init(&tsq_queue->tx_mutex);
}
mt_rte_free(tsq->tsq_queues);
tsq->tsq_queues = NULL;
Expand Down Expand Up @@ -527,6 +562,23 @@ struct mt_tsq_entry* mt_tsq_get(struct mtl_main_impl* impl, enum mtl_port port,
}
tsq_queue->tx_pool = pool;
}
if (tsqm->queue_mode == MT_SQ_MODE_XDP) {
if (!tsq_queue->xdp) {
/* get a 1:1 mapped queue */
struct mt_tx_xdp_get_args args;
memset(&args, 0, sizeof(args));
args.queue_match = true;
args.queue_id = q;
tsq_queue->xdp = mt_tx_xdp_get(impl, port, flow, &args);
if (!tsq_queue->xdp) {
err("%s(%d:%u), xdp queue get fail\n", __func__, port, q);
tsq_unlock(tsq_queue);
mt_rte_free(entry);
return NULL;
}
}
}

MT_TAILQ_INSERT_HEAD(&tsq_queue->head, entry, next);
rte_atomic32_inc(&tsq_queue->entry_cnt);
tsq_unlock(tsq_queue);
Expand Down Expand Up @@ -582,7 +634,10 @@ uint16_t mt_tsq_burst(struct mt_tsq_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t tx;

rte_spinlock_lock(&tsq_queue->tx_mutex);
tx = rte_eth_tx_burst(tsq_queue->port_id, tsq_queue->queue_id, tx_pkts, nb_pkts);
if (tsq_queue->xdp)
tx = mt_tx_xdp_burst(tsq_queue->xdp, tx_pkts, nb_pkts);
else
tx = rte_eth_tx_burst(tsq_queue->port_id, tsq_queue->queue_id, tx_pkts, nb_pkts);
tsq_queue->stat_pkts_send += tx;
rte_spinlock_unlock(&tsq_queue->tx_mutex);

Expand Down Expand Up @@ -645,6 +700,8 @@ int mt_tsq_init(struct mtl_main_impl* impl) {
impl->tsq[i]->parent = impl;
impl->tsq[i]->port = i;
impl->tsq[i]->nb_tsq_queues = mt_if(impl, i)->nb_tx_q;
impl->tsq[i]->queue_mode =
mt_pmd_is_native_af_xdp(impl, i) ? MT_SQ_MODE_XDP : MT_SQ_MODE_DPDK;
ret = tsq_init(impl, impl->tsq[i]);
if (ret < 0) {
err("%s(%d), tsq init fail\n", __func__, i);
Expand Down
121 changes: 81 additions & 40 deletions lib/src/dev/mt_af_xdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,14 @@ static bool xdp_rx_check_pkt(struct mt_rx_xdp_entry* entry, struct rte_mbuf* pkt
return false;
}

uint16_t dst_port = ntohs(udp->dst_port);
if (dst_port != entry->flow.dst_port) {
xq->stat_rx_pkt_err_udp_port++;
dbg("%s(%d, %u), wrong dst_port %u expect %u\n", __func__, port, q, dst_port,
entry->flow.dst_port);
return false;
if (!entry->skip_udp_port_check) {
uint16_t dst_port = ntohs(udp->dst_port);
if (dst_port != entry->flow.dst_port) {
xq->stat_rx_pkt_err_udp_port++;
dbg("%s(%d, %u), wrong dst_port %u expect %u\n", __func__, port, q, dst_port,
entry->flow.dst_port);
return false;
}
}

return true;
Expand Down Expand Up @@ -671,6 +673,7 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
return ret;
}

inf->port_id = inf->port;
inf->xdp = xdp;
inf->feature |= MT_IF_FEATURE_TX_MULTI_SEGS;
info("%s(%d), start queue %u cnt %u\n", __func__, port, xdp->start_queue,
Expand All @@ -692,7 +695,8 @@ int mt_dev_xdp_uinit(struct mt_interface* inf) {
}

struct mt_tx_xdp_entry* mt_tx_xdp_get(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_txq_flow* flow) {
struct mt_txq_flow* flow,
struct mt_tx_xdp_get_args* args) {
if (!mt_pmd_is_native_af_xdp(impl, port)) {
err("%s(%d), this pmd is not native xdp\n", __func__, port);
return NULL;
Expand All @@ -710,21 +714,36 @@ struct mt_tx_xdp_entry* mt_tx_xdp_get(struct mtl_main_impl* impl, enum mtl_port

struct mt_xdp_priv* xdp = mt_if(impl, port)->xdp;
struct mt_xdp_queue* xq = NULL;
/* find a null slot */
mt_pthread_mutex_lock(&xdp->queues_lock);
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
if (!xdp->queues_info[i].tx_entry) {
xq = &xdp->queues_info[i];
xq->tx_entry = entry;
break;

if (args && args->queue_match) {
mt_pthread_mutex_lock(&xdp->queues_lock);
xq = &xdp->queues_info[args->queue_id];
if (xq->tx_entry) {
err("%s(%d), q %u is already used\n", __func__, port, args->queue_id);
mt_pthread_mutex_unlock(&xdp->queues_lock);
mt_tx_xdp_put(entry);
return NULL;
}
xq->tx_entry = entry;
mt_pthread_mutex_unlock(&xdp->queues_lock);
} else {
/* find a null slot */
mt_pthread_mutex_lock(&xdp->queues_lock);
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
if (!xdp->queues_info[i].tx_entry) {
xq = &xdp->queues_info[i];
xq->tx_entry = entry;
break;
}
}
mt_pthread_mutex_unlock(&xdp->queues_lock);
if (!xq) {
err("%s(%d), no free tx queue\n", __func__, port);
mt_tx_xdp_put(entry);
return NULL;
}
}
mt_pthread_mutex_unlock(&xdp->queues_lock);
if (!xq) {
err("%s(%d), no free tx queue\n", __func__, port);
mt_tx_xdp_put(entry);
return NULL;
}

entry->xq = xq;
entry->queue_id = xq->q;

Expand Down Expand Up @@ -760,12 +779,15 @@ uint16_t mt_tx_xdp_burst(struct mt_tx_xdp_entry* entry, struct rte_mbuf** tx_pkt
}

struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow) {
struct mt_rxq_flow* flow,
struct mt_rx_xdp_get_args* args) {
if (!mt_pmd_is_native_af_xdp(impl, port)) {
err("%s(%d), this pmd is not native xdp\n", __func__, port);
return NULL;
}

MTL_MAY_UNUSED(args);

struct mt_rx_xdp_entry* entry =
mt_rte_zmalloc_socket(sizeof(*entry), mt_socket_id(impl, port));
if (!entry) {
Expand All @@ -778,31 +800,50 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port

struct mt_xdp_priv* xdp = mt_if(impl, port)->xdp;
struct mt_xdp_queue* xq = NULL;
/* find a null slot */
mt_pthread_mutex_lock(&xdp->queues_lock);
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
if (!xdp->queues_info[i].rx_entry) {
xq = &xdp->queues_info[i];
xq->rx_entry = entry;
break;

if (args && args->queue_match) {
mt_pthread_mutex_lock(&xdp->queues_lock);
xq = &xdp->queues_info[args->queue_id];
if (xq->rx_entry) {
err("%s(%d), q %u is already used\n", __func__, port, args->queue_id);
mt_pthread_mutex_unlock(&xdp->queues_lock);
mt_rx_xdp_put(entry);
return NULL;
}
xq->rx_entry = entry;
mt_pthread_mutex_unlock(&xdp->queues_lock);
} else {
/* find a null slot */
mt_pthread_mutex_lock(&xdp->queues_lock);
for (uint16_t i = 0; i < xdp->queues_cnt; i++) {
if (!xdp->queues_info[i].rx_entry) {
xq = &xdp->queues_info[i];
xq->rx_entry = entry;
break;
}
}
mt_pthread_mutex_unlock(&xdp->queues_lock);
if (!xq) {
err("%s(%d), no free rx queue\n", __func__, port);
mt_rx_xdp_put(entry);
return NULL;
}
}
mt_pthread_mutex_unlock(&xdp->queues_lock);
if (!xq) {
err("%s(%d), no free tx queue\n", __func__, port);
mt_rx_xdp_put(entry);
return NULL;
}

entry->xq = xq;
entry->queue_id = xq->q;
entry->skip_udp_port_check = args ? args->skip_udp_port_check : false;

uint16_t q = entry->queue_id;
/* create flow */
entry->flow_rsp = mt_rx_flow_create(impl, port, q - xdp->start_queue, flow);
if (!entry->flow_rsp) {
err("%s(%d,%u), create flow fail\n", __func__, port, q);
mt_rx_xdp_put(entry);
return NULL;

if (!args || !args->skip_flow) {
/* create flow */
entry->flow_rsp = mt_rx_flow_create(impl, port, q - xdp->start_queue, flow);
if (!entry->flow_rsp) {
err("%s(%d,%u), create flow fail\n", __func__, port, q);
mt_rx_xdp_put(entry);
return NULL;
}
}

uint8_t* ip = flow->dip_addr;
Expand Down
Loading

0 comments on commit 2687c05

Please sign in to comment.