Skip to content

Commit

Permalink
fix reconnection of shards by creating new shards on reconnect, with …
Browse files Browse the repository at this point in the history
…a management timer to handle reconnections to stop reconnect storms
  • Loading branch information
braindigitalis committed Dec 7, 2024
1 parent a8b006b commit e2771ad
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 46 deletions.
13 changes: 13 additions & 0 deletions include/dpp/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class DPP_EXPORT cluster {
*/
shard_list shards;

/**
* @brief List of shards waiting for reconnection
*/
reconnect_list reconnections;

/**
* @brief Ephemeral list of deleted timer ids
*/
Expand Down Expand Up @@ -182,6 +187,14 @@ class DPP_EXPORT cluster {
*/
std::mutex timer_guard;

/**
* @brief Mark a shard as requiring reconnection.
* Destructs the old shard in 5 seconds and creates a new one attempting to resume.
*
* @param shard_id Shard ID
*/
void add_reconnect(uint32_t shard_id);

public:
/**
* @brief Current bot token for all shards on this cluster and all commands sent via HTTP
Expand Down
19 changes: 3 additions & 16 deletions include/dpp/discordclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,6 @@ class DPP_EXPORT discord_client : public websocket_client
*/
void start_connecting();

/**
* @brief Timer for use when reconnecting.
*
* The client will wait 5 seconds before retrying a connection, to comply
* with Discord rate limiting for websocket connections.
*/
timer reconnect_timer{0};

/**
* @brief Stores the most recent ping message on this shard, which we check
* for to monitor latency
Expand Down Expand Up @@ -410,11 +402,6 @@ class DPP_EXPORT discord_client : public websocket_client
*/
uint32_t max_shards;

/**
* @brief Thread ID
*/
std::thread::native_handle_type thread_id;

/**
* @brief Last sequence number received, for resumes and pings
*/
Expand Down Expand Up @@ -583,8 +570,10 @@ class DPP_EXPORT discord_client : public websocket_client
* other object, along with the seq number.
*
* @param old Previous connection to resume from
* @param sequence Sequence number of previous session
* @param session_id Session ID of previous session
*/
explicit discord_client(discord_client& old);
explicit discord_client(discord_client& old, uint64_t sequence, const std::string& session_id);

/**
* @brief Destroy the discord client object
Expand Down Expand Up @@ -613,8 +602,6 @@ class DPP_EXPORT discord_client : public websocket_client

/**
* @brief Start and monitor I/O loop.
* @note this is a blocking call and is usually executed within a
* thread by whatever creates the object.
*/
void run();

Expand Down
5 changes: 5 additions & 0 deletions include/dpp/restresults.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ namespace dpp {
*/
typedef std::map<uint32_t, class discord_client*> shard_list;

/**
* @brief List of shards awaiting reconnection, by id with earliest possible reconnect time
*/
typedef std::map<uint32_t, time_t> reconnect_list;

/**
* @brief Represents the various information from the 'get gateway bot' api call
*/
Expand Down
36 changes: 36 additions & 0 deletions src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,48 @@ dpp::utility::uptime cluster::uptime()
return dpp::utility::uptime(time(nullptr) - start_time);
}

void cluster::add_reconnect(uint32_t shard_id) {
reconnections.emplace(shard_id, time(nullptr));
}

void cluster::start(start_type return_after) {

auto event_loop = [this]() -> void {
auto reconnect_monitor = start_timer([this](auto t) {
time_t now = time(nullptr);
for (auto reconnect = reconnections.begin(); reconnect != reconnections.end(); ++reconnect) {
auto shard_id = reconnect->first;
auto shard_reconnect_time = reconnect->second;
if (now >= shard_reconnect_time) {
/* This shard needs to be reconnected */
reconnections.erase(reconnect);
discord_client* old = shards[shard_id];
/* These values must be copied to the new connection
* to attempt to resume it
*/
auto seq_no = old->last_seq;
auto session_id = old->sessionid;
log(ll_info, "Reconnecting shard " + std::to_string(shard_id));
/* Make a new resumed connection based off the old one */
shards[shard_id] = new discord_client(*old, seq_no, session_id);
/* Delete the old one */
delete old;
/* Set up the new shard's IO events */
shards[shard_id]->run();
/* It is not possible to reconnect another shard within the same 5-second window,
* due to discords strict rate limiting on shard connections, so we bail out here
* and only try another reconnect in the next timer interval. Do not try and make
* this support multiple reconnects per loop iteration or Discord will smack us
* with the rate limiting clue-by-four.
*/
return;
}
}
}, 5);
while (!this->terminating && socketengine.get()) {
socketengine->process_events();
}
stop_timer(reconnect_monitor);
};

if (on_guild_member_add && !(intents & dpp::i_guild_members)) {
Expand Down
65 changes: 35 additions & 30 deletions src/dpp/discordclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ class zlibcontext {
z_stream d_stream;
};


/**
* @brief Resume constructor for websocket client
*/
discord_client::discord_client(discord_client &old, uint64_t sequence, const std::string& session_id)
: websocket_client(old.owner, old.resume_gateway_url, "443", old.compressed ? (old.protocol == ws_json ? PATH_COMPRESSED_JSON : PATH_COMPRESSED_ETF) : (old.protocol == ws_json ? PATH_UNCOMPRESSED_JSON : PATH_UNCOMPRESSED_ETF)),
compressed(old.compressed),
zlib(nullptr),
decompressed_total(old.decompressed_total),
connect_time(0),
ping_start(0.0),
etf(nullptr),
creator(old.owner),
heartbeat_interval(0),
last_heartbeat(time(nullptr)),
shard_id(old.shard_id),
max_shards(old.max_shards),
last_seq(sequence),
token(old.token),
intents(old.intents),
sessionid(session_id),
resumes(old.resumes),
reconnects(old.reconnects),
websocket_ping(old.websocket_ping),
ready(false),
last_heartbeat_ack(time(nullptr)),
protocol(old.protocol),
resume_gateway_url(old.resume_gateway_url)
{
etf = std::make_unique<etf_parser>(etf_parser());
start_connecting();
}

discord_client::discord_client(dpp::cluster* _cluster, uint32_t _shard_id, uint32_t _max_shards, const std::string &_token, uint32_t _intents, bool comp, websocket_protocol_t ws_proto)
: websocket_client(_cluster, _cluster->default_gateway, "443", comp ? (ws_proto == ws_json ? PATH_COMPRESSED_JSON : PATH_COMPRESSED_ETF) : (ws_proto == ws_json ? PATH_UNCOMPRESSED_JSON : PATH_UNCOMPRESSED_ETF)),
compressed(comp),
Expand Down Expand Up @@ -99,38 +132,10 @@ discord_client::~discord_client()

void discord_client::on_disconnect()
{
if (reconnect_timer != 0U) {
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnection already in progress...");
return;
}
set_resume_hostname();
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting in 5 seconds...");
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting...");
ssl_client::close();
/* Prevent low level connect retries here, as we are handling it ourselves */
connect_retries = MAX_RETRIES + 1;
end_zlib();
/* Stop the timer first if its already ticking, to prevent concurrent reconnects */
reconnect_timer = owner->start_timer([this](auto handle) {
log(dpp::ll_debug, "Reconnecting shard " + std::to_string(shard_id) + " to wss://" + hostname + "...");
try {
if (timer_handle) {
owner->stop_timer(timer_handle);
timer_handle = 0;
}
start = time(nullptr);
ssl_client::connect();
start_connecting();
run();
owner->stop_timer(handle);
reconnect_timer = 0;
}
catch (const std::exception &e) {
/* If we get here, the timer will tick again */
ssl_client::close();
end_zlib();
log(dpp::ll_debug, "Error reconnecting shard " + std::to_string(shard_id) + ": " + std::string(e.what()) + "; Retry in 5 seconds...");
}
}, 5);
owner->add_reconnect(this->shard_id);
}

uint64_t discord_client::get_decompressed_bytes_in()
Expand Down

0 comments on commit e2771ad

Please sign in to comment.