Skip to content

Commit

Permalink
add more aclk worker jobs (netdata#19435)
Browse files Browse the repository at this point in the history
* add more aclk worker jobs

* garbage collect aclk buffer, when receiving ackowledgements

* add msg callback worker job

* added latency histogram for aclk pub-ack

* fix compilation

* Add a few workers, handle context checkpoint async

* Proper register worker

---------

Co-authored-by: Stelios Fragkakis <[email protected]>
  • Loading branch information
ktsaou and stelfrag authored Jan 18, 2025
1 parent 635ae9b commit 6c41244
Show file tree
Hide file tree
Showing 12 changed files with 366 additions and 29 deletions.
11 changes: 11 additions & 0 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,17 @@ void *aclk_main(void *ptr)
worker_register_job_name(WORKER_ACLK_HANDLE_MQTT_INTERNAL, "mqtt internal");
worker_register_job_name(WORKER_ACLK_TX, "tx");
worker_register_job_name(WORKER_ACLK_TX_ERROR, "tx error");
worker_register_job_name(WORKER_ACLK_TRY_SEND_ALL, "try send all");
worker_register_job_name(WORKER_ACLK_HANDLE_INCOMING, "handle incoming");
worker_register_job_name(WORKER_ACLK_CPT_CONNACK, "cpt connack");
worker_register_job_name(WORKER_ACLK_CPT_PUBACK, "cpt puback");
worker_register_job_name(WORKER_ACLK_CPT_PINGRESP, "cpt pingresp");
worker_register_job_name(WORKER_ACLK_CPT_SUBACK, "cpt suback");
worker_register_job_name(WORKER_ACLK_CPT_PUBLISH, "cpt publish");
worker_register_job_name(WORKER_ACLK_CPT_DISCONNECT, "cpt disconnect");
worker_register_job_name(WORKER_ACLK_CPT_UNKNOWN, "cpt unknown");
worker_register_job_name(WORKER_ACLK_SEND_FRAGMENT, "send fragment");
worker_register_job_name(WORKER_ACLK_MSG_CALLBACK, "msg callback");

ACLK_PROXY_TYPE proxy_type;
aclk_get_proxy(&proxy_type);
Expand Down
4 changes: 2 additions & 2 deletions src/aclk/aclk_contexts_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

void aclk_send_contexts_snapshot(contexts_snapshot_t data)
{
aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
aclk_query_t query = aclk_query_new(CTX_SEND_SNAPSHOT);
query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
query->data.bin_payload.msg_name = "ContextsSnapshot";
Expand All @@ -15,7 +15,7 @@ void aclk_send_contexts_snapshot(contexts_snapshot_t data)

void aclk_send_contexts_updated(contexts_updated_t data)
{
aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
aclk_query_t query = aclk_query_new(CTX_SEND_SNAPSHOT_UPD);
query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
query->data.bin_payload.msg_name = "ContextsUpdated";
Expand Down
9 changes: 7 additions & 2 deletions src/aclk/aclk_query_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ typedef enum {
ALARM_PROVIDE_CFG,
ALARM_SNAPSHOT,
UPDATE_NODE_COLLECTORS,
PROTO_BIN_MESSAGE,
ACLK_QUERY_TYPE_COUNT // always keep this as last
CTX_SEND_SNAPSHOT, // Context snapshot to the cloud
CTX_SEND_SNAPSHOT_UPD, // Context incremental update to the cloud
CTX_CHECKPOINT, // Context checkpoint from the cloud
CTX_STOP_STREAMING, // Context stop streaming
CREATE_NODE_INSTANCE, // Create node instance on the agent
ACLK_QUERY_TYPE_COUNT // always keep this as last
} aclk_query_type_t;

struct aclk_query_http_api_v2 {
Expand Down Expand Up @@ -56,6 +60,7 @@ struct aclk_query {
union {
struct aclk_query_http_api_v2 http_api_v2;
struct aclk_bin_payload bin_payload;
void *payload;
} data;
};

Expand Down
17 changes: 7 additions & 10 deletions src/aclk/aclk_rx_msgs.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "aclk.h"
#include "aclk_capas.h"
#include "aclk_query.h"
#include "mqtt_websockets/aclk_mqtt_workers.h"

#include "schema-wrappers/proto_2_json.h"

Expand Down Expand Up @@ -417,11 +418,9 @@ int contexts_checkpoint(const char *msg, size_t msg_len)
if (!cmd)
return 1;

rrdcontext_hub_checkpoint_command(cmd);

freez(cmd->claim_id);
freez(cmd->node_id);
freez(cmd);
aclk_query_t query = aclk_query_new(CTX_CHECKPOINT);
query->data.payload = cmd;
aclk_execute_query(query);
return 0;
}

Expand All @@ -436,11 +435,9 @@ int stop_streaming_contexts(const char *msg, size_t msg_len)
if (!cmd)
return 1;

rrdcontext_hub_stop_streaming_command(cmd);

freez(cmd->claim_id);
freez(cmd->node_id);
freez(cmd);
aclk_query_t query = aclk_query_new(CTX_STOP_STREAMING);
query->data.payload = cmd;
aclk_execute_query(query);
return 0;
}

Expand Down
11 changes: 11 additions & 0 deletions src/aclk/mqtt_websockets/aclk_mqtt_workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,16 @@
#define WORKER_ACLK_HANDLE_MQTT_INTERNAL 20
#define WORKER_ACLK_TX 21
#define WORKER_ACLK_TX_ERROR 22
#define WORKER_ACLK_TRY_SEND_ALL 23
#define WORKER_ACLK_HANDLE_INCOMING 24
#define WORKER_ACLK_CPT_CONNACK 25
#define WORKER_ACLK_CPT_PUBACK 26
#define WORKER_ACLK_CPT_PINGRESP 27
#define WORKER_ACLK_CPT_SUBACK 28
#define WORKER_ACLK_CPT_PUBLISH 29
#define WORKER_ACLK_CPT_DISCONNECT 30
#define WORKER_ACLK_CPT_UNKNOWN 31
#define WORKER_ACLK_SEND_FRAGMENT 32
#define WORKER_ACLK_MSG_CALLBACK 33

#endif //NETDATA_ACLK_MQTT_WORKERS_H
48 changes: 42 additions & 6 deletions src/aclk/mqtt_websockets/mqtt_ng.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#endif

#include "libnetdata/libnetdata.h"
void pulse_aclk_sent_message_acked(usec_t usec, size_t len);

#include "common_internal.h"
#include "mqtt_constants.h"
#include "mqtt_ng.h"
#include "aclk_mqtt_workers.h"

#define SMALL_STRING_DONT_FRAGMENT_LIMIT 128

Expand All @@ -31,14 +33,13 @@

typedef uint16_t buffer_frag_flag_t;
struct buffer_fragment {
size_t len;
size_t sent;
uint32_t len;
uint32_t sent;
buffer_frag_flag_t flags;
uint16_t packet_id;
void (*free_fnc)(void *ptr);
unsigned char *data;

uint16_t packet_id;

usec_t sent_monotonic_ut;
struct buffer_fragment *next;
};

Expand Down Expand Up @@ -1851,6 +1852,7 @@ static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) {
if ( client->ping_pending && (!frag || (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD && frag->sent == 0)) ) {
client->ping_pending = 0;
ping_frag.sent = 0;
ping_frag.sent_monotonic_ut = 0;
client->main_buffer.sending_frag = &ping_frag;
return 0;
}
Expand All @@ -1865,6 +1867,8 @@ static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) {
// nothing could be written anymore
// return 1 if last fragment of a message was fully sent
static int send_fragment(struct mqtt_ng_client *client) {
worker_is_busy(WORKER_ACLK_SEND_FRAGMENT);

struct buffer_fragment *frag = client->main_buffer.sending_frag;

// for readability
Expand All @@ -1878,6 +1882,7 @@ static int send_fragment(struct mqtt_ng_client *client) {
else
nd_log(NDLS_DAEMON, NDLP_WARNING, "This fragment was fully sent already. This should not happen!");

frag->sent_monotonic_ut = now_monotonic_usec();
frag->sent += processed;
if (frag->sent != frag->len)
return -1;
Expand Down Expand Up @@ -1925,6 +1930,7 @@ static void mark_message_for_gc(struct buffer_fragment *frag)

static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
{
size_t reclaimable = 0;
LOCK_HDR_BUFFER(&client->main_buffer);
struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer);
while (frag) {
Expand All @@ -1934,10 +1940,20 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
UNLOCK_HDR_BUFFER(&client->main_buffer);
return 1;
}
pulse_aclk_sent_message_acked(frag->sent_monotonic_ut, frag->len);
mark_message_for_gc(frag);

size_t used = BUFFER_BYTES_USED(&client->main_buffer.hdr_buffer);
if (reclaimable >= (used / 4))
transaction_buffer_garbage_collect(&client->main_buffer);

UNLOCK_HDR_BUFFER(&client->main_buffer);
return 0;
}

if(frag_is_marked_for_gc(frag))
reclaimable += FRAG_SIZE_IN_BUFFER(frag);

frag = frag->next;
}
nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id);
Expand All @@ -1963,6 +1979,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
uint8_t ctrl_packet_type = get_control_packet_type(client->parser.mqtt_control_packet_type);
switch (ctrl_packet_type) {
case MQTT_CPT_CONNACK:
worker_is_busy(WORKER_ACLK_CPT_CONNACK);

LOCK_HDR_BUFFER(&client->main_buffer);
mark_message_for_gc(client->connect_msg);
UNLOCK_HDR_BUFFER(&client->main_buffer);
Expand Down Expand Up @@ -1991,21 +2009,27 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR;

case MQTT_CPT_PUBACK:
worker_is_busy(WORKER_ACLK_CPT_PUBACK);

if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id))
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
if (client->puback_callback)
client->puback_callback(client->parser.mqtt_packet.puback.packet_id);
break;

case MQTT_CPT_PINGRESP:
worker_is_busy(WORKER_ACLK_CPT_PINGRESP);
pulse_aclk_sent_message_acked(ping_frag.sent_monotonic_ut, ping_frag.len);
break;

case MQTT_CPT_SUBACK:
worker_is_busy(WORKER_ACLK_CPT_SUBACK);
if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id))
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
break;

case MQTT_CPT_PUBLISH:
worker_is_busy(WORKER_ACLK_CPT_PUBLISH);
pub = &client->parser.mqtt_packet.publish;

if (pub->qos > 1) {
Expand Down Expand Up @@ -2038,8 +2062,11 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
}
}

if (client->msg_callback)
if (client->msg_callback) {
worker_is_busy(WORKER_ACLK_MSG_CALLBACK);
client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
}

// in case we have property topic alias and we have topic we take over the string
// and add pointer to it into topic alias list
if (prop == NULL)
Expand All @@ -2048,11 +2075,13 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
return MQTT_NG_CLIENT_WANT_WRITE;

case MQTT_CPT_DISCONNECT:
worker_is_busy(WORKER_ACLK_CPT_DISCONNECT);
nd_log(NDLS_DAEMON, NDLP_INFO, "Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
client->client_state = MQTT_STATE_DISCONNECTED;
break;

default:
worker_is_busy(WORKER_ACLK_CPT_UNKNOWN);
nd_log(NDLS_DAEMON, NDLP_INFO, "Got unknown control packet %u from server", ctrl_packet_type);
break;
}
Expand All @@ -2068,19 +2097,26 @@ int mqtt_ng_sync(struct mqtt_ng_client *client)
if (client->client_state == MQTT_STATE_ERROR)
return 1;

worker_is_busy(WORKER_ACLK_TRY_SEND_ALL);

LOCK_HDR_BUFFER(&client->main_buffer);
try_send_all(client);
UNLOCK_HDR_BUFFER(&client->main_buffer);

int rc;

worker_is_busy(WORKER_ACLK_HANDLE_INCOMING);
while ((rc = handle_incoming_traffic(client)) != MQTT_NG_CLIENT_NEED_MORE_BYTES) {
if (rc < 0)
break;
if (rc == MQTT_NG_CLIENT_WANT_WRITE) {
worker_is_busy(WORKER_ACLK_TRY_SEND_ALL);

LOCK_HDR_BUFFER(&client->main_buffer);
try_send_all(client);
UNLOCK_HDR_BUFFER(&client->main_buffer);

worker_is_busy(WORKER_ACLK_HANDLE_INCOMING);
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/daemon/libuv_workers.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ void register_libuv_worker_jobs() {
worker_register_job_name(UV_EVENT_ACLK_NODE_INFO, "aclk host node info");
worker_register_job_name(UV_EVENT_ACLK_ALERT_PUSH, "aclk alert push");
worker_register_job_name(UV_EVENT_ACLK_QUERY_EXECUTE, "aclk query execute");
// aclk
worker_register_job_name(UV_EVENT_CTX_STOP_STREAMING, "ctx stop streaming");
worker_register_job_name(UV_EVENT_CTX_CHECKPOINT, "ctx version check");
worker_register_job_name(UV_EVENT_ALARM_PROVIDE_CFG, "send alarm config");
worker_register_job_name(UV_EVENT_ALARM_SNAPSHOT, "alert snapshot");
worker_register_job_name(UV_EVENT_REGISTER_NODE, "register node");
worker_register_job_name(UV_EVENT_UPDATE_NODE_COLLECTORS, "update collectors");
worker_register_job_name(UV_EVENT_UPDATE_NODE_INFO, "send node info");
worker_register_job_name(UV_EVENT_CTX_SEND_SNAPSHOT, "ctx send snapshot");
worker_register_job_name(UV_EVENT_CTX_SEND_SNAPSHOT_UPD, "ctx send update");
worker_register_job_name(UV_EVENT_NODE_STATE_UPDATE, "node state update");

// netdatacli
worker_register_job_name(UV_EVENT_SCHEDULE_CMD, "schedule command");
Expand Down
12 changes: 12 additions & 0 deletions src/daemon/libuv_workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ enum event_loop_job {
UV_EVENT_ACLK_ALERT_PUSH,
UV_EVENT_ACLK_QUERY_EXECUTE,

//
UV_EVENT_CTX_STOP_STREAMING,
UV_EVENT_CTX_CHECKPOINT,
UV_EVENT_ALARM_PROVIDE_CFG,
UV_EVENT_ALARM_SNAPSHOT,
UV_EVENT_REGISTER_NODE,
UV_EVENT_UPDATE_NODE_COLLECTORS,
UV_EVENT_UPDATE_NODE_INFO,
UV_EVENT_CTX_SEND_SNAPSHOT,
UV_EVENT_CTX_SEND_SNAPSHOT_UPD,
UV_EVENT_NODE_STATE_UPDATE,

// netdatacli
UV_EVENT_SCHEDULE_CMD,
};
Expand Down
Loading

0 comments on commit 6c41244

Please sign in to comment.