Skip to content

Commit

Permalink
Handle mqtt ping timeouts (netdata#18653)
Browse files Browse the repository at this point in the history
* Handle mqtt ping timeouts

* Increase ping timeout

* Reset ping_timeout when disconnection is requested
  • Loading branch information
stelfrag authored Oct 2, 2024
1 parent 587e836 commit 934fa2e
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 22 deletions.
30 changes: 21 additions & 9 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
int aclk_rcvd_cloud_msgs = 0;
int aclk_connection_counter = 0;
int disconnect_req = 0;

static bool aclk_connected = false;
static inline void aclk_set_connected(void) {
Expand Down Expand Up @@ -51,7 +50,8 @@ bool aclk_online_for_nodes(void) {

int aclk_ctx_based = 0;
int aclk_disable_runtime = 0;
int aclk_kill_link = 0;

ACLK_DISCONNECT_ACTION disconnect_req = ACLK_NO_DISCONNECT;

usec_t aclk_session_us = 0;
time_t aclk_session_sec = 0;
Expand Down Expand Up @@ -301,14 +301,26 @@ static int handle_connection(mqtt_wss_client client)
return 1;
}

if (disconnect_req || aclk_kill_link) {
nd_log(NDLS_DAEMON, NDLP_NOTICE,
"Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
disconnect_req ? "true" : "false",
aclk_kill_link ? "true" : "false");
if (disconnect_req != ACLK_NO_DISCONNECT) {
const char *reason;
switch (disconnect_req) {
case ACLK_CLOUD_DISCONNECT:
reason = "cloud request";
break;
case ACLK_PING_TIMEOUT:
reason = "ping timeout";
break;
case ACLK_RELOAD_CONF:
reason = "reclaim";
break;
default:
reason = "unknown";
break;
}

nd_log(NDLS_DAEMON, NDLP_NOTICE, "Going to restart connection due to \"%s\"", reason);

disconnect_req = 0;
aclk_kill_link = 0;
disconnect_req = ACLK_NO_DISCONNECT;
aclk_graceful_disconnect(client);
aclk_shared_state.mqtt_shutdown_msg_id = -1;
aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
Expand Down
9 changes: 8 additions & 1 deletion src/aclk/aclk.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
// stable for the purposes of TBEB (truncated binary exponential backoff)
#define ACLK_PUBACKS_CONN_STABLE 3

typedef enum {
ACLK_NO_DISCONNECT = 0,
ACLK_CLOUD_DISCONNECT = 1,
ACLK_RELOAD_CONF = 2,
ACLK_PING_TIMEOUT = 3
} ACLK_DISCONNECT_ACTION;

typedef enum __attribute__((packed)) {
ACLK_STATUS_CONNECTED = 0,
ACLK_STATUS_NONE,
Expand Down Expand Up @@ -62,7 +69,7 @@ extern time_t aclk_session_sec;
extern time_t aclk_block_until;

extern int aclk_connection_counter;
extern int disconnect_req;
extern ACLK_DISCONNECT_ACTION disconnect_req;

void *aclk_main(void *ptr);

Expand Down
2 changes: 1 addition & 1 deletion src/aclk/aclk_rx_msgs.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ int handle_disconnect_req(const char *msg, size_t msg_len)
"Cloud asks not to reconnect for %u seconds. We shall honor that request",
(unsigned int)cmd->reconnect_after_s);
}
disconnect_req = 1;
disconnect_req = ACLK_CLOUD_DISCONNECT;
freez(cmd->error_description);
freez(cmd);
return 0;
Expand Down
1 change: 1 addition & 0 deletions src/aclk/mqtt_websockets/mqtt_ng.c
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,7 @@ static int parse_data(struct mqtt_ng_client *client)
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
ping_timeout = 0;
break;
case MQTT_CPT_DISCONNECT:
rc = parse_disconnect_varhdr(client);
Expand Down
2 changes: 1 addition & 1 deletion src/aclk/mqtt_websockets/mqtt_ng.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#define MQTT_NG_MSGGEN_MSG_TOO_BIG 3

struct mqtt_ng_client;

extern time_t ping_timeout;
/* Converts integer to MQTT Variable Byte Integer as per 1.5.5 of MQTT 5 specs
* @param input value to be converted
* @param output pointer to memory where output will be written to. Must allow up to 4 bytes to be written.
Expand Down
28 changes: 19 additions & 9 deletions src/aclk/mqtt_websockets/mqtt_wss_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
#include "mqtt_ng.h"
#include "ws_client.h"
#include "common_internal.h"
#include "../aclk.h"

#define PIPE_READ_END 0
#define PIPE_WRITE_END 1
#define POLLFD_SOCKET 0
#define POLLFD_PIPE 1

#define PING_TIMEOUT (60) //Expect a ping response within this time (seconds)
time_t ping_timeout = 0;

#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) && (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097)
#include <openssl/conf.h>
#endif
Expand Down Expand Up @@ -744,13 +748,11 @@ static int handle_mqtt_internal(mqtt_wss_client client)
return 0;
}

#define SEC_TO_MSEC 1000
static long long int t_till_next_keepalive_ms(mqtt_wss_client client)
static int t_till_next_keepalive_ms(mqtt_wss_client client)
{
time_t last_send = mqtt_ng_last_send_time(client->mqtt);
long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC)
+ (client->mqtt_keepalive * (SEC_TO_MSEC * 0.75 /* SEND IN ADVANCE */));
return(next_mqtt_keep_alive - (time(NULL) * SEC_TO_MSEC));
time_t next_mqtt_keep_alive = last_send + client->mqtt_keepalive * 0.75;
return ((next_mqtt_keep_alive - now_realtime_sec()) * MSEC_PER_SEC);
}

#ifdef MQTT_WSS_CPUSTATS
Expand All @@ -777,10 +779,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
#endif

// Check user requested TO doesn't interfere with MQTT keep alives
long long int till_next_keep_alive = t_till_next_keepalive_ms(client);
if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
timeout_ms = till_next_keep_alive;
send_keepalive = 1;
if (!ping_timeout) {
int till_next_keep_alive = t_till_next_keepalive_ms(client);
if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
timeout_ms = till_next_keep_alive;
send_keepalive = 1;
}
}

#ifdef MQTT_WSS_CPUSTATS
Expand All @@ -802,11 +806,17 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
#endif

if (ret == 0) {
time_t now = now_realtime_sec();
if (send_keepalive) {
// otherwise we shortened the timeout ourselves to take care of
// MQTT keep alives
mqtt_ng_ping(client->mqtt);
ping_timeout = now + PING_TIMEOUT;
} else {
if (ping_timeout && ping_timeout < now) {
disconnect_req = ACLK_PING_TIMEOUT;
ping_timeout = 0;
}
// if poll timed out and user requested timeout was being used
// return here let user do his work and he will call us back soon
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/claim/claim.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ bool load_claiming_state(void) {
if (aclk_online()) {
nd_log(NDLS_DAEMON, NDLP_ERR,
"CLAIM: agent was already connected to NC - forcing reconnection under new credentials");
aclk_kill_link = 1;
disconnect_req = ACLK_RELOAD_CONF;
}
aclk_disable_runtime = 0;

Expand Down

0 comments on commit 934fa2e

Please sign in to comment.