Skip to content

Commit

Permalink
dp/socket: add multi thread for tx (#532)
Browse files Browse the repository at this point in the history
one fd is only capable to handle ~4g bit per second traffic.

Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx authored Oct 11, 2023
1 parent 4f01897 commit 9e86dad
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 62 deletions.
244 changes: 186 additions & 58 deletions lib/src/datapath/mt_dp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,127 @@
#include "mudp_api.h"
#endif

#define MT_RX_DP_SOCKET_MEMPOOL_PREFIX "SR_"
#define MT_RX_DP_SOCKET_PREFIX "SR_"

#ifndef WINDOWSENV

static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m) {
struct mt_tx_socket_entry* entry = t->parent;
enum mtl_port port = entry->port;
int fd = t->fd;
struct sockaddr_in send_addr;
struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw;

/* check if suppoted */
if (m->nb_segs > 1) {
err("%s(%d,%d), only support one nb_segs %u\n", __func__, port, fd, m->nb_segs);
return -ENOTSUP;
}
struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*);
struct rte_ether_hdr* eth = &hdr->eth;

if (eth->ether_type != htons(RTE_ETHER_TYPE_IPV4)) {
err("%s(%d,%d), not ipv4\n", __func__, port, fd);
return -ENOTSUP;
}
// mt_mbuf_dump(port, 0, "socket_tx", m);

void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr));
ssize_t payload_len = m->data_len - sizeof(struct mt_udp_hdr);

struct rte_ipv4_hdr* ipv4 = &hdr->ipv4;
struct rte_udp_hdr* udp = &hdr->udp;
mudp_init_sockaddr(&send_addr, (uint8_t*)&ipv4->dst_addr, ntohs(udp->dst_port));

/* nonblocking */
ssize_t send = sendto(fd, payload, payload_len, MSG_DONTWAIT,
(const struct sockaddr*)&send_addr, sizeof(send_addr));
dbg("%s(%d,%d), len %" PRId64 " send %" PRId64 "\n", __func__, port, fd, payload_len,
send);
if (send != payload_len) {
dbg("%s(%d,%d), sendto fail, len %" PRId64 " send %" PRId64 "\n", __func__, port, fd,
payload_len, send);
return -EBUSY;
}
if (stats) {
stats->tx_packets++;
stats->tx_bytes += m->data_len;
}

return 0;
}

static void* tx_socket_thread(void* arg) {
struct mt_tx_socket_thread* t = arg;
struct mt_tx_socket_entry* entry = t->parent;
enum mtl_port port = entry->port;
struct rte_mbuf* m = NULL;
int ret;

info("%s(%d), start, fd %d\n", __func__, port, t->fd);
while (rte_atomic32_read(&t->stop_thread) == 0) {
ret = rte_ring_mc_dequeue(entry->ring, (void**)&m);
if (ret < 0) continue;
do {
ret = tx_socket_send_mbuf(t, m);
} while ((ret < 0) && (rte_atomic32_read(&t->stop_thread) == 0));
rte_pktmbuf_free(m);
}
info("%s(%d), stop, fd %d\n", __func__, port, t->fd);

return NULL;
}

static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) {
int idx = entry->threads_data[0].fd;
int fd, ret;

/* fds[0] already init */
for (int i = 1; i < entry->threads; i++) {
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) {
mt_tx_socket_put(entry);
err("%s(%d), socket open fail %d\n", __func__, idx, fd);
return fd;
}
entry->threads_data[i].fd = fd;
/* non-blocking */
ret = mt_fd_set_nonbolck(fd);
if (ret < 0) return ret;
info("%s(%d), fd %d for thread %d\n", __func__, idx, fd, i);
}

/* create the ring, one producer multi consumer */
char ring_name[64];
struct rte_ring* ring;
unsigned int flags, count;
snprintf(ring_name, sizeof(ring_name), "%sP%dFD%d", MT_RX_DP_SOCKET_PREFIX, entry->port,
idx);
flags = RING_F_SP_ENQ;
count = mt_if_nb_tx_desc(entry->parent, entry->port);
ring =
rte_ring_create(ring_name, count, mt_socket_id(entry->parent, entry->port), flags);
if (!ring) {
err("%s(%d), ring create fail\n", __func__, idx);
return -EIO;
}
entry->ring = ring;

/* create the threads except the base fd */
for (int i = 0; i < entry->threads; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];

rte_atomic32_set(&t->stop_thread, 0);
ret = pthread_create(&t->tid, NULL, tx_socket_thread, t);
if (ret < 0) {
err("%s(%d), thread create fail %d for thread %d\n", __func__, idx, ret, i);
return ret;
}
}

return 0;
}

struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
enum mtl_port port,
struct mt_txq_flow* flow) {
Expand All @@ -32,11 +149,9 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
}

/* non-blocking */
int flags = fcntl(fd, F_GETFL, 0);
flags |= O_NONBLOCK;
ret = fcntl(fd, F_SETFL, flags);
ret = mt_fd_set_nonbolck(fd);
if (ret < 0) {
err("%s(%d,%d), O_NONBLOCK fail %d\n", __func__, port, fd, ret);
err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, fd, ret);
close(fd);
return NULL;
}
Expand All @@ -57,75 +172,91 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
close(fd);
return NULL;
}
for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];
t->fd = -1;
t->parent = entry;
}
entry->parent = impl;
entry->port = port;
entry->fd = fd;
/* 4g bit per second */
entry->rate_limit_per_thread = (uint64_t)4 * 1000 * 1000 * 1000;
entry->threads_data[0].fd = fd;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

uint64_t required = flow->bytes_per_sec * 8;
entry->threads = required / entry->rate_limit_per_thread + 1;
entry->threads = RTE_MIN(entry->threads, MT_DP_SOCKET_THREADS_MAX);
if (entry->threads > 1) {
ret = tx_socket_init_threads(entry);
if (ret < 0) {
err("%s(%d,%d), init %d threads fail %d\n", __func__, port, fd, entry->threads,
ret);
mt_tx_socket_put(entry);
return NULL;
}
}

uint8_t* ip = flow->dip_addr;
info("%s(%d), fd %d ip %u.%u.%u.%u, port %u\n", __func__, port, fd, ip[0], ip[1], ip[2],
ip[3], flow->dst_port);
info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u\n", __func__, port,
entry->threads_data[0].fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port,
entry->threads);
return entry;
}

int mt_tx_socket_put(struct mt_tx_socket_entry* entry) {
if (entry->fd > 0) {
close(entry->fd);
int idx = entry->threads_data[0].fd;

/* stop threads */
for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];

rte_atomic32_set(&t->stop_thread, 1);
if (t->tid) {
pthread_join(t->tid, NULL);
t->tid = 0;
}
}
info("%s(%d,%d), succ\n", __func__, entry->port, entry->fd);

if (entry->ring) {
mt_ring_dequeue_clean(entry->ring);
rte_ring_free(entry->ring);
entry->ring = NULL;
}

/* close fd */
for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];

if (t->fd >= 0) {
close(t->fd);
t->fd = -1;
}
}

info("%s(%d,%d), succ\n", __func__, entry->port, idx);
mt_rte_free(entry);
return 0;
}

uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts) {
uint16_t tx = 0;
enum mtl_port port = entry->port;
int fd = entry->fd;
struct sockaddr_in send_addr;
struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw;
int ret;

if (entry->ring) {
unsigned int n =
rte_ring_sp_enqueue_bulk(entry->ring, (void**)&tx_pkts[0], nb_pkts, NULL);
// tx_socket_dequeue(&entry->threads_data[0]);
return n;
}

for (tx = 0; tx < nb_pkts; tx++) {
struct rte_mbuf* m = tx_pkts[tx];

/* check if suppoted */
if (m->nb_segs > 1) {
err("%s(%d,%d), only support one nb_segs %u\n", __func__, port, fd, m->nb_segs);
goto done;
}
struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*);
struct rte_ether_hdr* eth = &hdr->eth;

if (eth->ether_type != htons(RTE_ETHER_TYPE_IPV4)) {
err("%s(%d,%d), not ipv4\n", __func__, port, fd);
goto done;
}
// mt_mbuf_dump(port, 0, "socket_tx", m);

void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr));
ssize_t payload_len = m->data_len - sizeof(struct mt_udp_hdr);

struct rte_ipv4_hdr* ipv4 = &hdr->ipv4;
struct rte_udp_hdr* udp = &hdr->udp;
mudp_init_sockaddr(&send_addr, (uint8_t*)&ipv4->dst_addr, ntohs(udp->dst_port));

/* nonblocking */
ssize_t send = sendto(entry->fd, payload, payload_len, MSG_DONTWAIT,
(const struct sockaddr*)&send_addr, sizeof(send_addr));
dbg("%s(%d,%d), len %" PRId64 " send %" PRId64 "\n", __func__, port, fd, payload_len,
send);
if (send != payload_len) {
dbg("%s(%d,%d), sendto fail, len %" PRId64 " send %" PRId64 "\n", __func__, port,
fd, payload_len, send);
goto done;
}
if (stats) {
stats->tx_packets++;
stats->tx_bytes += m->data_len;
}
ret = tx_socket_send_mbuf(&entry->threads_data[0], m);
if (ret < 0) break;
}

done:
rte_pktmbuf_free_bulk(tx_pkts, tx);
return tx;
}
Expand Down Expand Up @@ -155,11 +286,9 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,
return NULL;
}
/* non-blocking */
int flags = fcntl(fd, F_GETFL, 0);
flags |= O_NONBLOCK;
ret = fcntl(fd, F_SETFL, flags);
ret = mt_fd_set_nonbolck(fd);
if (ret < 0) {
err("%s(%d,%d), O_NONBLOCK fail %d\n", __func__, port, fd, ret);
err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, fd, ret);
close(fd);
return NULL;
}
Expand Down Expand Up @@ -219,8 +348,7 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,
/* Create mempool to hold the rx queue mbufs. */
unsigned int mbuf_elements = mt_if_nb_rx_desc(impl, port) + 1024;
char pool_name[ST_MAX_NAME_LEN];
snprintf(pool_name, ST_MAX_NAME_LEN, "%sP%dF%d_MBUF", MT_RX_DP_SOCKET_MEMPOOL_PREFIX,
port, fd);
snprintf(pool_name, ST_MAX_NAME_LEN, "%sP%dF%d_MBUF", MT_RX_DP_SOCKET_PREFIX, port, fd);
/* no priv */
entry->pool =
mt_mempool_create_by_ops(impl, port, pool_name, mbuf_elements, MT_MBUF_CACHE_SIZE,
Expand Down
2 changes: 1 addition & 1 deletion lib/src/datapath/mt_dp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
enum mtl_port port, struct mt_txq_flow* flow);
int mt_tx_socket_put(struct mt_tx_socket_entry* entry);
static inline uint16_t mt_tx_socket_queue_id(struct mt_tx_socket_entry* entry) {
return entry->fd;
return entry->threads_data[0].fd;
}
uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf** tx_pkts,
uint16_t nb_pkts);
Expand Down
14 changes: 13 additions & 1 deletion lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -942,12 +942,24 @@ struct mt_srss_impl {
int entry_idx;
};

#define MT_DP_SOCKET_THREADS_MAX (4)

struct mt_tx_socket_thread {
struct mt_tx_socket_entry* parent;
int fd;
pthread_t tid;
rte_atomic32_t stop_thread;
};

struct mt_tx_socket_entry {
struct mtl_main_impl* parent;
enum mtl_port port;
struct mt_txq_flow flow;

int fd;
uint64_t rate_limit_per_thread;
int threads;
struct rte_ring* ring;
struct mt_tx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX];
};

struct mt_rx_socket_entry {
Expand Down
13 changes: 13 additions & 0 deletions lib/src/mt_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,19 @@ static inline void mt_mbuf_refcnt_inc_bulk(struct rte_mbuf** mbufs, uint16_t nb)
}
}

#ifdef WINDOWSENV
static inline int mt_fd_set_nonbolck(int fd) {
MTL_MAY_UNUSED(fd);
return -ENOTSUP;
}
#else
static inline int mt_fd_set_nonbolck(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
flags |= O_NONBLOCK;
return fcntl(fd, F_SETFL, flags);
}
#endif

const char* mt_dpdk_afxdp_port2if(const char* port);
const char* mt_dpdk_afpkt_port2if(const char* port);
const char* mt_kernel_port2if(const char* port);
Expand Down
4 changes: 2 additions & 2 deletions plugins/st22_ffmpeg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ ffmpeg -s 1920x1080 -pix_fmt yuv420p -i yuv420p8le.yuv -pix_fmt yuv422p test_pla
Tx run:

```bash
./build/app/TxSt22PipelineSample --st22_codec h264_cbr --st22_fmt YUV422PLANAR8 --tx_url test_planar8.yuv
./build/app/TxSt22PipelineSample --st22_codec h264_cbr --pipeline_fmt YUV422PLANAR8 --tx_url test_planar8.yuv
```

Rx run:

```bash
./build/app/RxSt22PipelineSample --st22_codec h264_cbr --st22_fmt YUV422PLANAR8 --rx_url out_planar8.yuv
./build/app/RxSt22PipelineSample --st22_codec h264_cbr --pipeline_fmt YUV422PLANAR8 --rx_url out_planar8.yuv
```

0 comments on commit 9e86dad

Please sign in to comment.