Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dp/xdp: add palceholder for xdp queue #535

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions lib/src/datapath/mt_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "mt_queue.h"

#include "../dev/mt_af_xdp.h"
#include "../dev/mt_dev.h"
#include "../mt_cni.h"
#include "../mt_log.h"
Expand All @@ -16,6 +17,11 @@ static uint16_t rx_socket_burst(struct mt_rxq_entry* entry, struct rte_mbuf** rx
return mt_rx_socket_burst(entry->rx_socket_q, rx_pkts, nb_pkts);
}

static uint16_t rx_xdp_burst(struct mt_rxq_entry* entry, struct rte_mbuf** rx_pkts,
const uint16_t nb_pkts) {
return mt_rx_xdp_burst(entry->rx_xdp_q, rx_pkts, nb_pkts);
}

static uint16_t rx_srss_burst(struct mt_rxq_entry* entry, struct rte_mbuf** rx_pkts,
const uint16_t nb_pkts) {
return mt_srss_burst(entry->srss, rx_pkts, nb_pkts);
Expand Down Expand Up @@ -52,6 +58,11 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port,
if (!entry->rx_socket_q) goto fail;
entry->queue_id = mt_rx_socket_queue_id(entry->rx_socket_q);
entry->burst = rx_socket_burst;
} else if (mt_pmd_is_native_af_xdp(impl, port)) {
entry->rx_xdp_q = mt_rx_xdp_get(impl, port, flow);
if (!entry->rx_xdp_q) goto fail;
entry->queue_id = mt_rx_xdp_queue_id(entry->rx_xdp_q);
entry->burst = rx_xdp_burst;
} else if (mt_has_srss(impl, port)) {
entry->srss = mt_srss_get(impl, port, flow);
if (!entry->srss) goto fail;
Expand Down Expand Up @@ -102,6 +113,10 @@ int mt_rxq_put(struct mt_rxq_entry* entry) {
mt_rx_socket_put(entry->rx_socket_q);
entry->rx_socket_q = NULL;
}
if (entry->rx_xdp_q) {
mt_rx_xdp_put(entry->rx_xdp_q);
entry->rx_xdp_q = NULL;
}
mt_rte_free(entry);
return 0;
}
Expand All @@ -116,6 +131,11 @@ static uint16_t tx_socket_burst(struct mt_txq_entry* entry, struct rte_mbuf** tx
return mt_tx_socket_burst(entry->tx_socket_q, tx_pkts, nb_pkts);
}

static uint16_t tx_xdp_burst(struct mt_txq_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts) {
return mt_tx_xdp_burst(entry->tx_xdp_q, tx_pkts, nb_pkts);
}

static uint16_t tx_tsq_burst(struct mt_txq_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts) {
return mt_tsq_burst(entry->tsq, tx_pkts, nb_pkts);
Expand All @@ -142,6 +162,11 @@ struct mt_txq_entry* mt_txq_get(struct mtl_main_impl* impl, enum mtl_port port,
if (!entry->tx_socket_q) goto fail;
entry->queue_id = mt_tx_socket_queue_id(entry->tx_socket_q);
entry->burst = tx_socket_burst;
} else if (mt_pmd_is_native_af_xdp(impl, port)) {
entry->tx_xdp_q = mt_tx_xdp_get(impl, port, flow);
if (!entry->tx_xdp_q) goto fail;
entry->queue_id = mt_tx_xdp_queue_id(entry->tx_xdp_q);
entry->burst = tx_xdp_burst;
} else if (mt_shared_tx_queue(impl, port)) {
entry->tsq = mt_tsq_get(impl, port, flow);
if (!entry->tsq) goto fail;
Expand Down Expand Up @@ -174,6 +199,10 @@ int mt_txq_put(struct mt_txq_entry* entry) {
mt_tx_socket_put(entry->tx_socket_q);
entry->tx_socket_q = NULL;
}
if (entry->tx_xdp_q) {
mt_tx_xdp_put(entry->tx_xdp_q);
entry->tx_xdp_q = NULL;
}
mt_rte_free(entry);
return 0;
}
Expand Down
2 changes: 2 additions & 0 deletions lib/src/datapath/mt_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct mt_rxq_entry {
struct mt_srss_entry* srss;
struct mt_csq_entry* csq;
struct mt_rx_socket_entry* rx_socket_q;
struct mt_rx_xdp_entry* rx_xdp_q;

uint16_t (*burst)(struct mt_rxq_entry* entry, struct rte_mbuf** rx_pkts,
const uint16_t nb_pkts);
Expand All @@ -38,6 +39,7 @@ struct mt_txq_entry {
struct mt_tx_queue* txq;
struct mt_tsq_entry* tsq;
struct mt_tx_socket_entry* tx_socket_q;
struct mt_tx_xdp_entry* tx_xdp_q;

uint16_t (*burst)(struct mt_txq_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts);
Expand Down
239 changes: 233 additions & 6 deletions lib/src/dev/mt_af_xdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,133 @@

#include "mt_af_xdp.h"

#include <linux/ethtool.h>
#include <linux/if_xdp.h>
#include <linux/sockios.h>
#include <xdp/xsk.h>

#include "../mt_log.h"

#ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
#error "Please use XDP lib version with XDP_UMEM_UNALIGNED_CHUNK_FLAG"
#endif

struct mt_xdp_queue {
struct rte_mempool* mbuf_pool;
struct xsk_umem* umem;
struct xsk_ring_prod pq;
struct xsk_ring_cons cq;
void* umem_buffer;

struct xsk_socket* xsk;
};

struct mt_xdp_priv {
struct mtl_main_impl* parent;
enum mtl_port port;
uint8_t start_queue;
uint16_t queues_cnt;
uint32_t max_combined;
uint32_t combined_count;

struct mt_xdp_queue* queues_info;
};

static int xdp_free(struct mt_xdp_priv* xdp) {
if (xdp->queues_info) {
for (uint16_t q = 0; q < xdp->queues_cnt; q++) {
struct mt_xdp_queue* xq = &xdp->queues_info[q];

if (xq->umem) {
xsk_umem__delete(xq->umem);
xq->umem = NULL;
}
}
mt_rte_free(xdp->queues_info);
xdp->queues_info = NULL;
}

mt_rte_free(xdp);
return 0;
}

static int xdp_parse_combined_info(struct mt_xdp_priv* xdp) {
struct mtl_main_impl* impl = xdp->parent;
enum mtl_port port = xdp->port;
const char* if_name = mt_kernel_if_name(impl, port);
struct ethtool_channels channels;
struct ifreq ifr;
int fd, ret;

fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) return -1;

channels.cmd = ETHTOOL_GCHANNELS;
ifr.ifr_data = (void*)&channels;
strlcpy(ifr.ifr_name, if_name, IFNAMSIZ);
ret = ioctl(fd, SIOCETHTOOL, &ifr);
if (ret < 0) {
warn("%s(%d), SIOCETHTOOL fail %d\n", __func__, port, ret);
return ret;
}

xdp->max_combined = channels.max_combined;
xdp->combined_count = channels.combined_count;
info("%s(%d), combined max %u cnt %u\n", __func__, port, xdp->max_combined,
xdp->combined_count);
return 0;
}

static inline uintptr_t xdp_mp_base_addr(struct rte_mempool* mp, uint64_t* align) {
struct rte_mempool_memhdr* hdr;
uintptr_t hdr_addr, aligned_addr;

hdr = STAILQ_FIRST(&mp->mem_list);
hdr_addr = (uintptr_t)hdr->addr;
aligned_addr = hdr_addr & ~(getpagesize() - 1);
*align = hdr_addr - aligned_addr;

return aligned_addr;
}

static int xdp_umem_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
enum mtl_port port = xdp->port;
int ret;
struct xsk_umem_config cfg;
void* base_addr = NULL;
struct rte_mempool* pool = xq->mbuf_pool;
uint64_t umem_size, align = 0;

memset(&cfg, 0, sizeof(cfg));
cfg.fill_size = XSK_RING_CONS__DEFAULT_NUM_DESCS * 2;
cfg.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
cfg.flags = XDP_UMEM_UNALIGNED_CHUNK_FLAG;

cfg.frame_size = rte_mempool_calc_obj_size(pool->elt_size, pool->flags, NULL);
cfg.frame_headroom = pool->header_size + sizeof(struct rte_mbuf) +
rte_pktmbuf_priv_size(pool) + RTE_PKTMBUF_HEADROOM;

base_addr = (void*)xdp_mp_base_addr(pool, &align);
umem_size = (uint64_t)pool->populated_size * (uint64_t)cfg.frame_size + align;
dbg("%s(%d), base_addr %p umem_size %" PRIu64 "\n", __func__, port, base_addr,
umem_size);
ret = xsk_umem__create(&xq->umem, base_addr, umem_size, &xq->pq, &xq->cq, &cfg);
if (ret < 0) {
err("%s(%d), umem create fail %d %s\n", __func__, port, ret, strerror(errno));
return ret;
}
xq->umem_buffer = base_addr;

info("%s(%d), umem %p buffer %p size %" PRIu64 "\n", __func__, port, xq->umem,
xq->umem_buffer, umem_size);
return 0;
}

int mt_dev_xdp_init(struct mt_interface* inf) {
struct mtl_main_impl* impl = inf->parent;
enum mtl_port port = inf->port;
struct mtl_init_params* p = mt_get_user_params(impl);
int ret;

if (!mt_pmd_is_native_af_xdp(impl, port)) {
err("%s(%d), not native af_xdp\n", __func__, port);
Expand All @@ -25,20 +143,129 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
err("%s(%d), xdp malloc fail\n", __func__, port);
return -ENOMEM;
}
xdp->parent = impl;
xdp->port = port;
xdp->max_combined = 1;
xdp->combined_count = 1;
xdp->start_queue = p->xdp_info[port].start_queue;
xdp->queues_cnt = RTE_MAX(inf->max_tx_queues, inf->max_rx_queues);

xdp_parse_combined_info(xdp);
if ((xdp->start_queue + xdp->queues_cnt) > xdp->combined_count) {
err("%s(%d), too many queues requested, start_queue %u queues_cnt %u combined_count "
"%u\n",
__func__, port, xdp->start_queue, xdp->queues_cnt, xdp->combined_count);
xdp_free(xdp);
return -ENOTSUP;
}

xdp->queues_info = mt_rte_zmalloc_socket(sizeof(*xdp->queues_info) * xdp->queues_cnt,
mt_socket_id(impl, port));
if (!xdp->queues_info) {
err("%s(%d), xdp queues_info malloc fail\n", __func__, port);
xdp_free(xdp);
return -ENOMEM;
}
for (uint16_t q = 0; q < xdp->queues_cnt; q++) {
struct mt_xdp_queue* xq = &xdp->queues_info[q];

xq->mbuf_pool = inf->rx_queues[q].mbuf_pool;
if (!xq->mbuf_pool) {
err("%s(%d), no mbuf_pool for q %u\n", __func__, port, q);
xdp_free(xdp);
return -EIO;
}
ret = xdp_umem_init(xdp, xq);
if (ret < 0) {
err("%s(%d), umem init fail for q %u\n", __func__, port, q);
xdp_free(xdp);
return -EIO;
}
}

inf->xdp = xdp;
info("%s(%d), succ\n", __func__, port);
info("%s(%d), start queue %u cnt %u\n", __func__, port, xdp->start_queue,
xdp->queues_cnt);
return 0;
}

int mt_dev_xdp_uinit(struct mt_interface* inf) {
enum mtl_port port = inf->port;
struct mt_xdp_priv* xdp = inf->xdp;
if (!xdp) return 0;

if (inf->xdp) {
mt_rte_free(inf->xdp);
inf->xdp = NULL;
xdp_free(xdp);
inf->xdp = NULL;
dbg("%s(%d), succ\n", __func__, inf->port);
return 0;
}

struct mt_tx_xdp_entry* mt_tx_xdp_get(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_txq_flow* flow) {
if (!mt_pmd_is_native_af_xdp(impl, port)) {
err("%s(%d), this pmd is not native xdp\n", __func__, port);
return NULL;
}

info("%s(%d), succ\n", __func__, port);
struct mt_tx_xdp_entry* entry =
mt_rte_zmalloc_socket(sizeof(*entry), mt_socket_id(impl, port));
if (!entry) {
err("%s(%d), entry malloc fail\n", __func__, port);
return NULL;
}
entry->parent = impl;
entry->port = port;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

uint8_t* ip = flow->dip_addr;
info("%s(%d), ip %u.%u.%u.%u, port %u, queue %u\n", __func__, port, ip[0], ip[1], ip[2],
ip[3], flow->dst_port, entry->queue_id);
return entry;
}

int mt_tx_xdp_put(struct mt_tx_xdp_entry* entry) {
mt_rte_free(entry);
return 0;
}

uint16_t mt_tx_xdp_burst(struct mt_tx_xdp_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts) {
MTL_MAY_UNUSED(entry);
rte_pktmbuf_free_bulk(tx_pkts, nb_pkts);
return nb_pkts;
}

struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow) {
if (!mt_pmd_is_native_af_xdp(impl, port)) {
err("%s(%d), this pmd is not native xdp\n", __func__, port);
return NULL;
}

struct mt_rx_xdp_entry* entry =
mt_rte_zmalloc_socket(sizeof(*entry), mt_socket_id(impl, port));
if (!entry) {
err("%s(%d), entry malloc fail\n", __func__, port);
return NULL;
}
entry->parent = impl;
entry->port = port;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

uint8_t* ip = flow->dip_addr;
info("%s(%d), ip %u.%u.%u.%u port %u queue %d\n", __func__, port, ip[0], ip[1], ip[2],
ip[3], flow->dst_port, entry->queue_id);
return entry;
}

int mt_rx_xdp_put(struct mt_rx_xdp_entry* entry) {
mt_rte_free(entry);
return 0;
}

uint16_t mt_rx_xdp_burst(struct mt_rx_xdp_entry* entry, struct rte_mbuf** rx_pkts,
const uint16_t nb_pkts) {
MTL_MAY_UNUSED(entry);
MTL_MAY_UNUSED(rx_pkts);
MTL_MAY_UNUSED(nb_pkts);
return 0;
}
Loading