diff --git a/include/dpp/socketengine.h b/include/dpp/socketengine.h index 051aa2f801..9579bbd5a7 100644 --- a/include/dpp/socketengine.h +++ b/include/dpp/socketengine.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -74,6 +75,51 @@ using socket_write_event = std::function; +/** + * @brief Contains statistics about the IO loop + */ +struct DPP_EXPORT socket_stats { + /** + * @brief Number of reads since startup + */ + uint64_t reads{0}; + + /** + * @brief Number of writes since startup + */ + uint64_t writes{0}; + + /** + * @brief Number of errors since startup + */ + uint64_t errors{0}; + + /** + * @brief Number of updates to file descriptors + */ + uint64_t updates{0}; + + /** + * @brief Number of deletions of file descriptors + */ + uint64_t deletions{0}; + + /** + * @brief Number of loop iterations since startup + */ + uint64_t iterations{0}; + + /** + * @brief Number of currently active file descriptors + */ + uint64_t active_fds{0}; + + /** + * @brief Socket engine type + */ + std::string_view engine_type; +}; + /** * @brief Represents an active socket event set in the socket engine. * @@ -150,21 +196,6 @@ using socket_container = std::unordered_map(e)); + stats.active_fds++; return true; } if (e.fd != INVALID_SOCKET && i != fds.end()) { remove_socket(e.fd); fds.erase(i); fds.emplace(e.fd, std::make_unique(e)); + stats.updates++; return true; } return false; @@ -51,6 +53,7 @@ bool socket_engine_base::update_socket(const socket_events &e) { if (e.fd != INVALID_SOCKET && fds.find(e.fd) != fds.end()) { auto iter = fds.find(e.fd); *(iter->second) = e; + stats.updates++; return true; } return false; @@ -83,6 +86,25 @@ socket_events* socket_engine_base::get_fd(dpp::socket fd) { return iter->second.get(); } +void socket_engine_base::inplace_modify_fd(dpp::socket fd, uint8_t extra_flags) { + bool should_modify{false}; + socket_events s{}; + { + socket_events *new_se = nullptr; + std::lock_guard lk(fds_mutex); + auto i = fds.find(fd); + should_modify = i != fds.end() && (i->second->flags & extra_flags) != extra_flags; + if (should_modify) { + new_se = i->second.get(); + new_se->flags |= extra_flags; + s = *new_se; + } + } + if (should_modify) { + update_socket(s); + } +} + void socket_engine_base::prune() { if (time(nullptr) != last_time) { try { @@ -101,6 +123,7 @@ void socket_engine_base::prune() { last_time = time(nullptr); } + stats.iterations++; } bool socket_engine_base::delete_socket(dpp::socket fd) { @@ -110,6 +133,8 @@ bool socket_engine_base::delete_socket(dpp::socket fd) { return false; } iter->second->flags |= WANT_DELETION; + stats.deletions++; + stats.active_fds--; return true; } @@ -117,4 +142,8 @@ bool socket_engine_base::remove_socket(dpp::socket fd) { return true; } +const socket_stats& socket_engine_base::get_stats() const { + return stats; +} + } diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index ed601878a8..3519f1819d 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -54,7 +54,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { int epoll_handle{INVALID_SOCKET}; static constexpr size_t MAX_EVENTS = 65536; - std::array events; + std::array events{}; socket_engine_epoll(const socket_engine_epoll&) = delete; socket_engine_epoll(socket_engine_epoll&&) = delete; @@ -65,6 +65,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if (epoll_handle == -1) { throw dpp::connection_exception("Failed to initialise epoll()"); } + stats.engine_type = "epoll"; } ~socket_engine_epoll() override { @@ -89,6 +90,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if ((eh->flags & WANT_DELETION) == 0L) try { if ((ev.events & EPOLLHUP) != 0U) { + stats.errors++; if (eh->on_error) { eh->on_error(fd, *eh, EPIPE); } @@ -96,6 +98,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { } if ((ev.events & EPOLLERR) != 0U) { + stats.errors++; socklen_t codesize = sizeof(int); int errcode{}; if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) { @@ -111,18 +114,21 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { /* Should we have a flag to allow keeping WANT_WRITE? Maybe like WANT_WRITE_ONCE or GREEDY_WANT_WRITE, eh */ eh->flags = modify_event(epoll_handle, eh, eh->flags & ~WANT_WRITE); if (eh->on_write) { + stats.writes++; eh->on_write(fd, *eh); } } if ((ev.events & EPOLLIN) != 0U) { if (eh->on_read) { + stats.reads++; eh->on_read(fd, *eh); } } } catch (const std::exception& e) { owner->log(ll_trace, "Socket loop exception: " + std::string(e.what())); + stats.errors++; eh->on_error(fd, *eh, 0); } diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index eef50da65e..a13e352a67 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -48,6 +48,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { if (kqueue_handle == -1) { throw dpp::connection_exception("Failed to initialise kqueue()"); } + stats.engine_type = "kqueue"; } ~socket_engine_kqueue() override { @@ -78,6 +79,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { if (kev.flags & EV_EOF || kev.flags & EV_ERROR) { if (eh->on_error) { eh->on_error(kev.ident, *eh, kev.fflags); + stats.errors++; } continue; } @@ -86,15 +88,18 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { eh->flags &= ~bits_to_clr; if (eh->on_write) { eh->on_write(kev.ident, *eh); + stats.writes++; } } else if (filter == EVFILT_READ) { if (eh->on_read) { eh->on_read(kev.ident, *eh); + stats.reads++; } } } catch (const std::exception& e) { + stats.errors++; owner->log(ll_trace, "Socket loop exception: " + std::string(e.what())); eh->on_error(kev.ident, *eh, 0); } diff --git a/src/dpp/socketengines/poll.cpp b/src/dpp/socketengines/poll.cpp index aca03bd2b3..7cb0400ca6 100644 --- a/src/dpp/socketengines/poll.cpp +++ b/src/dpp/socketengines/poll.cpp @@ -100,6 +100,7 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { if ((revents & POLLHUP) != 0) { eh->on_error(fd, *eh, 0); + stats.errors++; continue; } @@ -109,21 +110,25 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *) &errcode, &codesize) < 0) { errcode = errno; } + stats.errors++; eh->on_error(fd, *eh, errcode); continue; } if ((revents & POLLIN) != 0) { + stats.reads++; eh->on_read(fd, *eh); } if ((revents & POLLOUT) != 0) { + stats.writes++; eh->flags &= ~WANT_WRITE; update_socket(*eh); eh->on_write(fd, *eh); } } catch (const std::exception &e) { + stats.errors++; eh->on_error(fd, *eh, 0); } @@ -181,7 +186,9 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { return r; } - explicit socket_engine_poll(cluster* creator) : socket_engine_base(creator) { }; + explicit socket_engine_poll(cluster* creator) : socket_engine_base(creator) { + stats.engine_type = "poll"; + }; protected: diff --git a/src/dpp/wsclient.cpp b/src/dpp/wsclient.cpp index 2180ec5f5b..4d382f968e 100644 --- a/src/dpp/wsclient.cpp +++ b/src/dpp/wsclient.cpp @@ -132,22 +132,7 @@ void websocket_client::write(const std::string_view data, ws_opcode _opcode) ssl_client::socket_write(data); } - bool should_append_want_write = false; - socket_events *new_se = nullptr; - { - std::lock_guard lk(owner->socketengine->fds_mutex); - auto i = owner->socketengine->fds.find(sfd); - - should_append_want_write = i != owner->socketengine->fds.end() && (i->second->flags & WANT_WRITE) != WANT_WRITE; - if (should_append_want_write) { - new_se = i->second.get(); - new_se->flags |= WANT_WRITE; - } - } - - if (should_append_want_write) { - owner->socketengine->update_socket(*new_se); - } + owner->socketengine->inplace_modify_fd(sfd, WANT_WRITE); } bool websocket_client::handle_buffer(std::string& buffer)