From 2023d028c7de7eb42cefd71aa43d803a5034871e Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 25 Oct 2023 14:20:55 +0800 Subject: [PATCH 1/2] dp/socket: add gso for tx Signed-off-by: Frank Du --- lib/src/datapath/mt_dp_socket.c | 214 +++++++++++++++++++++------ lib/src/mt_main.h | 11 ++ lib/src/st2110/st_tx_video_session.c | 1 + 3 files changed, 181 insertions(+), 45 deletions(-) diff --git a/lib/src/datapath/mt_dp_socket.c b/lib/src/datapath/mt_dp_socket.c index a7e11df6f..1c220975e 100644 --- a/lib/src/datapath/mt_dp_socket.c +++ b/lib/src/datapath/mt_dp_socket.c @@ -16,25 +16,39 @@ #ifndef WINDOWSENV +static inline int tx_socket_verify_mbuf(struct rte_mbuf* m) { + if (m->nb_segs > 1) { + err("%s, only support one nb_segs %u\n", __func__, m->nb_segs); + return -ENOTSUP; + } + + struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*); + struct rte_ether_hdr* eth = &hdr->eth; + uint16_t ether_type = ntohs(eth->ether_type); + + if (ether_type != RTE_ETHER_TYPE_IPV4) { + err("%s, not ipv4, ether_type 0x%x\n", __func__, ether_type); + return -ENOTSUP; + } + + return 0; +} + static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m) { struct mt_tx_socket_entry* entry = t->parent; enum mtl_port port = entry->port; - int fd = t->fd; + int fd = t->fd, ret; struct sockaddr_in send_addr; struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw; /* check if suppoted */ - if (m->nb_segs > 1) { - err("%s(%d,%d), only support one nb_segs %u\n", __func__, port, fd, m->nb_segs); - return -ENOTSUP; + ret = tx_socket_verify_mbuf(m); + if (ret < 0) { + err("%s(%d,%d), unsupported mbuf %p ret %d\n", __func__, port, fd, m, ret); + return ret; } - struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*); - struct rte_ether_hdr* eth = &hdr->eth; - if (eth->ether_type != htons(RTE_ETHER_TYPE_IPV4)) { - err("%s(%d,%d), not ipv4\n", __func__, port, fd); - return -ENOTSUP; - } + struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*); // mt_mbuf_dump(port, 0, "socket_tx", m); void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr)); @@ -63,6 +77,94 @@ static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m return 0; } +static uint16_t tx_socket_send_mbuf_gso(struct mt_tx_socket_thread* t, + struct rte_mbuf** tx_pkts, uint16_t nb_pkts) { + uint16_t tx = 0; + struct mt_tx_socket_entry* entry = t->parent; + enum mtl_port port = entry->port; + int fd = t->fd, ret; + uint16_t gso_sz = entry->gso_sz; + struct iovec iovs[nb_pkts]; + int gso_cnt = 0; + struct msghdr* msg = &t->msg; + ssize_t write; + struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw; + + msg->msg_iov = iovs; + + for (uint16_t i = 0; i < nb_pkts; i++) { + struct rte_mbuf* m = tx_pkts[i]; + ret = tx_socket_verify_mbuf(m); + if (ret < 0) { + err("%s(%d,%d), unsupported mbuf %p ret %d\n", __func__, port, fd, m, ret); + return tx; + } + + uint16_t payload_len = m->data_len - sizeof(struct mt_udp_hdr); + void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr)); + dbg("%s(%d,%d), mbuf %u payload_len %u\n", __func__, port, fd, i, payload_len); + + if (payload_len == gso_sz) { + iovs[gso_cnt].iov_base = payload; + iovs[gso_cnt].iov_len = payload_len; + gso_cnt++; + } else { + if (gso_cnt) { + msg->msg_iovlen = gso_cnt; + write = sendmsg(fd, msg, MSG_DONTWAIT); + if (write != (gso_sz * gso_cnt)) { + dbg("%s(%d,%d), sendmsg 1 fail, len %u send %" PRId64 "\n", __func__, port, fd, + gso_sz * gso_cnt, write); + return tx; + } + tx += gso_cnt; + if (stats) { + stats->tx_packets += gso_cnt; + stats->tx_bytes += write; + } + t->stat_tx_pkt += gso_cnt; + t->stat_tx_gso++; + + gso_cnt = 0; + } + write = sendto(fd, payload, payload_len, MSG_DONTWAIT, &t->send_addr, + sizeof(t->send_addr)); + if (write != payload_len) { + dbg("%s(%d,%d), sendto fail, len %u send %" PRId64 "\n", __func__, port, fd, + payload_len, write); + return tx; + } + tx++; + if (stats) { + stats->tx_packets++; + stats->tx_bytes += write; + } + t->stat_tx_pkt++; + } + } + + if (gso_cnt) { + msg->msg_iovlen = gso_cnt; + write = sendmsg(fd, msg, MSG_DONTWAIT); + if (write != (gso_sz * gso_cnt)) { + dbg("%s(%d,%d), sendmsg fail, len %u send %" PRId64 "\n", __func__, port, fd, + gso_sz * gso_cnt, write); + return tx; + } + dbg("%s(%d,%d), sendmsg succ, len %u send %" PRId64 "\n", __func__, port, fd, + gso_sz * gso_cnt, write); + tx += gso_cnt; + if (stats) { + stats->tx_packets += gso_cnt; + stats->tx_bytes += write; + } + t->stat_tx_pkt += gso_cnt; + t->stat_tx_gso++; + } + + return tx; +} + static void* tx_socket_thread_loop(void* arg) { struct mt_tx_socket_thread* t = arg; struct mt_tx_socket_entry* entry = t->parent; @@ -84,14 +186,23 @@ static void* tx_socket_thread_loop(void* arg) { return NULL; } -static int tx_socket_init_fd(struct mt_tx_socket_entry* entry, int fd) { +static int tx_socket_init_thread_data(struct mt_tx_socket_thread* t) { int ret; + struct mt_tx_socket_entry* entry = t->parent; enum mtl_port port = entry->port; + int idx = t->idx; + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) { + err("%s(%d,%d), socket open fail %d\n", __func__, port, idx, fd); + return fd; + } + t->fd = fd; + info("%s(%d), fd %d for thread %d\n", __func__, idx, fd, idx); /* non-blocking */ ret = mt_fd_set_nonbolck(fd); if (ret < 0) { - err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, fd, ret); + err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, idx, ret); return ret; } @@ -99,29 +210,40 @@ static int tx_socket_init_fd(struct mt_tx_socket_entry* entry, int fd) { const char* if_name = mt_kernel_if_name(entry->parent, port); ret = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, if_name, strlen(if_name)); if (ret < 0) { - err("%s(%d,%d), SO_BINDTODEVICE to %s fail %d\n", __func__, port, fd, if_name, ret); + err("%s(%d,%d), SO_BINDTODEVICE to %s fail %d\n", __func__, port, idx, if_name, ret); return ret; } + if (entry->gso_sz) { + mudp_init_sockaddr(&t->send_addr, entry->flow.dip_addr, entry->flow.dst_port); + t->msg.msg_namelen = sizeof(t->send_addr); + t->msg.msg_name = &t->send_addr; + + /* gso size for sendmsg */ + t->msg.msg_control = t->msg_control; + t->msg.msg_controllen = sizeof(t->msg_control); + struct cmsghdr* cmsg; + cmsg = CMSG_FIRSTHDR(&t->msg); + cmsg->cmsg_level = SOL_UDP; + cmsg->cmsg_type = UDP_SEGMENT; + cmsg->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + uint16_t* val_p; + val_p = (uint16_t*)CMSG_DATA(cmsg); + *val_p = entry->gso_sz; + } + return 0; } static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) { int idx = entry->threads_data[0].fd; - int fd, ret; + int ret; /* fds[0] already init */ for (int i = 1; i < entry->threads; i++) { - fd = socket(AF_INET, SOCK_DGRAM, 0); - if (fd < 0) { - err("%s(%d), socket open fail %d\n", __func__, idx, fd); - return fd; - } - entry->threads_data[i].fd = fd; - /* non-blocking */ - ret = tx_socket_init_fd(entry, fd); + struct mt_tx_socket_thread* t = &entry->threads_data[i]; + ret = tx_socket_init_thread_data(t); if (ret < 0) return ret; - info("%s(%d), fd %d for thread %d\n", __func__, idx, fd, i); } /* create the ring, multi producer single consumer */ @@ -140,7 +262,7 @@ static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) { } entry->ring = ring; - /* create the threads except the base fd */ + /* create the threads */ for (int i = 0; i < entry->threads; i++) { struct mt_tx_socket_thread* t = &entry->threads_data[i]; @@ -165,33 +287,27 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, return NULL; } - int fd = socket(AF_INET, SOCK_DGRAM, 0); - if (fd < 0) { - err("%s(%d), socket open fail %d\n", __func__, port, fd); - return NULL; - } - struct mt_tx_socket_entry* entry = mt_rte_zmalloc_socket(sizeof(*entry), mt_socket_id(impl, port)); if (!entry) { err("%s(%d), entry malloc fail\n", __func__, port); - close(fd); return NULL; } entry->parent = impl; entry->port = port; - /* 4g bit per second */ - entry->rate_limit_per_thread = (uint64_t)4 * 1000 * 1000 * 1000; + /* 5g bit per second */ + entry->rate_limit_per_thread = (uint64_t)6 * 1000 * 1000 * 1000; + entry->gso_sz = flow->gso_sz; rte_memcpy(&entry->flow, flow, sizeof(entry->flow)); for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { struct mt_tx_socket_thread* t = &entry->threads_data[i]; + t->idx = i; t->fd = -1; t->parent = entry; } - entry->threads_data[0].fd = fd; - ret = tx_socket_init_fd(entry, fd); + ret = tx_socket_init_thread_data(&entry->threads_data[0]); if (ret < 0) { mt_tx_socket_put(entry); return NULL; @@ -203,22 +319,22 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl, if (entry->threads > 1) { ret = tx_socket_init_threads(entry); if (ret < 0) { - err("%s(%d,%d), init %d threads fail %d\n", __func__, port, fd, entry->threads, - ret); + err("%s(%d), init %d threads fail %d\n", __func__, port, entry->threads, ret); mt_tx_socket_put(entry); return NULL; } } uint8_t* ip = flow->dip_addr; - info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u\n", __func__, port, + info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u gso_sz %u\n", __func__, port, entry->threads_data[0].fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port, - entry->threads); + entry->threads, entry->gso_sz); return entry; } int mt_tx_socket_put(struct mt_tx_socket_entry* entry) { int idx = entry->threads_data[0].fd; + enum mtl_port port = entry->port; /* stop threads */ for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { @@ -244,10 +360,12 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) { if (t->fd >= 0) { close(t->fd); t->fd = -1; + info("%s(%d,%d), tx pkt %d gso %d on thread %d\n", __func__, port, idx, + t->stat_tx_pkt, t->stat_tx_gso, i); } } - info("%s(%d,%d), succ\n", __func__, entry->port, idx); + info("%s(%d,%d), succ\n", __func__, port, idx); mt_rte_free(entry); return 0; } @@ -264,10 +382,14 @@ uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf** return n; } - for (tx = 0; tx < nb_pkts; tx++) { - struct rte_mbuf* m = tx_pkts[tx]; - ret = tx_socket_send_mbuf(&entry->threads_data[0], m); - if (ret < 0) break; + if (entry->gso_sz) { + tx = tx_socket_send_mbuf_gso(&entry->threads_data[0], tx_pkts, nb_pkts); + } else { + for (tx = 0; tx < nb_pkts; tx++) { + struct rte_mbuf* m = tx_pkts[tx]; + ret = tx_socket_send_mbuf(&entry->threads_data[0], m); + if (ret < 0) break; + } } rte_pktmbuf_free_bulk(tx_pkts, tx); @@ -475,7 +597,7 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, entry->port = port; entry->pool_element_sz = 2048; rte_memcpy(&entry->flow, flow, sizeof(entry->flow)); - /* 4g bit per second */ + /* 5g bit per second */ entry->rate_limit_per_thread = (uint64_t)5 * 1000 * 1000 * 1000; for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) { @@ -526,6 +648,7 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl, int mt_rx_socket_put(struct mt_rx_socket_entry* entry) { int idx = entry->fd; + enum mtl_port port = entry->port; struct mt_rx_socket_thread* t; /* stop threads */ @@ -541,6 +664,7 @@ int mt_rx_socket_put(struct mt_rx_socket_entry* entry) { rte_pktmbuf_free(t->mbuf); t->mbuf = NULL; } + info("%s(%d,%d), rx pkt %d on thread %d\n", __func__, port, idx, t->stat_rx_pkt, i); } if (entry->ring) { diff --git a/lib/src/mt_main.h b/lib/src/mt_main.h index 1b5d639d9..442a8127c 100644 --- a/lib/src/mt_main.h +++ b/lib/src/mt_main.h @@ -881,6 +881,8 @@ struct mt_txq_flow { uint16_t dst_port; /* udp destination port */ /* value with MT_TXQ_FLOW_F_* */ uint32_t flags; + /* only for kernel socket */ + uint16_t gso_sz; }; struct mt_tsq_impl; /* forward delcare */ @@ -948,11 +950,19 @@ struct mt_srss_impl { struct mt_tx_socket_thread { struct mt_tx_socket_entry* parent; + int idx; int fd; pthread_t tid; rte_atomic32_t stop_thread; +#ifndef WINDOWSENV + struct sockaddr_in send_addr; + struct msghdr msg; + char msg_control[CMSG_SPACE(sizeof(uint16_t))]; +#endif + int stat_tx_pkt; + int stat_tx_gso; }; struct mt_tx_socket_entry { @@ -961,6 +971,7 @@ struct mt_tx_socket_entry { struct mt_txq_flow flow; uint64_t rate_limit_per_thread; + uint16_t gso_sz; int threads; struct rte_ring* ring; struct mt_tx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX]; diff --git a/lib/src/st2110/st_tx_video_session.c b/lib/src/st2110/st_tx_video_session.c index 03e74a724..e2f582208 100644 --- a/lib/src/st2110/st_tx_video_session.c +++ b/lib/src/st2110/st_tx_video_session.c @@ -2414,6 +2414,7 @@ static int tv_init_hw(struct mtl_main_impl* impl, struct st_tx_video_sessions_mg flow.dst_port = s->ops.udp_port[i]; if (ST21_TX_PACING_WAY_TSN == s->pacing_way[i]) flow.flags |= MT_TXQ_FLOW_F_LAUNCH_TIME; + flow.gso_sz = s->st20_pkt_size - sizeof(struct mt_udp_hdr); s->queue[i] = mt_txq_get(impl, port, &flow); if (!s->queue[i]) { tv_uinit_hw(s); From ce2d674e417312506d4e42f8b806f3fed8861464 Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 25 Oct 2023 15:36:32 +0800 Subject: [PATCH 2/2] fix centos build Signed-off-by: Frank Du --- lib/src/datapath/mt_dp_socket.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/src/datapath/mt_dp_socket.c b/lib/src/datapath/mt_dp_socket.c index 1c220975e..4eaed78b7 100644 --- a/lib/src/datapath/mt_dp_socket.c +++ b/lib/src/datapath/mt_dp_socket.c @@ -14,6 +14,11 @@ #define MT_RX_DP_SOCKET_PREFIX "SR_" #define MT_TX_DP_SOCKET_PREFIX "SR_" +#ifndef UDP_SEGMENT +/* fix for centos build */ +#define UDP_SEGMENT 103 /* Set GSO segmentation size */ +#endif + #ifndef WINDOWSENV static inline int tx_socket_verify_mbuf(struct rte_mbuf* m) {