Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dp/socket: add gso for tx #547

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 174 additions & 45 deletions lib/src/datapath/mt_dp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,46 @@
#define MT_RX_DP_SOCKET_PREFIX "SR_"
#define MT_TX_DP_SOCKET_PREFIX "SR_"

#ifndef UDP_SEGMENT
/* fix for centos build */
#define UDP_SEGMENT 103 /* Set GSO segmentation size */
#endif

#ifndef WINDOWSENV

static inline int tx_socket_verify_mbuf(struct rte_mbuf* m) {
if (m->nb_segs > 1) {
err("%s, only support one nb_segs %u\n", __func__, m->nb_segs);
return -ENOTSUP;
}

struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*);
struct rte_ether_hdr* eth = &hdr->eth;
uint16_t ether_type = ntohs(eth->ether_type);

if (ether_type != RTE_ETHER_TYPE_IPV4) {
err("%s, not ipv4, ether_type 0x%x\n", __func__, ether_type);
return -ENOTSUP;
}

return 0;
}

static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m) {
struct mt_tx_socket_entry* entry = t->parent;
enum mtl_port port = entry->port;
int fd = t->fd;
int fd = t->fd, ret;
struct sockaddr_in send_addr;
struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw;

/* check if suppoted */
if (m->nb_segs > 1) {
err("%s(%d,%d), only support one nb_segs %u\n", __func__, port, fd, m->nb_segs);
return -ENOTSUP;
ret = tx_socket_verify_mbuf(m);
if (ret < 0) {
err("%s(%d,%d), unsupported mbuf %p ret %d\n", __func__, port, fd, m, ret);
return ret;
}
struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*);
struct rte_ether_hdr* eth = &hdr->eth;

if (eth->ether_type != htons(RTE_ETHER_TYPE_IPV4)) {
err("%s(%d,%d), not ipv4\n", __func__, port, fd);
return -ENOTSUP;
}
struct mt_udp_hdr* hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*);
// mt_mbuf_dump(port, 0, "socket_tx", m);

void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr));
Expand Down Expand Up @@ -63,6 +82,94 @@ static int tx_socket_send_mbuf(struct mt_tx_socket_thread* t, struct rte_mbuf* m
return 0;
}

static uint16_t tx_socket_send_mbuf_gso(struct mt_tx_socket_thread* t,
struct rte_mbuf** tx_pkts, uint16_t nb_pkts) {
uint16_t tx = 0;
struct mt_tx_socket_entry* entry = t->parent;
enum mtl_port port = entry->port;
int fd = t->fd, ret;
uint16_t gso_sz = entry->gso_sz;
struct iovec iovs[nb_pkts];
int gso_cnt = 0;
struct msghdr* msg = &t->msg;
ssize_t write;
struct mtl_port_status* stats = mt_if(entry->parent, port)->dev_stats_sw;

msg->msg_iov = iovs;

for (uint16_t i = 0; i < nb_pkts; i++) {
struct rte_mbuf* m = tx_pkts[i];
ret = tx_socket_verify_mbuf(m);
if (ret < 0) {
err("%s(%d,%d), unsupported mbuf %p ret %d\n", __func__, port, fd, m, ret);
return tx;
}

uint16_t payload_len = m->data_len - sizeof(struct mt_udp_hdr);
void* payload = rte_pktmbuf_mtod_offset(m, void*, sizeof(struct mt_udp_hdr));
dbg("%s(%d,%d), mbuf %u payload_len %u\n", __func__, port, fd, i, payload_len);

if (payload_len == gso_sz) {
iovs[gso_cnt].iov_base = payload;
iovs[gso_cnt].iov_len = payload_len;
gso_cnt++;
} else {
if (gso_cnt) {
msg->msg_iovlen = gso_cnt;
write = sendmsg(fd, msg, MSG_DONTWAIT);
if (write != (gso_sz * gso_cnt)) {
dbg("%s(%d,%d), sendmsg 1 fail, len %u send %" PRId64 "\n", __func__, port, fd,
gso_sz * gso_cnt, write);
return tx;
}
tx += gso_cnt;
if (stats) {
stats->tx_packets += gso_cnt;
stats->tx_bytes += write;
}
t->stat_tx_pkt += gso_cnt;
t->stat_tx_gso++;

gso_cnt = 0;
}
write = sendto(fd, payload, payload_len, MSG_DONTWAIT, &t->send_addr,
sizeof(t->send_addr));
if (write != payload_len) {
dbg("%s(%d,%d), sendto fail, len %u send %" PRId64 "\n", __func__, port, fd,
payload_len, write);
return tx;
}
tx++;
if (stats) {
stats->tx_packets++;
stats->tx_bytes += write;
}
t->stat_tx_pkt++;
}
}

if (gso_cnt) {
msg->msg_iovlen = gso_cnt;
write = sendmsg(fd, msg, MSG_DONTWAIT);
if (write != (gso_sz * gso_cnt)) {
dbg("%s(%d,%d), sendmsg fail, len %u send %" PRId64 "\n", __func__, port, fd,
gso_sz * gso_cnt, write);
return tx;
}
dbg("%s(%d,%d), sendmsg succ, len %u send %" PRId64 "\n", __func__, port, fd,
gso_sz * gso_cnt, write);
tx += gso_cnt;
if (stats) {
stats->tx_packets += gso_cnt;
stats->tx_bytes += write;
}
t->stat_tx_pkt += gso_cnt;
t->stat_tx_gso++;
}

return tx;
}

static void* tx_socket_thread_loop(void* arg) {
struct mt_tx_socket_thread* t = arg;
struct mt_tx_socket_entry* entry = t->parent;
Expand All @@ -84,44 +191,64 @@ static void* tx_socket_thread_loop(void* arg) {
return NULL;
}

static int tx_socket_init_fd(struct mt_tx_socket_entry* entry, int fd) {
static int tx_socket_init_thread_data(struct mt_tx_socket_thread* t) {
int ret;
struct mt_tx_socket_entry* entry = t->parent;
enum mtl_port port = entry->port;
int idx = t->idx;
int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) {
err("%s(%d,%d), socket open fail %d\n", __func__, port, idx, fd);
return fd;
}
t->fd = fd;
info("%s(%d), fd %d for thread %d\n", __func__, idx, fd, idx);

/* non-blocking */
ret = mt_fd_set_nonbolck(fd);
if (ret < 0) {
err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, fd, ret);
err("%s(%d,%d), set nonbolck fail %d\n", __func__, port, idx, ret);
return ret;
}

/* bind to device */
const char* if_name = mt_kernel_if_name(entry->parent, port);
ret = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, if_name, strlen(if_name));
if (ret < 0) {
err("%s(%d,%d), SO_BINDTODEVICE to %s fail %d\n", __func__, port, fd, if_name, ret);
err("%s(%d,%d), SO_BINDTODEVICE to %s fail %d\n", __func__, port, idx, if_name, ret);
return ret;
}

if (entry->gso_sz) {
mudp_init_sockaddr(&t->send_addr, entry->flow.dip_addr, entry->flow.dst_port);
t->msg.msg_namelen = sizeof(t->send_addr);
t->msg.msg_name = &t->send_addr;

/* gso size for sendmsg */
t->msg.msg_control = t->msg_control;
t->msg.msg_controllen = sizeof(t->msg_control);
struct cmsghdr* cmsg;
cmsg = CMSG_FIRSTHDR(&t->msg);
cmsg->cmsg_level = SOL_UDP;
cmsg->cmsg_type = UDP_SEGMENT;
cmsg->cmsg_len = CMSG_LEN(sizeof(uint16_t));
uint16_t* val_p;
val_p = (uint16_t*)CMSG_DATA(cmsg);
*val_p = entry->gso_sz;
}

return 0;
}

static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) {
int idx = entry->threads_data[0].fd;
int fd, ret;
int ret;

/* fds[0] already init */
for (int i = 1; i < entry->threads; i++) {
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) {
err("%s(%d), socket open fail %d\n", __func__, idx, fd);
return fd;
}
entry->threads_data[i].fd = fd;
/* non-blocking */
ret = tx_socket_init_fd(entry, fd);
struct mt_tx_socket_thread* t = &entry->threads_data[i];
ret = tx_socket_init_thread_data(t);
if (ret < 0) return ret;
info("%s(%d), fd %d for thread %d\n", __func__, idx, fd, i);
}

/* create the ring, multi producer single consumer */
Expand All @@ -140,7 +267,7 @@ static int tx_socket_init_threads(struct mt_tx_socket_entry* entry) {
}
entry->ring = ring;

/* create the threads except the base fd */
/* create the threads */
for (int i = 0; i < entry->threads; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];

Expand All @@ -165,33 +292,27 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
return NULL;
}

int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) {
err("%s(%d), socket open fail %d\n", __func__, port, fd);
return NULL;
}

struct mt_tx_socket_entry* entry =
mt_rte_zmalloc_socket(sizeof(*entry), mt_socket_id(impl, port));
if (!entry) {
err("%s(%d), entry malloc fail\n", __func__, port);
close(fd);
return NULL;
}
entry->parent = impl;
entry->port = port;
/* 4g bit per second */
entry->rate_limit_per_thread = (uint64_t)4 * 1000 * 1000 * 1000;
/* 5g bit per second */
entry->rate_limit_per_thread = (uint64_t)6 * 1000 * 1000 * 1000;
entry->gso_sz = flow->gso_sz;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
struct mt_tx_socket_thread* t = &entry->threads_data[i];
t->idx = i;
t->fd = -1;
t->parent = entry;
}
entry->threads_data[0].fd = fd;

ret = tx_socket_init_fd(entry, fd);
ret = tx_socket_init_thread_data(&entry->threads_data[0]);
if (ret < 0) {
mt_tx_socket_put(entry);
return NULL;
Expand All @@ -203,22 +324,22 @@ struct mt_tx_socket_entry* mt_tx_socket_get(struct mtl_main_impl* impl,
if (entry->threads > 1) {
ret = tx_socket_init_threads(entry);
if (ret < 0) {
err("%s(%d,%d), init %d threads fail %d\n", __func__, port, fd, entry->threads,
ret);
err("%s(%d), init %d threads fail %d\n", __func__, port, entry->threads, ret);
mt_tx_socket_put(entry);
return NULL;
}
}

uint8_t* ip = flow->dip_addr;
info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u\n", __func__, port,
info("%s(%d), fd %d ip %u.%u.%u.%u, port %u, threads %u gso_sz %u\n", __func__, port,
entry->threads_data[0].fd, ip[0], ip[1], ip[2], ip[3], flow->dst_port,
entry->threads);
entry->threads, entry->gso_sz);
return entry;
}

int mt_tx_socket_put(struct mt_tx_socket_entry* entry) {
int idx = entry->threads_data[0].fd;
enum mtl_port port = entry->port;

/* stop threads */
for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
Expand All @@ -244,10 +365,12 @@ int mt_tx_socket_put(struct mt_tx_socket_entry* entry) {
if (t->fd >= 0) {
close(t->fd);
t->fd = -1;
info("%s(%d,%d), tx pkt %d gso %d on thread %d\n", __func__, port, idx,
t->stat_tx_pkt, t->stat_tx_gso, i);
}
}

info("%s(%d,%d), succ\n", __func__, entry->port, idx);
info("%s(%d,%d), succ\n", __func__, port, idx);
mt_rte_free(entry);
return 0;
}
Expand All @@ -264,10 +387,14 @@ uint16_t mt_tx_socket_burst(struct mt_tx_socket_entry* entry, struct rte_mbuf**
return n;
}

for (tx = 0; tx < nb_pkts; tx++) {
struct rte_mbuf* m = tx_pkts[tx];
ret = tx_socket_send_mbuf(&entry->threads_data[0], m);
if (ret < 0) break;
if (entry->gso_sz) {
tx = tx_socket_send_mbuf_gso(&entry->threads_data[0], tx_pkts, nb_pkts);
} else {
for (tx = 0; tx < nb_pkts; tx++) {
struct rte_mbuf* m = tx_pkts[tx];
ret = tx_socket_send_mbuf(&entry->threads_data[0], m);
if (ret < 0) break;
}
}

rte_pktmbuf_free_bulk(tx_pkts, tx);
Expand Down Expand Up @@ -475,7 +602,7 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,
entry->port = port;
entry->pool_element_sz = 2048;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));
/* 4g bit per second */
/* 5g bit per second */
entry->rate_limit_per_thread = (uint64_t)5 * 1000 * 1000 * 1000;

for (int i = 0; i < MT_DP_SOCKET_THREADS_MAX; i++) {
Expand Down Expand Up @@ -526,6 +653,7 @@ struct mt_rx_socket_entry* mt_rx_socket_get(struct mtl_main_impl* impl,

int mt_rx_socket_put(struct mt_rx_socket_entry* entry) {
int idx = entry->fd;
enum mtl_port port = entry->port;
struct mt_rx_socket_thread* t;

/* stop threads */
Expand All @@ -541,6 +669,7 @@ int mt_rx_socket_put(struct mt_rx_socket_entry* entry) {
rte_pktmbuf_free(t->mbuf);
t->mbuf = NULL;
}
info("%s(%d,%d), rx pkt %d on thread %d\n", __func__, port, idx, t->stat_rx_pkt, i);
}

if (entry->ring) {
Expand Down
11 changes: 11 additions & 0 deletions lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ struct mt_txq_flow {
uint16_t dst_port; /* udp destination port */
/* value with MT_TXQ_FLOW_F_* */
uint32_t flags;
/* only for kernel socket */
uint16_t gso_sz;
};

struct mt_tsq_impl; /* forward delcare */
Expand Down Expand Up @@ -948,11 +950,19 @@ struct mt_srss_impl {

struct mt_tx_socket_thread {
struct mt_tx_socket_entry* parent;
int idx;
int fd;
pthread_t tid;
rte_atomic32_t stop_thread;

#ifndef WINDOWSENV
struct sockaddr_in send_addr;
struct msghdr msg;
char msg_control[CMSG_SPACE(sizeof(uint16_t))];
#endif

int stat_tx_pkt;
int stat_tx_gso;
};

struct mt_tx_socket_entry {
Expand All @@ -961,6 +971,7 @@ struct mt_tx_socket_entry {
struct mt_txq_flow flow;

uint64_t rate_limit_per_thread;
uint16_t gso_sz;
int threads;
struct rte_ring* ring;
struct mt_tx_socket_thread threads_data[MT_DP_SOCKET_THREADS_MAX];
Expand Down
Loading