Skip to content

Commit

Permalink
feat: improved timer system, fix for runaway cpu usage due to connect…
Browse files Browse the repository at this point in the history
…() retry, fix fd and memory leak in httpsclient
  • Loading branch information
braindigitalis committed Dec 4, 2024
1 parent 8b83af2 commit e6f0d50
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 90 deletions.
8 changes: 4 additions & 4 deletions include/dpp/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ class DPP_EXPORT cluster {
shard_list shards;

/**
* @brief List of all active registered timers
* @brief Ephemeral list of deleted timer ids
*/
timer_reg_t timer_list;
timers_deleted_t deleted_timers;

/**
* @brief List of timers by time
* @brief Priority queue of of timers by time
*/
timer_next_t next_timer;

Expand Down Expand Up @@ -172,7 +172,7 @@ class DPP_EXPORT cluster {
*
* @param t Timer to reschedule
*/
void timer_reschedule(timer_t t);
void timer_reschedule(timer_t& t);

/**
* @brief Thread pool
Expand Down
17 changes: 15 additions & 2 deletions include/dpp/sslclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,25 @@ DPP_EXPORT bool close_socket(dpp::socket sfd);
*/
DPP_EXPORT bool set_nonblocking(dpp::socket sockfd, bool non_blocking);

/* You'd think that we would get better performance with a bigger buffer, but SSL frames are 16k each.
/**
* @brief SSL_read buffer size
*
* You'd think that we would get better performance with a bigger buffer, but SSL frames are 16k each.
* SSL_read in non-blocking mode will only read 16k at a time. There's no point in a bigger buffer as
* it'd go unused.
*/
constexpr uint16_t DPP_BUFSIZE{16 * 1024};

/* Represents a failed socket system call, e.g. connect() failure */
/**
* @brief Represents a failed socket system call, e.g. connect() failure
*/
constexpr int ERROR_STATUS{-1};

/**
* @brief Maximum number of internal connect() retries on TCP connections
*/
constexpr int MAX_RETRIES{4};


/**
* @brief Implements a simple non-blocking SSL stream client.
Expand All @@ -91,6 +101,9 @@ class DPP_EXPORT ssl_client
*/
void cleanup();

/**
* @brief Mutex for creation of internal SSL pointers by openssl
*/
std::mutex ssl_mutex;

/**
Expand Down
2 changes: 1 addition & 1 deletion include/dpp/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct DPP_EXPORT thread_pool_task {

struct DPP_EXPORT thread_pool_task_comparator {
bool operator()(const thread_pool_task &a, const thread_pool_task &b) const {
return a.priority < b.priority;
return a.priority > b.priority;
};
};

Expand Down
27 changes: 18 additions & 9 deletions include/dpp/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <unordered_map>
#include <cstddef>
#include <ctime>
#include <set>
#include <queue>
#include <functional>

namespace dpp {
Expand All @@ -50,39 +52,46 @@ struct DPP_EXPORT timer_t {
/**
* @brief Timer handle
*/
timer handle;
timer handle{0};

/**
* @brief Next timer tick as unix epoch
*/
time_t next_tick;
time_t next_tick{0};

/**
* @brief Frequency between ticks
*/
uint64_t frequency;
uint64_t frequency{0};

/**
* @brief Lambda to call on tick
*/
timer_callback_t on_tick;
timer_callback_t on_tick{};

/**
* @brief Lambda to call on stop (optional)
*/
timer_callback_t on_stop;
timer_callback_t on_stop{};
};

struct DPP_EXPORT timer_comparator {
bool operator()(const timer_t &a, const timer_t &b) const {
return a.next_tick > b.next_tick;
};
};


/**
* @brief A map of timers, ordered by earliest first so that map::begin() is always the
* @brief A priority timers, ordered by earliest first so that the head is always the
* soonest to be due.
*/
typedef std::multimap<time_t, timer_t> timer_next_t;
typedef std::priority_queue<timer_t, std::vector<timer_t>, timer_comparator> timer_next_t;

/**
* @brief A map of timers stored by handle
* @brief A set of deleted timer handles
*/
typedef std::unordered_map<timer, timer_t> timer_reg_t;
typedef std::set<timer> timers_deleted_t;

/**
* @brief Trigger a timed event once.
Expand Down
3 changes: 1 addition & 2 deletions src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,7 @@ void cluster::shutdown() {

{
std::lock_guard<std::mutex> l(timer_guard);
timer_list.clear();
next_timer.clear();
next_timer = {};
}

/* Terminate shards */
Expand Down
118 changes: 50 additions & 68 deletions src/dpp/cluster/timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,89 +35,71 @@ timer cluster::start_timer(timer_callback_t on_tick, uint64_t frequency, timer_c
newtimer.on_tick = on_tick;
newtimer.on_stop = on_stop;
newtimer.frequency = frequency;
timer_list[newtimer.handle] = newtimer;
next_timer.emplace(newtimer.next_tick, newtimer);

next_timer.emplace(newtimer);

return newtimer.handle;
}

bool cluster::stop_timer(timer t) {
/*
* Because iterating a priority queue is O(log n) we don't actually walk the queue
* looking for the timer to remove. Instead, we just insert the timer handle into a std::set
* to inform the tick_timers() function later if it sees a handle in this set, it is to
* have its on_stop() called and it is not to be rescheduled.
*/
std::lock_guard<std::mutex> l(timer_guard);

auto i = timer_list.find(t);
if (i != timer_list.end()) {
timer_t timer_current = i->second;
if (timer_current.on_stop) {
/* If there is an on_stop event, call it */
timer_current.on_stop(t);
}
timer_list.erase(i);
bool again;
do {
again = false;
for (auto this_timer = next_timer.begin(); this_timer != next_timer.end(); ++this_timer) {
if (this_timer->second.handle == t) {
next_timer.erase(this_timer);
again = true;
break;
}
}
} while(again);
return true;
}
return false;
deleted_timers.emplace(t);
return true;
}

void cluster::timer_reschedule(timer_t t) {
std::lock_guard<std::mutex> l(timer_guard);
for (auto i = next_timer.begin(); i != next_timer.end(); ++i) {
/* Rescheduling the timer means finding it in the next tick map.
* It should be pretty much near the start of the map so this loop
* should only be at most a handful of iterations.
*/
if (i->second.handle == t.handle) {
next_timer.erase(i);
t.next_tick = time(nullptr) + t.frequency;
next_timer.emplace(t.next_tick, t);
break;
}
}
void cluster::timer_reschedule(timer_t& t) {

}

void cluster::tick_timers() {
std::vector<timer_t> scheduled;
{
time_t now = time(nullptr);
std::lock_guard<std::mutex> l(timer_guard);
for (auto & i : next_timer) {
if (now >= i.second.next_tick) {
scheduled.push_back(i.second);
} else {
/* The first time we encounter an entry which is not due,
* we can bail out, because std::map is ordered storage, so
* we know at this point no more will match either.
*/
time_t now = time(nullptr);
time_t time_frame{};
if (next_timer.empty()) {
return;
}
do {
timer_t cur_timer;
{
std::lock_guard<std::mutex> l(timer_guard);
cur_timer = next_timer.top();
if (cur_timer.next_tick > now) {
/* Nothing to do */
break;
}
next_timer.pop();
}
}
for (auto & t : scheduled) {
timer handle = t.handle;
/* Call handler */
t.on_tick(handle);
/* Reschedule if it wasn't deleted.
* Note: We wrap the .contains() check in a lambda as it needs locking
* for thread safety, but timer_rescheudle also locks the container, so this
* is the cleanest way to do it.
*/
bool not_deleted = ([handle, this]() -> bool {
std::lock_guard<std::mutex> l(timer_guard);
return timer_list.find(handle) != timer_list.end();
}());
if (not_deleted) {
timer_reschedule(t);
timers_deleted_t::iterator deleted_iter{};
bool deleted{};
{
deleted_iter = deleted_timers.find(cur_timer.handle);
deleted = deleted_iter != deleted_timers.end();
}
}

if (!deleted) {
cur_timer.on_tick(cur_timer.handle);
cur_timer.next_tick += cur_timer.frequency;
{
std::lock_guard<std::mutex> l(timer_guard);
next_timer.emplace(cur_timer);
}
} else {
/* Deleted timers are not reinserted into the priority queue and their on_stop is called */
if (cur_timer.on_stop) {
cur_timer.on_stop(cur_timer.handle);
}
{
std::lock_guard<std::mutex> l(timer_guard);
deleted_timers.erase(deleted_iter);
}
}

} while (true);
}

#ifdef DPP_CORO
Expand Down
2 changes: 2 additions & 0 deletions src/dpp/discordclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ void discord_client::on_disconnect()
set_resume_hostname();
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting in 5 seconds...");
ssl_client::close();
/* Prevent low level connect retries here, as we are handling it ourselves */
connect_retries = MAX_RETRIES + 1;
end_zlib();
/* Stop the timer first if its already ticking, to prevent concurrent reconnects */
reconnect_timer = owner->start_timer([this](auto handle) {
Expand Down
4 changes: 2 additions & 2 deletions src/dpp/httpsclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ void https_client::close() {
completed(this);
completed = {};
}
state = HTTPS_DONE;
ssl_client::close();
}
state = HTTPS_DONE;
ssl_client::close();
}

https_client::~https_client() {
Expand Down
5 changes: 5 additions & 0 deletions src/dpp/queues.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor

/* Transfer it to completed requests */
owner->queue_work(0, [this, result]() {
/* Manually release this unique_ptr now, to keep memory consumption and file descriptor consumption low.
* Note we do this BEFORE we call complete(), becasue if user code throws an exception we need to be sure
* we freed this first to avoid dangling pointer leaks.
*/
this->cli.reset();
complete(result);
});
}
Expand Down
7 changes: 6 additions & 1 deletion src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ namespace dpp {

bool socket_engine_base::register_socket(const socket_events &e) {
std::unique_lock lock(fds_mutex);
if (e.fd != INVALID_SOCKET && fds.find(e.fd) == fds.end()) {
auto i = fds.find(e.fd);
if (e.fd != INVALID_SOCKET && i == fds.end()) {
fds.emplace(e.fd, std::make_unique<socket_events>(e));
return true;
} else if (e.fd != INVALID_SOCKET && i != fds.end()) {
this->remove_socket(e.fd);
fds.emplace(e.fd, std::make_unique<socket_events>(e));
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/dpp/sslclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ void ssl_client::read_loop()
if (!timer_handle) {
timer_handle = owner->start_timer([this, setup_events](auto handle) {
one_second_timer();
if (!tcp_connect_done && time(nullptr) > start + 2 && connect_retries < 3) {
if (!tcp_connect_done && time(nullptr) > start + 2 && connect_retries < MAX_RETRIES) {
/* Retry failed connect(). This can happen even in the best situation with bullet-proof hosting.
* Previously with blocking connect() there was some leniency in this, but now we have to do this
* ourselves.
Expand Down

0 comments on commit e6f0d50

Please sign in to comment.