diff --git a/lib/src/datapath/mt_dp_socket.c b/lib/src/datapath/mt_dp_socket.c index 2730fa1aa..41aa309dc 100644 --- a/lib/src/datapath/mt_dp_socket.c +++ b/lib/src/datapath/mt_dp_socket.c @@ -11,10 +11,127 @@ #include "mudp_api.h" #endif -#define MT_RX_DP_SOCKET_MEMPOOL_PREFIX "SR_" +#define MT_RX_DP_SOCKET_PREFIX "SR_" #ifndef WINDOWSENV +static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m) { + struct mt_tx_socket_entry* entry = t->parent; + enum mtl_port port = entry->port; + int fd = t->fd; + struct sockaddr_in send_addr; + struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw; + + /* check if suppoted */ + if (m->nb_segs > 1) { + err("%s(%d,%d), only support one nb_segs %u\n", __func__, port, fd, m->nb_segs); + return -ENOTSUP; + } + struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*); + struct rte_ether_hdr* eth = &hdr->eth; + + if (eth->ether_type != htons(RTE_ETHER_TYPE_IPV4)) { + err("%s(%d,%d), not ipv4\n", __func__, port, fd); + return -ENOTSUP; + } + // mt_mbuf_dump(port, 0, "socket_tx", m); + + void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr)); + ssize_t payload_len = m->data_len - sizeof(struct mt_udp_hdr); + + struct rte_ipv4_hdr* ipv4 = &hdr->ipv4; + struct rte_udp_hdr* udp = &hdr->udp; + mudp_init_sockaddr(&send_addr, (uint8_t*)&ipv4->dst_addr, ntohs(udp->dst_port)); + + /* nonblocking */ + ssize_t send = sendto(fd, payload, payload_len, MSG_DONTWAIT, + (const struct sockaddr*)&send_addr, sizeof(send_addr)); + dbg("%s(%d,%d), len %" PRId64 " send %" PRId64 "\n", __func__, port, fd, payload_len, + send); + if (send != payload_len) { + dbg("%s(%d,%d), sendto fail, len %" PRId64 " send %" PRId64 "\n", __func__, port, fd, + payload_len, send); + return -EBUSY; + } + if (stats) { + stats->tx_packets++; + stats->tx_bytes += m->data_len; + } + + return 0; +} + +static void* tx_socket_thread(void* arg) { + struct mt_tx_socket_thread* t = arg; + struct mt_tx_socket_entry* entry = t->parent; + enum mtl_port port = entry->port; + struct rte_mbuf* m = NULL; + int ret; + + info("%s(%d), start, fd %d\n", __func__, port, t->fd); + while (rte_atomic32_read(&t->stop_thread) == 0) { + ret = rte_ring_mc_dequeue(entry->ring, (void**)&m); + if (ret < 0) continue; + do { + ret = tx_socket_send_mbuf(t, m); + } while ((ret < 0) && (rte_atomic32_read(&t->stop_thread) == 0)); + rte_pktmbuf_free(m); + } + info("%s(%d), stop, fd %d\n", __func__, port, t->fd); + + return NULL; +} + +static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) { + int idx = entry->threads_data[0].fd; + int fd, ret; + + /* fds[0] already init */ + for (int i = 1; i < entry->threads; i++) { + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) { + mt_tx_socket_put(entry); + err("%s(%d), socket open fail %d\n", __func__, idx, fd); + return fd; + } + entry->threads_data[i].fd = fd; + /* non-blocking */ + ret = mt_fd_set_nonbolck(fd); + if (ret < 0) return ret; + info("%s(%d), fd %d for thread %d\n", __func__, idx, fd, i); + } + + /* create the ring, one producer multi consumer */ + char ring_name[64]; + struct rte_ring* ring; + unsigned int flags, count; + snprintf(ring_name, sizeof(ring_name), "%sP%dFD%d", MT_RX_DP_SOCKET_PREFIX, entry->port, + idx); + flags = RING_F_SP_ENQ; + count = mt_if_nb_tx_desc(entry->parent, entry->port); + ring = + rte_ring_create(ring_name, count, mt_socket_id(entry->parent, entry->port), flags); + if (!ring) { + err("%s(%d), ring create fail\n", __func__, idx); + return -EIO; + } + entry->ring = ring; + + /* create the threads except the base fd */ + for (int i = 0; i < entry->threads; i++) { + struct mt_tx_socket_thread* t = &entry->threads_data[i]; + + rte_atomic32_set(&t->stop_thread, 0); + ret = pthread_create(&t->tid, NULL, tx_socket_thread, t); + if (ret < 0) { + err("%s(%d), thread create fail %d for thread %d\n", __func__, idx, ret, i); + return ret; + } + } + + return 0; +} + struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, enum mtl_port port, struct mt_txq_flow* flow) { @@ -32,11 +149,9 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, } /* non-blocking */ - int flags = fcntl(fd, F_GETFL, 0); - flags |= O_NONBLOCK; - ret = fcntl(fd, F_SETFL, flags); + ret = mt_fd_set_nonbolck(fd); if (ret < 0) { - err("%s(%d,%d), O_NONBLOCK fail %d\n", __func__, port, fd, ret); + err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, fd, ret); close(fd); return NULL; } @@ -57,22 +172,69 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, close(fd); return NULL; } + for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { + struct mt_tx_socket_thread* t = &entry->threads_data[i]; + t->fd = -1; + t->parent = entry; + } entry->parent = impl; entry->port = port; - entry->fd = fd; + /* 4g bit per second */ + entry->rate_limit_per_thread = (uint64_t)4 * 1000 * 1000 * 1000; + entry->threads_data[0].fd = fd; rte_memcpy(&entry->flow, flow, sizeof(entry->flow)); + uint64_t required = flow->bytes_per_sec * 8; + entry->threads = required / entry->rate_limit_per_thread + 1; + entry->threads = RTE_MIN(entry->threads, MT_DP_SOCKET_THREADS_MAX); + if (entry->threads > 1) { + ret = tx_socket_init_threads(entry); + if (ret < 0) { + err("%s(%d,%d), init %d threads fail %d\n", __func__, port, fd, entry->threads, + ret); + mt_tx_socket_put(entry); + return NULL; + } + } + uint8_t* ip = flow->dip_addr; - info("%s(%d), fd %d ip %u.%u.%u.%u, port %u\n", __func__, port, fd, ip[0], ip[1], ip[2], - ip[3], flow->dst_port); + info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u\n", __func__, port, + entry->threads_data[0].fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port, + entry->threads); return entry; } int mt_tx_socket_put(struct mt_tx_socket_entry* entry) { - if (entry->fd > 0) { - close(entry->fd); + int idx = entry->threads_data[0].fd; + + /* stop threads */ + for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { + struct mt_tx_socket_thread* t = &entry->threads_data[i]; + + rte_atomic32_set(&t->stop_thread, 1); + if (t->tid) { + pthread_join(t->tid, NULL); + t->tid = 0; + } } - info("%s(%d,%d), succ\n", __func__, entry->port, entry->fd); + + if (entry->ring) { + mt_ring_dequeue_clean(entry->ring); + rte_ring_free(entry->ring); + entry->ring = NULL; + } + + /* close fd */ + for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { + struct mt_tx_socket_thread* t = &entry->threads_data[i]; + + if (t->fd >= 0) { + close(t->fd); + t->fd = -1; + } + } + + info("%s(%d,%d), succ\n", __func__, entry->port, idx); mt_rte_free(entry); return 0; } @@ -80,52 +242,21 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) { uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf** tx_pkts, uint16_t nb_pkts) { uint16_t tx = 0; - enum mtl_port port = entry->port; - int fd = entry->fd; - struct sockaddr_in send_addr; - struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw; + int ret; + + if (entry->ring) { + unsigned int n = + rte_ring_sp_enqueue_bulk(entry->ring, (void**)&tx_pkts[0], nb_pkts, NULL); + // tx_socket_dequeue(&entry->threads_data[0]); + return n; + } for (tx = 0; tx < nb_pkts; tx++) { struct rte_mbuf* m = tx_pkts[tx]; - - /* check if suppoted */ - if (m->nb_segs > 1) { - err("%s(%d,%d), only support one nb_segs %u\n", __func__, port, fd, m->nb_segs); - goto done; - } - struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*); - struct rte_ether_hdr* eth = &hdr->eth; - - if (eth->ether_type != htons(RTE_ETHER_TYPE_IPV4)) { - err("%s(%d,%d), not ipv4\n", __func__, port, fd); - goto done; - } - // mt_mbuf_dump(port, 0, "socket_tx", m); - - void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr)); - ssize_t payload_len = m->data_len - sizeof(struct mt_udp_hdr); - - struct rte_ipv4_hdr* ipv4 = &hdr->ipv4; - struct rte_udp_hdr* udp = &hdr->udp; - mudp_init_sockaddr(&send_addr, (uint8_t*)&ipv4->dst_addr, ntohs(udp->dst_port)); - - /* nonblocking */ - ssize_t send = sendto(entry->fd, payload, payload_len, MSG_DONTWAIT, - (const struct sockaddr*)&send_addr, sizeof(send_addr)); - dbg("%s(%d,%d), len %" PRId64 " send %" PRId64 "\n", __func__, port, fd, payload_len, - send); - if (send != payload_len) { - dbg("%s(%d,%d), sendto fail, len %" PRId64 " send %" PRId64 "\n", __func__, port, - fd, payload_len, send); - goto done; - } - if (stats) { - stats->tx_packets++; - stats->tx_bytes += m->data_len; - } + ret = tx_socket_send_mbuf(&entry->threads_data[0], m); + if (ret < 0) break; } -done: rte_pktmbuf_free_bulk(tx_pkts, tx); return tx; } @@ -155,11 +286,9 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, return NULL; } /* non-blocking */ - int flags = fcntl(fd, F_GETFL, 0); - flags |= O_NONBLOCK; - ret = fcntl(fd, F_SETFL, flags); + ret = mt_fd_set_nonbolck(fd); if (ret < 0) { - err("%s(%d,%d), O_NONBLOCK fail %d\n", __func__, port, fd, ret); + err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, fd, ret); close(fd); return NULL; } @@ -219,8 +348,7 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, /* Create mempool to hold the rx queue mbufs. */ unsigned int mbuf_elements = mt_if_nb_rx_desc(impl, port) + 1024; char pool_name[ST_MAX_NAME_LEN]; - snprintf(pool_name, ST_MAX_NAME_LEN, "%sP%dF%d_MBUF", MT_RX_DP_SOCKET_MEMPOOL_PREFIX, - port, fd); + snprintf(pool_name, ST_MAX_NAME_LEN, "%sP%dF%d_MBUF", MT_RX_DP_SOCKET_PREFIX, port, fd); /* no priv */ entry->pool = mt_mempool_create_by_ops(impl, port, pool_name, mbuf_elements, MT_MBUF_CACHE_SIZE, diff --git a/lib/src/datapath/mt_dp_socket.h b/lib/src/datapath/mt_dp_socket.h index 9d2dcd006..17491e5e9 100644 --- a/lib/src/datapath/mt_dp_socket.h +++ b/lib/src/datapath/mt_dp_socket.h @@ -11,7 +11,7 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, enum mtl_port port, struct mt_txq_flow* flow); int mt_tx_socket_put(struct mt_tx_socket_entry* entry); static inline uint16_t mt_tx_socket_queue_id(struct mt_tx_socket_entry* entry) { - return entry->fd; + return entry->threads_data[0].fd; } uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf** tx_pkts, uint16_t nb_pkts); diff --git a/lib/src/mt_main.h b/lib/src/mt_main.h index d5a5961e8..24ef87911 100644 --- a/lib/src/mt_main.h +++ b/lib/src/mt_main.h @@ -942,12 +942,24 @@ struct mt_srss_impl { int entry_idx; }; +#define MT_DP_SOCKET_THREADS_MAX (4) + +struct mt_tx_socket_thread { + struct mt_tx_socket_entry* parent; + int fd; + pthread_t tid; + rte_atomic32_t stop_thread; +}; + struct mt_tx_socket_entry { struct mtl_main_impl* parent; enum mtl_port port; struct mt_txq_flow flow; - int fd; + uint64_t rate_limit_per_thread; + int threads; + struct rte_ring* ring; + struct mt_tx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX]; }; struct mt_rx_socket_entry { diff --git a/lib/src/mt_util.h b/lib/src/mt_util.h index 649939a00..e78167a0d 100644 --- a/lib/src/mt_util.h +++ b/lib/src/mt_util.h @@ -208,6 +208,19 @@ static inline void mt_mbuf_refcnt_inc_bulk(struct rte_mbuf** mbufs, uint16_t nb) } } +#ifdef WINDOWSENV +static inline int mt_fd_set_nonbolck(int fd) { + MTL_MAY_UNUSED(fd); + return -ENOTSUP; +} +#else +static inline int mt_fd_set_nonbolck(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + flags |= O_NONBLOCK; + return fcntl(fd, F_SETFL, flags); +} +#endif + const char* mt_dpdk_afxdp_port2if(const char* port); const char* mt_dpdk_afpkt_port2if(const char* port); const char* mt_kernel_port2if(const char* port); diff --git a/plugins/st22_ffmpeg/README.md b/plugins/st22_ffmpeg/README.md index f790ccba3..15798a15c 100644 --- a/plugins/st22_ffmpeg/README.md +++ b/plugins/st22_ffmpeg/README.md @@ -54,11 +54,11 @@ ffmpeg -s 1920x1080 -pix_fmt yuv420p -i yuv420p8le.yuv -pix_fmt yuv422p test_pla Tx run: ```bash -./build/app/TxSt22PipelineSample --st22_codec h264_cbr --st22_fmt YUV422PLANAR8 --tx_url test_planar8.yuv +./build/app/TxSt22PipelineSample --st22_codec h264_cbr --pipeline_fmt YUV422PLANAR8 --tx_url test_planar8.yuv ``` Rx run: ```bash -./build/app/RxSt22PipelineSample --st22_codec h264_cbr --st22_fmt YUV422PLANAR8 --rx_url out_planar8.yuv +./build/app/RxSt22PipelineSample --st22_codec h264_cbr --pipeline_fmt YUV422PLANAR8 --rx_url out_planar8.yuv ```