Skip to content

Commit

Permalink
feat: error handling and client auto reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis committed Nov 22, 2024
1 parent 2dbcb65 commit d22254b
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 37 deletions.
23 changes: 12 additions & 11 deletions include/dpp/discordclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ class DPP_EXPORT discord_client : public websocket_client
*/
void disconnect_voice_internal(snowflake guild_id, bool send_json = true);

/**
* @brief Start connecting the websocket
*
* Called from the constructor, or during reconnection
*/
void start_connecting();

private:

/**
Expand All @@ -191,17 +198,6 @@ class DPP_EXPORT discord_client : public websocket_client
*/
std::deque<std::string> message_queue;

/**
* @brief Thread this shard is executing on
*/
std::thread* runner;

/**
* @brief Run shard loop under a thread.
* Calls discord_client::run() from within a std::thread.
*/
void thread_run();

/**
* @brief If true, stream compression is enabled
*/
Expand Down Expand Up @@ -499,6 +495,11 @@ class DPP_EXPORT discord_client : public websocket_client
*/
void run();

/**
* @brief Called when the HTTP socket is closed
*/
virtual void on_disconnect();

/**
* @brief Connect to a voice channel
*
Expand Down
9 changes: 7 additions & 2 deletions include/dpp/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct DPP_EXPORT thread_pool_task {
};

struct DPP_EXPORT thread_pool_task_comparator {
bool operator()(const thread_pool_task &a, const thread_pool_task &b) {
bool operator()(const thread_pool_task &a, const thread_pool_task &b) const {
return a.priority < b.priority;
};
};
Expand All @@ -57,7 +57,12 @@ struct DPP_EXPORT thread_pool {
std::condition_variable cv;
bool stop{false};

explicit thread_pool(size_t num_threads = std::thread::hardware_concurrency());
/**
* @brief Create a new priority thread pool
* @param creator creating cluster (for logging)
* @param num_threads number of threads in the pool
*/
explicit thread_pool(class cluster* creator, size_t num_threads = std::thread::hardware_concurrency());
~thread_pool();
void enqueue(thread_pool_task task);
};
Expand Down
5 changes: 5 additions & 0 deletions include/dpp/wsclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ class DPP_EXPORT websocket_client : public ssl_client {
* This indicates graceful close.
*/
void send_close_packet();

/**
* @brief Called on HTTP socket closure
*/
virtual void on_disconnect();
};

}
2 changes: 1 addition & 1 deletion src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ cluster::cluster(const std::string &_token, uint32_t _intents, uint32_t _shards,
numshards(_shards), cluster_id(_cluster_id), maxclusters(_maxclusters), rest_ping(0.0), cache_policy(policy), ws_mode(ws_json)
{
socketengine = create_socket_engine(this);
pool = std::make_unique<thread_pool>(request_threads);
pool = std::make_unique<thread_pool>(this, request_threads);
/* Instantiate REST request queues */
try {
rest = new request_queue(this, request_threads);
Expand Down
32 changes: 18 additions & 14 deletions src/dpp/discordclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ thread_local static std::string last_ping_message;

discord_client::discord_client(dpp::cluster* _cluster, uint32_t _shard_id, uint32_t _max_shards, const std::string &_token, uint32_t _intents, bool comp, websocket_protocol_t ws_proto)
: websocket_client(_cluster, _cluster->default_gateway, "443", comp ? (ws_proto == ws_json ? PATH_COMPRESSED_JSON : PATH_COMPRESSED_ETF) : (ws_proto == ws_json ? PATH_UNCOMPRESSED_JSON : PATH_UNCOMPRESSED_ETF)),
terminating(false),
runner(nullptr),
terminating(false),
compressed(comp),
decomp_buffer(nullptr),
zlib(nullptr),
Expand All @@ -90,6 +89,10 @@ discord_client::discord_client(dpp::cluster* _cluster, uint32_t _shard_id, uint3
protocol(ws_proto),
resume_gateway_url(_cluster->default_gateway)
{
start_connecting();
}

void discord_client::start_connecting() {
try {
zlib = new zlibcontext();
etf = new etf_parser();
Expand All @@ -112,10 +115,6 @@ discord_client::discord_client(dpp::cluster* _cluster, uint32_t _shard_id, uint3
void discord_client::cleanup()
{
terminating = true;
if (runner) {
runner->join();
delete runner;
}
delete etf;
delete zlib;
}
Expand All @@ -125,6 +124,19 @@ discord_client::~discord_client()
cleanup();
}

void discord_client::on_disconnect()
{
set_resume_hostname();
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting in 5 seconds...");
owner->start_timer([this](auto handle) {
owner->stop_timer(handle);
cleanup();
terminating = false;
start_connecting();
run();
}, 5);
}

uint64_t discord_client::get_decompressed_bytes_in()
{
return decompressed_total;
Expand Down Expand Up @@ -159,20 +171,12 @@ void discord_client::set_resume_hostname()
hostname = resume_gateway_url;
}

void discord_client::thread_run()
{
}

void discord_client::run()
{
// TODO: This only runs once. Replace the reconnect mechanics.
// To make this work, we will need to intercept errors.
setup_zlib();
ready = false;
message_queue.clear();
ssl_client::read_loop();
//ssl_client::close();
//end_zlib();
}

bool discord_client::handle_frame(const std::string &buffer, ws_opcode opcode)
Expand Down
15 changes: 12 additions & 3 deletions src/dpp/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
#include <dpp/utility.h>
#include <dpp/thread_pool.h>
#include <shared_mutex>
#include <dpp/cluster.h>

namespace dpp {

thread_pool::thread_pool(size_t num_threads) {
thread_pool::thread_pool(cluster* creator, size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back([this, i]() {
threads.emplace_back([this, i, creator]() {
dpp::utility::set_thread_name("pool/exec/" + std::to_string(i));
while (true) {
thread_pool_task task;
Expand All @@ -47,7 +48,15 @@ thread_pool::thread_pool(size_t num_threads) {
tasks.pop();
}

task.function();
try {
task.function();
}
catch (const std::exception &e) {
creator->log(ll_warning, "Uncaught exception in thread pool: " + std::string(e.what()));
}
catch (...) {
creator->log(ll_warning, "Uncaught exception in thread pool, but not derived from std::exception!");
}
}
});
}
Expand Down
5 changes: 5 additions & 0 deletions src/dpp/wsclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,13 @@ void websocket_client::error(uint32_t errorcode)
{
}

void websocket_client::on_disconnect()
{
}

void websocket_client::close()
{
this->on_disconnect();
this->state = HTTP_HEADERS;
ssl_client::close();
}
Expand Down
6 changes: 0 additions & 6 deletions src/soaktest/soak.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ int main() {
});
soak_test.start(dpp::st_return);

dpp::https_client c2(&soak_test, "github.com", 80, "/", "GET", "", {}, true, 2, "1.1", [](dpp::https_client* c2) {
std::string hdr2 = c2->get_header("location");
std::string content2 = c2->get_content();
std::cout << "hdr2 == " << hdr2 << " ? https://github.com/ status = " << c2->get_status() << "\n";
});

while (true) {
std::this_thread::sleep_for(60s);
dpp::discord_client* dc = soak_test.get_shard(0);
Expand Down

0 comments on commit d22254b

Please sign in to comment.