Skip to content

Commit

Permalink
rss: dispatch the entry to different list as the udp port number
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx committed Nov 6, 2023
1 parent c5d992c commit 4208546
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 98 deletions.
23 changes: 3 additions & 20 deletions lib/src/datapath/mt_shared_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,6 @@ static int rsq_rx(struct mt_rsq_queue* rsq_queue) {
struct mt_rsq_entry* last_rsq_entry = NULL;
uint16_t matched_pkts_nb = 0;
struct mt_udp_hdr* hdr;
struct rte_ipv4_hdr* ipv4;
struct rte_udp_hdr* udp;

if (rsq_queue->xdp)
rx = mt_rx_xdp_burst(rsq_queue->xdp, pkts, MT_SQ_BURST_SIZE);
Expand All @@ -289,27 +287,12 @@ static int rsq_rx(struct mt_rsq_queue* rsq_queue) {
rsq_entry = NULL;

hdr = rte_pktmbuf_mtod(pkts[i], struct mt_udp_hdr*);
ipv4 = &hdr->ipv4;
udp = &hdr->udp;
dbg("%s(%u), pkt %u ip %u.%u.%u.%u, port dst %u src %u\n", __func__, q, i,
ntohs(udp->dst_port), ntohs(udp->src_port));
ntohs(hdr->udp.dst_port), ntohs(hdr->udp.src_port));

MT_TAILQ_FOREACH(rsq_entry, &rsq_queue->head, next) {
bool ip_matched;
if (rsq_entry->flow.flags & MT_RXQ_FLOW_F_NO_IP) {
ip_matched = true;
} else {
ip_matched = mt_is_multicast_ip(rsq_entry->flow.dip_addr)
? (ipv4->dst_addr == *(uint32_t*)rsq_entry->flow.dip_addr)
: (ipv4->src_addr == *(uint32_t*)rsq_entry->flow.dip_addr);
}
bool port_matched;
if (rsq_entry->flow.flags & MT_RXQ_FLOW_F_NO_PORT) {
port_matched = true;
} else {
port_matched = ntohs(udp->dst_port) == rsq_entry->flow.dst_port;
}
if (ip_matched && port_matched) { /* match dst ip:port */
bool matched = mt_udp_matched(&rsq_entry->flow, hdr);
if (matched) {
if (rsq_entry != last_rsq_entry) UPDATE_ENTRY();
matched_pkts[matched_pkts_nb++] = pkts[i];
break;
Expand Down
164 changes: 108 additions & 56 deletions lib/src/datapath/mt_shared_rss.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
#define MT_SRSS_BURST_SIZE (128)
#define MT_SRSS_RING_PREFIX "SR_"

static inline void srss_lock(struct mt_srss_impl* srss) {
rte_spinlock_lock(&srss->mutex);
static inline struct mt_srss_list* srss_list_by_udp_port(struct mt_srss_impl* srss,
uint16_t port) {
int l_idx = port % srss->lists_sz;
return &srss->lists[l_idx];
}

static inline void srss_list_lock(struct mt_srss_list* list) {
rte_spinlock_lock(&list->mutex);
}

/* return true if try lock succ */
static inline bool srss_try_lock(struct mt_srss_impl* srss) {
int ret = rte_spinlock_trylock(&srss->mutex);
static inline bool srss_list_try_lock(struct mt_srss_list* list) {
int ret = rte_spinlock_trylock(&list->mutex);
return ret ? true : false;
}

static inline void srss_unlock(struct mt_srss_impl* srss) {
rte_spinlock_unlock(&srss->mutex);
static inline void srss_list_unlock(struct mt_srss_list* list) {
rte_spinlock_unlock(&list->mutex);
}

static inline void srss_entry_pkts_enqueue(struct mt_srss_entry* entry,
Expand Down Expand Up @@ -63,7 +69,6 @@ static int srss_tasklet_handler(void* priv) {
struct mt_srss_entry *srss_entry, *last_srss_entry;
struct mt_udp_hdr* hdr;
struct rte_ipv4_hdr* ipv4;
struct rte_udp_hdr* udp;

for (uint16_t queue = 0; queue < inf->nb_rx_q; queue++) {
uint16_t matched_pkts_nb = 0;
Expand All @@ -77,7 +82,6 @@ static int srss_tasklet_handler(void* priv) {
}
if (!rx) continue;

srss_lock(srss);
last_srss_entry = NULL;
for (uint16_t i = 0; i < rx; i++) {
srss_entry = NULL;
Expand All @@ -94,36 +98,27 @@ static int srss_tasklet_handler(void* priv) {
CNI_ENQUEUE();
continue;
}
udp = &hdr->udp;
MT_TAILQ_FOREACH(srss_entry, &srss->head, next) {
bool ip_matched;
if (srss_entry->flow.flags & MT_RXQ_FLOW_F_NO_IP) {
ip_matched = true;
} else {
ip_matched = mt_is_multicast_ip(srss_entry->flow.dip_addr)
? (ipv4->dst_addr == *(uint32_t*)srss_entry->flow.dip_addr)
: (ipv4->src_addr == *(uint32_t*)srss_entry->flow.dip_addr);
}
bool port_matched;
if (srss_entry->flow.flags & MT_RXQ_FLOW_F_NO_PORT) {
port_matched = true;
} else {
port_matched = ntohs(udp->dst_port) == srss_entry->flow.dst_port;
}
if (ip_matched && port_matched) { /* match dst ip:port */
/* check if match any entry */
struct mt_srss_list* list = srss_list_by_udp_port(srss, ntohs(hdr->udp.dst_port));
srss_list_lock(list);
struct mt_srss_entrys_list* head = &list->entrys_list;
MT_TAILQ_FOREACH(srss_entry, head, next) {
bool matched = mt_udp_matched(&srss_entry->flow, hdr);
if (matched) {
if (srss_entry != last_srss_entry) UPDATE_ENTRY();
matched_pkts[matched_pkts_nb++] = pkts[i];
break;
}
}
srss_list_unlock(list);

if (!srss_entry) { /* no match, redirect to cni */
UPDATE_ENTRY();
CNI_ENQUEUE();
}
}
if (matched_pkts_nb)
srss_entry_pkts_enqueue(last_srss_entry, &matched_pkts[0], matched_pkts_nb);
srss_unlock(srss);
}

return 0;
Expand Down Expand Up @@ -193,23 +188,27 @@ static int srss_stat(void* priv) {
struct mt_srss_entry* entry;
int idx;

if (!srss_try_lock(srss)) {
notice("%s(%d), get lock fail\n", __func__, port);
return 0;
}
MT_TAILQ_FOREACH(entry, &srss->head, next) {
idx = entry->idx;
notice("%s(%d,%d), enqueue %u dequeue %u\n", __func__, port, idx,
entry->stat_enqueue_cnt, entry->stat_dequeue_cnt);
entry->stat_enqueue_cnt = 0;
entry->stat_dequeue_cnt = 0;
if (entry->stat_enqueue_fail_cnt) {
warn("%s(%d,%d), enqueue fail %u\n", __func__, port, idx,
entry->stat_enqueue_fail_cnt);
entry->stat_enqueue_fail_cnt = 0;
for (int l_idx = 0; l_idx < srss->lists_sz; l_idx++) {
struct mt_srss_list* list = &srss->lists[l_idx];
if (!srss_list_try_lock(list)) {
continue;
}

struct mt_srss_entrys_list* head = &list->entrys_list;
MT_TAILQ_FOREACH(entry, head, next) {
idx = entry->idx;
notice("%s(%d,%d,%d), enqueue %u dequeue %u\n", __func__, port, l_idx, idx,
entry->stat_enqueue_cnt, entry->stat_dequeue_cnt);
entry->stat_enqueue_cnt = 0;
entry->stat_dequeue_cnt = 0;
if (entry->stat_enqueue_fail_cnt) {
warn("%s(%d,%d,%d), enqueue fail %u\n", __func__, port, l_idx, idx,
entry->stat_enqueue_fail_cnt);
entry->stat_enqueue_fail_cnt = 0;
}
}
srss_list_unlock(list);
}
srss_unlock(srss);

return 0;
}
Expand Down Expand Up @@ -273,21 +272,30 @@ struct mt_srss_entry* mt_srss_get(struct mtl_main_impl* impl, enum mtl_port port
struct mt_srss_impl* srss = impl->srss[port];
int idx = srss->entry_idx;
struct mt_srss_entry* entry;
struct mt_srss_list* list;
struct mt_srss_entrys_list* head;

if (!mt_has_srss(impl, port)) {
err("%s(%d,%d), shared rss not enabled\n", __func__, port, idx);
return NULL;
}

MT_TAILQ_FOREACH(entry, &srss->head, next) {
list = srss_list_by_udp_port(srss, flow->dst_port);
head = &list->entrys_list;

srss_list_lock(list);
MT_TAILQ_FOREACH(entry, head, next) {
/* todo: check with flow flags */
if (entry->flow.dst_port == flow->dst_port &&
*(uint32_t*)entry->flow.dip_addr == *(uint32_t*)flow->dip_addr) {
err("%s(%d,%d), already has entry %u.%u.%u.%u:%u\n", __func__, port, idx,
flow->dip_addr[0], flow->dip_addr[1], flow->dip_addr[2], flow->dip_addr[3],
flow->dst_port);
srss_list_unlock(list);
return NULL;
}
}
srss_list_unlock(list);

entry = mt_rte_zmalloc_socket(sizeof(*entry), mt_socket_id(impl, port));
if (!entry) {
Expand All @@ -310,24 +318,43 @@ struct mt_srss_entry* mt_srss_get(struct mtl_main_impl* impl, enum mtl_port port
entry->srss = srss;
entry->idx = idx;

srss_lock(srss);
MT_TAILQ_INSERT_TAIL(&srss->head, entry, next);
srss_list_lock(list);
MT_TAILQ_INSERT_TAIL(head, entry, next);
if (flow->flags & MT_RXQ_FLOW_F_SYS_QUEUE) srss->cni_entry = entry;
srss->entry_idx++;
srss_unlock(srss);
srss_list_unlock(list);

info("%s(%d), entry %u.%u.%u.%u:(dst)%u on %d\n", __func__, port, flow->dip_addr[0],
flow->dip_addr[1], flow->dip_addr[2], flow->dip_addr[3], flow->dst_port, idx);
info("%s(%d), entry %u.%u.%u.%u:(dst)%u on %d of list %d\n", __func__, port,
flow->dip_addr[0], flow->dip_addr[1], flow->dip_addr[2], flow->dip_addr[3],
flow->dst_port, idx, list->idx);
return entry;
}

int mt_srss_put(struct mt_srss_entry* entry) {
struct mt_srss_impl* srss = entry->srss;
enum mtl_port port = srss->port;
struct mt_srss_list* list = srss_list_by_udp_port(srss, entry->flow.dst_port);
struct mt_srss_entrys_list* head = &list->entrys_list;

/* check if it's a known entry in the list */
struct mt_srss_entry* temp_entry;
bool found = false;
srss_list_lock(list);
MT_TAILQ_FOREACH(temp_entry, head, next) {
if (entry == temp_entry) {
found = true;
break;
}
}
srss_list_unlock(list);
if (!found) {
info("%s(%d), unknow entry %p on %d\n", __func__, port, entry, entry->idx);
return -EIO;
}

srss_lock(srss);
MT_TAILQ_REMOVE(&srss->head, entry, next);
srss_unlock(srss);
srss_list_lock(list);
MT_TAILQ_REMOVE(head, entry, next);
srss_list_unlock(list);

if (entry->ring) {
mt_ring_dequeue_clean(entry->ring);
Expand Down Expand Up @@ -367,7 +394,21 @@ int mt_srss_init(struct mtl_main_impl* impl) {
srss->parent = impl;
srss->queue_mode =
mt_pmd_is_native_af_xdp(impl, i) ? MT_QUEUE_MODE_XDP : MT_QUEUE_MODE_DPDK;
MT_TAILQ_INIT(&srss->head);
srss->lists_sz = 128;
srss->lists = mt_rte_zmalloc_socket(sizeof(*srss->lists) * srss->lists_sz,
mt_socket_id(impl, i));
if (!srss->lists) {
err("%s(%d), lists malloc fail\n", __func__, i);
mt_srss_uinit(impl);
return -ENOMEM;
}
for (int l_idx = 0; l_idx < srss->lists_sz; l_idx++) {
struct mt_srss_list* list = &srss->lists[l_idx];

list->idx = l_idx;
MT_TAILQ_INIT(&list->entrys_list);
rte_spinlock_init(&list->mutex);
}

if (srss->queue_mode == MT_QUEUE_MODE_XDP) {
ret = srss_init_xdp(srss);
Expand Down Expand Up @@ -425,11 +466,22 @@ int mt_srss_uinit(struct mtl_main_impl* impl) {
mt_sch_put(srss->sch, 0);
srss->sch = NULL;
}
struct mt_srss_entry* entry;
while ((entry = MT_TAILQ_FIRST(&srss->head))) {
warn("%s, still has entry %p\n", __func__, entry);
MT_TAILQ_REMOVE(&srss->head, entry, next);
mt_rte_free(entry);
if (srss->lists) {
for (int l_idx = 0; l_idx < srss->lists_sz; l_idx++) {
struct mt_srss_list* list = &srss->lists[l_idx];

struct mt_srss_entrys_list* head = &list->entrys_list;
struct mt_srss_entry* entry;
while ((entry = MT_TAILQ_FIRST(head))) {
warn("%s(%d), still has entry %p on list_heads %d\n", __func__, i, entry,
l_idx);
MT_TAILQ_REMOVE(head, entry, next);
mt_rte_free(entry);
}
}

mt_rte_free(srss->lists);
srss->lists = NULL;
}

srss_uinit_xdp(srss);
Expand Down
21 changes: 2 additions & 19 deletions lib/src/mt_cni.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,32 +163,15 @@ static int cni_burst_from_kernel(struct mt_cni_entry* cni) {

static int cni_udp_handle(struct mt_cni_entry* cni, struct rte_mbuf* m) {
struct mt_udp_hdr* hdr;
struct rte_ipv4_hdr* ipv4;
struct rte_udp_hdr* udp;
struct mt_csq_entry* csq = NULL;
int ret;

hdr = rte_pktmbuf_mtod(m, struct mt_udp_hdr*);
ipv4 = &hdr->ipv4;
udp = &hdr->udp;

csq_lock(cni);
MT_TAILQ_FOREACH(csq, &cni->csq_queues, next) {
bool ip_matched;
if (csq->flow.flags & MT_RXQ_FLOW_F_NO_IP) {
ip_matched = true;
} else {
ip_matched = mt_is_multicast_ip(csq->flow.dip_addr)
? (ipv4->dst_addr == *(uint32_t*)csq->flow.dip_addr)
: (ipv4->src_addr == *(uint32_t*)csq->flow.dip_addr);
}
bool port_matched;
if (csq->flow.flags & MT_RXQ_FLOW_F_NO_PORT) {
port_matched = true;
} else {
port_matched = ntohs(udp->dst_port) == csq->flow.dst_port;
}
if (ip_matched && port_matched) { /* match dst ip:port */
bool matched = mt_udp_matched(&csq->flow, hdr);
if (matched) {
ret = rte_ring_sp_enqueue(csq->ring, m);
if (ret < 0) {
csq->stat_enqueue_fail_cnt++;
Expand Down
13 changes: 11 additions & 2 deletions lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -946,13 +946,22 @@ struct mt_srss_entry {
};
MT_TAILQ_HEAD(mt_srss_entrys_list, mt_srss_entry);

struct mt_srss_list {
struct mt_srss_entrys_list entrys_list;
rte_spinlock_t mutex; /* protect entrys_list */
int idx;
};

struct mt_srss_impl {
struct mtl_main_impl* parent;
rte_spinlock_t mutex; /* protect struct mt_srss_entrys_list head */

enum mtl_port port;
enum mt_queue_mode queue_mode;

struct mt_srss_entrys_list head;
/* map entry to different heads as the UDP port number */
struct mt_srss_list* lists;
int lists_sz;

pthread_t tid;
rte_atomic32_t stop_thread;
struct mt_sch_tasklet_impl* tasklet;
Expand Down
Loading

0 comments on commit 4208546

Please sign in to comment.