Skip to content

Commit

Permalink
datapath/queue: add force socket option support (#529)
Browse files Browse the repository at this point in the history
Use MT_TXQ_FLOW_F_FORCE_SOCKET/MT_RXQ_FLOW_F_FORCE_SOCKET to select
socket path for the queue.

test with:
sudo ./build/app/RxTxApp --config_file
tests/script/dpdk_af_xdp_json/rx.json --ptp

Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx authored Oct 10, 2023
1 parent facbe7d commit 4f01897
Show file tree
Hide file tree
Showing 15 changed files with 113 additions and 81 deletions.
24 changes: 14 additions & 10 deletions lib/src/datapath/mt_dp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
struct mt_txq_flow* flow) {
int ret;

if (!mt_pmd_is_kernel_socket(impl, port)) {
err("%s(%d), this pmd is not kernel socket\n", __func__, port);
if (!mt_drv_kernel_based(impl, port)) {
err("%s(%d), this pmd is not kernel based\n", __func__, port);
return NULL;
}

Expand Down Expand Up @@ -119,8 +119,10 @@ uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf**
fd, payload_len, send);
goto done;
}
stats->tx_packets++;
stats->tx_bytes += m->data_len;
if (stats) {
stats->tx_packets++;
stats->tx_bytes += m->data_len;
}
}

done:
Expand All @@ -133,16 +135,16 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,
struct mt_rxq_flow* flow) {
int ret;

if (!mt_pmd_is_kernel_socket(impl, port)) {
err("%s(%d), this pmd is not kernel socket\n", __func__, port);
if (!mt_drv_kernel_based(impl, port)) {
err("%s(%d), this pmd is not kernel based\n", __func__, port);
return NULL;
}

if (flow->sys_queue) {
if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) {
err("%s(%d), sys_queue not supported\n", __func__, port);
return NULL;
}
if (flow->no_port_flow) {
if (flow->flags & MT_RXQ_FLOW_F_NO_PORT) {
err("%s(%d), no_port_flow not supported\n", __func__, port);
return NULL;
}
Expand Down Expand Up @@ -298,8 +300,10 @@ uint16_t mt_rx_socket_burst(struct mt_rx_socket_entry* entry, struct rte_mbuf**
ipv4->next_proto_id = IPPROTO_UDP;

rx_pkts[rx] = pkt;
stats->rx_packets++;
stats->rx_bytes += pkt->data_len;
if (stats) {
stats->rx_packets++;
stats->rx_bytes += pkt->data_len;
}

/* allocate a new pkt for next iteration */
entry->pkt = rte_pktmbuf_alloc(entry->pool);
Expand Down
11 changes: 7 additions & 4 deletions lib/src/datapath/mt_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port,
}
entry->parent = impl;

if (mt_pmd_is_kernel_socket(impl, port)) {
dbg("%s(%d), flags 0x%x\n", __func__, port, flow->flags);
if (mt_pmd_is_kernel_socket(impl, port) || (flow->flags & MT_RXQ_FLOW_F_FORCE_SOCKET)) {
entry->rx_socket_q = mt_rx_socket_get(impl, port, flow);
if (!entry->rx_socket_q) goto fail;
entry->queue_id = mt_rx_socket_queue_id(entry->rx_socket_q);
Expand All @@ -61,7 +62,7 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port,
if (!entry->rsq) goto fail;
entry->queue_id = mt_rsq_queue_id(entry->rsq);
entry->burst = rx_rsq_burst;
} else if (flow->use_cni_queue) {
} else if (flow->flags & MT_RXQ_FLOW_F_FORCE_CNI) {
entry->csq = mt_csq_get(impl, port, flow);
if (!entry->csq) goto fail;
entry->queue_id = mt_csq_queue_id(entry->csq);
Expand Down Expand Up @@ -135,7 +136,8 @@ struct mt_txq_entry* mt_txq_get(struct mtl_main_impl* impl, enum mtl_port port,
}
entry->parent = impl;

if (mt_pmd_is_kernel_socket(impl, port)) {
dbg("%s(%d), flags 0x%x\n", __func__, port, flow->flags);
if (mt_pmd_is_kernel_socket(impl, port) || (flow->flags & MT_TXQ_FLOW_F_FORCE_SOCKET)) {
entry->tx_socket_q = mt_tx_socket_get(impl, port, flow);
if (!entry->tx_socket_q) goto fail;
entry->queue_id = mt_tx_socket_queue_id(entry->tx_socket_q);
Expand Down Expand Up @@ -265,7 +267,8 @@ int mt_dp_queue_init(struct mtl_main_impl* impl) {

struct mt_txq_flow flow;
memset(&flow, 0, sizeof(flow));
flow.sys_queue = true;
flow.flags = MT_TXQ_FLOW_F_SYS_QUEUE;
if (mt_drv_kernel_based(impl, i)) flow.flags = MT_TXQ_FLOW_F_FORCE_SOCKET;
dp->txq_sys_entry = mt_txq_get(impl, i, &flow);
if (!dp->txq_sys_entry) {
err("%s(%d), txq sys entry get fail\n", __func__, i);
Expand Down
15 changes: 8 additions & 7 deletions lib/src/datapath/mt_shared_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ static uint32_t rsq_flow_hash(struct mt_rxq_flow* flow) {
struct rte_ipv4_tuple tuple;
uint32_t len;

if (flow->sys_queue) return 0;
if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) return 0;

len = RTE_THASH_V4_L4_LEN;
tuple.src_addr = RTE_IPV4(flow->dip_addr[0], flow->dip_addr[1], flow->dip_addr[2],
Expand Down Expand Up @@ -173,7 +173,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port,
entry->parent = rsqm;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

if (!flow->sys_queue) {
if (!(flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE)) {
entry->flow_rsp = mt_rx_flow_create(impl, port, q, flow);
if (!entry->flow_rsp) {
err("%s(%u), create flow fail\n", __func__, q);
Expand All @@ -197,7 +197,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port,
MT_TAILQ_INSERT_HEAD(&rsq_queue->head, entry, next);
rte_atomic32_inc(&rsq_queue->entry_cnt);
rsq_queue->entry_idx++;
if (flow->sys_queue) rsq_queue->cni_entry = entry;
if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) rsq_queue->cni_entry = entry;
rsq_unlock(rsq_queue);

uint8_t* ip = flow->dip_addr;
Expand Down Expand Up @@ -266,15 +266,15 @@ static int rsq_rx(struct mt_rsq_queue* rsq_queue) {

MT_TAILQ_FOREACH(rsq_entry, &rsq_queue->head, next) {
bool ip_matched;
if (rsq_entry->flow.no_ip_flow) {
if (rsq_entry->flow.flags & MT_RXQ_FLOW_F_NO_IP) {
ip_matched = true;
} else {
ip_matched = mt_is_multicast_ip(rsq_entry->flow.dip_addr)
? (ipv4->dst_addr == *(uint32_t*)rsq_entry->flow.dip_addr)
: (ipv4->src_addr == *(uint32_t*)rsq_entry->flow.dip_addr);
}
bool port_matched;
if (rsq_entry->flow.no_port_flow) {
if (rsq_entry->flow.flags & MT_RXQ_FLOW_F_NO_PORT) {
port_matched = true;
} else {
port_matched = ntohs(udp->dst_port) == rsq_entry->flow.dst_port;
Expand Down Expand Up @@ -454,7 +454,7 @@ static uint32_t tsq_flow_hash(struct mt_txq_flow* flow) {
struct rte_ipv4_tuple tuple;
uint32_t len;

if (flow->sys_queue) return 0;
if (flow->flags & MT_TXQ_FLOW_F_SYS_QUEUE) return 0;

len = RTE_THASH_V4_L4_LEN;
tuple.src_addr = RTE_IPV4(flow->dip_addr[0], flow->dip_addr[1], flow->dip_addr[2],
Expand All @@ -475,7 +475,8 @@ struct mt_tsq_entry* mt_tsq_get(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_tsq_impl* tsqm = tsq_ctx_get(impl, port);
uint32_t hash = tsq_flow_hash(flow);
uint16_t q = 0;
if (!flow->sys_queue) { /* queue zero is reserved for system queue */
/* queue zero is reserved for system queue */
if (!(flow->flags & MT_TXQ_FLOW_F_SYS_QUEUE)) {
q = (hash % RTE_ETH_RETA_GROUP_SIZE) % (tsqm->max_tsq_queues - 1) + 1;
}
struct mt_tsq_queue* tsq_queue = &tsqm->tsq_queues[q];
Expand Down
6 changes: 3 additions & 3 deletions lib/src/datapath/mt_shared_rss.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ static int srss_tasklet_handler(void* priv) {
udp = &hdr->udp;
MT_TAILQ_FOREACH(srss_entry, &srss->head, next) {
bool ip_matched;
if (srss_entry->flow.no_ip_flow) {
if (srss_entry->flow.flags & MT_RXQ_FLOW_F_NO_IP) {
ip_matched = true;
} else {
ip_matched = mt_is_multicast_ip(srss_entry->flow.dip_addr)
? (ipv4->dst_addr == *(uint32_t*)srss_entry->flow.dip_addr)
: (ipv4->src_addr == *(uint32_t*)srss_entry->flow.dip_addr);
}
bool port_matched;
if (srss_entry->flow.no_port_flow) {
if (srss_entry->flow.flags & MT_RXQ_FLOW_F_NO_PORT) {
port_matched = true;
} else {
port_matched = ntohs(udp->dst_port) == srss_entry->flow.dst_port;
Expand Down Expand Up @@ -252,7 +252,7 @@ struct mt_srss_entry* mt_srss_get(struct mtl_main_impl* impl, enum mtl_port port

srss_lock(srss);
MT_TAILQ_INSERT_TAIL(&srss->head, entry, next);
if (flow->sys_queue) srss->cni_entry = entry;
if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) srss->cni_entry = entry;
srss->entry_idx++;
srss_unlock(srss);

Expand Down
20 changes: 10 additions & 10 deletions lib/src/dev/mt_dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,31 @@ static const struct mt_dev_driver_info dev_drvs[] = {
.drv_type = MT_DRV_DPDK_AF_XDP,
.flow_type = MT_FLOW_ALL,
.flags = MT_DRV_F_NO_CNI | MT_DRV_F_USE_KERNEL_CTL | MT_DRV_F_RX_POOL_COMMON |
MT_DRV_F_NO_SYS_TX_QUEUE,
MT_DRV_F_KERNEL_BASED,
},
{
.name = "net_af_packet",
.port_type = MT_PORT_DPDK_AF_PKT,
.drv_type = MT_DRV_DPDK_AF_PKT,
.flow_type = MT_FLOW_ALL,
.flags = MT_DRV_F_USE_KERNEL_CTL | MT_DRV_F_RX_POOL_COMMON | MT_DRV_F_RX_NO_FLOW |
MT_DRV_F_NO_SYS_TX_QUEUE,
MT_DRV_F_KERNEL_BASED,
},
{
.name = "kernel_socket",
.port_type = MT_PORT_KERNEL_SOCKET,
.drv_type = MT_DRV_KERNEL_SOCKET,
.flow_type = MT_FLOW_ALL,
.flags = MT_DRV_F_NOT_DPDK_PMD | MT_DRV_F_NO_CNI | MT_DRV_F_USE_KERNEL_CTL |
MT_DRV_F_RX_NO_FLOW | MT_DRV_F_MCAST_IN_DP,
MT_DRV_F_RX_NO_FLOW | MT_DRV_F_MCAST_IN_DP | MT_DRV_F_KERNEL_BASED,
},
{
.name = "native_af_xdp",
.port_type = MT_PORT_NATIVE_AF_XDP,
.drv_type = MT_DRV_NATIVE_AF_XDP,
.flow_type = MT_FLOW_ALL,
.flags = MT_DRV_F_NOT_DPDK_PMD | MT_DRV_F_NO_CNI | MT_DRV_F_USE_KERNEL_CTL |
MT_DRV_F_NO_SYS_TX_QUEUE | MT_DRV_F_NO_SYS_TX_QUEUE,
MT_DRV_F_KERNEL_BASED,
},
};

Expand Down Expand Up @@ -1495,7 +1495,7 @@ struct mt_tx_queue* mt_dev_get_tx_queue(struct mtl_main_impl* impl, enum mtl_por
* of queue 0 by hard coding static configuration. So, traffic requires
* LaunchTime based pacing must be transmitted over queue 0.
*/
if (flow->launch_time_enabled) {
if (flow->flags & MT_TXQ_FLOW_F_LAUNCH_TIME) {
/* If require LaunchTime based pacing, queue 0 is the only choice. */
if (q != 0) break;
} else {
Expand Down Expand Up @@ -1550,7 +1550,8 @@ struct mt_rx_queue* mt_dev_get_rx_queue(struct mtl_main_impl* impl, enum mtl_por
for (uint16_t q = 0; q < inf->max_rx_queues; q++) {
rx_queue = &inf->rx_queues[q];
if (rx_queue->active) continue;
if (flow && flow->hdr_split) { /* continue if not hdr split queue */
if (flow && (flow->flags & MT_RXQ_FLOW_F_HDR_SPLIT)) {
/* continue if not hdr split queue */
if (!mt_if_hdr_split_pool(inf, q)) continue;
#ifdef ST_HAS_DPDK_HDR_SPLIT
if (flow->hdr_split_mbuf_cb) {
Expand All @@ -1574,7 +1575,7 @@ struct mt_rx_queue* mt_dev_get_rx_queue(struct mtl_main_impl* impl, enum mtl_por
}

memset(&rx_queue->flow, 0, sizeof(rx_queue->flow));
if (flow && !flow->sys_queue) {
if (flow && !(flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE)) {
rx_queue->flow_rsp = mt_rx_flow_create(impl, port, q, flow);
if (!rx_queue->flow_rsp) {
err("%s(%d), create flow fail for queue %d\n", __func__, port, q);
Expand Down Expand Up @@ -1611,8 +1612,7 @@ struct mt_rx_queue* mt_dev_get_rx_queue(struct mtl_main_impl* impl, enum mtl_por
}
mt_pthread_mutex_unlock(&inf->rx_queues_mutex);

err("%s(%d), fail to find free rx queue for %s\n", __func__, port,
flow && flow->hdr_split ? "hdr_split" : "normal");
err("%s(%d), fail to find free rx queue\n", __func__, port);
return NULL;
}

Expand Down Expand Up @@ -1740,7 +1740,7 @@ int mt_dev_put_rx_queue(struct mtl_main_impl* impl, struct mt_rx_queue* queue) {
rx_queue->flow_rsp = NULL;
}

if (rx_queue->flow.hdr_split) {
if (rx_queue->flow.flags & MT_RXQ_FLOW_F_HDR_SPLIT) {
#ifdef ST_HAS_DPDK_HDR_SPLIT
/* clear hdrs mbuf callback */
rte_eth_hdrs_set_mbuf_callback(inf->port_id, queue_id, NULL, NULL);
Expand Down
9 changes: 5 additions & 4 deletions lib/src/mt_cni.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ static int cni_udp_handle(struct mt_cni_entry* cni, struct rte_mbuf* m) {
csq_lock(cni);
MT_TAILQ_FOREACH(csq, &cni->csq_queues, next) {
bool ip_matched;
if (csq->flow.no_ip_flow) {
if (csq->flow.flags & MT_RXQ_FLOW_F_NO_IP) {
ip_matched = true;
} else {
ip_matched = mt_is_multicast_ip(csq->flow.dip_addr)
? (ipv4->dst_addr == *(uint32_t*)csq->flow.dip_addr)
: (ipv4->src_addr == *(uint32_t*)csq->flow.dip_addr);
}
bool port_matched;
if (csq->flow.no_port_flow) {
if (csq->flow.flags & MT_RXQ_FLOW_F_NO_PORT) {
port_matched = true;
} else {
port_matched = ntohs(udp->dst_port) == csq->flow.dst_port;
Expand Down Expand Up @@ -415,7 +415,7 @@ static int cni_queues_init(struct mtl_main_impl* impl) {

struct mt_rxq_flow flow;
memset(&flow, 0, sizeof(flow));
flow.sys_queue = true;
flow.flags = MT_RXQ_FLOW_F_SYS_QUEUE;
cni->rxq = mt_rxq_get(impl, i, &flow);
if (!cni->rxq) {
err("%s(%d), rx queue get fail\n", __func__, i);
Expand All @@ -430,6 +430,7 @@ static int cni_queues_init(struct mtl_main_impl* impl) {

static bool cni_need_tasklet(struct mt_cni_impl* cni_impl) {
struct mtl_main_impl* impl = cni_impl->parent;
if (!impl) return false;
struct mt_cni_entry* cni;
int num_ports = mt_num_ports(impl);

Expand Down Expand Up @@ -607,7 +608,7 @@ struct mt_csq_entry* mt_csq_get(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_cni_entry* cni = cni_get_entry(impl, port);
int idx = cni->csq_idx;

if (flow->sys_queue) {
if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) {
err("%s(%d,%d), not support sys queue\n", __func__, port, idx);
return NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/mt_flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ static struct rte_flow* rte_rx_flow_create(struct mt_interface* inf, uint16_t q,
/* drv not support ip flow */
if (inf->drv_info.flow_type == MT_FLOW_NO_IP) has_ip_flow = false;
/* no ip flow requested */
if (flow->no_ip_flow) has_ip_flow = false;
if (flow->flags & MT_RXQ_FLOW_F_NO_IP) has_ip_flow = false;
/* no port flow requested */
if (flow->no_port_flow) has_port_flow = false;
if (flow->flags & MT_RXQ_FLOW_F_NO_PORT) has_port_flow = false;

/* only raw flow can be applied on the hdr split queue */
if (mt_if_hdr_split_pool(inf, q)) {
Expand Down
Loading

0 comments on commit 4f01897

Please sign in to comment.