From 9566009e68002c0aeb7e80e21da235e751526a60 Mon Sep 17 00:00:00 2001 From: Craig Edwards Date: Thu, 4 Apr 2024 00:17:43 +0000 Subject: [PATCH] epoll and kqueue stuff --- cmake/epoll.cmake | 17 ++++ cmake/kqueue.cmake | 17 ++++ include/dpp/socketengine.h | 13 ++- library-vcpkg/CMakeLists.txt | 16 +++ library/CMakeLists.txt | 15 +++ src/dpp/socketengine.cpp | 3 +- src/dpp/socketengines/epoll.cpp | 72 ++++++++++++- src/dpp/socketengines/kqueue.cpp | 167 +++++++++++++++++++++++++++++++ src/dpp/socketengines/poll.cpp | 2 +- 9 files changed, 310 insertions(+), 12 deletions(-) create mode 100644 cmake/epoll.cmake create mode 100644 cmake/kqueue.cmake create mode 100644 src/dpp/socketengines/kqueue.cpp diff --git a/cmake/epoll.cmake b/cmake/epoll.cmake new file mode 100644 index 0000000000..63070933d7 --- /dev/null +++ b/cmake/epoll.cmake @@ -0,0 +1,17 @@ +macro(CHECK_EPOLL VARIABLE) + if(UNIX) + if("${VARIABLE}" MATCHES "^${VARIABLE}$") + message(STATUS "Check if the system supports epoll") + include(CheckSymbolExists) + check_symbol_exists(epoll_create "sys/epoll.h" EPOLL_PROTOTYPE_EXISTS) + + if(EPOLL_PROTOTYPE_EXISTS) + message(STATUS "Check if the system supports epoll - yes") + set(${VARIABLE} 1 CACHE INTERNAL "Result of CHECK_EPOLL" FORCE) + else(EPOLL_PROTOTYPE_EXISTS) + message(STATUS "Check if the system supports epoll - no") + set(${VARIABLE} "" CACHE INTERNAL "Result of CHECK_EPOLL" FORCE) + endif(EPOLL_PROTOTYPE_EXISTS) + endif("${VARIABLE}" MATCHES "^${VARIABLE}$") + endif(UNIX) +endmacro(CHECK_EPOLL) \ No newline at end of file diff --git a/cmake/kqueue.cmake b/cmake/kqueue.cmake new file mode 100644 index 0000000000..955e356d71 --- /dev/null +++ b/cmake/kqueue.cmake @@ -0,0 +1,17 @@ +macro(CHECK_KQUEUE VARIABLE) + if(UNIX) + if("${VARIABLE}" MATCHES "^${VARIABLE}$") + message(STATUS "Check if the system supports kqueue") + include(CheckSymbolExists) + check_symbol_exists(kqueue "sys/event.h" KQUEUE_PROTOTYPE_EXISTS) + + if(KQUEUE_PROTOTYPE_EXISTS) + message(STATUS "Check if the system supports kqueue - yes") + set(${VARIABLE} 1 CACHE INTERNAL "Result of CHECK_KQUEUE" FORCE) + else(KQUEUE_PROTOTYPE_EXISTS) + message(STATUS "Check if the system supports kqueue - no") + set(${VARIABLE} "" CACHE INTERNAL "Result of CHECK_KQUEUE" FORCE) + endif(KQUEUE_PROTOTYPE_EXISTS) + endif("${VARIABLE}" MATCHES "^${VARIABLE}$") + endif(UNIX) +endmacro(CHECK_KQUEUE) \ No newline at end of file diff --git a/include/dpp/socketengine.h b/include/dpp/socketengine.h index 12ba096743..c8871c508a 100644 --- a/include/dpp/socketengine.h +++ b/include/dpp/socketengine.h @@ -31,9 +31,9 @@ enum socket_event_flags : uint8_t { WANT_ERROR = 4, }; -using socket_read_event = auto (*)(const struct socket_event&) -> void; -using socket_write_event = auto (*)(const struct socket_event&) -> void; -using socket_error_event = auto (*)(const struct socket_event&, int error_code) -> void; +using socket_read_event = auto (*)(dpp::socket fd, const struct socket_events&) -> void; +using socket_write_event = auto (*)(dpp::socket fd, const struct socket_events&) -> void; +using socket_error_event = auto (*)(dpp::socket fd, const struct socket_events&, int error_code) -> void; struct socket_events { uint8_t flags{0}; @@ -48,9 +48,14 @@ struct socket_engine_base { socket_container fds; socket_engine_base(); + socket_engine_base(const socket_engine_base&) = default; + socket_engine_base(socket_engine_base&&) = default; + socket_engine_base& operator=(const socket_engine_base&) = default; + socket_engine_base& operator=(socket_engine_base&&) = default; + virtual ~socket_engine_base() = default; - virtual void run() = 0; + virtual void process_events() = 0; virtual bool register_socket(dpp::socket fd, const socket_events& e); virtual bool update_socket(dpp::socket fd, const socket_events& e); virtual bool remove_socket(dpp::socket fd); diff --git a/library-vcpkg/CMakeLists.txt b/library-vcpkg/CMakeLists.txt index dded1b52a5..ea382ea835 100644 --- a/library-vcpkg/CMakeLists.txt +++ b/library-vcpkg/CMakeLists.txt @@ -9,6 +9,22 @@ else() find_package(Threads REQUIRED) endif() +include("${CMAKE_CURRENT_SOURCE_DIR}/../cmake/epoll.cmake") +include("${CMAKE_CURRENT_SOURCE_DIR}/../cmake/kqueue.cmake") +check_epoll(HAS_EPOLL) +check_kqueue(HAS_KQUEUE) +if (HAS_EPOLL) + message("-- Building with ${Green}epoll socket engine${ColourReset} -- good!") + target_sources("${LIB_NAME}" PRIVATE "${DPP_ROOT_PATH}/src/dpp/socketengines/epoll.cpp") +elseif (HAS_KQUEUE) + message("-- Building with ${Green}kqueue socket engine${ColourReset} -- good!") + target_sources("${LIB_NAME}" PRIVATE "${DPP_ROOT_PATH}/src/dpp/socketengines/kqueue.cpp") +else() + message("-- Building with ${Green}poll socket engine${ColourReset} -- meh!") + target_sources("${LIB_NAME}" PRIVATE "${DPP_ROOT_PATH}/src/dpp/socketengines/poll.cpp") +endif() + + add_library("${PROJECT_NAME}::${LIB_NAME}" ALIAS "${LIB_NAME}") if(${AVX_TYPE} STREQUAL "OFF") diff --git a/library/CMakeLists.txt b/library/CMakeLists.txt index ba47b77cf7..8ce90495ad 100644 --- a/library/CMakeLists.txt +++ b/library/CMakeLists.txt @@ -271,6 +271,21 @@ foreach (fullmodname ${subdirlist}) endif() endforeach() +include("${CMAKE_CURRENT_SOURCE_DIR}/../cmake/epoll.cmake") +include("${CMAKE_CURRENT_SOURCE_DIR}/../cmake/kqueue.cmake") +check_epoll(HAS_EPOLL) +check_kqueue(HAS_KQUEUE) +if (HAS_EPOLL) + message("-- Building with ${Green}epoll socket engine${ColourReset} -- good!") + target_sources("dpp" PRIVATE "${modules_dir}/dpp/socketengines/epoll.cpp") +elseif (HAS_KQUEUE) + message("-- Building with ${Green}kqueue socket engine${ColourReset} -- good!") + target_sources("dpp" PRIVATE "${modules_dir}/dpp/socketengines/kqueue.cpp") +else() + message("-- Building with ${Green}poll socket engine${ColourReset} -- meh!") + target_sources("dpp" PRIVATE "${modules_dir}/dpp/socketengines/poll.cpp") +endif() + target_compile_features(dpp PUBLIC cxx_std_17) target_compile_features(dpp PRIVATE cxx_constexpr) target_compile_features(dpp PRIVATE cxx_auto_type) diff --git a/src/dpp/socketengine.cpp b/src/dpp/socketengine.cpp index 6d16ae2328..f48611deb3 100644 --- a/src/dpp/socketengine.cpp +++ b/src/dpp/socketengine.cpp @@ -25,8 +25,7 @@ bool socket_engine_base::register_socket(dpp::socket fd, const socket_events &e) { if (fd > INVALID_SOCKET && fds.find(fd) == fds.end()) { - auto se = std::make_unique(); - *se = e; + fds.emplace(fd, std::make_unique(e)); return true; } return false; diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index e81026cb66..c2ab498d8d 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -23,27 +23,80 @@ #include #include #include +#include +#include +#include struct socket_engine_epoll : public socket_engine_base { int epoll_handle{INVALID_SOCKET}; - const int epoll_hint = 128; + static const int epoll_hint = 128; + std::vector events; - socket_engine_epoll() : epoll_handle(epoll_create(epoll_hint)) { + socket_engine_epoll(const socket_engine_epoll&) = default; + socket_engine_epoll(socket_engine_epoll&&) = default; + socket_engine_epoll& operator=(const socket_engine_epoll&) = default; + socket_engine_epoll& operator=(socket_engine_epoll&&) = default; + socket_engine_epoll() : epoll_handle(epoll_create(socket_engine_epoll::epoll_hint)) { + events.resize(16); if (epoll_handle == -1) { throw dpp::connection_exception("Failed to initialise epoll()"); } } - ~socket_engine_epoll() { + ~socket_engine_epoll() override { if (epoll_handle != INVALID_SOCKET) { close(epoll_handle); } } - void run() override { - // TODO: event routing loop for epoll() goes here + void process_events() final { + const int sleep_length = 1000; + int i = epoll_wait(epoll_handle, events.data(), static_cast(events.size()), sleep_length); + + for (int j = 0; j < i; j++) { + epoll_event ev = events[j]; + + auto* const eh = static_cast(ev.data.ptr); + const int fd = ev.data.fd; + if (fd == INVALID_SOCKET) { + continue; + } + + if ((ev.events & EPOLLHUP) != 0U) { + eh->on_error(fd, *eh, 0); + continue; + } + + if ((ev.events & EPOLLERR) != 0U) { + /* Get error number */ + socklen_t codesize = sizeof(int); + int errcode{}; + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) { + errcode = errno; + } + eh->on_error(fd, *eh, errcode); + continue; + } + + if ((ev.events & EPOLLOUT) != 0U) { + int new_events = eh->flags & ~WANT_WRITE; + if (new_events != eh->flags) { + ev.events = new_events; + ev.data.ptr = static_cast(eh); + epoll_ctl(epoll_handle, EPOLL_CTL_MOD, fd, &ev); + } + eh->flags = new_events; + } + if (ev.events & EPOLLIN) { + eh->on_read(fd, *eh); + } + if (ev.events & EPOLLOUT) { + eh->on_write(fd, *eh); + } + } + } bool register_socket(dpp::socket fd, const socket_events& e) final { @@ -56,11 +109,17 @@ struct socket_engine_epoll : public socket_engine_base { if ((e.flags & WANT_WRITE) != 0) { ev.events |= EPOLLOUT; } + if ((e.flags & WANT_ERROR) != 0) { + ev.events |= EPOLLERR; + } ev.data.ptr = fds.find(fd)->second.get(); int i = epoll_ctl(epoll_handle, EPOLL_CTL_ADD, fd, &ev); if (i < 0) { throw dpp::connection_exception("Failed to register socket to epoll_ctl()"); } + if (fds.size() * 2 > events.size()) { + events.resize(fds.size() * 2); + } } return r; } @@ -75,6 +134,9 @@ struct socket_engine_epoll : public socket_engine_base { if ((e.flags & WANT_WRITE) != 0) { ev.events |= EPOLLOUT; } + if ((e.flags & WANT_ERROR) != 0) { + ev.events |= EPOLLERR; + } ev.data.ptr = fds.find(fd)->second.get(); int i = epoll_ctl(epoll_handle, EPOLL_CTL_MOD, fd, &ev); if (i < 0) { diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp new file mode 100644 index 0000000000..cbf3350e2b --- /dev/null +++ b/src/dpp/socketengines/kqueue.cpp @@ -0,0 +1,167 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include + +#if defined __NetBSD__ && __NetBSD_Version__ <= 999001400 + #define CAST_TYPE intptr_t +#else + #define CAST_TYPE void* +#endif + + +struct socket_engine_kqueue : public socket_engine_base { + + int kqueue_handle{INVALID_SOCKET}; + unsigned int change_pos = 0; + std::vector change_list; + std::vector ke_list; + + socket_engine_kqueue(const socket_engine_kqueue&) = default; + socket_engine_kqueue(socket_engine_kqueue&&) = default; + socket_engine_kqueue& operator=(const socket_engine_kqueue&) = default; + socket_engine_kqueue& operator=(socket_engine_kqueue&&) = default; + + socket_engine_kqueue() : kqueue_handle(kqueue()) { + change_list.resize(8); + ke_list.resize(16); + if (kqueue_handle == -1) { + throw dpp::connection_exception("Failed to initialise kqueue()"); + } + } + + ~socket_engine_kqueue() override { + if (kqueue_handle != INVALID_SOCKET) { + close(kqueue_handle); + } + } + + struct kevent* get_change_kevent() + { + if (change_pos >= change_list.size()) { + change_list.resize(change_list.size() * 2); + } + return &change_list[change_pos++]; + } + + void process_events() final { + struct timespec ts{}; + ts.tv_sec = 1; + + int i = kevent(kqueue_handle, &change_list.front(), change_pos, &ke_list.front(), static_cast(ke_list.size()), &ts); + change_pos = 0; + + if (i < 0) { + return; + } + + for (int j = 0; j < i; j++) { + const struct kevent& kev = ke_list[j]; + auto* eh = reinterpret_cast(kev.udata); + if (eh == nullptr) { + continue; + } + + const short filter = kev.filter; + if (kev.flags & EV_EOF) { + eh->on_error(kev.ident, kev.fflags); + continue; + } + if (filter == EVFILT_WRITE) { + const int bits_to_clr = FD_WANT_WRITE; + eh->flags &= ~bits_to_clr; + eh->on_write(kev.ident, eh); + } + else if (filter == EVFILT_READ) { + eh->on_read(kev.ident, eh); + } + } + } + + void set_event_write_flags(dpp::socket fd, socket_events* eh, uint8_t old_mask, uint8_t new_mask) + { + if (((new_mask & WANT_WRITE) != 0) && ((old_mask & WANT_WRITE) == 0)) + { + struct kevent* ke = get_change_kevent(); + EV_SET(ke, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast(eh)); + } + else if (((old_mask & WANT_WRITE) != 0) && ((new_mask & WANT_WRITE) == 0)) + { + struct kevent* ke = get_change_kevent(); + EV_SET(ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + } + } + + bool register_socket(dpp::socket fd, const socket_events& e) final { + bool r = socket_engine_base::register_socket(fd, e); + if (r) { + struct kevent* ke = get_change_kevent(); + socket_events* se = fds.find(fd)->second.get(); + if ((se->flags & WANT_READ) != 0) { + EV_SET(ke, fd, EVFILT_READ, EV_ADD, 0, 0, static_cast(se)); + } + set_event_write_flags(fd, se, 0, e.flags); + if (fds.size() * 2 > ke_list.size()) { + ke_list.resize(fds.size() * 2); + } + } + return r; + } + + bool update_socket(dpp::socket fd, const socket_events& e) final { + bool r = socket_engine_base::update_socket(fd, e); + if (r) { + struct kevent* ke = get_change_kevent(); + socket_events* se = fds.find(fd)->second.get(); + if ((se->flags & WANT_READ) != 0) { + EV_SET(ke, fd, EVFILT_READ, EV_ADD, 0, 0, static_cast(se)); + } + set_event_write_flags(fd, se, 0, e.flags); + if (fds.size() * 2 > ke_list.size()) { + ke_list.resize(fds.size() * 2); + } + } + return r; + } + + bool remove_socket(dpp::socket fd) final { + bool r = socket_engine_base::remove_socket(fd); + if (r) { + struct kevent* ke = get_change_kevent(); + EV_SET(ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + + // Then remove the read filter. + ke = get_change_kevent(); + EV_SET(ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + } + return r; + } +}; + +std::unique_ptr create_socket_engine() { + return std::make_unique(); +} \ No newline at end of file diff --git a/src/dpp/socketengines/poll.cpp b/src/dpp/socketengines/poll.cpp index 7a23e49d17..a89a83c741 100644 --- a/src/dpp/socketengines/poll.cpp +++ b/src/dpp/socketengines/poll.cpp @@ -50,7 +50,7 @@ struct socket_engine_poll : public socket_engine_base { */ std::vector poll_set; - void run() override { + void process_events() final { // TODO: event routing loop for poll() goes here }