diff --git a/lib/src/datapath/mt_dp_socket.c b/lib/src/datapath/mt_dp_socket.c index 2691361c8..2730fa1aa 100644 --- a/lib/src/datapath/mt_dp_socket.c +++ b/lib/src/datapath/mt_dp_socket.c @@ -20,8 +20,8 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, struct mt_txq_flow* flow) { int ret; - if (!mt_pmd_is_kernel_socket(impl, port)) { - err("%s(%d), this pmd is not kernel socket\n", __func__, port); + if (!mt_drv_kernel_based(impl, port)) { + err("%s(%d), this pmd is not kernel based\n", __func__, port); return NULL; } @@ -119,8 +119,10 @@ uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf** fd, payload_len, send); goto done; } - stats->tx_packets++; - stats->tx_bytes += m->data_len; + if (stats) { + stats->tx_packets++; + stats->tx_bytes += m->data_len; + } } done: @@ -133,16 +135,16 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, struct mt_rxq_flow* flow) { int ret; - if (!mt_pmd_is_kernel_socket(impl, port)) { - err("%s(%d), this pmd is not kernel socket\n", __func__, port); + if (!mt_drv_kernel_based(impl, port)) { + err("%s(%d), this pmd is not kernel based\n", __func__, port); return NULL; } - if (flow->sys_queue) { + if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) { err("%s(%d), sys_queue not supported\n", __func__, port); return NULL; } - if (flow->no_port_flow) { + if (flow->flags & MT_RXQ_FLOW_F_NO_PORT) { err("%s(%d), no_port_flow not supported\n", __func__, port); return NULL; } @@ -298,8 +300,10 @@ uint16_t mt_rx_socket_burst(struct mt_rx_socket_entry* entry, struct rte_mbuf** ipv4->next_proto_id = IPPROTO_UDP; rx_pkts[rx] = pkt; - stats->rx_packets++; - stats->rx_bytes += pkt->data_len; + if (stats) { + stats->rx_packets++; + stats->rx_bytes += pkt->data_len; + } /* allocate a new pkt for next iteration */ entry->pkt = rte_pktmbuf_alloc(entry->pool); diff --git a/lib/src/datapath/mt_queue.c b/lib/src/datapath/mt_queue.c index 49c41c7d9..e0e0a3e95 100644 --- a/lib/src/datapath/mt_queue.c +++ b/lib/src/datapath/mt_queue.c @@ -46,7 +46,8 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port, } entry->parent = impl; - if (mt_pmd_is_kernel_socket(impl, port)) { + dbg("%s(%d), flags 0x%x\n", __func__, port, flow->flags); + if (mt_pmd_is_kernel_socket(impl, port) || (flow->flags & MT_RXQ_FLOW_F_FORCE_SOCKET)) { entry->rx_socket_q = mt_rx_socket_get(impl, port, flow); if (!entry->rx_socket_q) goto fail; entry->queue_id = mt_rx_socket_queue_id(entry->rx_socket_q); @@ -61,7 +62,7 @@ struct mt_rxq_entry* mt_rxq_get(struct mtl_main_impl* impl, enum mtl_port port, if (!entry->rsq) goto fail; entry->queue_id = mt_rsq_queue_id(entry->rsq); entry->burst = rx_rsq_burst; - } else if (flow->use_cni_queue) { + } else if (flow->flags & MT_RXQ_FLOW_F_FORCE_CNI) { entry->csq = mt_csq_get(impl, port, flow); if (!entry->csq) goto fail; entry->queue_id = mt_csq_queue_id(entry->csq); @@ -135,7 +136,8 @@ struct mt_txq_entry* mt_txq_get(struct mtl_main_impl* impl, enum mtl_port port, } entry->parent = impl; - if (mt_pmd_is_kernel_socket(impl, port)) { + dbg("%s(%d), flags 0x%x\n", __func__, port, flow->flags); + if (mt_pmd_is_kernel_socket(impl, port) || (flow->flags & MT_TXQ_FLOW_F_FORCE_SOCKET)) { entry->tx_socket_q = mt_tx_socket_get(impl, port, flow); if (!entry->tx_socket_q) goto fail; entry->queue_id = mt_tx_socket_queue_id(entry->tx_socket_q); @@ -265,7 +267,8 @@ int mt_dp_queue_init(struct mtl_main_impl* impl) { struct mt_txq_flow flow; memset(&flow, 0, sizeof(flow)); - flow.sys_queue = true; + flow.flags = MT_TXQ_FLOW_F_SYS_QUEUE; + if (mt_drv_kernel_based(impl, i)) flow.flags = MT_TXQ_FLOW_F_FORCE_SOCKET; dp->txq_sys_entry = mt_txq_get(impl, i, &flow); if (!dp->txq_sys_entry) { err("%s(%d), txq sys entry get fail\n", __func__, i); diff --git a/lib/src/datapath/mt_shared_queue.c b/lib/src/datapath/mt_shared_queue.c index d91389fa5..20df65920 100644 --- a/lib/src/datapath/mt_shared_queue.c +++ b/lib/src/datapath/mt_shared_queue.c @@ -138,7 +138,7 @@ static uint32_t rsq_flow_hash(struct mt_rxq_flow* flow) { struct rte_ipv4_tuple tuple; uint32_t len; - if (flow->sys_queue) return 0; + if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) return 0; len = RTE_THASH_V4_L4_LEN; tuple.src_addr = RTE_IPV4(flow->dip_addr[0], flow->dip_addr[1], flow->dip_addr[2], @@ -173,7 +173,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port, entry->parent = rsqm; rte_memcpy(&entry->flow, flow, sizeof(entry->flow)); - if (!flow->sys_queue) { + if (!(flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE)) { entry->flow_rsp = mt_rx_flow_create(impl, port, q, flow); if (!entry->flow_rsp) { err("%s(%u), create flow fail\n", __func__, q); @@ -197,7 +197,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port, MT_TAILQ_INSERT_HEAD(&rsq_queue->head, entry, next); rte_atomic32_inc(&rsq_queue->entry_cnt); rsq_queue->entry_idx++; - if (flow->sys_queue) rsq_queue->cni_entry = entry; + if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) rsq_queue->cni_entry = entry; rsq_unlock(rsq_queue); uint8_t* ip = flow->dip_addr; @@ -266,7 +266,7 @@ static int rsq_rx(struct mt_rsq_queue* rsq_queue) { MT_TAILQ_FOREACH(rsq_entry, &rsq_queue->head, next) { bool ip_matched; - if (rsq_entry->flow.no_ip_flow) { + if (rsq_entry->flow.flags & MT_RXQ_FLOW_F_NO_IP) { ip_matched = true; } else { ip_matched = mt_is_multicast_ip(rsq_entry->flow.dip_addr) @@ -274,7 +274,7 @@ static int rsq_rx(struct mt_rsq_queue* rsq_queue) { : (ipv4->src_addr == *(uint32_t*)rsq_entry->flow.dip_addr); } bool port_matched; - if (rsq_entry->flow.no_port_flow) { + if (rsq_entry->flow.flags & MT_RXQ_FLOW_F_NO_PORT) { port_matched = true; } else { port_matched = ntohs(udp->dst_port) == rsq_entry->flow.dst_port; @@ -454,7 +454,7 @@ static uint32_t tsq_flow_hash(struct mt_txq_flow* flow) { struct rte_ipv4_tuple tuple; uint32_t len; - if (flow->sys_queue) return 0; + if (flow->flags & MT_TXQ_FLOW_F_SYS_QUEUE) return 0; len = RTE_THASH_V4_L4_LEN; tuple.src_addr = RTE_IPV4(flow->dip_addr[0], flow->dip_addr[1], flow->dip_addr[2], @@ -475,7 +475,8 @@ struct mt_tsq_entry* mt_tsq_get(struct mtl_main_impl* impl, enum mtl_port port, struct mt_tsq_impl* tsqm = tsq_ctx_get(impl, port); uint32_t hash = tsq_flow_hash(flow); uint16_t q = 0; - if (!flow->sys_queue) { /* queue zero is reserved for system queue */ + /* queue zero is reserved for system queue */ + if (!(flow->flags & MT_TXQ_FLOW_F_SYS_QUEUE)) { q = (hash % RTE_ETH_RETA_GROUP_SIZE) % (tsqm->max_tsq_queues - 1) + 1; } struct mt_tsq_queue* tsq_queue = &tsqm->tsq_queues[q]; diff --git a/lib/src/datapath/mt_shared_rss.c b/lib/src/datapath/mt_shared_rss.c index 6bfc410e8..ef75383ac 100644 --- a/lib/src/datapath/mt_shared_rss.c +++ b/lib/src/datapath/mt_shared_rss.c @@ -91,7 +91,7 @@ static int srss_tasklet_handler(void* priv) { udp = &hdr->udp; MT_TAILQ_FOREACH(srss_entry, &srss->head, next) { bool ip_matched; - if (srss_entry->flow.no_ip_flow) { + if (srss_entry->flow.flags & MT_RXQ_FLOW_F_NO_IP) { ip_matched = true; } else { ip_matched = mt_is_multicast_ip(srss_entry->flow.dip_addr) @@ -99,7 +99,7 @@ static int srss_tasklet_handler(void* priv) { : (ipv4->src_addr == *(uint32_t*)srss_entry->flow.dip_addr); } bool port_matched; - if (srss_entry->flow.no_port_flow) { + if (srss_entry->flow.flags & MT_RXQ_FLOW_F_NO_PORT) { port_matched = true; } else { port_matched = ntohs(udp->dst_port) == srss_entry->flow.dst_port; @@ -252,7 +252,7 @@ struct mt_srss_entry* mt_srss_get(struct mtl_main_impl* impl, enum mtl_port port srss_lock(srss); MT_TAILQ_INSERT_TAIL(&srss->head, entry, next); - if (flow->sys_queue) srss->cni_entry = entry; + if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) srss->cni_entry = entry; srss->entry_idx++; srss_unlock(srss); diff --git a/lib/src/dev/mt_dev.c b/lib/src/dev/mt_dev.c index 6dea3ce96..c6863f6c0 100644 --- a/lib/src/dev/mt_dev.c +++ b/lib/src/dev/mt_dev.c @@ -73,7 +73,7 @@ static const struct mt_dev_driver_info dev_drvs[] = { .drv_type = MT_DRV_DPDK_AF_XDP, .flow_type = MT_FLOW_ALL, .flags = MT_DRV_F_NO_CNI | MT_DRV_F_USE_KERNEL_CTL | MT_DRV_F_RX_POOL_COMMON | - MT_DRV_F_NO_SYS_TX_QUEUE, + MT_DRV_F_KERNEL_BASED, }, { .name = "net_af_packet", @@ -81,7 +81,7 @@ static const struct mt_dev_driver_info dev_drvs[] = { .drv_type = MT_DRV_DPDK_AF_PKT, .flow_type = MT_FLOW_ALL, .flags = MT_DRV_F_USE_KERNEL_CTL | MT_DRV_F_RX_POOL_COMMON | MT_DRV_F_RX_NO_FLOW | - MT_DRV_F_NO_SYS_TX_QUEUE, + MT_DRV_F_KERNEL_BASED, }, { .name = "kernel_socket", @@ -89,7 +89,7 @@ static const struct mt_dev_driver_info dev_drvs[] = { .drv_type = MT_DRV_KERNEL_SOCKET, .flow_type = MT_FLOW_ALL, .flags = MT_DRV_F_NOT_DPDK_PMD | MT_DRV_F_NO_CNI | MT_DRV_F_USE_KERNEL_CTL | - MT_DRV_F_RX_NO_FLOW | MT_DRV_F_MCAST_IN_DP, + MT_DRV_F_RX_NO_FLOW | MT_DRV_F_MCAST_IN_DP | MT_DRV_F_KERNEL_BASED, }, { .name = "native_af_xdp", @@ -97,7 +97,7 @@ static const struct mt_dev_driver_info dev_drvs[] = { .drv_type = MT_DRV_NATIVE_AF_XDP, .flow_type = MT_FLOW_ALL, .flags = MT_DRV_F_NOT_DPDK_PMD | MT_DRV_F_NO_CNI | MT_DRV_F_USE_KERNEL_CTL | - MT_DRV_F_NO_SYS_TX_QUEUE | MT_DRV_F_NO_SYS_TX_QUEUE, + MT_DRV_F_KERNEL_BASED, }, }; @@ -1495,7 +1495,7 @@ struct mt_tx_queue* mt_dev_get_tx_queue(struct mtl_main_impl* impl, enum mtl_por * of queue 0 by hard coding static configuration. So, traffic requires * LaunchTime based pacing must be transmitted over queue 0. */ - if (flow->launch_time_enabled) { + if (flow->flags & MT_TXQ_FLOW_F_LAUNCH_TIME) { /* If require LaunchTime based pacing, queue 0 is the only choice. */ if (q != 0) break; } else { @@ -1550,7 +1550,8 @@ struct mt_rx_queue* mt_dev_get_rx_queue(struct mtl_main_impl* impl, enum mtl_por for (uint16_t q = 0; q < inf->max_rx_queues; q++) { rx_queue = &inf->rx_queues[q]; if (rx_queue->active) continue; - if (flow && flow->hdr_split) { /* continue if not hdr split queue */ + if (flow && (flow->flags & MT_RXQ_FLOW_F_HDR_SPLIT)) { + /* continue if not hdr split queue */ if (!mt_if_hdr_split_pool(inf, q)) continue; #ifdef ST_HAS_DPDK_HDR_SPLIT if (flow->hdr_split_mbuf_cb) { @@ -1574,7 +1575,7 @@ struct mt_rx_queue* mt_dev_get_rx_queue(struct mtl_main_impl* impl, enum mtl_por } memset(&rx_queue->flow, 0, sizeof(rx_queue->flow)); - if (flow && !flow->sys_queue) { + if (flow && !(flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE)) { rx_queue->flow_rsp = mt_rx_flow_create(impl, port, q, flow); if (!rx_queue->flow_rsp) { err("%s(%d), create flow fail for queue %d\n", __func__, port, q); @@ -1611,8 +1612,7 @@ struct mt_rx_queue* mt_dev_get_rx_queue(struct mtl_main_impl* impl, enum mtl_por } mt_pthread_mutex_unlock(&inf->rx_queues_mutex); - err("%s(%d), fail to find free rx queue for %s\n", __func__, port, - flow && flow->hdr_split ? "hdr_split" : "normal"); + err("%s(%d), fail to find free rx queue\n", __func__, port); return NULL; } @@ -1740,7 +1740,7 @@ int mt_dev_put_rx_queue(struct mtl_main_impl* impl, struct mt_rx_queue* queue) { rx_queue->flow_rsp = NULL; } - if (rx_queue->flow.hdr_split) { + if (rx_queue->flow.flags & MT_RXQ_FLOW_F_HDR_SPLIT) { #ifdef ST_HAS_DPDK_HDR_SPLIT /* clear hdrs mbuf callback */ rte_eth_hdrs_set_mbuf_callback(inf->port_id, queue_id, NULL, NULL); diff --git a/lib/src/mt_cni.c b/lib/src/mt_cni.c index e8b075192..49e0a8e3f 100644 --- a/lib/src/mt_cni.c +++ b/lib/src/mt_cni.c @@ -175,7 +175,7 @@ static int cni_udp_handle(struct mt_cni_entry* cni, struct rte_mbuf* m) { csq_lock(cni); MT_TAILQ_FOREACH(csq, &cni->csq_queues, next) { bool ip_matched; - if (csq->flow.no_ip_flow) { + if (csq->flow.flags & MT_RXQ_FLOW_F_NO_IP) { ip_matched = true; } else { ip_matched = mt_is_multicast_ip(csq->flow.dip_addr) @@ -183,7 +183,7 @@ static int cni_udp_handle(struct mt_cni_entry* cni, struct rte_mbuf* m) { : (ipv4->src_addr == *(uint32_t*)csq->flow.dip_addr); } bool port_matched; - if (csq->flow.no_port_flow) { + if (csq->flow.flags & MT_RXQ_FLOW_F_NO_PORT) { port_matched = true; } else { port_matched = ntohs(udp->dst_port) == csq->flow.dst_port; @@ -415,7 +415,7 @@ static int cni_queues_init(struct mtl_main_impl* impl) { struct mt_rxq_flow flow; memset(&flow, 0, sizeof(flow)); - flow.sys_queue = true; + flow.flags = MT_RXQ_FLOW_F_SYS_QUEUE; cni->rxq = mt_rxq_get(impl, i, &flow); if (!cni->rxq) { err("%s(%d), rx queue get fail\n", __func__, i); @@ -430,6 +430,7 @@ static int cni_queues_init(struct mtl_main_impl* impl) { static bool cni_need_tasklet(struct mt_cni_impl* cni_impl) { struct mtl_main_impl* impl = cni_impl->parent; + if (!impl) return false; struct mt_cni_entry* cni; int num_ports = mt_num_ports(impl); @@ -607,7 +608,7 @@ struct mt_csq_entry* mt_csq_get(struct mtl_main_impl* impl, enum mtl_port port, struct mt_cni_entry* cni = cni_get_entry(impl, port); int idx = cni->csq_idx; - if (flow->sys_queue) { + if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) { err("%s(%d,%d), not support sys queue\n", __func__, port, idx); return NULL; } diff --git a/lib/src/mt_flow.c b/lib/src/mt_flow.c index 3f2d043ee..479161a33 100644 --- a/lib/src/mt_flow.c +++ b/lib/src/mt_flow.c @@ -96,9 +96,9 @@ static struct rte_flow* rte_rx_flow_create(struct mt_interface* inf, uint16_t q, /* drv not support ip flow */ if (inf->drv_info.flow_type == MT_FLOW_NO_IP) has_ip_flow = false; /* no ip flow requested */ - if (flow->no_ip_flow) has_ip_flow = false; + if (flow->flags & MT_RXQ_FLOW_F_NO_IP) has_ip_flow = false; /* no port flow requested */ - if (flow->no_port_flow) has_port_flow = false; + if (flow->flags & MT_RXQ_FLOW_F_NO_PORT) has_port_flow = false; /* only raw flow can be applied on the hdr split queue */ if (mt_if_hdr_split_pool(inf, q)) { diff --git a/lib/src/mt_main.h b/lib/src/mt_main.h index 009b2de93..d5a5961e8 100644 --- a/lib/src/mt_main.h +++ b/lib/src/mt_main.h @@ -187,9 +187,9 @@ struct mt_ptp_impl { * The flag indicates Qbv (IEEE 802.1Qbv) traffic shaper * enable. * - * The IEEE 802.1Qbv is designed to seperate traffics + * The IEEE 802.1Qbv is designed to separate traffics * transmission into different time slices to prevent - * traffics transmission interfereing. + * traffics transmission interfering. */ bool qbv_enabled; int64_t no_timesync_delta; @@ -278,24 +278,30 @@ struct mt_ptp_impl { uint16_t stat_sync_keep; }; +/* used for cni sys queue */ +#define MT_RXQ_FLOW_F_SYS_QUEUE (MTL_BIT32(0)) +/* no ip flow, only use port flow, for udp transport */ +#define MT_RXQ_FLOW_F_NO_IP (MTL_BIT32(1)) +/* if apply destination port flow or not */ +#define MT_RXQ_FLOW_F_NO_PORT (MTL_BIT32(2)) +/* child of cni to save queue usage */ +#define MT_RXQ_FLOW_F_FORCE_CNI (MTL_BIT32(3)) +/* if request hdr split */ +#define MT_RXQ_FLOW_F_HDR_SPLIT (MTL_BIT32(4)) +/* force to use socket, only for MT_DRV_F_KERNEL_BASED */ +#define MT_RXQ_FLOW_F_FORCE_SOCKET (MTL_BIT32(5)) + /* request of rx queue flow */ struct mt_rxq_flow { - /* used for cni queue */ - bool sys_queue; - /* no ip flow, only use port flow, for udp transport */ - bool no_ip_flow; - /* if apply destination port flow or not */ - bool no_port_flow; - /* child of cni to save queue usage */ - bool use_cni_queue; /* mandatory if not no_ip_flow */ uint8_t dip_addr[MTL_IP_ADDR_LEN]; /* rx destination IP */ /* source ip is ignored if destination is a multicast address */ uint8_t sip_addr[MTL_IP_ADDR_LEN]; /* source IP */ uint16_t dst_port; /* udp destination port */ + /* value of MT_RXQ_FLOW_F_* */ + uint32_t flags; - /* optional */ - bool hdr_split; /* if request hdr split */ + /* optional for hdr split */ void* hdr_split_mbuf_cb_priv; #ifdef ST_HAS_DPDK_HDR_SPLIT /* rte_eth_hdrs_mbuf_callback_fn define with this marco */ rte_eth_hdrs_mbuf_callback_fn hdr_split_mbuf_cb; @@ -601,12 +607,14 @@ struct mt_tx_queue { #define MT_DRV_F_USE_KERNEL_CTL (MTL_BIT64(4)) /* no priv for the mbuf in the rx queue */ #define MT_DRV_F_RX_POOL_COMMON (MTL_BIT64(5)) -/* no rx flow, for MTL_PMD_DPDK_AF_PACKET and MTL_PMD_DPDK_AF_PACKET */ +/* no rx flow, for MTL_PMD_DPDK_AF_PACKET and MTL_PMD_KERNEL_SOCKET */ #define MT_DRV_F_RX_NO_FLOW (MTL_BIT64(6)) /* mcast control in data path, for MTL_PMD_KERNEL_SOCKET */ #define MT_DRV_F_MCAST_IN_DP (MTL_BIT64(7)) /* no sys tx queue support */ #define MT_DRV_F_NO_SYS_TX_QUEUE (MTL_BIT64(8)) +/* kernel based backend */ +#define MT_DRV_F_KERNEL_BASED (MTL_BIT64(9)) struct mt_dev_driver_info { char* name; @@ -856,14 +864,21 @@ struct mt_rsq_impl { struct mt_rsq_queue* rsq_queues; }; +/* used for sys queue */ +#define MT_TXQ_FLOW_F_SYS_QUEUE (MTL_BIT32(0)) +/* if launch time enabled */ +#define MT_TXQ_FLOW_F_LAUNCH_TIME (MTL_BIT32(1)) +/* force to use socket, only for MT_DRV_F_KERNEL_BASED */ +#define MT_TXQ_FLOW_F_FORCE_SOCKET (MTL_BIT32(1)) + /* request of tx queue flow */ struct mt_txq_flow { - bool sys_queue; uint64_t bytes_per_sec; /* rl rate in byte */ /* mandatory if not sys_queue */ uint8_t dip_addr[MTL_IP_ADDR_LEN]; /* tx destination IP */ uint16_t dst_port; /* udp destination port */ - bool launch_time_enabled; + /* value with MT_TXQ_FLOW_F_* */ + uint32_t flags; }; struct mt_tsq_impl; /* forward delcare */ @@ -1125,6 +1140,13 @@ static inline bool mt_drv_no_sys_txq(struct mtl_main_impl* impl, enum mtl_port p return false; } +static inline bool mt_drv_kernel_based(struct mtl_main_impl* impl, enum mtl_port port) { + if (mt_if(impl, port)->drv_info.flags & MT_DRV_F_KERNEL_BASED) + return true; + else + return false; +} + static inline bool mt_pmd_is_dpdk_af_xdp(struct mtl_main_impl* impl, enum mtl_port port) { if (MTL_PMD_DPDK_AF_XDP == mt_get_user_params(impl)->pmd[port]) return true; diff --git a/lib/src/mt_ptp.c b/lib/src/mt_ptp.c index 918c145ef..f0b16880e 100644 --- a/lib/src/mt_ptp.c +++ b/lib/src/mt_ptp.c @@ -1097,7 +1097,7 @@ static int ptp_init(struct mtl_main_impl* impl, struct mt_ptp_impl* ptp, memcpy(&id[3], &magic, 2); memcpy(&id[5], &mac.addr_bytes[3], 3); our_port_id->port_number = htons(port_id); // now always - // ptp_print_port_id(port_id, our_port_id); + ptp_print_port_id(port_id, our_port_id); rte_memcpy(ip, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); @@ -1146,12 +1146,21 @@ static int ptp_init(struct mtl_main_impl* impl, struct mt_ptp_impl* ptp, inet_pton(AF_INET, "224.0.1.129", ptp->mcast_group_addr); - /* create rx queue if no CNI path */ - if (!mt_has_cni(impl, port)) { + if (mt_has_cni(impl, port) && !mt_drv_kernel_based(impl, port)) { + /* join mcast only if cni path, no cni use socket which has mcast in the data path */ + ret = mt_mcast_join(impl, mt_ip_to_u32(ptp->mcast_group_addr), port); + if (ret < 0) { + err("%s(%d), join ptp multicast group fail\n", __func__, port); + return ret; + } + mt_mcast_l2_join(impl, &ptp_l2_multicast_eaddr, port); + } else { + /* create rx socket queue if no CNI path */ struct mt_rxq_flow flow; memset(&flow, 0, sizeof(flow)); rte_memcpy(flow.dip_addr, ptp->mcast_group_addr, MTL_IP_ADDR_LEN); rte_memcpy(flow.sip_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); + flow.flags = MT_RXQ_FLOW_F_FORCE_SOCKET; flow.dst_port = MT_PTP_UDP_GEN_PORT; ptp->gen_rxq = mt_rxq_get(impl, port, &flow); @@ -1180,14 +1189,6 @@ static int ptp_init(struct mtl_main_impl* impl, struct mt_ptp_impl* ptp, } } - /* join mcast */ - ret = mt_mcast_join(impl, mt_ip_to_u32(ptp->mcast_group_addr), port); - if (ret < 0) { - err("%s(%d), join ptp multicast group fail\n", __func__, port); - return ret; - } - mt_mcast_l2_join(impl, &ptp_l2_multicast_eaddr, port); - ptp->active = true; if (!mt_if_has_timesync(impl, port)) { ptp->no_timesync = true; @@ -1212,8 +1213,10 @@ static int ptp_uinit(struct mtl_main_impl* impl, struct mt_ptp_impl* ptp) { if (!ptp->active) return 0; - mt_mcast_l2_leave(impl, &ptp_l2_multicast_eaddr, port); - mt_mcast_leave(impl, mt_ip_to_u32(ptp->mcast_group_addr), port); + if (mt_has_cni(impl, port) && !mt_drv_kernel_based(impl, port)) { + mt_mcast_l2_leave(impl, &ptp_l2_multicast_eaddr, port); + mt_mcast_leave(impl, mt_ip_to_u32(ptp->mcast_group_addr), port); + } if (ptp->rxq_tasklet) { mt_sch_unregister_tasklet(ptp->rxq_tasklet); diff --git a/lib/src/mt_socket.c b/lib/src/mt_socket.c index b6cb16b71..49c43d669 100644 --- a/lib/src/mt_socket.c +++ b/lib/src/mt_socket.c @@ -391,16 +391,16 @@ int mt_socket_add_flow(struct mtl_main_impl* impl, enum mtl_port port, uint16_t uint8_t start_queue = mt_afxdp_start_queue(impl, port); const char* if_name = mt_kernel_if_name(impl, port); - if (flow->sys_queue) { + if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) { err("%s(%d), sys_queue not supported\n", __func__, port); return -EIO; } - if (flow->no_port_flow) { + if (flow->flags & MT_RXQ_FLOW_F_NO_PORT) { err("%s(%d), no_port_flow not supported\n", __func__, port); return -EIO; } - if (flow->no_ip_flow) { + if (flow->flags & MT_RXQ_FLOW_F_NO_IP) { snprintf(cmd, sizeof(cmd), "ethtool -N %s flow-type udp4 dst-port %u action %u", if_name, flow->dst_port, queue_id + start_queue); } else if (mt_is_multicast_ip(flow->dip_addr)) { diff --git a/lib/src/st2110/st_rx_ancillary_session.c b/lib/src/st2110/st_rx_ancillary_session.c index b1c2ad228..4163389d3 100644 --- a/lib/src/st2110/st_rx_ancillary_session.c +++ b/lib/src/st2110/st_rx_ancillary_session.c @@ -223,7 +223,7 @@ static int rx_ancillary_session_init_hw(struct mtl_main_impl* impl, rte_memcpy(flow.dip_addr, s->ops.sip_addr[i], MTL_IP_ADDR_LEN); rte_memcpy(flow.sip_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); flow.dst_port = s->st40_dst_port[i]; - if (mt_has_cni_rx(impl, port)) flow.use_cni_queue = true; + if (mt_has_cni_rx(impl, port)) flow.flags |= MT_RXQ_FLOW_F_FORCE_CNI; /* no flow for data path only */ if (s->ops.flags & ST40_RX_FLAG_DATA_PATH_ONLY) { diff --git a/lib/src/st2110/st_rx_audio_session.c b/lib/src/st2110/st_rx_audio_session.c index 8a7ec093b..efd10b458 100644 --- a/lib/src/st2110/st_rx_audio_session.c +++ b/lib/src/st2110/st_rx_audio_session.c @@ -617,7 +617,7 @@ static int rx_audio_session_init_hw(struct mtl_main_impl* impl, rte_memcpy(flow.dip_addr, s->ops.sip_addr[i], MTL_IP_ADDR_LEN); rte_memcpy(flow.sip_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); flow.dst_port = s->st30_dst_port[i]; - if (mt_has_cni_rx(impl, port)) flow.use_cni_queue = true; + if (mt_has_cni_rx(impl, port)) flow.flags |= MT_RXQ_FLOW_F_FORCE_CNI; /* no flow for data path only */ if (s->ops.flags & ST30_RX_FLAG_DATA_PATH_ONLY) { diff --git a/lib/src/st2110/st_rx_video_session.c b/lib/src/st2110/st_rx_video_session.c index 8c9e7bd42..33342bc6b 100644 --- a/lib/src/st2110/st_rx_video_session.c +++ b/lib/src/st2110/st_rx_video_session.c @@ -2963,7 +2963,7 @@ static int rv_init_hw(struct mtl_main_impl* impl, struct st_rx_video_session_imp rte_memcpy(flow.sip_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); flow.dst_port = s->st20_dst_port[i]; if (rv_is_hdr_split(s)) { - flow.hdr_split = true; + flow.flags |= MT_RXQ_FLOW_F_HDR_SPLIT; #ifdef ST_HAS_DPDK_HDR_SPLIT flow.hdr_split_mbuf_cb_priv = s; flow.hdr_split_mbuf_cb = rv_hdrs_mbuf_callback_fn; @@ -2972,10 +2972,8 @@ static int rv_init_hw(struct mtl_main_impl* impl, struct st_rx_video_session_imp rv_uinit_hw(s); return -ENOTSUP; #endif - } else { - flow.hdr_split = false; } - if (mt_has_cni_rx(impl, port)) flow.use_cni_queue = true; + if (mt_has_cni_rx(impl, port)) flow.flags |= MT_RXQ_FLOW_F_FORCE_CNI; /* no flow for data path only */ if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) { diff --git a/lib/src/st2110/st_tx_video_session.c b/lib/src/st2110/st_tx_video_session.c index a80571e51..cd4dba1a2 100644 --- a/lib/src/st2110/st_tx_video_session.c +++ b/lib/src/st2110/st_tx_video_session.c @@ -872,9 +872,8 @@ static int tv_init_rtcp(struct mtl_main_impl* impl, struct st_tx_video_sessions_ } /* create flow to receive rtcp nack */ memset(&flow, 0, sizeof(flow)); - flow.no_ip_flow = true; + flow.flags = MT_RXQ_FLOW_F_NO_IP | MT_RXQ_FLOW_F_FORCE_CNI; flow.dst_port = s->st20_dst_port[i] + 1; - flow.use_cni_queue = true; s->rtcp_q[i] = mt_rxq_get(impl, port, &flow); if (!s->rtcp_q[i]) { err("%s(%d,%d), mt_rxq_get fail on port %d\n", __func__, mgr_idx, idx, i); @@ -2419,7 +2418,8 @@ static int tv_init_hw(struct mtl_main_impl* impl, struct st_tx_video_sessions_mg flow.bytes_per_sec = tv_rl_bps(s); mtl_memcpy(&flow.dip_addr, &s->ops.dip_addr[i], MTL_IP_ADDR_LEN); flow.dst_port = s->ops.udp_port[i]; - if (ST21_TX_PACING_WAY_TSN == s->pacing_way[i]) flow.launch_time_enabled = true; + if (ST21_TX_PACING_WAY_TSN == s->pacing_way[i]) + flow.flags |= MT_TXQ_FLOW_F_LAUNCH_TIME; s->queue[i] = mt_txq_get(impl, port, &flow); if (!s->queue[i]) { tv_uinit_hw(s); diff --git a/lib/src/udp/udp_rxq.c b/lib/src/udp/udp_rxq.c index aa130bdfc..d2505dce7 100644 --- a/lib/src/udp/udp_rxq.c +++ b/lib/src/udp/udp_rxq.c @@ -257,7 +257,7 @@ static struct mur_queue* urq_get(struct mudp_rxq_mgr* mgr, /* create flow */ struct mt_rxq_flow flow; memset(&flow, 0, sizeof(flow)); - flow.no_ip_flow = true; + flow.flags = MT_RXQ_FLOW_F_NO_IP; flow.dst_port = dst_port; q->rxq = mt_rxq_get(impl, port, &flow); if (!q->rxq) {