Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue: 2342345 Optimize wakeup mechanizm #942

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/vma/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ libvma_la_SOURCES := \
\
util/wakeup.cpp \
util/wakeup_pipe.cpp \
util/wakeup_eventfd.cpp \
util/match.cpp \
util/utils.cpp \
util/instrumentation.cpp \
Expand Down Expand Up @@ -303,6 +304,7 @@ libvma_la_SOURCES := \
util/vtypes.h \
util/wakeup.h \
util/wakeup_pipe.h \
util/wakeup_eventfd.h \
util/agent.h \
util/agent_def.h \
util/data_updater.h \
Expand Down
21 changes: 5 additions & 16 deletions src/vma/dev/ring_bond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,16 @@ void ring_bond::restart()
for (j = 0; j < m_rx_flows.size(); j++) {
sockinfo* si = static_cast<sockinfo*> (m_rx_flows[j].sink);
for (k = 0; k < num_ring_rx_fds; k++ ) {
epfd = si->get_rx_epfd();
if (epfd > 0) {
fd = ring_rx_fds_array[k];
rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
ring_logdbg("Remove fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno);
}
fd = ring_rx_fds_array[k];
si->delete_fd_from_poll_array_deferred(fd);

epfd = si->get_epoll_context_fd();
if (epfd > 0) {
fd = ring_rx_fds_array[k];
rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
ring_logdbg("Remove fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno);
}
}
si->do_wakeup();
}

p_ring_tap->m_active = true;
Expand Down Expand Up @@ -230,15 +227,7 @@ void ring_bond::restart()
sockinfo* si = static_cast<sockinfo*> (m_rx_flows[j].sink);
p_ring_bond_netvsc->m_vf_ring->attach_flow(m_rx_flows[j].flow, m_rx_flows[j].sink);
for (k = 0; k < num_ring_rx_fds; k++ ) {
epfd = si->get_rx_epfd();
if (epfd > 0) {
epoll_event ev = {0, {0}};
fd = ring_rx_fds_array[k];
ev.events = EPOLLIN;
ev.data.fd = fd;
rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
ring_logdbg("Add fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno);
}
si->add_fd_to_poll_array(ring_rx_fds_array[k]);
epfd = si->get_epoll_context_fd();
if (epfd > 0) {
#define CQ_FD_MARK 0xabcd /* see socket_fd_api */
Expand Down
2 changes: 1 addition & 1 deletion src/vma/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ event_handler_manager::event_handler_manager() :
m_b_continue_running = true;
m_event_handler_tid = 0;

wakeup_set_epoll_fd(m_epfd);
wakeup_set_fd(m_epfd);
going_to_sleep();

return;
Expand Down
2 changes: 1 addition & 1 deletion src/vma/iomux/epfd_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ epfd_info::epfd_info(int epfd, int size) :
// Register this socket to read nonoffloaded data
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

wakeup_set_epoll_fd(m_epfd);
wakeup_set_fd(m_epfd);
}

epfd_info::~epfd_info()
Expand Down
44 changes: 16 additions & 28 deletions src/vma/sock/sockinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include "sockinfo.h"

#include <sys/epoll.h>
#include <netdb.h>
#include <linux/sockios.h>

Expand Down Expand Up @@ -93,11 +92,12 @@ sockinfo::sockinfo(int fd):

{
m_ring_alloc_logic = ring_allocation_logic_rx(get_fd(), m_ring_alloc_log_rx, this);
m_rx_epfd = orig_os_api.epoll_create(128);
if (unlikely(m_rx_epfd == -1)) {
throw_vma_exception("create internal epoll");
}
wakeup_set_epoll_fd(m_rx_epfd);
m_poll_fds_array_capacity = DEFAULT_FDS_ARR_SIZE;
m_poll_fds_array = (pollfd *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(pollfd));
m_poll_fds_delete_array = (int *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(int));
m_poll_fds_array_size = 0;
m_poll_fds_delete_array_size = 0;
add_fd_to_poll_array(wakeup_get_fd());

m_p_socket_stats = &m_socket_stats; // Save stats as local copy and allow state publisher to copy from this location
vma_stats_instance_create_socket_block(m_p_socket_stats);
Expand Down Expand Up @@ -133,12 +133,15 @@ sockinfo::~sockinfo()

// Change to non-blocking socket so calling threads can exit
m_b_blocking = false;
orig_os_api.close(m_rx_epfd); // this will wake up any blocked thread in rx() call to orig_os_api.epoll_wait()
if (m_p_rings_fds) {
delete[] m_p_rings_fds;
m_p_rings_fds = NULL;
}
vma_stats_instance_remove_socket_block(m_p_socket_stats);

free(m_poll_fds_array);
free(m_poll_fds_delete_array);

vma_stats_instance_remove_socket_block(m_p_socket_stats);
}

void sockinfo::set_blocking(bool is_blocked)
Expand Down Expand Up @@ -1146,22 +1149,11 @@ void sockinfo::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring,

notify_epoll = true;

// Add this new CQ channel fd to the rx epfd handle (no need to wake up any sleeping thread about this new fd)
epoll_event ev = {0, {0}};
ev.events = EPOLLIN;
size_t num_ring_rx_fds;
int *ring_rx_fds_array = p_ring->get_rx_channel_fds(num_ring_rx_fds);

for (size_t i = 0; i < num_ring_rx_fds; i++) {
int cq_ch_fd = ring_rx_fds_array[i];

ev.data.fd = cq_ch_fd;

BULLSEYE_EXCLUDE_BLOCK_START
if (unlikely( orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, cq_ch_fd, &ev))) {
si_logerr("failed to add cq channel fd to internal epfd errno=%d (%m)", errno);
}
BULLSEYE_EXCLUDE_BLOCK_END
for (int i = 0; i < (int)num_ring_rx_fds; i++) {
add_fd_to_poll_array(ring_rx_fds_array[i]);
}

do_wakeup(); // A ready wce can be pending due to the drain logic (cq channel will not wake up by itself)
Expand Down Expand Up @@ -1225,14 +1217,10 @@ void sockinfo::rx_del_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring,
size_t num_ring_rx_fds;
int *ring_rx_fds_array = base_ring->get_rx_channel_fds(num_ring_rx_fds);

for (size_t i = 0; i < num_ring_rx_fds; i++) {
int cq_ch_fd = ring_rx_fds_array[i];
BULLSEYE_EXCLUDE_BLOCK_START
if (unlikely( orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_DEL, cq_ch_fd, NULL))) {
si_logerr("failed to delete cq channel fd from internal epfd (errno=%d %m)", errno);
}
BULLSEYE_EXCLUDE_BLOCK_END
for (int i = 0; i < (int)num_ring_rx_fds; i++) {
delete_fd_from_poll_array_deferred(ring_rx_fds_array[i]);
}
do_wakeup();

notify_epoll = true;

Expand Down
77 changes: 65 additions & 12 deletions src/vma/sock/sockinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#include "vma/util/sock_addr.h"
#include "vma/util/vma_stats.h"
#include "vma/util/sys_vars.h"
#include "vma/util/wakeup_pipe.h"
#include "vma/util/wakeup_eventfd.h"
#include "vma/proto/flow_tuple.h"
#include "vma/proto/mem_buf_desc.h"
#include "vma/proto/dst_entry.h"
Expand All @@ -57,10 +57,11 @@
#ifndef BASE_SOCKINFO_H
#define BASE_SOCKINFO_H

#define SI_RX_EPFD_EVENT_MAX 16
#define BYTE_TO_KB(byte_value) ((byte_value) / 125)
#define KB_TO_BYTE(kbit_value) ((kbit_value) * 125)

#define DEFAULT_FDS_ARR_SIZE 10

#if DEFINED_MISSING_NET_TSTAMP
enum {
SOF_TIMESTAMPING_TX_HARDWARE = (1<<0),
Expand Down Expand Up @@ -154,7 +155,7 @@ const uint8_t ip_tos2prio[16] = {
4, 4, 4, 4
};

class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_pipe
class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_eventfd
{
public:
sockinfo(int fd);
Expand All @@ -181,8 +182,56 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou
return false;
}
inline bool flow_tag_enabled(void) { return m_flow_tag_enabled; }
inline int get_rx_epfd(void) { return m_rx_epfd; }

inline void add_fd_to_poll_array(int fd) {
if (m_poll_fds_array_size >= m_poll_fds_array_capacity) {
m_poll_fds_array_capacity *= 2;
pollfd* new_m_poll_fds_array = static_cast<pollfd*>(realloc(m_poll_fds_array, m_poll_fds_array_capacity * sizeof(pollfd)));
if (new_m_poll_fds_array == NULL) {
vlog_printf(VLOG_ERROR, "%s:%d: Realloc cannot find enough space\n", __func__, __LINE__);
return ;
} else {
m_poll_fds_array = new_m_poll_fds_array;
}
}
m_poll_fds_array[m_poll_fds_array_size].fd = fd;
m_poll_fds_array[m_poll_fds_array_size].events = POLLIN;
++m_poll_fds_array_size;
}
inline void delete_fds_from_poll_array() {
int j;
if (unlikely(m_poll_fds_array_size == 0)) {
return ;
}
if (unlikely(m_poll_fds_delete_array_size != 0)) {
for (int i = 0; i < m_poll_fds_delete_array_size; i++) {
for (j = 0; j < m_poll_fds_array_size && m_poll_fds_array[j].fd != m_poll_fds_delete_array[i]; j++) ;
//safe for overlapping memory blocks
if (j < m_poll_fds_array_size - 1) {
memmove(&m_poll_fds_array[j], &m_poll_fds_array[j + 1], (m_poll_fds_array_size - j - 1) * sizeof(m_poll_fds_array[0]));
}
if (j < m_poll_fds_array_size) {
m_poll_fds_array_size--;
}
}
memset(m_poll_fds_delete_array, 0, m_poll_fds_delete_array_size * sizeof(int));
m_poll_fds_delete_array_size = 0;
}
}
inline void delete_fd_from_poll_array_deferred(int fd) {
if (m_poll_fds_delete_array_size >= m_poll_fds_array_capacity) {
m_poll_fds_array_capacity *= 2;
int* new_m_poll_fds_delete_array = static_cast<int*>(realloc(m_poll_fds_delete_array, m_poll_fds_array_capacity * sizeof(int)));
if (new_m_poll_fds_delete_array == NULL) {
vlog_printf(VLOG_ERROR, "%s:%d: Realloc cannot find enough space\n", __func__, __LINE__);
return ;
} else {
m_poll_fds_delete_array = new_m_poll_fds_delete_array;
}

}
m_poll_fds_delete_array[m_poll_fds_delete_array_size] = fd;
++m_poll_fds_delete_array_size;
}
virtual bool flow_in_reuse(void) { return false;};
virtual int* get_rings_fds(int &res_length);
virtual int get_rings_num();
Expand All @@ -191,6 +240,11 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou
virtual void statistics_print(vlog_levels_t log_level = VLOG_DEBUG);
uint32_t get_flow_tag_val() { return m_flow_tag_id; }
inline in_protocol_t get_protocol(void) { return m_protocol; }
virtual inline void do_wakeup() {
if (!is_socketxtreme()) {
wakeup_eventfd::do_wakeup();
}
}

private:
int fcntl_helper(int __cmd, unsigned long int __arg, bool& bexit);
Expand All @@ -217,7 +271,12 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou
socket_stats_t m_socket_stats;
socket_stats_t* m_p_socket_stats;

int m_rx_epfd;
struct pollfd* m_poll_fds_array;
int m_poll_fds_array_size;
int m_poll_fds_array_capacity;
int* m_poll_fds_delete_array;
int m_poll_fds_delete_array_size;

cache_observer m_rx_nd_observer;
rx_net_device_map_t m_rx_nd_map;
rx_flow_map_t m_rx_flow_map;
Expand Down Expand Up @@ -333,12 +392,6 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou

virtual bool try_un_offloading(); // un-offload the socket if possible

virtual inline void do_wakeup() {
if (!is_socketxtreme()) {
wakeup_pipe::do_wakeup();
}
}

inline bool is_socketxtreme() {
return (m_p_rx_ring && m_p_rx_ring->is_socketxtreme());
}
Expand Down
64 changes: 25 additions & 39 deletions src/vma/sock/sockinfo_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2491,24 +2491,7 @@ int sockinfo_tcp::listen(int backlog)
BULLSEYE_EXCLUDE_BLOCK_END

// Add the user's orig fd to the rx epfd handle
epoll_event ev = {0, {0}};
ev.events = EPOLLIN;
ev.data.fd = m_fd;
int ret = orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
BULLSEYE_EXCLUDE_BLOCK_START
if (unlikely(ret)) {
if (errno == EEXIST) {
si_tcp_logdbg("failed to add user's fd to internal epfd errno=%d (%m)", errno);
} else {
si_tcp_logerr("failed to add user's fd to internal epfd errno=%d (%m)", errno);
si_tcp_logdbg("Fallback the connection to os");
destructor_helper();
setPassthrough();
unlock_tcp_con();
return 0;
}
}
BULLSEYE_EXCLUDE_BLOCK_END
add_fd_to_poll_array(m_fd);

if (m_sysvar_tcp_ctl_thread > CTL_THREAD_DISABLE)
m_timer_handle = g_p_event_handler_manager->register_timer_event(safe_mce_sys().timer_resolution_msec , this, PERIODIC_TIMER, 0, NULL);
Expand Down Expand Up @@ -4047,7 +4030,6 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking)
int n;
uint64_t poll_sn = 0;
rx_ring_map_t::iterator rx_ring_iter;
epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX];

// poll for completion
__log_info_func("");
Expand Down Expand Up @@ -4145,7 +4127,7 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking)
}

//sleep on different CQs and OS listen socket
ret = orig_os_api.epoll_wait(m_rx_epfd, rx_epfd_events, SI_RX_EPFD_EVENT_MAX, m_loops_timer.time_left_msec());
ret = orig_os_api.poll(m_poll_fds_array, m_poll_fds_array_size, m_loops_timer.time_left_msec());

lock_tcp_con();
return_from_sleep();
Expand All @@ -4158,28 +4140,32 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking)
if(m_n_rx_pkt_ready_list_count)
return 0;

for (int event_idx = 0; event_idx < ret; event_idx++)
for (int event_idx = 0; event_idx < m_poll_fds_array_size; event_idx++)
{
int fd = rx_epfd_events[event_idx].data.fd;
if (is_wakeup_fd(fd))
{ // wakeup event
lock_tcp_con();
remove_wakeup_fd();
unlock_tcp_con();
continue;
}
if (m_poll_fds_array[event_idx].revents & POLLIN) {
m_poll_fds_array[event_idx].revents = 0;
int fd = m_poll_fds_array[event_idx].fd;
if (is_wakeup_fd(fd))
{ // wakeup event
lock_tcp_con();
remove_wakeup_fd();
delete_fds_from_poll_array();
unlock_tcp_con();
continue;
}

// Check if OS fd is ready for reading
if (fd == m_fd) {
continue;
}
// Check if OS fd is ready for reading
if (fd == m_fd) {
continue;
}

// poll cq. fd == cq channel fd.
cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd);
if (p_cq_ch_info) {
ring* p_ring = p_cq_ch_info->get_ring();
if (p_ring) {
p_ring->wait_for_notification_and_process_element(fd, &poll_sn);
// poll cq. fd == cq channel fd.
cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd);
if (p_cq_ch_info) {
ring* p_ring = p_cq_ch_info->get_ring();
if (p_ring) {
p_ring->wait_for_notification_and_process_element(fd, &poll_sn);
}
}
}
}
Expand Down
Loading