Skip to content

Commit

Permalink
Streaming Improvements No 5 (netdata#19193)
Browse files Browse the repository at this point in the history
* rrdhost state id is now used to detect not available functions

* acquire release for rrdhost state

* initialize rddhost state for local hosts

* track send misses

* log for functions that return 503

* fix rrd_collector_finished() call from stream threads
  • Loading branch information
ktsaou authored Dec 12, 2024
1 parent 50429e8 commit bf4a3a5
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,8 @@ set(RRD_PLUGIN_FILES
src/database/rrdcollector-internals.h
src/database/rrd-database-mode.h
src/database/rrd-database-mode.c
src/database/rrdhost-state-id.c
src/database/rrdhost-state-id.h
)

if(ENABLE_DBENGINE)
Expand Down
1 change: 0 additions & 1 deletion src/collectors/ebpf.plugin/ebpf_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,6 @@ end_socket_loop: ; // the empty statement is here to allow code to be compiled b
else {
ebpf_release_pid_data(local_pid, fd, key.pid, EBPF_MODULE_SOCKET_IDX);
ebpf_socket_release_publish(curr);
local_pid->socket = NULL;
}
memset(values, 0, length);
memcpy(&key, &next_key, sizeof(key));
Expand Down
7 changes: 6 additions & 1 deletion src/database/rrd.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern "C" {
#include "rrd-database-mode.h"
#include "streaming/stream-traffic-types.h"
#include "streaming/stream-sender-commit.h"
#include "rrdhost-state-id.h"

// non-existing structs instead of voids
// to enable type checking at compile time
Expand Down Expand Up @@ -1161,6 +1162,11 @@ struct rrdhost {
STRING *program_name; // the program name that collects metrics for this host
STRING *program_version; // the program version that collects metrics for this host

REFCOUNT state_refcount;
RRDHOST_STATE state_id; // every time data collection (stream receiver) (dis)connects,
// this gets incremented - it is used to detect stale functions,
// stale backfilling requests, etc.

int32_t utc_offset; // the offset in seconds from utc

RRDHOST_OPTIONS options; // configuration option for this RRDHOST (no atomics on this)
Expand Down Expand Up @@ -1238,7 +1244,6 @@ struct rrdhost {

struct {
pid_t tid;
uint32_t state_id; // every time the receiver connects/disconnects, this is incremented

time_t last_connected; // the time the last sender was connected
time_t last_disconnected; // the time the last sender was disconnected
Expand Down
1 change: 1 addition & 0 deletions src/database/rrdfunctions-internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct rrd_host_function {
rrd_function_execute_cb_t execute_cb;
void *execute_cb_data;

RRDHOST_STATE rrdhost_state_id;
struct rrd_collector *collector;
};

Expand Down
33 changes: 30 additions & 3 deletions src/database/rrdfunctions.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// ----------------------------------------------------------------------------

static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) {
RRDHOST *host = rrdhost; (void)host;
RRDHOST *host = rrdhost;
struct rrd_host_function *rdcf = func;

rrd_collector_started();
rdcf->collector = rrd_collector_acquire_current_thread();
rdcf->rrdhost_state_id = rrdhost_state_id(host);

if(!rdcf->priority)
rdcf->priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
Expand Down Expand Up @@ -57,6 +58,17 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_
changed = true;
}

if(rdcf->rrdhost_state_id != rrdhost_state_id(host)) {
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"FUNCTIONS: function '%s' of host '%s' changed state id from %u to %u",
dictionary_acquired_item_name(item), rrdhost_hostname(host),
rdcf->rrdhost_state_id,
rrdhost_state_id(host));

rdcf->rrdhost_state_id = rrdhost_state_id(host);
changed = true;
}

if(rdcf->execute_cb != new_rdcf->execute_cb) {
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"FUNCTIONS: function '%s' of host '%s' changed execute callback",
Expand Down Expand Up @@ -260,6 +272,8 @@ int rrd_functions_find_by_name(RRDHOST *host, BUFFER *wb, const char *name, size
strncpyz(buffer, name, sizeof(buffer) - 1);
char *s = NULL;

RRDHOST_STATE state_id = rrdhost_state_id(host);

bool found = false;
*item = NULL;
if(host->functions) {
Expand All @@ -268,10 +282,23 @@ int rrd_functions_find_by_name(RRDHOST *host, BUFFER *wb, const char *name, size
found = true;

struct rrd_host_function *rdcf = dictionary_acquired_item_value(*item);
if(rrd_collector_running(rdcf->collector)) {
if(rrd_collector_running(rdcf->collector) && rdcf->rrdhost_state_id == state_id) {
break;
}
else {

nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Function '%s' is not available. "
"host '%s', collector = { tid: %d, running: %s }, host tid { rcv: %d, snd: %d }, host state { id: %u, expected %u }, hops: %d",
name,
rrdhost_hostname(host),
rrd_collector_tid(rdcf->collector),
rrd_collector_running(rdcf->collector) ? "yes" : "no",
host->stream.rcv.status.tid, host->stream.snd.status.tid,
state_id, rdcf->rrdhost_state_id,
host->system_info->hops
);

dictionary_acquired_item_release(host->functions, *item);
*item = NULL;
}
Expand Down Expand Up @@ -314,7 +341,7 @@ bool rrd_function_available(RRDHOST *host, const char *function) {
const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(host->functions, function);
if(item) {
struct rrd_host_function *rdcf = dictionary_acquired_item_value(item);
if(rrd_collector_running(rdcf->collector))
if(rrd_collector_running(rdcf->collector) && rdcf->rrdhost_state_id == rrdhost_state_id(host))
ret = true;

dictionary_acquired_item_release(host->functions, item);
Expand Down
73 changes: 73 additions & 0 deletions src/database/rrdhost-state-id.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "rrdhost-state-id.h"
#include "rrd.h"

RRDHOST_STATE rrdhost_state_id(struct rrdhost *host) {
return __atomic_load_n(&host->state_id, __ATOMIC_RELAXED);
}

bool rrdhost_state_connected(RRDHOST *host) {
__atomic_add_fetch(&host->state_id, 1, __ATOMIC_RELAXED);

int32_t expected = __atomic_load_n(&host->state_refcount, __ATOMIC_RELAXED);
int32_t desired;

do {
if(expected >= 0) {
internal_fatal(true, "Cannot get the node connected");
return false;
}

desired = 0;

} while(!__atomic_compare_exchange_n(
&host->state_refcount, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));

return true;
}

bool rrdhost_state_disconnected(RRDHOST *host) {
__atomic_add_fetch(&host->state_id, 1, __ATOMIC_RELAXED);

int32_t expected = __atomic_load_n(&host->state_refcount, __ATOMIC_RELAXED);
int32_t desired;

do {
if(expected < 0) {
internal_fatal(true, "Cannot get the node disconnected");
return false;
}

desired = -1;

} while(!__atomic_compare_exchange_n(
&host->state_refcount, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));

return true;
}

bool rrdhost_state_acquire(RRDHOST *host, RRDHOST_STATE wanted_state_id) {
int32_t expected = __atomic_load_n(&host->state_refcount, __ATOMIC_RELAXED);
int32_t desired;

do {
if(expected < 0)
return false;

desired = expected + 1;

} while(!__atomic_compare_exchange_n(
&host->state_refcount, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));

if(rrdhost_state_id(host) != wanted_state_id) {
rrdhost_state_release(host);
return false;
}

return true;
}

void rrdhost_state_release(RRDHOST *host) {
__atomic_sub_fetch(&host->state_refcount, 1, __ATOMIC_RELAXED);
}
19 changes: 19 additions & 0 deletions src/database/rrdhost-state-id.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_RRDHOST_STATE_ID_H
#define NETDATA_RRDHOST_STATE_ID_H

#include "libnetdata/libnetdata.h"

typedef uint32_t RRDHOST_STATE;

struct rrdhost;
RRDHOST_STATE rrdhost_state_id(struct rrdhost *host);

bool rrdhost_state_connected(struct rrdhost *host);
bool rrdhost_state_disconnected(struct rrdhost *host);

bool rrdhost_state_acquire(struct rrdhost *host, RRDHOST_STATE wanted_state_id);
void rrdhost_state_release(struct rrdhost *host);

#endif //NETDATA_RRDHOST_STATE_ID_H
3 changes: 3 additions & 0 deletions src/database/rrdhost.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ static RRDHOST *rrdhost_create(
}

RRDHOST *host = callocz(1, sizeof(RRDHOST));
host->state_refcount = -1;

__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(RRDHOST), __ATOMIC_RELAXED);

strncpyz(host->machine_guid, guid, GUID_LEN + 1);
Expand Down Expand Up @@ -840,6 +842,7 @@ int rrd_init(const char *hostname, struct rrdhost_system_info *system_info, bool
return 1;

rrdhost_flag_set(localhost, RRDHOST_FLAG_COLLECTOR_ONLINE);
rrdhost_state_connected(localhost);

ml_host_start(localhost);
dyncfg_host_init(localhost);
Expand Down
4 changes: 4 additions & 0 deletions src/libnetdata/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ typedef uint32_t uid_t;

// --------------------------------------------------------------------------------------------------------------------

typedef int32_t REFCOUNT;

// --------------------------------------------------------------------------------------------------------------------

#if defined(OS_WINDOWS)
#include <windows.h>
#include <wctype.h>
Expand Down
27 changes: 15 additions & 12 deletions src/plugins.d/pluginsd_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si

rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
rrdhost_flag_set(host, RRDHOST_FLAG_COLLECTOR_ONLINE);
rrdhost_state_connected(host);
ml_host_start(host);
dyncfg_host_init(host);

Expand Down Expand Up @@ -376,16 +377,19 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
}

static void backfill_callback(size_t successful_dims __maybe_unused, size_t failed_dims __maybe_unused, struct backfill_request_data *brd) {
if (brd->rrdhost_receiver_state_id == __atomic_load_n(&brd->host->stream.rcv.status.state_id, __ATOMIC_RELAXED)) {
if (!replicate_chart_request(send_to_plugin, brd->parser, brd->host, brd->st,
brd->first_entry_child, brd->last_entry_child, brd->child_wall_clock_time,
0, 0)) {
netdata_log_error(
"PLUGINSD: 'host:%s' failed to initiate replication for 'chart:%s'",
rrdhost_hostname(brd->host),
rrdset_id(brd->st));
}
if(!rrdhost_state_acquire(brd->host, brd->rrdhost_receiver_state_id))
return;

if (!replicate_chart_request(send_to_plugin, brd->parser, brd->host, brd->st,
brd->first_entry_child, brd->last_entry_child, brd->child_wall_clock_time,
0, 0)) {
netdata_log_error(
"PLUGINSD: 'host:%s' failed to initiate replication for 'chart:%s'",
rrdhost_hostname(brd->host),
rrdset_id(brd->st));
}

rrdhost_state_release(brd->host);
}

static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
Expand Down Expand Up @@ -417,7 +421,7 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w
rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);

struct backfill_request_data brd = {
.rrdhost_receiver_state_id =__atomic_load_n(&host->stream.rcv.status.state_id, __ATOMIC_RELAXED),
.rrdhost_receiver_state_id = rrdhost_state_id(host),
.parser = parser,
.host = host,
.st = st,
Expand Down Expand Up @@ -1173,8 +1177,6 @@ void pluginsd_process_cleanup(PARSER *parser) {
pluginsd_cleanup_v2(parser);
pluginsd_host_define_cleanup(parser);

rrd_collector_finished();

#ifdef NETDATA_LOG_STREAM_RECEIVE
if(parser->user.stream_log_fp) {
fclose(parser->user.stream_log_fp);
Expand All @@ -1188,6 +1190,7 @@ void pluginsd_process_cleanup(PARSER *parser) {
void pluginsd_process_thread_cleanup(void *pptr) {
PARSER *parser = CLEANUP_FUNCTION_GET_PTR(pptr);
pluginsd_process_cleanup(parser);
rrd_collector_finished();
}

bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
Expand Down
2 changes: 1 addition & 1 deletion src/plugins.d/pluginsd_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include "daemon/common.h"

#define WORKER_PARSER_FIRST_JOB 35
#define WORKER_PARSER_FIRST_JOB 36

// this has to be in-sync with the same at stream-thread.c
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION 25
Expand Down
9 changes: 4 additions & 5 deletions src/streaming/stream-receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ static void streaming_parser_init(struct receiver_state *rpt) {

pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);

rrd_collector_started();

rpt->thread.compressed.start = 0;
rpt->thread.compressed.used = 0;
rpt->thread.compressed.enabled = stream_decompression_initialize(rpt);
Expand Down Expand Up @@ -446,6 +444,7 @@ void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct re
sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);

stream_receive_log_database_gap(rpt);
rrdhost_state_connected(rpt->host);

// keep this last, since it sends commands back to the child
streaming_parser_init(rpt);
Expand Down Expand Up @@ -477,6 +476,8 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st
, rpt->client_port ? rpt->client_port : "-"
, why ? why : "");

rrdhost_state_disconnected(rpt->host);

internal_fatal(META_GET(&sth->run.meta, (Word_t)&rpt->thread.meta) == NULL, "Receiver to be removed is not found in the list of receivers");
META_DEL(&sth->run.meta, (Word_t)&rpt->thread.meta);

Expand Down Expand Up @@ -806,8 +807,6 @@ bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
rrdhost_receiver_lock(host);

if (!host->receiver) {
__atomic_add_fetch(&host->stream.rcv.status.state_id, 1, __ATOMIC_RELAXED);

rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);

host->stream.rcv.status.connections++;
Expand Down Expand Up @@ -870,8 +869,8 @@ void rrdhost_clear_receiver(struct receiver_state *rpt) {
// Make sure that we detach this thread and don't kill a freshly arriving receiver

if (host->receiver == rpt) {
__atomic_add_fetch(&host->stream.rcv.status.state_id, 1, __ATOMIC_RELAXED);
rrdhost_flag_clear(host, RRDHOST_FLAG_COLLECTOR_ONLINE);

rrdhost_receiver_unlock(host);
{
// run all these without having the receiver lock
Expand Down
4 changes: 3 additions & 1 deletion src/streaming/stream-sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,10 @@ bool stream_sender_process_poll_events(struct stream_thread *sth, struct sender_
return false;
}
}
else
else {
sth->snd.send_misses++;
break;
}
}
}

Expand Down
Loading

0 comments on commit bf4a3a5

Please sign in to comment.