From e9ef7a7961336bb671e7bdfcde1672b2bcf11fe5 Mon Sep 17 00:00:00 2001 From: Craig Edwards Date: Thu, 21 Nov 2024 01:18:28 +0000 Subject: [PATCH] improve threading --- doxygen-awesome-css | 2 +- src/dpp/socketengine.cpp | 1 - src/dpp/socketengines/epoll.cpp | 22 ++++++++++++---------- src/dpp/socketengines/kqueue.cpp | 24 +++++++++++++----------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/doxygen-awesome-css b/doxygen-awesome-css index c6568ebc70..af1d9030b3 160000 --- a/doxygen-awesome-css +++ b/doxygen-awesome-css @@ -1 +1 @@ -Subproject commit c6568ebc70adf9fb0fb6c1745737ae6945576813 +Subproject commit af1d9030b3ffa7b483fa9997a7272fb12af6af4c diff --git a/src/dpp/socketengine.cpp b/src/dpp/socketengine.cpp index 89bc373fbd..38f738aeaf 100644 --- a/src/dpp/socketengine.cpp +++ b/src/dpp/socketengine.cpp @@ -63,7 +63,6 @@ socket_engine_base::socket_engine_base(cluster* creator) : owner(creator) { throw dpp::connection_exception(err_connect_failure, "WSAStartup failure"); } #endif - //pool = std::make_unique(); } time_t last_time = time(nullptr); diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index 8852812b5f..8bcf3671ec 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -52,16 +52,15 @@ int modify_event(int epoll_handle, socket_events* eh, int new_events) { struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { int epoll_handle{INVALID_SOCKET}; - static const int epoll_hint = 128; - std::vector events; + static constexpr size_t MAX_EVENTS = 65536; + std::array events; socket_engine_epoll(const socket_engine_epoll&) = delete; socket_engine_epoll(socket_engine_epoll&&) = delete; socket_engine_epoll& operator=(const socket_engine_epoll&) = delete; socket_engine_epoll& operator=(socket_engine_epoll&&) = delete; - explicit socket_engine_epoll(cluster* creator) : socket_engine_base(creator), epoll_handle(epoll_create(socket_engine_epoll::epoll_hint)) { - events.resize(socket_engine_epoll::epoll_hint); + explicit socket_engine_epoll(cluster* creator) : socket_engine_base(creator), epoll_handle(epoll_create(MAX_EVENTS)) { if (epoll_handle == -1) { throw dpp::connection_exception("Failed to initialise epoll()"); } @@ -75,7 +74,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { void process_events() final { const int sleep_length = 1000; - int i = epoll_wait(epoll_handle, events.data(), static_cast(events.size()), sleep_length); + int i = epoll_wait(epoll_handle, events.data(), MAX_EVENTS, sleep_length); for (int j = 0; j < i; j++) { epoll_event ev = events[j]; @@ -145,14 +144,14 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if ((e.flags & WANT_ERROR) != 0) { ev.events |= EPOLLERR; } - ev.data.ptr = fds.find(e.fd)->second.get(); + { + std::unique_lock lock(fds_mutex); + ev.data.ptr = fds.find(e.fd)->second.get(); + } int i = epoll_ctl(epoll_handle, EPOLL_CTL_ADD, e.fd, &ev); if (i < 0) { throw dpp::connection_exception("Failed to register socket to epoll_ctl()"); } - if (fds.size() * 2 > events.size()) { - events.resize(fds.size() * 2); - } } return r; } @@ -171,7 +170,10 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if ((e.flags & WANT_ERROR) != 0) { ev.events |= EPOLLERR; } - ev.data.ptr = fds.find(e.fd)->second.get(); + { + std::unique_lock lock(fds_mutex); + ev.data.ptr = fds.find(e.fd)->second.get(); + } int i = epoll_ctl(epoll_handle, EPOLL_CTL_MOD, e.fd, &ev); if (i < 0) { throw dpp::connection_exception("Failed to modify socket with epoll_ctl()"); diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index bc93272714..4ba0c64e33 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -33,8 +34,10 @@ namespace dpp { struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { + static constexpr size_t MAX_SOCKET_VALUE = 65536; + int kqueue_handle{INVALID_SOCKET}; - std::array ke_list; + std::array ke_list; socket_engine_kqueue(const socket_engine_kqueue&) = delete; socket_engine_kqueue(socket_engine_kqueue&&) = delete; @@ -57,7 +60,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { struct timespec ts{}; ts.tv_sec = 1; - int i = kevent(kqueue_handle, NULL, 0, ke_list.data(), static_cast(ke_list.size()), &ts); + int i = kevent(kqueue_handle, nullptr, 0, ke_list.data(), static_cast(ke_list.size()), &ts); if (i < 0) { return; } @@ -101,15 +104,15 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { bool register_socket(const socket_events& e) final { bool r = socket_engine_base::register_socket(e); if (r) { - struct kevent ke; + struct kevent ke{}; socket_events* se = fds.find(e.fd)->second.get(); if ((se->flags & WANT_READ) != 0) { EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); } if ((se->flags & WANT_WRITE) != 0) { EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); } } return r; @@ -118,15 +121,14 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { bool update_socket(const socket_events& e) final { bool r = socket_engine_base::update_socket(e); if (r) { - socket_events* se = fds.find(e.fd)->second.get(); - struct kevent ke; + struct kevent ke{}; if ((e.flags & WANT_READ) != 0) { EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); } if ((e.flags & WANT_WRITE) != 0) { EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); } } return r; @@ -139,9 +141,9 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { if (r) { struct kevent ke; EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); - kevent(kqueue_handle, &ke, 1, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueue_handle, &ke, 1, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); } return r; }