diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c index 41f26ded5904b9..2f8e8b703b6a8f 100644 --- a/src/aclk/aclk.c +++ b/src/aclk/aclk.c @@ -23,7 +23,6 @@ int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. int aclk_rcvd_cloud_msgs = 0; int aclk_connection_counter = 0; -int disconnect_req = 0; static bool aclk_connected = false; static inline void aclk_set_connected(void) { @@ -51,7 +50,8 @@ bool aclk_online_for_nodes(void) { int aclk_ctx_based = 0; int aclk_disable_runtime = 0; -int aclk_kill_link = 0; + +ACLK_DISCONNECT_ACTION disconnect_req = ACLK_NO_DISCONNECT; usec_t aclk_session_us = 0; time_t aclk_session_sec = 0; @@ -301,14 +301,26 @@ static int handle_connection(mqtt_wss_client client) return 1; } - if (disconnect_req || aclk_kill_link) { - nd_log(NDLS_DAEMON, NDLP_NOTICE, - "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", - disconnect_req ? "true" : "false", - aclk_kill_link ? "true" : "false"); + if (disconnect_req != ACLK_NO_DISCONNECT) { + const char *reason; + switch (disconnect_req) { + case ACLK_CLOUD_DISCONNECT: + reason = "cloud request"; + break; + case ACLK_PING_TIMEOUT: + reason = "ping timeout"; + break; + case ACLK_RELOAD_CONF: + reason = "reclaim"; + break; + default: + reason = "unknown"; + break; + } + + nd_log(NDLS_DAEMON, NDLP_NOTICE, "Going to restart connection due to \"%s\"", reason); - disconnect_req = 0; - aclk_kill_link = 0; + disconnect_req = ACLK_NO_DISCONNECT; aclk_graceful_disconnect(client); aclk_shared_state.mqtt_shutdown_msg_id = -1; aclk_shared_state.mqtt_shutdown_msg_rcvd = 0; diff --git a/src/aclk/aclk.h b/src/aclk/aclk.h index b8e719bc87fcc9..45a2eac85494f7 100644 --- a/src/aclk/aclk.h +++ b/src/aclk/aclk.h @@ -11,6 +11,13 @@ // stable for the purposes of TBEB (truncated binary exponential backoff) #define ACLK_PUBACKS_CONN_STABLE 3 +typedef enum { + ACLK_NO_DISCONNECT = 0, + ACLK_CLOUD_DISCONNECT = 1, + ACLK_RELOAD_CONF = 2, + ACLK_PING_TIMEOUT = 3 +} ACLK_DISCONNECT_ACTION; + typedef enum __attribute__((packed)) { ACLK_STATUS_CONNECTED = 0, ACLK_STATUS_NONE, @@ -62,7 +69,7 @@ extern time_t aclk_session_sec; extern time_t aclk_block_until; extern int aclk_connection_counter; -extern int disconnect_req; +extern ACLK_DISCONNECT_ACTION disconnect_req; void *aclk_main(void *ptr); diff --git a/src/aclk/aclk_rx_msgs.c b/src/aclk/aclk_rx_msgs.c index ce517048c2c93a..36bd3599d622fe 100644 --- a/src/aclk/aclk_rx_msgs.c +++ b/src/aclk/aclk_rx_msgs.c @@ -407,7 +407,7 @@ int handle_disconnect_req(const char *msg, size_t msg_len) "Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); } - disconnect_req = 1; + disconnect_req = ACLK_CLOUD_DISCONNECT; freez(cmd->error_description); freez(cmd); return 0; diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c index daf7931151bdec..6026a1e55ef67c 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.c +++ b/src/aclk/mqtt_websockets/mqtt_ng.c @@ -1804,6 +1804,7 @@ static int parse_data(struct mqtt_ng_client *client) return MQTT_NG_CLIENT_PROTOCOL_ERROR; } parser->state = MQTT_PARSE_MQTT_PACKET_DONE; + ping_timeout = 0; break; case MQTT_CPT_DISCONNECT: rc = parse_disconnect_varhdr(client); diff --git a/src/aclk/mqtt_websockets/mqtt_ng.h b/src/aclk/mqtt_websockets/mqtt_ng.h index 1661f540e2c795..c5f6d94ccf9dbe 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.h +++ b/src/aclk/mqtt_websockets/mqtt_ng.h @@ -10,7 +10,7 @@ #define MQTT_NG_MSGGEN_MSG_TOO_BIG 3 struct mqtt_ng_client; - +extern time_t ping_timeout; /* Converts integer to MQTT Variable Byte Integer as per 1.5.5 of MQTT 5 specs * @param input value to be converted * @param output pointer to memory where output will be written to. Must allow up to 4 bytes to be written. diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c index 2b2c972bb7fe5d..92c489905a4583 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_client.c +++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c @@ -9,12 +9,16 @@ #include "mqtt_ng.h" #include "ws_client.h" #include "common_internal.h" +#include "../aclk.h" #define PIPE_READ_END 0 #define PIPE_WRITE_END 1 #define POLLFD_SOCKET 0 #define POLLFD_PIPE 1 +#define PING_TIMEOUT (60) //Expect a ping response within this time (seconds) +time_t ping_timeout = 0; + #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) && (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097) #include #endif @@ -744,13 +748,11 @@ static int handle_mqtt_internal(mqtt_wss_client client) return 0; } -#define SEC_TO_MSEC 1000 -static long long int t_till_next_keepalive_ms(mqtt_wss_client client) +static int t_till_next_keepalive_ms(mqtt_wss_client client) { time_t last_send = mqtt_ng_last_send_time(client->mqtt); - long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC) - + (client->mqtt_keepalive * (SEC_TO_MSEC * 0.75 /* SEND IN ADVANCE */)); - return(next_mqtt_keep_alive - (time(NULL) * SEC_TO_MSEC)); + time_t next_mqtt_keep_alive = last_send + client->mqtt_keepalive * 0.75; + return ((next_mqtt_keep_alive - now_realtime_sec()) * MSEC_PER_SEC); } #ifdef MQTT_WSS_CPUSTATS @@ -777,10 +779,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) #endif // Check user requested TO doesn't interfere with MQTT keep alives - long long int till_next_keep_alive = t_till_next_keepalive_ms(client); - if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) { - timeout_ms = till_next_keep_alive; - send_keepalive = 1; + if (!ping_timeout) { + int till_next_keep_alive = t_till_next_keepalive_ms(client); + if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) { + timeout_ms = till_next_keep_alive; + send_keepalive = 1; + } } #ifdef MQTT_WSS_CPUSTATS @@ -802,11 +806,17 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) #endif if (ret == 0) { + time_t now = now_realtime_sec(); if (send_keepalive) { // otherwise we shortened the timeout ourselves to take care of // MQTT keep alives mqtt_ng_ping(client->mqtt); + ping_timeout = now + PING_TIMEOUT; } else { + if (ping_timeout && ping_timeout < now) { + disconnect_req = ACLK_PING_TIMEOUT; + ping_timeout = 0; + } // if poll timed out and user requested timeout was being used // return here let user do his work and he will call us back soon return 0; diff --git a/src/claim/claim.c b/src/claim/claim.c index 03fb18c388fefa..da64a1367c6531 100644 --- a/src/claim/claim.c +++ b/src/claim/claim.c @@ -148,7 +148,7 @@ bool load_claiming_state(void) { if (aclk_online()) { nd_log(NDLS_DAEMON, NDLP_ERR, "CLAIM: agent was already connected to NC - forcing reconnection under new credentials"); - aclk_kill_link = 1; + disconnect_req = ACLK_RELOAD_CONF; } aclk_disable_runtime = 0;