From 5a06ebc9fca25817f737a45b593443fe822e4dbd Mon Sep 17 00:00:00 2001 From: Craig Edwards Date: Thu, 12 Dec 2024 02:17:52 +0000 Subject: [PATCH] fix: immediately delete fds that want deletion after we are done firing their events rather than prune() loop fix: replace fds that already exist in the set. emplace would fail. --- src/dpp/cluster.cpp | 3 ++- src/dpp/discordclient.cpp | 1 + src/dpp/socketengine.cpp | 29 +++++++++-------------------- src/dpp/socketengines/epoll.cpp | 18 ++++++++++-------- src/dpp/socketengines/kqueue.cpp | 23 +++++++++++++---------- src/dpp/socketengines/poll.cpp | 28 +++++++++++++--------------- src/dpp/sslclient.cpp | 1 + src/dpp/wsclient.cpp | 1 + 8 files changed, 50 insertions(+), 54 deletions(-) diff --git a/src/dpp/cluster.cpp b/src/dpp/cluster.cpp index 3ad3553d6c..05665cf6db 100644 --- a/src/dpp/cluster.cpp +++ b/src/dpp/cluster.cpp @@ -199,7 +199,7 @@ dpp::utility::uptime cluster::uptime() } void cluster::add_reconnect(uint32_t shard_id) { - reconnections[shard_id] =time(nullptr) + 5; + reconnections[shard_id] = time(nullptr) + 5; log(ll_trace, "Reconnecting shard " + std::to_string(shard_id) + " in 5 seconds..."); } @@ -208,6 +208,7 @@ void cluster::start(start_type return_after) { auto event_loop = [this]() -> void { auto reconnect_monitor = start_timer([this](auto t) { time_t now = time(nullptr); + log(ll_trace, "Ticking reconnect monitor with " + std::to_string(reconnections.size()) + " queued reconnections"); for (auto reconnect = reconnections.begin(); reconnect != reconnections.end(); ++reconnect) { auto shard_id = reconnect->first; auto shard_reconnect_time = reconnect->second; diff --git a/src/dpp/discordclient.cpp b/src/dpp/discordclient.cpp index 8239af4b04..f45ed0de93 100644 --- a/src/dpp/discordclient.cpp +++ b/src/dpp/discordclient.cpp @@ -132,6 +132,7 @@ discord_client::~discord_client() void discord_client::on_disconnect() { + log(ll_trace, "discord_client::on_disconnect()"); set_resume_hostname(); if (sfd != INVALID_SOCKET) { log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting..."); diff --git a/src/dpp/socketengine.cpp b/src/dpp/socketengine.cpp index 4b8730ee0a..762dbcf9e5 100644 --- a/src/dpp/socketengine.cpp +++ b/src/dpp/socketengine.cpp @@ -36,8 +36,10 @@ bool socket_engine_base::register_socket(const socket_events &e) { if (e.fd != INVALID_SOCKET && i == fds.end()) { fds.emplace(e.fd, std::make_unique(e)); return true; - } else if (e.fd != INVALID_SOCKET && i != fds.end()) { - this->remove_socket(e.fd); + } + if (e.fd != INVALID_SOCKET && i != fds.end()) { + remove_socket(e.fd); + fds.erase(i); fds.emplace(e.fd, std::make_unique(e)); return true; } @@ -75,37 +77,25 @@ time_t last_time = time(nullptr); socket_events* socket_engine_base::get_fd(dpp::socket fd) { std::unique_lock lock(fds_mutex); auto iter = fds.find(fd); - if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) { + if (iter == fds.end()) { return nullptr; } return iter->second.get(); } void socket_engine_base::prune() { - if (to_delete_count > 0) { - std::unique_lock lock(fds_mutex); - for (auto it = fds.cbegin(); it != fds.cend();) { - if ((it->second->flags & WANT_DELETION) != 0L) { - remove_socket(it->second->fd); - it = fds.erase(it); - } else { - ++it; - } - } - to_delete_count = 0; - } if (time(nullptr) != last_time) { try { - /* Every minute, rehash all cache containers. - * We do this from the socket engine now, not from - * shard 0, so no need to run shards to have timers! - */ owner->tick_timers(); } catch (const std::exception& e) { owner->log(dpp::ll_error, "Uncaught exception in tick_timers: " + std::string(e.what())); } if ((time(nullptr) % 60) == 0) { + /* Every minute, rehash all cache containers. + * We do this from the socket engine now, not from + * shard 0, so no need to run shards to have timers! + */ dpp::garbage_collection(); } @@ -120,7 +110,6 @@ bool socket_engine_base::delete_socket(dpp::socket fd) { return false; } iter->second->flags |= WANT_DELETION; - to_delete_count++; return true; } diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index 648e28ed0d..ed601878a8 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -82,11 +82,11 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { const int fd = ev.data.fd; auto eh = get_fd(fd); - if (!eh || fd == INVALID_SOCKET) { + if (eh == nullptr || fd == INVALID_SOCKET) { continue; } - try { + if ((eh->flags & WANT_DELETION) == 0L) try { if ((ev.events & EPOLLHUP) != 0U) { if (eh->on_error) { @@ -125,6 +125,11 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { owner->log(ll_trace, "Socket loop exception: " + std::string(e.what())); eh->on_error(fd, *eh, 0); } + + if ((eh->flags & WANT_DELETION) != 0L) { + remove_socket(fd); + fds.erase(fd); + } } prune(); } @@ -172,12 +177,9 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { protected: bool remove_socket(dpp::socket fd) final { - bool r = socket_engine_base::remove_socket(fd); - if (r) { - struct epoll_event ev{}; - epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev); - } - return r; + struct epoll_event ev{}; + epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev); + return true; } }; diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index 3c3acf16c5..77fd81d385 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -72,7 +72,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { continue; } - try { + if ((eh->flags & WANT_DELETION) == 0L) try { const short filter = kev.filter; if (kev.flags & EV_EOF || kev.flags & EV_ERROR) { @@ -98,6 +98,12 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { owner->log(ll_trace, "Socket loop exception: " + std::string(e.what())); eh->on_error(kev.ident, *eh, 0); } + + + if ((eh->flags & WANT_DELETION) != 0L) { + remove_socket(kev.ident); + fds.erase(kev.ident); + } } prune(); } @@ -131,15 +137,12 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { protected: bool remove_socket(dpp::socket fd) final { - bool r = socket_engine_base::remove_socket(fd); - if (r) { - struct kevent ke{}; - EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); - kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); - EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); - } - return r; + struct kevent ke{}; + EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); + EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); + return true; } }; diff --git a/src/dpp/socketengines/poll.cpp b/src/dpp/socketengines/poll.cpp index a336a20954..beb168ed9e 100644 --- a/src/dpp/socketengines/poll.cpp +++ b/src/dpp/socketengines/poll.cpp @@ -90,17 +90,12 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { processed++; } - auto iter = fds.find(fd); - if (iter == fds.end()) { - continue; - } - socket_events *eh = iter->second.get(); - + socket_events *eh = get_fd(fd); if (eh == nullptr || eh->flags & WANT_DELETION) { continue; } - try { + if ((eh->flags & WANT_DELETION) == 0L) try { if ((revents & POLLHUP) != 0) { eh->on_error(fd, *eh, 0); @@ -130,6 +125,12 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { } catch (const std::exception &e) { eh->on_error(fd, *eh, 0); } + + if ((eh->flags & WANT_DELETION) != 0L) { + remove_socket(fd); + std::unique_lock lock(fds_mutex); + fds.erase(fd); + } } } @@ -184,14 +185,11 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { protected: bool remove_socket(dpp::socket fd) final { - bool r = socket_engine_base::remove_socket(fd); - if (r) { - std::unique_lock lock(poll_set_mutex); - for (auto i = poll_set.begin(); i != poll_set.end(); ++i) { - if (i->fd == fd) { - poll_set.erase(i); - return true; - } + std::unique_lock lock(poll_set_mutex); + for (auto i = poll_set.begin(); i != poll_set.end(); ++i) { + if (i->fd == fd) { + poll_set.erase(i); + return true; } } return false; diff --git a/src/dpp/sslclient.cpp b/src/dpp/sslclient.cpp index fcc31c05ab..c7c9a15c02 100644 --- a/src/dpp/sslclient.cpp +++ b/src/dpp/sslclient.cpp @@ -579,6 +579,7 @@ void ssl_client::close() last_tick = time(nullptr); bytes_in = bytes_out = 0; if (sfd != INVALID_SOCKET) { + log(ll_trace, "ssl_client::close() with sfd"); owner->socketengine->delete_socket(sfd); close_socket(sfd); sfd = INVALID_SOCKET; diff --git a/src/dpp/wsclient.cpp b/src/dpp/wsclient.cpp index a87239d011..d4e6d749fb 100644 --- a/src/dpp/wsclient.cpp +++ b/src/dpp/wsclient.cpp @@ -370,6 +370,7 @@ void websocket_client::on_disconnect() void websocket_client::close() { + log(ll_trace, "websocket_client::close()"); this->on_disconnect(); this->state = HTTP_HEADERS; ssl_client::close();