From 33633ba17652ae3a6ce0162d0cb0d99dc3ebe915 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Thu, 12 Dec 2024 13:16:50 +0200 Subject: [PATCH] Streaming improvements No 4 (#19186) * ml logging about dimensions acquiring with rate limit * unify logs * backfilling thread for quickly backfilling charts before initiating replication * multiple backfill threads * fix for the multi-threaded backfill * another fix for the multi-threaded backfill * each backfilling thread is working on a dimension level * allocate memory without having the backfill lock * workers in backfill threads * use aral for backfilling threads; limit the number of backfilling threads to 16 * enable backfilling threads only on parents * leftover freez() * cleanup netdata startup * streaming receivers waiting list * use also replication to control the waiting list * when it is stable check on every iteration * make sure the right data are always set before adding the request to the queue * accept new nodes every 5 seconds --- CMakeLists.txt | 4 + src/daemon/analytics.c | 2 +- .../netdata-conf-backwards-compatibility.c | 4 - src/daemon/config/netdata-conf-global.c | 43 +++- src/daemon/config/netdata-conf-logs.c | 22 ++ src/daemon/config/netdata-conf.c | 5 + src/daemon/main.c | 152 ++++-------- src/daemon/pulse/pulse-workers.c | 1 + src/daemon/static_threads.c | 12 + src/database/rrd.h | 5 +- src/database/rrdhost.c | 2 - src/libnetdata/locks/spinlock.h | 4 + src/libnetdata/log/nd_log.h | 2 +- src/libnetdata/threads/threads.c | 24 +- src/libnetdata/threads/threads.h | 3 + src/ml/ml.cc | 18 +- src/plugins.d/pluginsd_parser.c | 34 ++- src/plugins.d/pluginsd_parser.h | 4 +- src/plugins.d/pluginsd_replication.c | 3 + .../protocol/command-begin-set-end.c | 2 +- src/streaming/protocol/command-nodeid.c | 12 +- src/streaming/protocol/commands.c | 4 +- src/streaming/replication.c | 42 ++-- src/streaming/stream-capabilities.c | 4 +- src/streaming/stream-conf.c | 14 +- src/streaming/stream-conf.h | 2 +- src/streaming/stream-connector.c | 2 +- src/streaming/stream-receiver-connection.c | 30 +-- src/streaming/stream-receiver.c | 116 +++++---- src/streaming/stream-sender-commit.c | 18 +- src/streaming/stream-sender-execute.c | 10 +- src/streaming/stream-sender.c | 38 +-- src/streaming/stream-thread.c | 44 ++-- src/streaming/stream-thread.h | 92 ++++--- src/streaming/stream-waiting-list.c | 48 ++++ src/streaming/stream-waiting-list.h | 14 ++ src/web/api/queries/backfill.c | 231 ++++++++++++++++++ src/web/api/queries/backfill.h | 26 ++ src/web/api/queries/query.c | 16 +- src/web/server/h2o/http_server.c | 2 +- src/web/server/h2o/http_server.h | 2 +- 41 files changed, 768 insertions(+), 345 deletions(-) create mode 100644 src/streaming/stream-waiting-list.c create mode 100644 src/streaming/stream-waiting-list.h create mode 100644 src/web/api/queries/backfill.c create mode 100644 src/web/api/queries/backfill.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bd57bbfa84bae8..3f1d42ad4faa0f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1551,6 +1551,8 @@ set(STREAMING_PLUGIN_FILES src/streaming/stream-circular-buffer.h src/streaming/stream-control.c src/streaming/stream-control.h + src/streaming/stream-waiting-list.c + src/streaming/stream-waiting-list.h ) set(WEB_PLUGIN_FILES @@ -1564,6 +1566,8 @@ set(WEB_PLUGIN_FILES src/web/server/web_client_cache.h src/web/api/v3/api_v3_stream_info.c src/web/api/v3/api_v3_stream_path.c + src/web/api/queries/backfill.c + src/web/api/queries/backfill.h ) set(CLAIM_PLUGIN_FILES diff --git a/src/daemon/analytics.c b/src/daemon/analytics.c index 75e52a592a3757..2835041eefb983 100644 --- a/src/daemon/analytics.c +++ b/src/daemon/analytics.c @@ -911,7 +911,7 @@ void analytics_statistic_send(const analytics_statistic_t *statistic) { } void analytics_reset(void) { - analytics_data.data_length = 0; + analytics_data.data_length = 0; analytics_set_data(&analytics_data.netdata_config_stream_enabled, "null"); analytics_set_data(&analytics_data.netdata_config_memory_mode, "null"); analytics_set_data(&analytics_data.netdata_config_exporting_enabled, "null"); diff --git a/src/daemon/config/netdata-conf-backwards-compatibility.c b/src/daemon/config/netdata-conf-backwards-compatibility.c index ce7698f26c33c3..cd8ba029dd0313 100644 --- a/src/daemon/config/netdata-conf-backwards-compatibility.c +++ b/src/daemon/config/netdata-conf-backwards-compatibility.c @@ -4,10 +4,6 @@ #include "database/engine/rrdengineapi.h" void netdata_conf_backwards_compatibility(void) { - static bool run = false; - if(run) return; - run = true; - // move [global] options to the [web] section config_move(CONFIG_SECTION_GLOBAL, "http port listen backlog", diff --git a/src/daemon/config/netdata-conf-global.c b/src/daemon/config/netdata-conf-global.c index 2e60708b36cd14..9877c97d847d61 100644 --- a/src/daemon/config/netdata-conf-global.c +++ b/src/daemon/config/netdata-conf-global.c @@ -18,9 +18,45 @@ static int get_hostname(char *buf, size_t buf_size) { return rc; } -void netdata_conf_section_global(void) { - netdata_conf_backwards_compatibility(); +static void glibc_initialize(void) { + const char *pmax = config_get(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for plugins", "1"); + if(pmax && *pmax) + setenv("MALLOC_ARENA_MAX", pmax, 1); + +#if defined(HAVE_C_MALLOPT) + int i = (int)config_get_number(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for netdata", 1); + if(i > 0) + mallopt(M_ARENA_MAX, 1); + +#ifdef NETDATA_INTERNAL_CHECKS + mallopt(M_PERTURB, 0x5A); + // mallopt(M_MXFAST, 0); +#endif +#endif +} + +static void libuv_initialize(void) { + libuv_worker_threads = (int)get_netdata_cpus() * 6; + + if(libuv_worker_threads < MIN_LIBUV_WORKER_THREADS) + libuv_worker_threads = MIN_LIBUV_WORKER_THREADS; + + if(libuv_worker_threads > MAX_LIBUV_WORKER_THREADS) + libuv_worker_threads = MAX_LIBUV_WORKER_THREADS; + + libuv_worker_threads = config_get_number(CONFIG_SECTION_GLOBAL, "libuv worker threads", libuv_worker_threads); + if(libuv_worker_threads < MIN_LIBUV_WORKER_THREADS) { + libuv_worker_threads = MIN_LIBUV_WORKER_THREADS; + config_set_number(CONFIG_SECTION_GLOBAL, "libuv worker threads", libuv_worker_threads); + } + + char buf[20 + 1]; + snprintfz(buf, sizeof(buf) - 1, "%d", libuv_worker_threads); + setenv("UV_THREADPOOL_SIZE", buf, 1); +} + +void netdata_conf_section_global(void) { // ------------------------------------------------------------------------ // get the hostname @@ -42,6 +78,9 @@ void netdata_conf_section_global(void) { os_get_system_cpus_uncached(); os_get_system_pid_max(); + + glibc_initialize(); + libuv_initialize(); } void netdata_conf_section_global_run_as_user(const char **user) { diff --git a/src/daemon/config/netdata-conf-logs.c b/src/daemon/config/netdata-conf-logs.c index 989bfbfd86a034..0b3d31163389ca 100644 --- a/src/daemon/config/netdata-conf-logs.c +++ b/src/daemon/config/netdata-conf-logs.c @@ -2,6 +2,27 @@ #include "netdata-conf-logs.h" +static void debug_flags_initialize(void) { + // -------------------------------------------------------------------- + // get the debugging flags from the configuration file + + const char *flags = config_get(CONFIG_SECTION_LOGS, "debug flags", "0x0000000000000000"); + nd_setenv("NETDATA_DEBUG_FLAGS", flags, 1); + + debug_flags = strtoull(flags, NULL, 0); + netdata_log_debug(D_OPTIONS, "Debug flags set to '0x%" PRIX64 "'.", debug_flags); + + if(debug_flags != 0) { + struct rlimit rl = { RLIM_INFINITY, RLIM_INFINITY }; + if(setrlimit(RLIMIT_CORE, &rl) != 0) + netdata_log_error("Cannot request unlimited core dumps for debugging... Proceeding anyway..."); + +#ifdef HAVE_SYS_PRCTL_H + prctl(PR_SET_DUMPABLE, 1, 0, 0, 0); +#endif + } +} + void netdata_conf_section_logs(void) { static bool run = false; if(run) return; @@ -78,5 +99,6 @@ void netdata_conf_section_logs(void) { nd_log_set_user_settings(NDLS_ACLK, config_get(CONFIG_SECTION_CLOUD, "conversation log file", filename)); } + debug_flags_initialize(); aclk_config_get_query_scope(); } diff --git a/src/daemon/config/netdata-conf.c b/src/daemon/config/netdata-conf.c index d5d6105ff19996..9e81f5c35bf157 100644 --- a/src/daemon/config/netdata-conf.c +++ b/src/daemon/config/netdata-conf.c @@ -3,6 +3,10 @@ #include "netdata-conf.h" bool netdata_conf_load(char *filename, char overwrite_used, const char **user) { + static bool run = false; + if(run) return false; + run = true; + errno_clear(); int ret = 0; @@ -29,6 +33,7 @@ bool netdata_conf_load(char *filename, char overwrite_used, const char **user) { freez(filename); } + netdata_conf_backwards_compatibility(); netdata_conf_section_global_run_as_user(user); return ret; } diff --git a/src/daemon/main.c b/src/daemon/main.c index c2c27219364981..25fef600041a55 100644 --- a/src/daemon/main.c +++ b/src/daemon/main.c @@ -4,6 +4,7 @@ #include "buildinfo.h" #include "daemon/watcher.h" #include "static_threads.h" +#include "web/api/queries/backfill.h" #include "database/engine/page_test.h" #include @@ -1326,108 +1327,68 @@ int netdata_main(int argc, char **argv) { cloud_conf_load(0); } - // ------------------------------------------------------------------------ - // initialize netdata - { - const char *pmax = config_get(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for plugins", "1"); - if(pmax && *pmax) - setenv("MALLOC_ARENA_MAX", pmax, 1); - -#if defined(HAVE_C_MALLOPT) - i = (int)config_get_number(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for netdata", 1); - if(i > 0) - mallopt(M_ARENA_MAX, 1); + // ---------------------------------------------------------------------------------------------------------------- + // initialize the logging system + // IMPORTANT: KEEP THIS FIRST SO THAT THE REST OF NETDATA WILL LOG PROPERLY + netdata_conf_section_logs(); + nd_log_limits_unlimited(); -#ifdef NETDATA_INTERNAL_CHECKS - mallopt(M_PERTURB, 0x5A); - // mallopt(M_MXFAST, 0); -#endif -#endif - - // set libuv worker threads - libuv_worker_threads = (int)get_netdata_cpus() * 6; - - if(libuv_worker_threads < MIN_LIBUV_WORKER_THREADS) - libuv_worker_threads = MIN_LIBUV_WORKER_THREADS; - - if(libuv_worker_threads > MAX_LIBUV_WORKER_THREADS) - libuv_worker_threads = MAX_LIBUV_WORKER_THREADS; + // initialize the log files + nd_log_initialize(); + netdata_log_info("Netdata agent version '%s' is starting", NETDATA_VERSION); + // ---------------------------------------------------------------------------------------------------------------- + // global configuration - libuv_worker_threads = config_get_number(CONFIG_SECTION_GLOBAL, "libuv worker threads", libuv_worker_threads); - if(libuv_worker_threads < MIN_LIBUV_WORKER_THREADS) { - libuv_worker_threads = MIN_LIBUV_WORKER_THREADS; - config_set_number(CONFIG_SECTION_GLOBAL, "libuv worker threads", libuv_worker_threads); - } + netdata_conf_section_global(); - { - char buf[20 + 1]; - snprintfz(buf, sizeof(buf) - 1, "%d", libuv_worker_threads); - setenv("UV_THREADPOOL_SIZE", buf, 1); - } + // Get execution path before switching user to avoid permission issues + get_netdata_execution_path(); - // prepare configuration environment variables for the plugins - netdata_conf_section_global(); - set_environment_for_plugins_and_scripts(); - analytics_reset(); + // ---------------------------------------------------------------------------------------------------------------- + // analytics - // work while we are cd into config_dir - // to allow the plugins refer to their config - // files using relative filenames - if(chdir(netdata_configured_user_config_dir) == -1) - fatal("Cannot cd to '%s'", netdata_configured_user_config_dir); + analytics_reset(); + get_system_timezone(); - // Get execution path before switching user to avoid permission issues - get_netdata_execution_path(); - } - - { - // -------------------------------------------------------------------- - // get the debugging flags from the configuration file + // ---------------------------------------------------------------------------------------------------------------- + // data collection plugins - const char *flags = config_get(CONFIG_SECTION_LOGS, "debug flags", "0x0000000000000000"); - nd_setenv("NETDATA_DEBUG_FLAGS", flags, 1); + // prepare configuration environment variables for the plugins + set_environment_for_plugins_and_scripts(); - debug_flags = strtoull(flags, NULL, 0); - netdata_log_debug(D_OPTIONS, "Debug flags set to '0x%" PRIX64 "'.", debug_flags); + // cd into config_dir to allow the plugins refer to their config files using relative filenames + if(chdir(netdata_configured_user_config_dir) == -1) + fatal("Cannot cd to '%s'", netdata_configured_user_config_dir); - if(debug_flags != 0) { - struct rlimit rl = { RLIM_INFINITY, RLIM_INFINITY }; - if(setrlimit(RLIMIT_CORE, &rl) != 0) - netdata_log_error("Cannot request unlimited core dumps for debugging... Proceeding anyway..."); + // ---------------------------------------------------------------------------------------------------------------- + // pulse (internal netdata instrumentation) -#ifdef HAVE_SYS_PRCTL_H - prctl(PR_SET_DUMPABLE, 1, 0, 0, 0); +#ifdef NETDATA_INTERNAL_CHECKS + pulse_enabled = true; + pulse_extended_enabled = true; #endif - } - - // -------------------------------------------------------------------- - // get log filenames and settings - - netdata_conf_section_logs(); - nd_log_limits_unlimited(); - - // initialize the log files - nd_log_initialize(); - netdata_log_info("Netdata agent version '%s' is starting", NETDATA_VERSION); + pulse_extended_enabled = + config_get_boolean(CONFIG_SECTION_PULSE, "extended", pulse_extended_enabled); - check_local_streaming_capabilities(); + if(pulse_extended_enabled) + // this has to run before starting any other threads that use workers + workers_utilization_enable(); - get_system_timezone(); + // ---------------------------------------------------------------------------------------------------------------- + // streaming, replication, backfilling - replication_initialize(); + stream_conf_load(); + check_local_streaming_capabilities(); + replication_initialize(); - rrd_functions_inflight_init(); - - // -------------------------------------------------------------------- - // get the certificate and start security - - netdata_conf_web_security_init(); + rrd_functions_inflight_init(); + { // -------------------------------------------------------------------- - // This is the safest place to start the SILENCERS structure + // alerts SILENCERS health_set_silencers_filename(); health_initialize_global_silencers(); @@ -1452,21 +1413,12 @@ int netdata_main(int argc, char **argv) { if (default_stacksize < 1 * 1024 * 1024) default_stacksize = 1 * 1024 * 1024; -#ifdef NETDATA_INTERNAL_CHECKS - pulse_enabled = true; - pulse_extended_enabled = true; -#endif - - pulse_extended_enabled = - config_get_boolean(CONFIG_SECTION_PULSE, "extended", pulse_extended_enabled); - - if(pulse_extended_enabled) - // this has to run before starting any other threads that use workers - workers_utilization_enable(); - for (i = 0; static_threads[i].name != NULL ; i++) { struct netdata_static_thread *st = &static_threads[i]; + if(st->enable_routine) + st->enabled = st->enable_routine(); + if(st->config_name) st->enabled = config_get_boolean(st->config_section, st->config_name, st->enabled); @@ -1485,6 +1437,9 @@ int netdata_main(int argc, char **argv) { delta_startup_time("initialize web server"); + // get the certificate and start security + netdata_conf_web_security_init(); + nd_web_api_init(); web_server_threading_selection(); @@ -1502,14 +1457,6 @@ int netdata_main(int argc, char **argv) { delta_startup_time("initialize ML"); ml_init(); - -#ifdef ENABLE_H2O - delta_startup_time("initialize h2o server"); - for (int t = 0; static_threads[t].name; t++) { - if (static_threads[t].start_routine == h2o_main) - static_threads[t].enabled = httpd_is_enabled(); - } -#endif } delta_startup_time("set resource limits"); @@ -1632,6 +1579,7 @@ int netdata_main(int argc, char **argv) { delta_startup_time("start the static threads"); netdata_conf_section_web(); + backfill_threads_detect_from_stream_conf(); set_late_analytics_variables(system_info); for (i = 0; static_threads[i].name != NULL ; i++) { diff --git a/src/daemon/pulse/pulse-workers.c b/src/daemon/pulse/pulse-workers.c index e2e00c80e2bb8e..fbe831534a0966 100644 --- a/src/daemon/pulse/pulse-workers.c +++ b/src/daemon/pulse/pulse-workers.c @@ -142,6 +142,7 @@ static struct worker_utilization all_workers_utilization[] = { { .name = "SERVICE", .family = "workers service", .priority = 1000000 }, { .name = "PROFILER", .family = "workers profile", .priority = 1000000 }, { .name = "PGCEVICT", .family = "workers dbengine eviction", .priority = 1000000 }, + { .name = "BACKFILL", .family = "workers backfill", .priority = 1000000 }, // has to be terminated with a NULL { .name = NULL, .family = NULL } diff --git a/src/daemon/static_threads.c b/src/daemon/static_threads.c index fa4de32e183e49..18f0e40644cbe5 100644 --- a/src/daemon/static_threads.c +++ b/src/daemon/static_threads.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "common.h" +#include "web/api/queries/backfill.h" void *aclk_main(void *ptr); void *analytics_main(void *ptr); @@ -123,6 +124,7 @@ const struct netdata_static_thread static_threads_common[] = { .name = "h2o", .config_section = NULL, .config_name = NULL, + .enable_routine = httpd_is_enabled, .enabled = 0, .thread = NULL, .init_routine = NULL, @@ -168,6 +170,16 @@ const struct netdata_static_thread static_threads_common[] = { .init_routine = NULL, .start_routine = profile_main }, + { + .name = "BACKFILL", + .config_section = NULL, + .config_name = NULL, + .enable_routine = backfill_threads_detect_from_stream_conf, + .enabled = 0, + .thread = NULL, + .init_routine = NULL, + .start_routine = backfill_thread + }, // terminator { diff --git a/src/database/rrd.h b/src/database/rrd.h index 65ff69ee593438..319a89dcf6ab3f 100644 --- a/src/database/rrd.h +++ b/src/database/rrd.h @@ -278,7 +278,7 @@ struct rrddim_tier { STORAGE_COLLECT_HANDLE *sch; // the data collection handle }; -void backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s); +bool backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s); // ---------------------------------------------------------------------------- // RRD DIMENSION - this is a metric @@ -360,7 +360,7 @@ struct rrddim { // ------------------------------------------------------------------------ - struct rrddim_tier tiers[]; // our tiers of databases + struct rrddim_tier tiers[]; // our tiers of databases }; size_t rrddim_size(void); @@ -1238,6 +1238,7 @@ struct rrdhost { struct { pid_t tid; + uint32_t state_id; // every time the receiver connects/disconnects, this is incremented time_t last_connected; // the time the last sender was connected time_t last_disconnected; // the time the last sender was disconnected diff --git a/src/database/rrdhost.c b/src/database/rrdhost.c index 09b64c7c543ce9..e7c196a4e9dc36 100644 --- a/src/database/rrdhost.c +++ b/src/database/rrdhost.c @@ -779,8 +779,6 @@ int rrd_init(const char *hostname, struct rrdhost_system_info *system_info, bool dbengine_enabled = true; } else { - stream_conf_init(); - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE || stream_conf_receiver_needs_dbengine()) { nd_log(NDLS_DAEMON, NDLP_DEBUG, "DBENGINE: Initializing ..."); diff --git a/src/libnetdata/locks/spinlock.h b/src/libnetdata/locks/spinlock.h index f541e264631a94..0508100b6b18ec 100644 --- a/src/libnetdata/locks/spinlock.h +++ b/src/libnetdata/locks/spinlock.h @@ -29,7 +29,11 @@ typedef struct netdata_spinlock #define spinlock_trylock(spinlock) (netdata_mutex_trylock(&((spinlock)->inner)) == 0) #define spinlock_init(spinlock) netdata_mutex_init(&((spinlock)->inner) #else +#ifdef NETDATA_INTERNAL_CHECKS +#define SPINLOCK_INITIALIZER { .locked = false, .locker_pid = 0, .spins = 0 } +#else #define SPINLOCK_INITIALIZER { .locked = false } +#endif void spinlock_init_with_trace(SPINLOCK *spinlock, const char *func); #define spinlock_init(spinlock) spinlock_init_with_trace(spinlock, __FUNCTION__) diff --git a/src/libnetdata/log/nd_log.h b/src/libnetdata/log/nd_log.h index 5d8a3cd3f89fe2..a892a38262bad8 100644 --- a/src/libnetdata/log/nd_log.h +++ b/src/libnetdata/log/nd_log.h @@ -163,7 +163,7 @@ typedef struct error_with_limit { usec_t sleep_ut; } ERROR_LIMIT; -#define nd_log_limit_static_global_var(var, log_every_secs, sleep_usecs) static ERROR_LIMIT var = { .last_logged = 0, .count = 0, .log_every = (log_every_secs), .sleep_ut = (sleep_usecs) } +#define nd_log_limit_static_global_var(var, log_every_secs, sleep_usecs) static ERROR_LIMIT var = { .spinlock = SPINLOCK_INITIALIZER, .log_every = (log_every_secs), .count = 0, .last_logged = 0, .sleep_ut = (sleep_usecs) } #define nd_log_limit_static_thread_var(var, log_every_secs, sleep_usecs) static __thread ERROR_LIMIT var = { .last_logged = 0, .count = 0, .log_every = (log_every_secs), .sleep_ut = (sleep_usecs) } void netdata_logger_with_limit(ERROR_LIMIT *erl, ND_LOG_SOURCES source, ND_LOG_FIELD_PRIORITY priority, const char *file, const char *function, unsigned long line, const char *fmt, ... ) PRINTFLIKE(7, 8); #define nd_log_limit(erl, NDLS, NDLP, args...) netdata_logger_with_limit(erl, NDLS, NDLP, __FILE__, __FUNCTION__, __LINE__, ##args) diff --git a/src/libnetdata/threads/threads.c b/src/libnetdata/threads/threads.c index 9f47343af34c2f..5c0562071f2c21 100644 --- a/src/libnetdata/threads/threads.c +++ b/src/libnetdata/threads/threads.c @@ -50,7 +50,7 @@ static struct { ND_THREAD *list; } running; - pthread_attr_t *attr; + pthread_attr_t attr; } threads_globals = { .exited = { .spinlock = SPINLOCK_INITIALIZER, @@ -60,7 +60,6 @@ static struct { .spinlock = SPINLOCK_INITIALIZER, .list = NULL, }, - .attr = NULL, }; static __thread ND_THREAD *_nd_thread_info = NULL; @@ -186,20 +185,15 @@ void nd_thread_rwspinlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_ // early initialization size_t netdata_threads_init(void) { - int i; + memset(&threads_globals.attr, 0, sizeof(threads_globals.attr)); - if(!threads_globals.attr) { - threads_globals.attr = callocz(1, sizeof(pthread_attr_t)); - i = pthread_attr_init(threads_globals.attr); - if (i != 0) - fatal("pthread_attr_init() failed with code %d.", i); - } + if(pthread_attr_init(&threads_globals.attr) != 0) + fatal("pthread_attr_init() failed."); // get the required stack size of the threads of netdata size_t stacksize = 0; - i = pthread_attr_getstacksize(threads_globals.attr, &stacksize); - if(i != 0) - fatal("pthread_attr_getstacksize() failed with code %d.", i); + if(pthread_attr_getstacksize(&threads_globals.attr, &stacksize) != 0) + fatal("pthread_attr_getstacksize() failed with code."); return stacksize; } @@ -211,8 +205,8 @@ void netdata_threads_init_after_fork(size_t stacksize) { int i; // set pthread stack size - if(threads_globals.attr && stacksize > (size_t)PTHREAD_STACK_MIN) { - i = pthread_attr_setstacksize(threads_globals.attr, stacksize); + if(stacksize > (size_t)PTHREAD_STACK_MIN) { + i = pthread_attr_setstacksize(&threads_globals.attr, stacksize); if(i != 0) nd_log(NDLS_DAEMON, NDLP_WARNING, "pthread_attr_setstacksize() to %zu bytes, failed with code %d.", stacksize, i); else @@ -371,7 +365,7 @@ ND_THREAD *nd_thread_create(const char *tag, NETDATA_THREAD_OPTIONS options, voi DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); spinlock_unlock(&threads_globals.running.spinlock); - int ret = pthread_create(&nti->thread, threads_globals.attr, nd_thread_starting_point, nti); + int ret = pthread_create(&nti->thread, &threads_globals.attr, nd_thread_starting_point, nti); if(ret != 0) { nd_log(NDLS_DAEMON, NDLP_ERR, "failed to create new thread for %s. pthread_create() failed with code %d", diff --git a/src/libnetdata/threads/threads.h b/src/libnetdata/threads/threads.h index 0b54a5fc0b634c..bce4bf457671f1 100644 --- a/src/libnetdata/threads/threads.h +++ b/src/libnetdata/threads/threads.h @@ -41,6 +41,9 @@ struct netdata_static_thread { // internal use, to maintain a pointer to the created thread ND_THREAD *thread; + // a function to call to check it should be enabled or not + bool (*enable_routine) (void); + // an initialization function to run before spawning the thread void (*init_routine) (void); diff --git a/src/ml/ml.cc b/src/ml/ml.cc index 8e37253c668816..f828fad027b28b 100644 --- a/src/ml/ml.cc +++ b/src/ml/ml.cc @@ -532,8 +532,10 @@ ml_dimension_deserialize_kmeans(const char *json_str) AcquiredDimension AcqDim(DLI); if (!AcqDim.acquired()) { - netdata_log_error("Failed to deserialize kmeans: could not acquire dimension (machine-guid: %s, dimension: '%s.%s', reason: %s)", - DLI.machineGuid(), DLI.chartId(), DLI.dimensionId(), AcqDim.acquire_failure()); + nd_log_limit_static_global_var(erl, 10, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "ML: Failed to deserialize kmeans: could not acquire dimension (machine-guid: %s, dimension: '%s.%s', reason: %s)", + DLI.machineGuid(), DLI.chartId(), DLI.dimensionId(), AcqDim.acquire_failure()); json_object_put(root); return false; } @@ -1039,8 +1041,10 @@ static enum ml_worker_result ml_worker_create_new_model(ml_worker_t *worker, ml_ AcquiredDimension AcqDim(req.DLI); if (!AcqDim.acquired()) { - netdata_log_error("Failed to create new model: could not acquire dimension (machine-guid: %s, dimension: '%s.%s', reason: %s)", - req.DLI.machineGuid(), req.DLI.chartId(), req.DLI.dimensionId(), AcqDim.acquire_failure()); + nd_log_limit_static_global_var(erl, 10, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "ML: Failed to create new model: could not acquire dimension (machine-guid: %s, dimension: '%s.%s', reason: %s)", + req.DLI.machineGuid(), req.DLI.chartId(), req.DLI.dimensionId(), AcqDim.acquire_failure()); return ML_WORKER_RESULT_NULL_ACQUIRED_DIMENSION; } @@ -1055,8 +1059,10 @@ static enum ml_worker_result ml_worker_add_existing_model(ml_worker_t *worker, m AcquiredDimension AcqDim(req.DLI); if (!AcqDim.acquired()) { - netdata_log_error("Failed to add existing model: could not acquire dimension (machine-guid: %s, dimension: '%s.%s', reason: %s)", - req.DLI.machineGuid(), req.DLI.chartId(), req.DLI.dimensionId(), AcqDim.acquire_failure()); + nd_log_limit_static_global_var(erl, 10, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "ML: Failed to add existing model: could not acquire dimension (machine-guid: %s, dimension: '%s.%s', reason: %s)", + req.DLI.machineGuid(), req.DLI.chartId(), req.DLI.dimensionId(), AcqDim.acquire_failure()); return ML_WORKER_RESULT_NULL_ACQUIRED_DIMENSION; } diff --git a/src/plugins.d/pluginsd_parser.c b/src/plugins.d/pluginsd_parser.c index f5f71e3079ac98..78f97bb5ade0a1 100644 --- a/src/plugins.d/pluginsd_parser.c +++ b/src/plugins.d/pluginsd_parser.c @@ -2,6 +2,8 @@ #include "pluginsd_internals.h" #include "streaming/replication.h" +#include "streaming/stream-waiting-list.h" +#include "web/api/queries/backfill.h" static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) { int idx = 1; @@ -373,6 +375,19 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p return PARSER_RC_OK; } +static void backfill_callback(size_t successful_dims __maybe_unused, size_t failed_dims __maybe_unused, struct backfill_request_data *brd) { + if (brd->rrdhost_receiver_state_id == __atomic_load_n(&brd->host->stream.rcv.status.state_id, __ATOMIC_RELAXED)) { + if (!replicate_chart_request(send_to_plugin, brd->parser, brd->host, brd->st, + brd->first_entry_child, brd->last_entry_child, brd->child_wall_clock_time, + 0, 0)) { + netdata_log_error( + "PLUGINSD: 'host:%s' failed to initiate replication for 'chart:%s'", + rrdhost_hostname(brd->host), + rrdset_id(brd->st)); + } + } +} + static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); @@ -401,9 +416,20 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); - ok = replicate_chart_request(send_to_plugin, parser, host, st, - first_entry_child, last_entry_child, child_wall_clock_time, - 0, 0); + struct backfill_request_data brd = { + .rrdhost_receiver_state_id =__atomic_load_n(&host->stream.rcv.status.state_id, __ATOMIC_RELAXED), + .parser = parser, + .host = host, + .st = st, + .first_entry_child = first_entry_child, + .last_entry_child = last_entry_child, + .child_wall_clock_time = child_wall_clock_time, + }; + + ok = backfill_request_add(st, backfill_callback, &brd); + if(!ok) + ok = replicate_chart_request( + send_to_plugin, parser, host, st, first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); } #ifdef NETDATA_LOG_REPLICATION_REQUESTS else { @@ -412,6 +438,8 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w } #endif + stream_thread_received_metadata(); + return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } diff --git a/src/plugins.d/pluginsd_parser.h b/src/plugins.d/pluginsd_parser.h index 08804245782944..eb219494a67bc6 100644 --- a/src/plugins.d/pluginsd_parser.h +++ b/src/plugins.d/pluginsd_parser.h @@ -5,10 +5,10 @@ #include "daemon/common.h" -#define WORKER_PARSER_FIRST_JOB 34 +#define WORKER_PARSER_FIRST_JOB 35 // this has to be in-sync with the same at stream-thread.c -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 9) +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION 25 // this controls the max response size of a function #define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024) diff --git a/src/plugins.d/pluginsd_replication.c b/src/plugins.d/pluginsd_replication.c index aab06576555f0b..cb234ccb77fad5 100644 --- a/src/plugins.d/pluginsd_replication.c +++ b/src/plugins.d/pluginsd_replication.c @@ -3,6 +3,7 @@ #include "pluginsd_replication.h" #include "streaming/stream-receiver-internals.h" #include "streaming/replication.h" +#include "streaming/stream-waiting-list.h" PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) { int idx = 1; @@ -359,6 +360,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) { host->stream.rcv.status.replication.percent = 100.0; worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->stream.rcv.status.replication.percent); + stream_thread_received_replication(); + return PARSER_RC_OK; } diff --git a/src/streaming/protocol/command-begin-set-end.c b/src/streaming/protocol/command-begin-set-end.c index 9af7613e78eff4..0433cdc31fab81 100644 --- a/src/streaming/protocol/command-begin-set-end.c +++ b/src/streaming/protocol/command-begin-set-end.c @@ -30,7 +30,7 @@ stream_send_rrdset_metrics_v1_internal(BUFFER *wb, RRDSET *st, struct sender_sta buffer_fast_strcat(wb, "\n", 1); } else { - internal_error(true, "STREAM SEND '%s': 'chart:%s/dim:%s' flag 'exposed' is updated but not exposed", + internal_error(true, "STREAM SND '%s': 'chart:%s/dim:%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); // we will include it in the next iteration rrddim_metadata_updated(rd); diff --git a/src/streaming/protocol/command-nodeid.c b/src/streaming/protocol/command-nodeid.c index b82195936358de..be46e74b24a581 100644 --- a/src/streaming/protocol/command-nodeid.c +++ b/src/streaming/protocol/command-nodeid.c @@ -51,7 +51,7 @@ void stream_sender_get_node_and_claim_id_from_parent(struct sender_state *s) { ND_UUID claim_id; if (uuid_parse(claim_id_str ? claim_id_str : "", claim_id.uuid) != 0) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s] received invalid claim id '%s'", + "STREAM SND '%s' [to %s] received invalid claim id '%s'", rrdhost_hostname(s->host), s->connected_to, claim_id_str ? claim_id_str : "(unset)"); return; @@ -60,7 +60,7 @@ void stream_sender_get_node_and_claim_id_from_parent(struct sender_state *s) { ND_UUID node_id; if(uuid_parse(node_id_str ? node_id_str : "", node_id.uuid) != 0) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s] received an invalid node id '%s'", + "STREAM SND '%s' [to %s] received an invalid node id '%s'", rrdhost_hostname(s->host), s->connected_to, node_id_str ? node_id_str : "(unset)"); return; @@ -68,14 +68,14 @@ void stream_sender_get_node_and_claim_id_from_parent(struct sender_state *s) { if (!UUIDiszero(s->host->aclk.claim_id_of_parent) && !UUIDeq(s->host->aclk.claim_id_of_parent, claim_id)) nd_log(NDLS_DAEMON, NDLP_INFO, - "STREAM SEND '%s' [to %s] changed parent's claim id to %s", + "STREAM SND '%s' [to %s] changed parent's claim id to %s", rrdhost_hostname(s->host), s->connected_to, claim_id_str ? claim_id_str : "(unset)"); if(!UUIDiszero(s->host->node_id) && !UUIDeq(s->host->node_id, node_id)) { if(claimed) { nd_log(NDLS_DAEMON, NDLP_WARNING, - "STREAM SEND '%s' [to %s] parent reports different node id '%s', but we are claimed. Ignoring it.", + "STREAM SND '%s' [to %s] parent reports different node id '%s', but we are claimed. Ignoring it.", rrdhost_hostname(s->host), s->connected_to, node_id_str ? node_id_str : "(unset)"); return; @@ -83,7 +83,7 @@ void stream_sender_get_node_and_claim_id_from_parent(struct sender_state *s) { else { update_node_id = true; nd_log(NDLS_DAEMON, NDLP_WARNING, - "STREAM SEND '%s' [to %s] changed node id to %s", + "STREAM SND '%s' [to %s] changed node id to %s", rrdhost_hostname(s->host), s->connected_to, node_id_str ? node_id_str : "(unset)"); } @@ -91,7 +91,7 @@ void stream_sender_get_node_and_claim_id_from_parent(struct sender_state *s) { if(!url || !*url) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s] received an invalid cloud URL '%s'", + "STREAM SND '%s' [to %s] received an invalid cloud URL '%s'", rrdhost_hostname(s->host), s->connected_to, url ? url : "(unset)"); return; diff --git a/src/streaming/protocol/commands.c b/src/streaming/protocol/commands.c index b73b47d8909f90..ae587d88c6e1cc 100644 --- a/src/streaming/protocol/commands.c +++ b/src/streaming/protocol/commands.c @@ -31,7 +31,7 @@ RRDSET_STREAM_BUFFER stream_send_metrics_init(RRDSET *st, time_t wall_clock_time // - the parent just disconnected, so local data are not streamed to parent nd_log(NDLS_DAEMON, NDLP_INFO, - "STREAM SEND '%s': streaming is not ready, not sending data to a parent...", + "STREAM SND '%s': streaming is not ready, not sending data to a parent...", rrdhost_hostname(host)); } @@ -39,7 +39,7 @@ RRDSET_STREAM_BUFFER stream_send_metrics_init(RRDSET *st, time_t wall_clock_time } else if(unlikely(host_flags & RRDHOST_FLAG_STREAM_SENDER_LOGGED_STATUS)) { nd_log(NDLS_DAEMON, NDLP_INFO, - "STREAM SEND '%s': streaming is ready, sending metrics to parent...", + "STREAM SND '%s': streaming is ready, sending metrics to parent...", rrdhost_hostname(host)); rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_SENDER_LOGGED_STATUS); } diff --git a/src/streaming/replication.c b/src/streaming/replication.c index 755e6c4916807f..1d970ed73d7b4b 100644 --- a/src/streaming/replication.c +++ b/src/streaming/replication.c @@ -155,7 +155,7 @@ static struct replication_query *replication_query_prepare( if (st->last_updated.tv_sec > q->query.before) { #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, - "STREAM SEND REPLAY: 'host:%s/chart:%s' " + "STREAM SND REPLAY: 'host:%s/chart:%s' " "has start_streaming = true, " "adjusting replication before timestamp from %llu to %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), @@ -178,7 +178,7 @@ static struct replication_query *replication_query_prepare( if (unlikely(rd_dfe.counter >= q->dimensions)) { internal_error(true, - "STREAM SEND REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + "STREAM SND REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", rrdhost_hostname(st->rrdhost), rrdset_id(st)); break; } @@ -364,7 +364,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, - "STREAM SEND REPLAY: 'host:%s/chart:%s/dim:%s': db does not advance the query " + "STREAM SND REPLAY: 'host:%s/chart:%s/dim:%s': db does not advance the query " "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), (unsigned long long) now); @@ -414,7 +414,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s #ifdef NETDATA_INTERNAL_CHECKS nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, - "STREAM SEND REPLAY WARNING: 'host:%s/chart:%s' misaligned dimensions, " + "STREAM SND REPLAY WARNING: 'host:%s/chart:%s' misaligned dimensions, " "update every (min: %ld, max: %ld), " "start time (min: %ld, max: %ld), " "end time (min %ld, max %ld), " @@ -450,7 +450,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s q->query.enable_streaming = false; internal_error(true, - "STREAM SEND REPLAY: current buffer size %zu is more than the " + "STREAM SND REPLAY: current buffer size %zu is more than the " "max message size %zu for chart '%s' of host '%s'. " "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), @@ -530,14 +530,14 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); internal_error(true, - "STREAM SEND REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + "STREAM SND REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); } else internal_error(true, - "STREAM SEND REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", + "STREAM SND REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS @@ -708,13 +708,13 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size st->stream.snd.resync_time_s = 0; #ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, "STREAM SEND REPLAY: 'host:%s/chart:%s' streaming starts", + internal_error(true, "STREAM SND REPLAY: 'host:%s/chart:%s' streaming starts", rrdhost_hostname(st->rrdhost), rrdset_id(st)); #endif } else internal_error(true, - "STREAM SEND REPLAY ERROR: 'host:%s/chart:%s' " + "STREAM SND REPLAY ERROR: 'host:%s/chart:%s' " "received start streaming command, but the chart is not in progress replicating", rrdhost_hostname(st->rrdhost), rrdset_id(st)); } @@ -775,7 +775,7 @@ static void replicate_log_request(struct replication_request_details *r, const c nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE, #endif - "STREAM SEND REPLAY ERROR: 'host:%s/chart:%s' child sent: " + "STREAM SND REPLAY ERROR: 'host:%s/chart:%s' child sent: " "db from %ld to %ld%s, wall clock time %ld, " "last request from %ld to %ld, " "issue: %s - " @@ -813,7 +813,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before); internal_error(true, - "STREAM SEND REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " + "STREAM SND REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf @@ -842,7 +842,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c ssize_t ret = r->caller.callback(buffer, r->caller.parser, STREAM_TRAFFIC_TYPE_REPLICATION); if (ret < 0) { - netdata_log_error("STREAM SEND REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)", + netdata_log_error("STREAM SND REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)", rrdhost_hostname(r->host), rrdset_id(r->st), ret); return false; } @@ -1281,7 +1281,7 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff } if (!rse_to_delete) - fatal("STREAM SEND REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.", + fatal("STREAM SND REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after); } @@ -1384,7 +1384,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ // we can replace this command internal_error( true, - "STREAM SEND '%s' [to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + "STREAM SND '%s' [to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); @@ -1397,7 +1397,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ replication_sort_entry_add(rq); internal_error( true, - "STREAM SEND '%s' [to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + "STREAM SND '%s' [to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); @@ -1405,7 +1405,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ else { internal_error( true, - "STREAM SEND '%s' [to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + "STREAM SND '%s' [to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false", @@ -1449,7 +1449,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor } if(!rq->st) { - internal_error(true, "STREAM SEND REPLAY ERROR: 'host:%s/chart:%s' not found", + internal_error(true, "STREAM SND REPLAY ERROR: 'host:%s/chart:%s' not found", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); goto cleanup; @@ -1577,7 +1577,7 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { host->sender && !stream_sender_pending_replication_requests(host->sender) && dictionary_entries(host->sender->replication.requests) != 0, - "STREAM SEND REPLAY SUMMARY: 'host:%s' reports %zu pending replication requests, " + "STREAM SND REPLAY SUMMARY: 'host:%s' reports %zu pending replication requests, " "but its chart replication index says there are %zu charts pending replication", rrdhost_hostname(host), stream_sender_pending_replication_requests(host->sender), @@ -1596,7 +1596,7 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { if(!flags) { internal_error( true, - "STREAM SEND REPLAY SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED", + "STREAM SND REPLAY SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED", rrdhost_hostname(host), rrdset_id(st) ); is_error = true; @@ -1605,7 +1605,7 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { internal_error( true, - "STREAM SEND REPLAY SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished", + "STREAM SND REPLAY SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished", rrdhost_hostname(host), rrdset_id(st) ); is_error = true; @@ -1619,7 +1619,7 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { rrdset_foreach_done(st); internal_error(errors, - "STREAM SEND REPLAY SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished", + "STREAM SND REPLAY SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished", rrdhost_hostname(host), ok, errors); return errors; diff --git a/src/streaming/stream-capabilities.c b/src/streaming/stream-capabilities.c index 83671dc7002e69..4d88ceefee9589 100644 --- a/src/streaming/stream-capabilities.c +++ b/src/streaming/stream-capabilities.c @@ -80,7 +80,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); - nd_log_daemon(NDLP_INFO, "STREAM RECEIVE '%s' [from [%s]:%s]: established link with negotiated capabilities: %s", + nd_log_daemon(NDLP_INFO, "STREAM RCV '%s' [from [%s]:%s]: established link with negotiated capabilities: %s", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); buffer_free(wb); @@ -90,7 +90,7 @@ void log_sender_capabilities(struct sender_state *s) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); - nd_log_daemon(NDLP_INFO, "STREAM SEND '%s' [to %s]: established link with negotiated capabilities: %s", + nd_log_daemon(NDLP_INFO, "STREAM SND '%s' [to %s]: established link with negotiated capabilities: %s", rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); buffer_free(wb); diff --git a/src/streaming/stream-conf.c b/src/streaming/stream-conf.c index 1b14e925afa318..71165a683abd2b 100644 --- a/src/streaming/stream-conf.c +++ b/src/streaming/stream-conf.c @@ -44,7 +44,7 @@ struct _stream_receive stream_receive = { } }; -static void stream_conf_load() { +static void stream_conf_load_internal() { errno_clear(); char *filename = filename_from_path_entry_strdupz(netdata_configured_user_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) { @@ -88,8 +88,12 @@ bool stream_conf_receiver_needs_dbengine(void) { return stream_conf_needs_dbengine(&stream_config); } -bool stream_conf_init() { - stream_conf_load(); +void stream_conf_load() { + static bool loaded = false; + if(loaded) return; + loaded = true; + + stream_conf_load_internal(); stream_send.enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", stream_send.enabled); @@ -179,8 +183,6 @@ bool stream_conf_init() { nd_log_daemon(NDLP_ERR, "STREAM [send]: cannot enable sending thread - information is missing."); stream_send.enabled = false; } - - return stream_send.enabled; } bool stream_conf_configured_as_parent() { @@ -194,7 +196,7 @@ void stream_conf_receiver_config(struct receiver_state *rpt, struct stream_recei rrd_memory_mode_name(default_rrd_memory_mode)))); if (unlikely(config->mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - netdata_log_error("STREAM RECEIVE '%s' [from [%s]:%s]: " + netdata_log_error("STREAM RCV '%s' [from [%s]:%s]: " "dbengine is not enabled, falling back to default." , rpt->hostname , rpt->client_ip, rpt->client_port diff --git a/src/streaming/stream-conf.h b/src/streaming/stream-conf.h index 18ea383fb787de..ab1b2712115815 100644 --- a/src/streaming/stream-conf.h +++ b/src/streaming/stream-conf.h @@ -83,7 +83,7 @@ struct stream_receiver_config { void stream_conf_receiver_config(struct receiver_state *rpt, struct stream_receiver_config *config, const char *api_key, const char *machine_guid); -bool stream_conf_init(); +void stream_conf_load(); bool stream_conf_receiver_needs_dbengine(); bool stream_conf_configured_as_parent(); diff --git a/src/streaming/stream-connector.c b/src/streaming/stream-connector.c index 0befff248839a9..2145039339b771 100644 --- a/src/streaming/stream-connector.c +++ b/src/streaming/stream-connector.c @@ -204,7 +204,7 @@ static int stream_connect_upgrade_prelude(RRDHOST *host __maybe_unused, struct s goto err_cleanup; } - netdata_log_debug(D_STREAM, "Stream sender upgrade to \"" NETDATA_STREAM_PROTO_NAME "\" successful"); + netdata_log_debug(D_STREAM, "STREAM SNDer upgrade to \"" NETDATA_STREAM_PROTO_NAME "\" successful"); rbuf_free(buf); http_parse_ctx_destroy(&ctx); return 0; diff --git a/src/streaming/stream-receiver-connection.c b/src/streaming/stream-receiver-connection.c index 09641f17770389..40e4cbb5a853f3 100644 --- a/src/streaming/stream-receiver-connection.c +++ b/src/streaming/stream-receiver-connection.c @@ -26,7 +26,7 @@ void stream_receiver_log_status(struct receiver_state *rpt, const char *msg, con , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "" , msg); - nd_log(NDLS_DAEMON, priority, "STREAM RECEIVE '%s' [from [%s]:%s]: %s %s%s%s" + nd_log(NDLS_DAEMON, priority, "STREAM RCV '%s' [from [%s]:%s]: %s %s%s%s" , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "" , rpt->client_ip, rpt->client_port , msg @@ -165,15 +165,15 @@ static bool stream_receiver_send_first_response(struct receiver_state *rpt) { return false; } - if (unlikely(!stream_control_children_should_be_accepted())) { - stream_receiver_log_status( - rpt, - "rejecting streaming connection; the system is backfilling higher tiers with high-resolution data, retry later", - STREAM_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE); - - stream_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); - return false; - } +// if (unlikely(!stream_control_children_should_be_accepted())) { +// stream_receiver_log_status( +// rpt, +// "rejecting streaming connection; the system is backfilling higher tiers with high-resolution data, retry later", +// STREAM_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE); +// +// stream_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); +// return false; +// } if(!rrdhost_set_receiver(host, rpt)) { stream_receiver_log_status( @@ -187,7 +187,7 @@ static bool stream_receiver_send_first_response(struct receiver_state *rpt) { } #ifdef NETDATA_INTERNAL_CHECKS - netdata_log_info("STREAM RECEIVE '%s' [from [%s]:%s]: " + netdata_log_info("STREAM RCV '%s' [from [%s]:%s]: " "client willing to stream metrics for host '%s' with machine_guid '%s': " "update every = %d, history = %d, memory mode = %s, health %s,%s" , rpt->hostname @@ -235,7 +235,7 @@ static bool stream_receiver_send_first_response(struct receiver_state *rpt) { // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->sock.fd) < 0) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE '%s' [from [%s]:%s]: cannot remove the non-blocking flag from socket %d", + "STREAM RCV '%s' [from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->sock.fd); struct timeval timeout; @@ -243,7 +243,7 @@ static bool stream_receiver_send_first_response(struct receiver_state *rpt) { timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->sock.fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE '%s' [from [%s]:%s]: cannot set timeout for socket %d", + "STREAM RCV '%s' [from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->sock.fd); } @@ -378,7 +378,7 @@ int stream_receiver_accept_connection(struct web_client *w, char *decoded_query_ rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false); if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { - nd_log_daemon(NDLP_NOTICE, "STREAM RECEIVE '%s' [from [%s]:%s]: " + nd_log_daemon(NDLP_NOTICE, "STREAM RCV '%s' [from [%s]:%s]: " "request has parameter '%s' = '%s', which is not used." , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" , rpt->client_ip, rpt->client_port @@ -540,7 +540,7 @@ int stream_receiver_accept_connection(struct web_client *w, char *decoded_query_ if(nd_sock_send_timeout(&rpt->sock, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - nd_log_daemon(NDLP_ERR, "STREAM RECEIVE '%s' [from [%s]:%s]: failed to reply.", + nd_log_daemon(NDLP_ERR, "STREAM RCV '%s' [from [%s]:%s]: failed to reply.", rpt->hostname, rpt->client_ip, rpt->client_port ); } diff --git a/src/streaming/stream-receiver.c b/src/streaming/stream-receiver.c index 06b17157521a1f..d4e20213eaeccb 100644 --- a/src/streaming/stream-receiver.c +++ b/src/streaming/stream-receiver.c @@ -148,14 +148,14 @@ static inline decompressor_status_t receiver_feed_decompressor(struct receiver_s if (unlikely(!compressed_message_size)) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[x] '%s' [from [%s]:%s]: multiplexed uncompressed data in compressed stream!", + "STREAM RCV[x] '%s' [from [%s]:%s]: multiplexed uncompressed data in compressed stream!", rrdhost_hostname(r->host), r->client_ip, r->client_port); return DECOMPRESS_FAILED; } if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[x] '%s' [from [%s]:%s]: received a compressed message of %zu bytes, " + "STREAM RCV[x] '%s' [from [%s]:%s]: received a compressed message of %zu bytes, " "which is bigger than the max compressed message " "size supported of %zu. Ignoring message.", rrdhost_hostname(r->host), r->client_ip, r->client_port, @@ -174,7 +174,7 @@ static inline decompressor_status_t receiver_feed_decompressor(struct receiver_s if (unlikely(!bytes_to_parse)) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[x] '%s' [from [%s]:%s]: no bytes to decompress.", + "STREAM RCV[x] '%s' [from [%s]:%s]: no bytes to decompress.", rrdhost_hostname(r->host), r->client_ip, r->client_port); return DECOMPRESS_FAILED; } @@ -265,7 +265,7 @@ void stream_receiver_handle_op(struct stream_thread *sth, struct receiver_state STREAM_CIRCULAR_BUFFER_STATS stats = *stream_circular_buffer_stats_unsafe(rpt->thread.send_to_child.scb); spinlock_unlock(&rpt->thread.send_to_child.spinlock); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: send buffer is full (buffer size %u, max %u, used %u, available %u). " + "STREAM RCV[%zu] '%s' [from [%s]:%s]: send buffer is full (buffer size %u, max %u, used %u, available %u). " "Restarting connection.", sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, stats.bytes_size, stats.bytes_max_size, stats.bytes_outstanding, stats.bytes_available); @@ -275,7 +275,7 @@ void stream_receiver_handle_op(struct stream_thread *sth, struct receiver_state } nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu]: invalid msg id %u", sth->id, (unsigned)msg->opcode); + "STREAM RCV[%zu]: invalid msg id %u", sth->id, (unsigned)msg->opcode); } static ssize_t send_to_child(const char *txt, void *data, STREAM_TRAFFIC_TYPE type) { @@ -400,62 +400,67 @@ static void stream_receive_log_database_gap(struct receiver_state *rpt) { char buf[128]; duration_snprintf(buf, sizeof(buf), now - last_db_entry, "s", true); nd_log(NDLS_DAEMON, NDLP_NOTICE, - "STREAM RECEIVE '%s' [from [%s]:%s]: node connected; last sample in the database %s ago", + "STREAM RCV '%s' [from [%s]:%s]: node connected; last sample in the database %s ago", rrdhost_hostname(host), rpt->client_ip, rpt->client_port, buf); } -void stream_receiver_move_queue_to_running_unsafe(struct stream_thread *sth) { +void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct receiver_state *rpt) { internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); - // process the queue - Word_t idx = 0; - for(struct receiver_state *rpt = RECEIVERS_FIRST(&sth->queue.receivers, &idx); - rpt; - rpt = RECEIVERS_NEXT(&sth->queue.receivers, &idx)) { - worker_is_busy(WORKER_STREAM_JOB_DEQUEUE); + worker_is_busy(WORKER_STREAM_JOB_DEQUEUE); - RECEIVERS_DEL(&sth->queue.receivers, (Word_t)rpt); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_STR(NDF_NIDL_NODE, rpt->host->hostname), + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_STR(NDF_NIDL_NODE, rpt->host->hostname), - ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM RCV[%zu] '%s' [from [%s]:%s]: moving host from receiver queue to receiver running...", + sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: moving host from receiver queue to receiver running...", - sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + rpt->host->stream.rcv.status.tid = gettid_cached(); + rpt->thread.meta.type = POLLFD_TYPE_RECEIVER; + rpt->thread.meta.rpt = rpt; - rpt->host->stream.rcv.status.tid = gettid_cached(); - rpt->thread.meta.type = POLLFD_TYPE_RECEIVER; - rpt->thread.meta.rpt = rpt; + spinlock_lock(&rpt->thread.send_to_child.spinlock); + rpt->thread.send_to_child.scb = stream_circular_buffer_create(); + rpt->thread.send_to_child.msg.thread_slot = (int32_t)sth->id; + rpt->thread.send_to_child.msg.session = os_random32(); + rpt->thread.send_to_child.msg.meta = &rpt->thread.meta; + spinlock_unlock(&rpt->thread.send_to_child.spinlock); - spinlock_lock(&rpt->thread.send_to_child.spinlock); - rpt->thread.send_to_child.scb = stream_circular_buffer_create(); - rpt->thread.send_to_child.msg.thread_slot = (int32_t)sth->id; - rpt->thread.send_to_child.msg.session = os_random32(); - rpt->thread.send_to_child.msg.meta = &rpt->thread.meta; - spinlock_unlock(&rpt->thread.send_to_child.spinlock); + internal_fatal(META_GET(&sth->run.meta, (Word_t)&rpt->thread.meta) != NULL, "Receiver to be added is already in the list of receivers"); + META_SET(&sth->run.meta, (Word_t)&rpt->thread.meta, &rpt->thread.meta); - internal_fatal(META_GET(&sth->run.meta, (Word_t)&rpt->thread.meta) != NULL, "Receiver to be added is already in the list of receivers"); - META_SET(&sth->run.meta, (Word_t)&rpt->thread.meta, &rpt->thread.meta); + if(sock_setnonblock(rpt->sock.fd) < 0) + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM RCV '%s' [from [%s]:%s]: cannot set the non-blocking flag from socket %d", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->sock.fd); - if(sock_setnonblock(rpt->sock.fd) < 0) - nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE '%s' [from [%s]:%s]: cannot set the non-blocking flag from socket %d", - rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->sock.fd); + if(!nd_poll_add(sth->run.ndpl, rpt->sock.fd, ND_POLL_READ, &rpt->thread.meta)) + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM RCV[%zu] '%s' [from [%s]:%s]:" + "Failed to add receiver socket to nd_poll()", + sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - if(!nd_poll_add(sth->run.ndpl, rpt->sock.fd, ND_POLL_READ, &rpt->thread.meta)) - nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]:" - "Failed to add receiver socket to nd_poll()", - sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + stream_receive_log_database_gap(rpt); - stream_receive_log_database_gap(rpt); + // keep this last, since it sends commands back to the child + streaming_parser_init(rpt); +} - // keep this last, since it sends commands back to the child - streaming_parser_init(rpt); +void stream_receiver_move_entire_queue_to_running_unsafe(struct stream_thread *sth) { + internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); + + // process the queue + Word_t idx = 0; + for(struct receiver_state *rpt = RECEIVERS_FIRST(&sth->queue.receivers, &idx); + rpt; + rpt = RECEIVERS_NEXT(&sth->queue.receivers, &idx)) { + RECEIVERS_DEL(&sth->queue.receivers, idx); + stream_receiver_move_to_running_unsafe(sth, rpt); } } @@ -464,7 +469,7 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st errno_clear(); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: " + "STREAM RCV[%zu] '%s' [from [%s]:%s]: " "receiver disconnected: %s" , sth->id , rpt->hostname ? rpt->hostname : "-" @@ -656,7 +661,7 @@ bool stream_receive_process_poll_events(struct stream_thread *sth, struct receiv worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: %s - closing connection", + "STREAM RCV[%zu] '%s' [from [%s]:%s]: %s - closing connection", sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, error); receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_ERROR, false); @@ -683,7 +688,7 @@ bool stream_receive_process_poll_events(struct stream_thread *sth, struct receiv if (!stats->bytes_outstanding) { if (!nd_poll_upd(sth->run.ndpl, rpt->sock.fd, ND_POLL_READ, &rpt->thread.meta)) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: cannot update nd_poll()", + "STREAM RCV[%zu] '%s' [from [%s]:%s]: cannot update nd_poll()", sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); // recreate the circular buffer if we have to @@ -711,7 +716,7 @@ bool stream_receive_process_poll_events(struct stream_thread *sth, struct receiv if (disconnect_reason) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: %s (%zd, on fd %d) - closing connection - " + "STREAM RCV[%zu] '%s' [from [%s]:%s]: %s (%zd, on fd %d) - closing connection - " "we have sent %zu bytes in %zu operations.", sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, disconnect_reason, rc, rpt->sock.fd, stats->bytes_sent, stats->sends); @@ -745,7 +750,7 @@ bool stream_receive_process_poll_events(struct stream_thread *sth, struct receiv else if (rc == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_REMOTE_CLOSED); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: socket %d reports EOF (closed by child).", + "STREAM RCV[%zu] '%s' [from [%s]:%s]: socket %d reports EOF (closed by child).", sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->sock.fd); receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_CLOSED_BY_REMOTE_END, false); stream_receiver_remove(sth, rpt, "socket reports EOF (closed by child)"); @@ -761,7 +766,7 @@ bool stream_receive_process_poll_events(struct stream_thread *sth, struct receiv else { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: error during receive (%zd, on fd %d) - closing connection.", + "STREAM RCV[%zu] '%s' [from [%s]:%s]: error during receive (%zd, on fd %d) - closing connection.", sth->id, rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rc, rpt->sock.fd); receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, false); stream_receiver_remove(sth, rpt, "error during receive"); @@ -801,6 +806,8 @@ bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { rrdhost_receiver_lock(host); if (!host->receiver) { + __atomic_add_fetch(&host->stream.rcv.status.state_id, 1, __ATOMIC_RELAXED); + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); host->stream.rcv.status.connections++; @@ -819,7 +826,7 @@ bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { if (rpt->config.health.delay > 0) { host->health.delay_up_to = now_realtime_sec() + rpt->config.health.delay; nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM RECEIVE '%s' [from [%s]:%s]: " + "STREAM RCV '%s' [from [%s]:%s]: " "Postponing health checks for %" PRId64 " seconds, because it was just connected.", rrdhost_hostname(host), rpt->client_ip, rpt->client_port, (int64_t) rpt->config.health.delay); @@ -863,6 +870,7 @@ void rrdhost_clear_receiver(struct receiver_state *rpt) { // Make sure that we detach this thread and don't kill a freshly arriving receiver if (host->receiver == rpt) { + __atomic_add_fetch(&host->stream.rcv.status.state_id, 1, __ATOMIC_RELAXED); rrdhost_flag_clear(host, RRDHOST_FLAG_COLLECTOR_ONLINE); rrdhost_receiver_unlock(host); { @@ -928,7 +936,7 @@ bool stream_receiver_signal_to_stop_and_wait(RRDHOST *host, STREAM_HANDSHAKE rea } if(host->receiver) - netdata_log_error("STREAM RECEIVE[x] '%s' [from [%s]:%s]: " + netdata_log_error("STREAM RCV[x] '%s' [from [%s]:%s]: " "streaming thread takes too long to stop, giving up..." , rrdhost_hostname(host) , host->receiver->client_ip, host->receiver->client_port); diff --git a/src/streaming/stream-sender-commit.c b/src/streaming/stream-sender-commit.c index 1ebb2df030ddc4..0aea46cb3dccd4 100644 --- a/src/streaming/stream-sender-commit.c +++ b/src/streaming/stream-sender-commit.c @@ -21,7 +21,7 @@ void sender_commit_thread_buffer_free(void) { // Collector thread starting a transmission BUFFER *sender_commit_start_with_trace(struct sender_state *s __maybe_unused, struct sender_buffer *commit, const char *func) { if(unlikely(commit->used)) - fatal("STREAM SEND '%s' [to %s]: thread buffer is used multiple times concurrently (%u). " + fatal("STREAM SND '%s' [to %s]: thread buffer is used multiple times concurrently (%u). " "It is already being used by '%s()', and now is called by '%s()'", rrdhost_hostname(s->host), s->connected_to, (unsigned)commit->used, @@ -29,7 +29,7 @@ BUFFER *sender_commit_start_with_trace(struct sender_state *s __maybe_unused, st func ? func : "(null)"); if(unlikely(commit->receiver_tid && commit->receiver_tid != gettid_cached())) - fatal("STREAM SEND '%s' [to %s]: thread buffer is reserved for tid %d, but it used by thread %d function '%s()'.", + fatal("STREAM SND '%s' [to %s]: thread buffer is reserved for tid %d, but it used by thread %d function '%s()'.", rrdhost_hostname(s->host), s->connected_to, commit->receiver_tid, gettid_cached(), func ? func : "(null)"); @@ -87,7 +87,7 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff s->scb, src_len * STREAM_CIRCULAR_BUFFER_ADAPT_TO_TIMES_MAX_SIZE, false))) { // adaptive sizing of the circular buffer nd_log(NDLS_DAEMON, NDLP_NOTICE, - "STREAM SEND '%s' [to %s]: Increased max buffer size to %u (message size %zu).", + "STREAM SND '%s' [to %s]: Increased max buffer size to %u (message size %zu).", rrdhost_hostname(s->host), s->connected_to, stats->bytes_max_size, src_len + 1); } @@ -126,7 +126,7 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff size_t dst_len = stream_compress(&s->compressor, src, size_to_compress, &dst); if (!dst_len) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s]: COMPRESSION failed. Resetting compressor and re-trying", + "STREAM SND '%s' [to %s]: COMPRESSION failed. Resetting compressor and re-trying", rrdhost_hostname(s->host), s->connected_to); stream_compression_initialize(s); @@ -142,7 +142,7 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff size_t decoded_dst_len = stream_decompress_decode_signature((const char *)&signature, sizeof(signature)); if (decoded_dst_len != dst_len) fatal( - "STREAM SEND '%s' [to %s]: invalid signature, original payload %zu bytes, " + "STREAM SND '%s' [to %s]: invalid signature, original payload %zu bytes, " "compressed payload length %zu bytes, but signature says payload is %zu bytes", rrdhost_hostname(s->host), s->connected_to, size_to_compress, dst_len, decoded_dst_len); @@ -188,7 +188,7 @@ overflow_with_lock: { stream_sender_send_opcode(s, msg); nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s]: buffer overflow (buffer size %u, max size %u, used %u, available %u). " + "STREAM SND '%s' [to %s]: buffer overflow (buffer size %u, max size %u, used %u, available %u). " "Restarting connection.", rrdhost_hostname(s->host), s->connected_to, stats->bytes_size, stats->bytes_max_size, stats->bytes_outstanding, stats->bytes_available); @@ -203,7 +203,7 @@ compression_failed_with_lock: { stream_sender_send_opcode(s, msg); nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s]: COMPRESSION failed (twice). " + "STREAM SND '%s' [to %s]: COMPRESSION failed (twice). " "Deactivating compression and restarting connection.", rrdhost_hostname(s->host), s->connected_to); } @@ -213,11 +213,11 @@ void sender_thread_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYP struct sender_buffer *commit = (wb == commit___thread.wb) ? & commit___thread : &s->host->stream.snd.commit; if (unlikely(wb != commit->wb)) - fatal("STREAM SEND '%s' [to %s]: function '%s()' is trying to commit an unknown commit buffer.", + fatal("STREAM SND '%s' [to %s]: function '%s()' is trying to commit an unknown commit buffer.", rrdhost_hostname(s->host), s->connected_to, func); if (unlikely(!commit->used)) - fatal("STREAM SEND '%s' [to %s]: function '%s()' is committing a sender buffer twice.", + fatal("STREAM SND '%s' [to %s]: function '%s()' is committing a sender buffer twice.", rrdhost_hostname(s->host), s->connected_to, func); commit->used = false; diff --git a/src/streaming/stream-sender-execute.c b/src/streaming/stream-sender-execute.c index 0d8b7cf3a4f3cb..022a352d89b8c7 100644 --- a/src/streaming/stream-sender-execute.c +++ b/src/streaming/stream-sender-execute.c @@ -26,7 +26,7 @@ static void stream_execute_function_callback(BUFFER *func_wb, int code, void *da sender_commit_clean_buffer(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); - internal_error(true, "STREAM SEND '%s' [to %s]: FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).", + internal_error(true, "STREAM SND '%s' [to %s]: FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).", rrdhost_hostname(s->host), s->connected_to, string2str(tmp->transaction), buffer_strlen(func_wb), @@ -57,7 +57,7 @@ static void execute_commands_function(struct sender_state *s, const char *comman nd_log(NDLS_ACCESS, NDLP_INFO, NULL); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("STREAM SEND '%s' [to %s]: %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + netdata_log_error("STREAM SND '%s' [to %s]: %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, command, transaction?transaction:"(unset)", @@ -111,7 +111,7 @@ static void execute_deferred_json(struct sender_state *s, void *data) { stream_path_set_from_json(s->host, buffer_tostring(s->defer.payload), true); else nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s]: unknown JSON keyword '%s' with payload: %s", + "STREAM SND '%s' [to %s]: unknown JSON keyword '%s' with payload: %s", rrdhost_hostname(s->host), s->connected_to, keyword, buffer_tostring(s->defer.payload)); } @@ -277,7 +277,7 @@ void stream_sender_execute_commands(struct sender_state *s) { const char *before = get_word(s->rbuf.line.words, s->rbuf.line.num_words, 4); if (!chart_id || !start_streaming || !after || !before) { - netdata_log_error("STREAM SEND '%s' [to %s] %s command is incomplete" + netdata_log_error("STREAM SND '%s' [to %s] %s command is incomplete" " (chart=%s, start_streaming=%s, after=%s, before=%s)", rrdhost_hostname(s->host), s->connected_to, command, @@ -313,7 +313,7 @@ void stream_sender_execute_commands(struct sender_state *s) { s->defer.action_data = strdupz(keyword); } else { - netdata_log_error("STREAM SEND '%s' [to %s] received unknown command over connection: %s", + netdata_log_error("STREAM SND '%s' [to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->rbuf.line.words[0]?s->rbuf.line.words[0]:"(unset)"); } diff --git a/src/streaming/stream-sender.c b/src/streaming/stream-sender.c index 2c1ab048d24e5f..1c6060349393aa 100644 --- a/src/streaming/stream-sender.c +++ b/src/streaming/stream-sender.c @@ -72,7 +72,7 @@ static void stream_sender_charts_and_replication_reset(struct sender_state *s) { void stream_sender_on_connect(struct sender_state *s) { nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM SEND [%s]: running on-connect hooks...", + "STREAM SND [%s]: running on-connect hooks...", rrdhost_hostname(s->host)); rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_SENDER_CONNECTED); @@ -89,7 +89,7 @@ void stream_sender_on_connect(struct sender_state *s) { static void stream_sender_on_ready_to_dispatch(struct sender_state *s) { nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM SEND '%s': running ready-to-dispatch hooks...", + "STREAM SND '%s': running ready-to-dispatch hooks...", rrdhost_hostname(s->host)); // set this flag before sending any data, or the data will not be sent @@ -105,7 +105,7 @@ static void stream_sender_on_ready_to_dispatch(struct sender_state *s) { static void stream_sender_on_disconnect(struct sender_state *s) { nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM SEND '%s': running on-disconnect hooks...", + "STREAM SND '%s': running on-disconnect hooks...", rrdhost_hostname(s->host)); stream_sender_lock(s); @@ -182,7 +182,7 @@ void stream_sender_handle_op(struct stream_thread *sth, struct sender_state *s, STREAM_CIRCULAR_BUFFER_STATS stats = *stream_circular_buffer_stats_unsafe(s->scb); stream_sender_unlock(s); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: send buffer is full (buffer size %u, max %u, used %u, available %u). " + "STREAM SND[%zu] '%s' [to %s]: send buffer is full (buffer size %u, max %u, used %u, available %u). " "Restarting connection.", sth->id, rrdhost_hostname(s->host), s->connected_to, stats.bytes_size, stats.bytes_max_size, stats.bytes_outstanding, stats.bytes_available); @@ -203,7 +203,7 @@ void stream_sender_handle_op(struct stream_thread *sth, struct sender_state *s, worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_COMPRESSION_ERROR); errno_clear(); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: restarting connection without compression.", + "STREAM SND[%zu] '%s' [to %s]: restarting connection without compression.", sth->id, rrdhost_hostname(s->host), s->connected_to); stream_sender_move_running_to_connector_or_remove( @@ -219,7 +219,7 @@ void stream_sender_handle_op(struct stream_thread *sth, struct sender_state *s, } nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu]: invalid msg id %u", sth->id, (unsigned)msg->opcode); + "STREAM SND[%zu]: invalid msg id %u", sth->id, (unsigned)msg->opcode); } @@ -235,7 +235,7 @@ void stream_sender_move_queue_to_running_unsafe(struct stream_thread *sth) { s = SENDERS_NEXT(&sth->queue.senders, &idx)) { worker_is_busy(WORKER_STREAM_JOB_DEQUEUE); - SENDERS_DEL(&sth->queue.senders, (Word_t)s); + SENDERS_DEL(&sth->queue.senders, idx); ND_LOG_STACK lgs[] = { ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname), @@ -245,7 +245,7 @@ void stream_sender_move_queue_to_running_unsafe(struct stream_thread *sth) { ND_LOG_STACK_PUSH(lgs); nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM SEND[%zu] '%s' [to %s]: moving host from dispatcher queue to dispatcher running...", + "STREAM SND[%zu] '%s' [to %s]: moving host from dispatcher queue to dispatcher running...", sth->id, rrdhost_hostname(s->host), s->connected_to); stream_sender_lock(s); @@ -269,7 +269,7 @@ void stream_sender_move_queue_to_running_unsafe(struct stream_thread *sth) { if(!nd_poll_add(sth->run.ndpl, s->sock.fd, ND_POLL_READ, &s->thread.meta)) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: failed to add sender socket to nd_poll()", + "STREAM SND[%zu] '%s' [to %s]: failed to add sender socket to nd_poll()", sth->id, rrdhost_hostname(s->host), s->connected_to); stream_sender_on_ready_to_dispatch(s); @@ -281,7 +281,7 @@ void stream_sender_remove(struct sender_state *s) { // when it gives up on a certain node nd_log(NDLS_DAEMON, NDLP_NOTICE, - "STREAM SEND '%s' [to %s]: streaming sender removed host: %s", + "STREAM SND '%s' [to %s]: streaming sender removed host: %s", rrdhost_hostname(s->host), s->connected_to, stream_handshake_error_to_string(s->exit.reason)); stream_sender_lock(s); @@ -319,7 +319,7 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre if(!nd_poll_del(sth->run.ndpl, s->sock.fd)) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: failed to delete sender socket from nd_poll()", + "STREAM SND[%zu] '%s' [to %s]: failed to delete sender socket from nd_poll()", sth->id, rrdhost_hostname(s->host), s->connected_to); // clear this flag asap, to stop other threads from pushing metrics for this node @@ -335,7 +335,7 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre stream_sender_unlock(s); nd_log(NDLS_DAEMON, NDLP_NOTICE, - "STREAM SEND[%zu] '%s' [to %s]: sender disconnected from parent, reason: %s", + "STREAM SND[%zu] '%s' [to %s]: sender disconnected from parent, reason: %s", sth->id, rrdhost_hostname(s->host), s->connected_to, stream_handshake_error_to_string(reason)); nd_sock_close(&s->sock); @@ -402,7 +402,7 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t n size_snprintf(pending, sizeof(pending), stats.bytes_outstanding, "B", false); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: could not send data for %ld seconds - closing connection - " + "STREAM SND[%zu] '%s' [to %s]: could not send data for %ld seconds - closing connection - " "we have sent %zu bytes in %zu operations, it is idle for %s, and we have %s pending to send " "(buffer is used %.2f%%).", sth->id, rrdhost_hostname(s->host), s->connected_to, stream_send.parents.timeout_s, @@ -418,7 +418,7 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t n if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ | (stats.bytes_outstanding ? ND_POLL_WRITE : 0), &s->thread.meta)) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: failed to update nd_poll().", + "STREAM SND[%zu] '%s' [to %s]: failed to update nd_poll().", sth->id, rrdhost_hostname(s->host), s->connected_to); } @@ -470,7 +470,7 @@ bool stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ stream_sender_unlock(s); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: %s restarting connection - %zu bytes transmitted in %zu operations.", + "STREAM SND[%zu] '%s' [to %s]: %s restarting connection - %zu bytes transmitted in %zu operations.", sth->id, rrdhost_hostname(s->host), s->connected_to, error, stats.bytes_sent, stats.sends); stream_sender_move_running_to_connector_or_remove(sth, s, STREAM_HANDSHAKE_DISCONNECT_SOCKET_ERROR, true); @@ -502,7 +502,7 @@ bool stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ // we sent them all - remove ND_POLL_WRITE if (!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ, &s->thread.meta)) nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: failed to update nd_poll().", + "STREAM SND[%zu] '%s' [to %s]: failed to update nd_poll().", sth->id, rrdhost_hostname(s->host), s->connected_to); // recreate the circular buffer if we have to @@ -531,7 +531,7 @@ bool stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ if (disconnect_reason) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: %s (%zd, on fd %d) - restarting connection - " + "STREAM SND[%zu] '%s' [to %s]: %s (%zd, on fd %d) - restarting connection - " "we have sent %zu bytes in %zu operations.", sth->id, rrdhost_hostname(s->host), s->connected_to, disconnect_reason, rc, s->sock.fd, stats->bytes_sent, stats->sends); @@ -572,7 +572,7 @@ bool stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ else if (rc == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_REMOTE_CLOSED); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: socket %d reports EOF (closed by parent).", + "STREAM SND[%zu] '%s' [to %s]: socket %d reports EOF (closed by parent).", sth->id, rrdhost_hostname(s->host), s->connected_to, s->sock.fd); stream_sender_move_running_to_connector_or_remove( sth, s, STREAM_HANDSHAKE_DISCONNECT_SOCKET_CLOSED_BY_REMOTE_END, true); @@ -585,7 +585,7 @@ bool stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ else { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] '%s' [to %s]: error during receive (%zd, on fd %d) - restarting connection.", + "STREAM SND[%zu] '%s' [to %s]: error during receive (%zd, on fd %d) - restarting connection.", sth->id, rrdhost_hostname(s->host), s->connected_to, rc, s->sock.fd); stream_sender_move_running_to_connector_or_remove( sth, s, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, true); diff --git a/src/streaming/stream-thread.c b/src/streaming/stream-thread.c index 25a532d1136a5f..a633d51a9e3929 100644 --- a/src/streaming/stream-thread.c +++ b/src/streaming/stream-thread.c @@ -1,6 +1,8 @@ // SPDX-License-Identifier: GPL-3.0-or-later +#define STREAM_INTERNALS #include "stream-thread.h" +#include "stream-waiting-list.h" struct stream_thread_globals stream_thread_globals = { .assign = { @@ -30,7 +32,7 @@ static void stream_thread_handle_op(struct stream_thread *sth, struct stream_opc if(!nd_poll_upd(sth->run.ndpl, m->s->sock.fd, ND_POLL_READ|ND_POLL_WRITE, m)) { nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_DEBUG, - "STREAM SEND[%zu] '%s' [to %s]: cannot enable output on sender socket %d.", + "STREAM SND[%zu] '%s' [to %s]: cannot enable output on sender socket %d.", sth->id, rrdhost_hostname(m->s->host), m->s->connected_to, m->s->sock.fd); } msg->opcode &= ~(STREAM_OPCODE_SENDER_POLLOUT); @@ -44,7 +46,7 @@ static void stream_thread_handle_op(struct stream_thread *sth, struct stream_opc if (!nd_poll_upd(sth->run.ndpl, m->rpt->sock.fd, ND_POLL_READ | ND_POLL_WRITE, m)) { nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_DEBUG, - "STREAM RECEIVE[%zu] '%s' [from [%s]:%s]: cannot enable output on receiver socket %d.", + "STREAM RCV[%zu] '%s' [from [%s]:%s]: cannot enable output on receiver socket %d.", sth->id, rrdhost_hostname(m->rpt->host), m->rpt->client_ip, m->rpt->client_port, m->rpt->sock.fd); } msg->opcode &= ~(STREAM_OPCODE_RECEIVER_POLLOUT); @@ -81,14 +83,14 @@ void stream_receiver_send_opcode(struct receiver_state *rpt, struct stream_opcod if(msg.meta != &rpt->thread.meta) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE '%s' [from [%s]:%s]: the receiver in the opcode the message does not match this receiver. " + "STREAM RCV '%s' [from [%s]:%s]: the receiver in the opcode the message does not match this receiver. " "Ignoring opcode.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); return; } struct stream_thread *sth = stream_thread_by_slot_id(msg.thread_slot); if(!sth) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM RECEIVE '%s' [from [%s]:%s]: the opcode (%u) message cannot be verified. Ignoring it.", + "STREAM RCV '%s' [from [%s]:%s]: the opcode (%u) message cannot be verified. Ignoring it.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, msg.opcode); return; } @@ -135,7 +137,7 @@ void stream_receiver_send_opcode(struct receiver_state *rpt, struct stream_opcod } #endif - fatal("STREAM RECEIVE '%s' [from [%s]:%s]: The streaming opcode queue is full, but this should never happen...", + fatal("STREAM RCV '%s' [from [%s]:%s]: The streaming opcode queue is full, but this should never happen...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); } @@ -161,7 +163,7 @@ void stream_sender_send_opcode(struct sender_state *s, struct stream_opcode msg) if(msg.meta != &s->thread.meta) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND '%s' [to %s]: the opcode message does not match this sender. " + "STREAM SND '%s' [to %s]: the opcode message does not match this sender. " "Ignoring opcode.", rrdhost_hostname(s->host), s->connected_to); return; } @@ -169,7 +171,7 @@ void stream_sender_send_opcode(struct sender_state *s, struct stream_opcode msg) struct stream_thread *sth = stream_thread_by_slot_id(msg.thread_slot); if(!sth) { nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[x] '%s' [to %s] the opcode (%u) message cannot be verified. Ignoring it.", + "STREAM SND[x] '%s' [to %s] the opcode (%u) message cannot be verified. Ignoring it.", rrdhost_hostname(s->host), s->connected_to, msg.opcode); return; } @@ -216,7 +218,7 @@ void stream_sender_send_opcode(struct sender_state *s, struct stream_opcode msg) } #endif - fatal("STREAM SEND '%s' [to %s]: The streaming opcode queue is full, but this should never happen...", + fatal("STREAM SND '%s' [to %s]: The streaming opcode queue is full, but this should never happen...", rrdhost_hostname(s->host), s->connected_to); } @@ -448,6 +450,10 @@ void *stream_thread(void *ptr) { "ops processed", "messages", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_RECEIVERS_WAITING_LIST_SIZE, + "receivers waiting to be added", "nodes", + WORKER_METRIC_ABSOLUTE); + if(pipe(sth->pipe.fds) != 0) { nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM THREAD[%zu]: cannot create required pipe.", sth->id); sth->pipe.fds[PIPE_READ] = -1; @@ -477,6 +483,7 @@ void *stream_thread(void *ptr) { bool exit_thread = false; size_t replay_entries = 0; + size_t receivers_waiting = 0; sth->snd.bytes_received = 0; sth->snd.bytes_sent = 0; @@ -488,9 +495,15 @@ void *stream_thread(void *ptr) { // move any pending hosts in the inbound queue, to the running list spinlock_lock(&sth->queue.spinlock); + stream_thread_messages_resize_unsafe(sth); - stream_receiver_move_queue_to_running_unsafe(sth); + + stream_thread_process_waiting_list_unsafe(sth, now_ut); + // stream_receiver_move_entire_queue_to_running_unsafe(sth); + stream_sender_move_queue_to_running_unsafe(sth); + + receivers_waiting = sth->queue.receivers_waiting; spinlock_unlock(&sth->queue.spinlock); last_dequeue_ut = now_ut; } @@ -507,6 +520,8 @@ void *stream_thread(void *ptr) { worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)sth->snd.bytes_received); worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)sth->snd.bytes_sent); worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE)replay_entries); + + worker_set_metric(WORKER_SENDER_JOB_RECEIVERS_WAITING_LIST_SIZE, (NETDATA_DOUBLE)receivers_waiting); replay_entries = 0; sth->snd.bytes_received = 0; sth->snd.bytes_sent = 0; @@ -549,7 +564,7 @@ void *stream_thread(void *ptr) { // dequeue spinlock_lock(&sth->queue.spinlock); stream_sender_move_queue_to_running_unsafe(sth); - stream_receiver_move_queue_to_running_unsafe(sth); + stream_receiver_move_entire_queue_to_running_unsafe(sth); spinlock_unlock(&sth->queue.spinlock); // cleanup receiver and dispatcher @@ -685,12 +700,12 @@ void stream_receiver_add_to_queue(struct receiver_state *rpt) { stream_thread_node_queued(rpt->host); nd_log(NDLS_DAEMON, NDLP_DEBUG, - "STREAM RECEIVE[%zu] '%s': moving host to receiver queue...", + "STREAM RCV[%zu] '%s': moving host to receiver queue...", sth->id, rrdhost_hostname(rpt->host)); spinlock_lock(&sth->queue.spinlock); - internal_fatal(RECEIVERS_GET(&sth->queue.receivers, (Word_t)rpt) != NULL, "Receiver is already in the receivers queue"); - RECEIVERS_SET(&sth->queue.receivers, (Word_t)rpt, rpt); + RECEIVERS_SET(&sth->queue.receivers, ++sth->queue.id, rpt); + sth->queue.receivers_waiting++; spinlock_unlock(&sth->queue.spinlock); } @@ -704,8 +719,7 @@ void stream_sender_add_to_queue(struct sender_state *s) { sth->id, rrdhost_hostname(s->host)); spinlock_lock(&sth->queue.spinlock); - internal_fatal(SENDERS_GET(&sth->queue.senders, (Word_t)s) != NULL, "Sender is already in the senders queue"); - SENDERS_SET(&sth->queue.senders, (Word_t)s, s); + SENDERS_SET(&sth->queue.senders, ++sth->queue.id, s); spinlock_unlock(&sth->queue.spinlock); } diff --git a/src/streaming/stream-thread.h b/src/streaming/stream-thread.h index 5a8878a9803e64..14f04e05cc0661 100644 --- a/src/streaming/stream-thread.h +++ b/src/streaming/stream-thread.h @@ -36,55 +36,58 @@ struct stream_opcode { // IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly // stream thread events -#define WORKER_STREAM_JOB_LIST (WORKER_PARSER_FIRST_JOB - 34) -#define WORKER_STREAM_JOB_DEQUEUE (WORKER_PARSER_FIRST_JOB - 33) -#define WORKER_STREAM_JOB_PREP (WORKER_PARSER_FIRST_JOB - 32) -#define WORKER_STREAM_JOB_POLL_ERROR (WORKER_PARSER_FIRST_JOB - 31) -#define WORKER_SENDER_JOB_PIPE_READ (WORKER_PARSER_FIRST_JOB - 30) +#define WORKER_STREAM_JOB_LIST 0 +#define WORKER_STREAM_JOB_DEQUEUE 1 +#define WORKER_STREAM_JOB_PREP 2 +#define WORKER_STREAM_JOB_POLL_ERROR 3 +#define WORKER_SENDER_JOB_PIPE_READ 4 // socket operations -#define WORKER_STREAM_JOB_SOCKET_RECEIVE (WORKER_PARSER_FIRST_JOB - 29) -#define WORKER_STREAM_JOB_SOCKET_SEND (WORKER_PARSER_FIRST_JOB - 28) -#define WORKER_STREAM_JOB_SOCKET_ERROR (WORKER_PARSER_FIRST_JOB - 27) +#define WORKER_STREAM_JOB_SOCKET_RECEIVE 5 +#define WORKER_STREAM_JOB_SOCKET_SEND 6 +#define WORKER_STREAM_JOB_SOCKET_ERROR 7 // compression -#define WORKER_STREAM_JOB_COMPRESS (WORKER_PARSER_FIRST_JOB - 26) -#define WORKER_STREAM_JOB_DECOMPRESS (WORKER_PARSER_FIRST_JOB - 25) +#define WORKER_STREAM_JOB_COMPRESS 8 +#define WORKER_STREAM_JOB_DECOMPRESS 9 // receiver events -#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 24) -#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 23) +#define WORKER_RECEIVER_JOB_BYTES_READ 10 +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED 11 // sender received commands -#define WORKER_SENDER_JOB_EXECUTE (WORKER_PARSER_FIRST_JOB - 22) -#define WORKER_SENDER_JOB_EXECUTE_REPLAY (WORKER_PARSER_FIRST_JOB - 21) -#define WORKER_SENDER_JOB_EXECUTE_FUNCTION (WORKER_PARSER_FIRST_JOB - 20) -#define WORKER_SENDER_JOB_EXECUTE_META (WORKER_PARSER_FIRST_JOB - 19) - -#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW (WORKER_PARSER_FIRST_JOB - 18) -#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT (WORKER_PARSER_FIRST_JOB - 17) -#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR (WORKER_PARSER_FIRST_JOB - 16) -#define WORKER_SENDER_JOB_DISCONNECT_REMOTE_CLOSED (WORKER_PARSER_FIRST_JOB - 15) -#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR (WORKER_PARSER_FIRST_JOB - 14) -#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR (WORKER_PARSER_FIRST_JOB - 13) -#define WORKER_SENDER_JOB_DISCONNECT_COMPRESSION_ERROR (WORKER_PARSER_FIRST_JOB - 12) -#define WORKER_SENDER_JOB_DISCONNECT_RECEIVER_LEFT (WORKER_PARSER_FIRST_JOB - 11) -#define WORKER_SENDER_JOB_DISCONNECT_HOST_CLEANUP (WORKER_PARSER_FIRST_JOB - 10) +#define WORKER_SENDER_JOB_EXECUTE 12 +#define WORKER_SENDER_JOB_EXECUTE_REPLAY 13 +#define WORKER_SENDER_JOB_EXECUTE_FUNCTION 14 +#define WORKER_SENDER_JOB_EXECUTE_META 15 + +#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 16 +#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 17 +#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 18 +#define WORKER_SENDER_JOB_DISCONNECT_REMOTE_CLOSED 19 +#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 20 +#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 21 +#define WORKER_SENDER_JOB_DISCONNECT_COMPRESSION_ERROR 22 +#define WORKER_SENDER_JOB_DISCONNECT_RECEIVER_LEFT 23 +#define WORKER_SENDER_JOB_DISCONNECT_HOST_CLEANUP 24 // dispatcher metrics // this has to be the same at pluginsd_parser.h -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 9) -#define WORKER_STREAM_METRIC_NODES (WORKER_PARSER_FIRST_JOB - 8) -#define WORKER_SENDER_JOB_BUFFER_RATIO (WORKER_PARSER_FIRST_JOB - 7) -#define WORKER_SENDER_JOB_BYTES_RECEIVED (WORKER_PARSER_FIRST_JOB - 6) -#define WORKER_SENDER_JOB_BYTES_SENT (WORKER_PARSER_FIRST_JOB - 5) -#define WORKER_SENDER_JOB_BYTES_COMPRESSED (WORKER_PARSER_FIRST_JOB - 4) -#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 3) -#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO (WORKER_PARSER_FIRST_JOB - 2) -#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE (WORKER_PARSER_FIRST_JOB - 1) -#define WORKER_SENDER_JOB_MESSAGES (WORKER_PARSER_FIRST_JOB - 0) - -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 35 +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION 25 +#define WORKER_STREAM_METRIC_NODES 26 +#define WORKER_SENDER_JOB_BUFFER_RATIO 27 +#define WORKER_SENDER_JOB_BYTES_RECEIVED 28 +#define WORKER_SENDER_JOB_BYTES_SENT 29 +#define WORKER_SENDER_JOB_BYTES_COMPRESSED 30 +#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 31 +#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 32 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 33 +#define WORKER_SENDER_JOB_MESSAGES 34 +#define WORKER_SENDER_JOB_RECEIVERS_WAITING_LIST_SIZE 35 + +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 36 #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 34 #endif @@ -141,10 +144,19 @@ struct stream_thread { // the incoming queue of the dispatcher thread // the connector thread leaves the connected senders in this list, for the dispatcher to pick them up SPINLOCK spinlock; + Word_t id; SENDERS_JudyLSet senders; RECEIVERS_JudyLSet receivers; + + size_t receivers_waiting; } queue; + struct { + usec_t last_accepted_ut; + size_t metadata; + size_t replication; + } waiting_list; + struct { SPINLOCK spinlock; size_t added; @@ -177,7 +189,7 @@ struct rrdhost; extern struct stream_thread_globals stream_thread_globals; void stream_sender_move_queue_to_running_unsafe(struct stream_thread *sth); -void stream_receiver_move_queue_to_running_unsafe(struct stream_thread *sth); +void stream_receiver_move_entire_queue_to_running_unsafe(struct stream_thread *sth); void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t now_ut); void stream_receiver_add_to_queue(struct receiver_state *rpt); @@ -198,6 +210,8 @@ void stream_thread_node_removed(struct rrdhost *host); // returns true if my_meta has received a message bool stream_thread_process_opcodes(struct stream_thread *sth, struct pollfd_meta *my_meta); +void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct receiver_state *rpt); + #include "stream-sender-internals.h" #include "stream-receiver-internals.h" #include "plugins.d/pluginsd_parser.h" diff --git a/src/streaming/stream-waiting-list.c b/src/streaming/stream-waiting-list.c new file mode 100644 index 00000000000000..2562415bf67997 --- /dev/null +++ b/src/streaming/stream-waiting-list.c @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define STREAM_INTERNALS +#include "stream-waiting-list.h" + +#define ACCEPT_NODES_EVERY_UT (5 * USEC_PER_SEC) + +static __thread struct { + size_t metadata; + size_t replication; +} throttle = { 0 }; + +void stream_thread_received_metadata(void) { + throttle.metadata++; +} +void stream_thread_received_replication(void) { + throttle.replication++; +} + +static inline size_t normalize_value(size_t v) { + return (v / 100) * 100; +} + +void stream_thread_process_waiting_list_unsafe(struct stream_thread *sth, usec_t now_ut) { + internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); + + Word_t idx = 0; + struct receiver_state *rpt = RECEIVERS_FIRST(&sth->queue.receivers, &idx); + if(!rpt) return; + + if(sth->waiting_list.last_accepted_ut + ACCEPT_NODES_EVERY_UT > now_ut || + !stream_control_children_should_be_accepted()) + return; + + size_t n_metadata = normalize_value(throttle.metadata); + size_t n_replication = normalize_value(throttle.replication); + + if(sth->waiting_list.metadata != n_metadata || + sth->waiting_list.replication != n_replication) { + sth->waiting_list.metadata = n_metadata; + sth->waiting_list.replication = n_replication; + return; + } + + RECEIVERS_DEL(&sth->queue.receivers, idx); + stream_receiver_move_to_running_unsafe(sth, rpt); + sth->queue.receivers_waiting--; +} diff --git a/src/streaming/stream-waiting-list.h b/src/streaming/stream-waiting-list.h new file mode 100644 index 00000000000000..c80be6bccc114b --- /dev/null +++ b/src/streaming/stream-waiting-list.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_STREAM_WAITING_LIST_H +#define NETDATA_STREAM_WAITING_LIST_H + +void stream_thread_received_metadata(void); +void stream_thread_received_replication(void); + +#ifdef STREAM_INTERNALS +#include "stream-thread.h" +void stream_thread_process_waiting_list_unsafe(struct stream_thread *sth, usec_t now_ut); +#endif + +#endif //NETDATA_STREAM_WAITING_LIST_H diff --git a/src/web/api/queries/backfill.c b/src/web/api/queries/backfill.c new file mode 100644 index 00000000000000..f24ec47d313058 --- /dev/null +++ b/src/web/api/queries/backfill.c @@ -0,0 +1,231 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "backfill.h" + +struct backfill_request { + size_t rrdhost_receiver_state_id; + RRDSET_ACQUIRED *rsa; + uint32_t works; + uint32_t successful; + uint32_t failed; + backfill_callback_t cb; + struct backfill_request_data data; +}; + +struct backfill_dim_work { + RRDDIM_ACQUIRED *rda; + struct backfill_request *br; +}; + +DEFINE_JUDYL_TYPED(BACKFILL, struct backfill_dim_work *); + +static struct { + struct completion completion; + + SPINLOCK spinlock; + bool running; + Word_t id; + size_t queue_size; + BACKFILL_JudyLSet queue; + + ARAL *ar_br; + ARAL *ar_bdm; + +} backfill_globals = { + .spinlock = SPINLOCK_INITIALIZER, + .queue = { 0 }, +}; + +bool backfill_request_add(RRDSET *st, backfill_callback_t cb, struct backfill_request_data *data) { + bool rc = false; + size_t dimensions = dictionary_entries(st->rrddim_root_index); + if(!dimensions || dimensions > 200) + return rc; + + size_t added = 0; + struct backfill_dim_work *array[dimensions]; + + if(backfill_globals.running) { + struct backfill_request *br = aral_mallocz(backfill_globals.ar_br); + br->data = *data; + br->rrdhost_receiver_state_id =__atomic_load_n(&st->rrdhost->stream.rcv.status.state_id, __ATOMIC_RELAXED); + br->rsa = rrdset_find_and_acquire(st->rrdhost, string2str(st->id)); + if(br->rsa) { + br->cb = cb; + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(added >= dimensions) + break; + + if (!rrddim_option_check(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS)) { + struct backfill_dim_work *bdm = aral_mallocz(backfill_globals.ar_bdm); + bdm->rda = (RRDDIM_ACQUIRED *)dictionary_acquired_item_dup(st->rrddim_root_index, rd_dfe.item); + bdm->br = br; + br->works++; + array[added++] = bdm; + } + } + rrddim_foreach_done(rd); + } + + if(added) { + spinlock_lock(&backfill_globals.spinlock); + + for(size_t i = 0; i < added ;i++) { + backfill_globals.queue_size++; + BACKFILL_SET(&backfill_globals.queue, backfill_globals.id++, array[i]); + } + + spinlock_unlock(&backfill_globals.spinlock); + completion_mark_complete_a_job(&backfill_globals.completion); + + rc = true; + } + else { + // no dimensions added + rrdset_acquired_release(br->rsa); + aral_freez(backfill_globals.ar_br, br); + } + } + + return rc; +} + +bool backfill_execute(struct backfill_dim_work *bdm) { + RRDSET *st = rrdset_acquired_to_rrdset(bdm->br->rsa); + + if(bdm->br->rrdhost_receiver_state_id !=__atomic_load_n(&st->rrdhost->stream.rcv.status.state_id, __ATOMIC_RELAXED)) + return false; + + RRDDIM *rd = rrddim_acquired_to_rrddim(bdm->rda); + + size_t success = 0; + for (size_t tier = 1; tier < storage_tiers; tier++) + if(backfill_tier_from_smaller_tiers(rd, tier, now_realtime_sec())) + success++; + + if(success > 0) + rrddim_option_set(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS); + + return success > 0; +} + +static void backfill_dim_work_free(bool successful, struct backfill_dim_work *bdm) { + struct backfill_request *br = bdm->br; + + if(successful) + __atomic_add_fetch(&br->successful, 1, __ATOMIC_RELAXED); + else + __atomic_add_fetch(&br->failed, 1, __ATOMIC_RELAXED); + + uint32_t works = __atomic_sub_fetch(&br->works, 1, __ATOMIC_RELAXED); + if(!works) { + if(br->cb) + br->cb(__atomic_load_n(&br->successful, __ATOMIC_RELAXED), + __atomic_load_n(&br->failed, __ATOMIC_RELAXED), + &br->data); + + rrdset_acquired_release(br->rsa); + aral_freez(backfill_globals.ar_br, br); + } + + rrddim_acquired_release(bdm->rda); + aral_freez(backfill_globals.ar_bdm, bdm); +} + +void *backfill_worker_thread(void *ptr __maybe_unused) { + worker_register("BACKFILL"); + + worker_register_job_name(0, "get"); + worker_register_job_name(1, "backfill"); + worker_register_job_custom_metric(2, "backfill queue size", "dimensions", WORKER_METRIC_ABSOLUTE); + + size_t job_id = 0, queue_size = 0; + while(!nd_thread_signaled_to_cancel() && service_running(SERVICE_COLLECTORS|SERVICE_STREAMING)) { + worker_is_busy(0); + spinlock_lock(&backfill_globals.spinlock); + Word_t idx = 0; + struct backfill_dim_work *bdm = BACKFILL_FIRST(&backfill_globals.queue, &idx); + if(bdm) { + backfill_globals.queue_size--; + BACKFILL_DEL(&backfill_globals.queue, idx); + } + queue_size = backfill_globals.queue_size; + spinlock_unlock(&backfill_globals.spinlock); + + if(bdm) { + worker_is_busy(1); + bool success = backfill_execute(bdm); + backfill_dim_work_free(success, bdm); + continue; + } + + worker_set_metric(2, (NETDATA_DOUBLE)queue_size); + + worker_is_idle(); + job_id = completion_wait_for_a_job_with_timeout(&backfill_globals.completion, job_id, 1000); + } + + worker_unregister(); + + return NULL; +} + +void *backfill_thread(void *ptr) { + struct netdata_static_thread *static_thread = ptr; + if(!static_thread) return NULL; + + nd_thread_tag_set("BACKFILL[0]"); + + completion_init(&backfill_globals.completion); + BACKFILL_INIT(&backfill_globals.queue); + backfill_globals.ar_br = aral_by_size_acquire(sizeof(struct backfill_request)); + backfill_globals.ar_bdm = aral_by_size_acquire(sizeof(struct backfill_dim_work)); + + spinlock_lock(&backfill_globals.spinlock); + backfill_globals.running = true; + spinlock_unlock(&backfill_globals.spinlock); + + size_t threads = get_netdata_cpus() / 2; + if(threads < 2) threads = 2; + if(threads > 16) threads = 16; + ND_THREAD *th[threads - 1]; + + for(size_t t = 0; t < threads - 1 ;t++) { + char tag[15]; + snprintfz(tag, sizeof(tag), "BACKFILL[%zu]", t + 1); + th[t] = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, backfill_worker_thread, NULL); + } + + backfill_worker_thread(NULL); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + for(size_t t = 0; t < threads - 1 ;t++) { + nd_thread_signal_cancel(th[t]); + nd_thread_join(th[t]); + } + + // cleanup + spinlock_lock(&backfill_globals.spinlock); + backfill_globals.running = false; + Word_t idx = 0; + for(struct backfill_dim_work *bdm = BACKFILL_FIRST(&backfill_globals.queue, &idx); + bdm; + bdm = BACKFILL_NEXT(&backfill_globals.queue, &idx)) { + backfill_dim_work_free(false, bdm); + } + spinlock_unlock(&backfill_globals.spinlock); + + aral_by_size_release(backfill_globals.ar_br); + aral_by_size_release(backfill_globals.ar_bdm); + completion_destroy(&backfill_globals.completion); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + + return NULL; +} + +bool backfill_threads_detect_from_stream_conf(void) { + return stream_conf_configured_as_parent(); +} diff --git a/src/web/api/queries/backfill.h b/src/web/api/queries/backfill.h new file mode 100644 index 00000000000000..5cce069d1cc173 --- /dev/null +++ b/src/web/api/queries/backfill.h @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_BACKFILL_H +#define NETDATA_BACKFILL_H + +#include "database/rrd.h" + +struct parser; +struct backfill_request_data { + size_t rrdhost_receiver_state_id; + struct parser *parser; + RRDHOST *host; + RRDSET *st; + time_t first_entry_child; + time_t last_entry_child; + time_t child_wall_clock_time; +}; + +typedef void (*backfill_callback_t)(size_t successful_dims, size_t failed_dims, struct backfill_request_data *brd); + +void *backfill_thread(void *ptr); +bool backfill_request_add(RRDSET *st, backfill_callback_t cb, struct backfill_request_data *data); + +bool backfill_threads_detect_from_stream_conf(void); + +#endif //NETDATA_BACKFILL_H diff --git a/src/web/api/queries/query.c b/src/web/api/queries/query.c index 1e06c739327ec3..86630ce506754b 100644 --- a/src/web/api/queries/query.c +++ b/src/web/api/queries/query.c @@ -1964,16 +1964,16 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut); -void backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) { - if(unlikely(tier >= storage_tiers)) return; +bool backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) { + if(unlikely(tier >= storage_tiers)) return false; #ifdef ENABLE_DBENGINE - if(default_backfill == RRD_BACKFILL_NONE) return; + if(default_backfill == RRD_BACKFILL_NONE) return false; #else - return; + return false; #endif struct rrddim_tier *t = &rd->tiers[tier]; - if(unlikely(!t)) return; + if(unlikely(!t)) return false; time_t latest_time_s = storage_engine_latest_time_s(t->seb, t->smh); time_t granularity = (time_t)t->tier_grouping * (time_t)rd->rrdset->update_every; @@ -1981,13 +1981,13 @@ void backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) { // if the user wants only NEW backfilling, and we don't have any data #ifdef ENABLE_DBENGINE - if(default_backfill == RRD_BACKFILL_NEW && latest_time_s <= 0) return; + if(default_backfill == RRD_BACKFILL_NEW && latest_time_s <= 0) return false; #else return; #endif // there is really nothing we can do - if(now_s <= latest_time_s || time_diff < granularity) return; + if(now_s <= latest_time_s || time_diff < granularity) return false; stream_control_backfill_query_started(); @@ -2026,6 +2026,8 @@ void backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) { } stream_control_backfill_query_finished(); + + return true; } // ---------------------------------------------------------------------------- diff --git a/src/web/server/h2o/http_server.c b/src/web/server/h2o/http_server.c index 0fc65b35096551..d729b4e259a2a4 100644 --- a/src/web/server/h2o/http_server.c +++ b/src/web/server/h2o/http_server.c @@ -423,6 +423,6 @@ void *h2o_main(void *ptr) { return NULL; } -int httpd_is_enabled() { +bool httpd_is_enabled() { return config_get_boolean(HTTPD_CONFIG_SECTION, "enabled", HTTPD_ENABLED_DEFAULT); } diff --git a/src/web/server/h2o/http_server.h b/src/web/server/h2o/http_server.h index 28d1c560a8e7be..3fbaa1de994cea 100644 --- a/src/web/server/h2o/http_server.h +++ b/src/web/server/h2o/http_server.h @@ -10,6 +10,6 @@ void *h2o_main(void * ptr); int h2o_stream_write(void *ctx, const char *data, size_t data_len); size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes); -int httpd_is_enabled(); +bool httpd_is_enabled(); #endif /* HTTP_SERVER_H */