From 4179fa8ffd14fdeb1ea6d7bd7bae88a1792f60b7 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Sat, 14 Dec 2024 00:35:59 +0200 Subject: [PATCH] Streaming improvements No 6 (#19196) * pulse aral stats fixes * double is not needed there * control the number of pages evicted at once, to avoid gaps on data collection * lower the aggresiveness of evictions to prevent evicting signifanctly more pages than really needed * add aral judy statistics to pulse aral * metadata sync shutsdown in the background * renumber * when just 1 host is still pending, log also its hostname * call mallocz_release_as_much_memory_to_the_system() at most once per second * improve ML logs and move metadata_sync_shutdown() at the proper place * stack size of external plugins; moved all function evloop data off the stack * fix for incorrect replication accounting * metadata cmds in aral * fix function payload parsing for external plugins * split main service control and netdata shutdown to separate files - no code changes * add more information to shutdown watcher * fix exit log messages * parallel dbengine exits for all tiers * log with limit * alerts not alarms * minor fixes in health * added logs to trace the root cause of delays when closing data files * added STORAGE_PRIORITY_SYNCHRONOUS_FIRST * Revert "added logs to trace the root cause of delays when closing data files" This reverts commit 75515c148421102ad1942dc340fe0391a4e012cc. * log datafile lock wait * print the number of writers * print the number of writers again * print the number of writers again again * fix watcher messages * single node agents use 1 replication and 1 ML thread; max threads for both is 256 * log the progress of flushers * spawn intense flushing on quiesce * cleanup logs added * print the status of the cache on exit * dbengine shutdown cleanup * cleanup logs * cleanup logs * more cleanup on logs * proper percentage calculation * sentry include added --- CMakeLists.txt | 8 +- src/collectors/apps.plugin/apps_plugin.c | 1 + src/collectors/cgroups.plugin/sys_fs_cgroup.c | 1 - src/collectors/cups.plugin/cups_plugin.c | 1 + .../debugfs.plugin/debugfs_plugin.c | 1 + .../diskspace.plugin/plugin_diskspace.c | 4 - src/collectors/ebpf.plugin/ebpf.c | 1 + .../freebsd.plugin/plugin_freebsd.c | 1 - .../idlejitter.plugin/plugin_idlejitter.c | 1 - src/collectors/macos.plugin/plugin_macos.c | 1 - .../network-viewer.plugin/network-viewer.c | 1 + src/collectors/nfacct.plugin/plugin_nfacct.c | 1 + src/collectors/perf.plugin/perf_plugin.c | 1 + src/collectors/proc.plugin/plugin_proc.c | 2 - src/collectors/proc.plugin/proc_net_dev.c | 2 - .../profile.plugin/plugin_profile.cc | 2 - src/collectors/slabinfo.plugin/slabinfo.c | 1 + src/collectors/statsd.plugin/statsd.c | 3 - .../systemd-journal.plugin/systemd-main.c | 1 + src/collectors/tc.plugin/plugin_tc.c | 2 - src/collectors/timex.plugin/plugin_timex.c | 1 - .../windows-events.plugin/windows-events.c | 1 + .../windows.plugin/windows_plugin.c | 2 - .../xenstat.plugin/xenstat_plugin.c | 1 + src/daemon/analytics.c | 1 - src/daemon/config/netdata-conf.c | 2 + src/daemon/config/netdata-conf.h | 1 + src/daemon/daemon-service.c | 284 +++++++++ src/daemon/daemon-service.h | 41 ++ .../{watcher.c => daemon-shutdown-watcher.c} | 93 +-- .../{watcher.h => daemon-shutdown-watcher.h} | 4 +- src/daemon/daemon-shutdown.c | 301 ++++++++++ src/daemon/daemon-shutdown.h | 10 + src/daemon/main.c | 558 +----------------- src/daemon/main.h | 36 +- src/daemon/pulse/pulse-aral.c | 14 +- src/daemon/pulse/pulse-aral.h | 1 + src/daemon/pulse/pulse-workers.c | 2 +- src/daemon/pulse/pulse.c | 3 - src/daemon/service.c | 1 - src/database/engine/cache.c | 36 +- src/database/engine/dbengine-stresstest.c | 2 +- src/database/engine/dbengine-unittest.c | 2 +- src/database/engine/metric.c | 2 +- src/database/engine/page.c | 2 +- src/database/engine/pagecache.c | 4 +- src/database/engine/rrdengine.c | 11 +- src/database/engine/rrdengine.h | 2 +- src/database/engine/rrdengineapi.c | 9 +- src/database/engine/rrdengineapi.h | 3 +- src/database/rrd.h | 1 + src/database/sqlite/sqlite_aclk_node.c | 42 +- src/database/sqlite/sqlite_metadata.c | 33 +- src/database/sqlite/sqlite_metadata.h | 3 + src/exporting/exporting_engine.c | 2 - src/health/health_event_loop.c | 41 +- .../functions_evloop/functions_evloop.c | 238 ++++---- src/libnetdata/libjudy/judy-malloc.c | 4 + src/libnetdata/libjudy/judy-malloc.h | 1 + src/ml/ml.cc | 12 +- src/ml/ml_config.cc | 6 +- src/plugins.d/pluginsd_parser.c | 24 +- src/streaming/replication.c | 10 +- src/streaming/stream-connector.c | 4 - src/streaming/stream-receiver.c | 17 +- src/streaming/stream-sender.c | 2 + src/web/api/queries/backfill.h | 2 +- src/web/api/queries/query.c | 2 +- src/web/api/queries/weights.c | 10 +- 69 files changed, 1016 insertions(+), 904 deletions(-) create mode 100644 src/daemon/daemon-service.c create mode 100644 src/daemon/daemon-service.h rename src/daemon/{watcher.c => daemon-shutdown-watcher.c} (70%) rename src/daemon/{watcher.h => daemon-shutdown-watcher.h} (94%) create mode 100644 src/daemon/daemon-shutdown.c create mode 100644 src/daemon/daemon-shutdown.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 022bc7dfc9fad4..23726f52e20301 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1051,8 +1051,8 @@ set(DAEMON_FILES src/daemon/signals.c src/daemon/signals.h src/daemon/service.c - src/daemon/watcher.c - src/daemon/watcher.h + src/daemon/daemon-shutdown-watcher.c + src/daemon/daemon-shutdown-watcher.h src/daemon/static_threads.c src/daemon/static_threads.h src/daemon/commands.c @@ -1114,6 +1114,10 @@ set(DAEMON_FILES src/daemon/config/netdata-conf-global.c src/daemon/config/netdata-conf-global.h src/daemon/config/netdata-conf.c + src/daemon/daemon-shutdown.c + src/daemon/daemon-shutdown.h + src/daemon/daemon-service.c + src/daemon/daemon-service.h ) set(H2O_FILES diff --git a/src/collectors/apps.plugin/apps_plugin.c b/src/collectors/apps.plugin/apps_plugin.c index b8ea0e797a8417..1f9386a095ec79 100644 --- a/src/collectors/apps.plugin/apps_plugin.c +++ b/src/collectors/apps.plugin/apps_plugin.c @@ -666,6 +666,7 @@ static bool apps_plugin_exit = false; int main(int argc, char **argv) { nd_log_initialize_for_external_plugins("apps.plugin"); + netdata_threads_init_for_external_plugins(0); pagesize = (size_t)sysconf(_SC_PAGESIZE); diff --git a/src/collectors/cgroups.plugin/sys_fs_cgroup.c b/src/collectors/cgroups.plugin/sys_fs_cgroup.c index d41575fa6c576f..ff54780c7db61c 100644 --- a/src/collectors/cgroups.plugin/sys_fs_cgroup.c +++ b/src/collectors/cgroups.plugin/sys_fs_cgroup.c @@ -1327,7 +1327,6 @@ static void cgroup_main_cleanup(void *pptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); worker_unregister(); usec_t max = 2 * USEC_PER_SEC, step = 50000; diff --git a/src/collectors/cups.plugin/cups_plugin.c b/src/collectors/cups.plugin/cups_plugin.c index 8d9e46cb1eb596..c1b06c25d8005a 100644 --- a/src/collectors/cups.plugin/cups_plugin.c +++ b/src/collectors/cups.plugin/cups_plugin.c @@ -227,6 +227,7 @@ void reset_metrics() { int main(int argc, char **argv) { nd_log_initialize_for_external_plugins("cups.plugin"); + netdata_threads_init_for_external_plugins(0); parse_command_line(argc, argv); diff --git a/src/collectors/debugfs.plugin/debugfs_plugin.c b/src/collectors/debugfs.plugin/debugfs_plugin.c index 37b4c83d861990..42fc92ca3c6a79 100644 --- a/src/collectors/debugfs.plugin/debugfs_plugin.c +++ b/src/collectors/debugfs.plugin/debugfs_plugin.c @@ -160,6 +160,7 @@ static void debugfs_parse_args(int argc, char **argv) int main(int argc, char **argv) { nd_log_initialize_for_external_plugins("debugfs.plugin"); + netdata_threads_init_for_external_plugins(0); netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); if (verify_netdata_host_prefix(true) == -1) diff --git a/src/collectors/diskspace.plugin/plugin_diskspace.c b/src/collectors/diskspace.plugin/plugin_diskspace.c index c9f6fe5999e8d5..225e1a03037a7f 100644 --- a/src/collectors/diskspace.plugin/plugin_diskspace.c +++ b/src/collectors/diskspace.plugin/plugin_diskspace.c @@ -516,8 +516,6 @@ static void diskspace_slow_worker_cleanup(void *pptr) { struct slow_worker_data *data = CLEANUP_FUNCTION_GET_PTR(pptr); if(data) return; - collector_info("cleaning up..."); - worker_unregister(); } @@ -608,8 +606,6 @@ static void diskspace_main_cleanup(void *pptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); - rrd_collector_finished(); worker_unregister(); diff --git a/src/collectors/ebpf.plugin/ebpf.c b/src/collectors/ebpf.plugin/ebpf.c index 88f6439c4075de..ac50c1517a810e 100644 --- a/src/collectors/ebpf.plugin/ebpf.c +++ b/src/collectors/ebpf.plugin/ebpf.c @@ -4006,6 +4006,7 @@ static void ebpf_manage_pid(pid_t pid) int main(int argc, char **argv) { nd_log_initialize_for_external_plugins(NETDATA_EBPF_PLUGIN_NAME); + netdata_threads_init_for_external_plugins(0); ebpf_set_global_variables(); if (ebpf_can_plugin_load_code(running_on_kernel, NETDATA_EBPF_PLUGIN_NAME)) diff --git a/src/collectors/freebsd.plugin/plugin_freebsd.c b/src/collectors/freebsd.plugin/plugin_freebsd.c index 2255343738d4cd..0d6d759b7edac0 100644 --- a/src/collectors/freebsd.plugin/plugin_freebsd.c +++ b/src/collectors/freebsd.plugin/plugin_freebsd.c @@ -78,7 +78,6 @@ static void freebsd_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/collectors/idlejitter.plugin/plugin_idlejitter.c b/src/collectors/idlejitter.plugin/plugin_idlejitter.c index 2a212a669974c7..b9195fd9e260b1 100644 --- a/src/collectors/idlejitter.plugin/plugin_idlejitter.c +++ b/src/collectors/idlejitter.plugin/plugin_idlejitter.c @@ -10,7 +10,6 @@ static void cpuidlejitter_main_cleanup(void *pptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/collectors/macos.plugin/plugin_macos.c b/src/collectors/macos.plugin/plugin_macos.c index 6f5b892d8bbc9a..4bb3f634f52e81 100644 --- a/src/collectors/macos.plugin/plugin_macos.c +++ b/src/collectors/macos.plugin/plugin_macos.c @@ -32,7 +32,6 @@ static void macos_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/collectors/network-viewer.plugin/network-viewer.c b/src/collectors/network-viewer.plugin/network-viewer.c index c0ea8af5e576b8..9ad44a314a5cf6 100644 --- a/src/collectors/network-viewer.plugin/network-viewer.c +++ b/src/collectors/network-viewer.plugin/network-viewer.c @@ -960,6 +960,7 @@ void network_viewer_function(const char *transaction, char *function __maybe_unu int main(int argc __maybe_unused, char **argv __maybe_unused) { nd_thread_tag_set("NETWORK-VIEWER"); nd_log_initialize_for_external_plugins("network-viewer.plugin"); + netdata_threads_init_for_external_plugins(0); netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); if(verify_netdata_host_prefix(true) == -1) exit(1); diff --git a/src/collectors/nfacct.plugin/plugin_nfacct.c b/src/collectors/nfacct.plugin/plugin_nfacct.c index 6225ec4a68dd40..7cc44ff2c6c648 100644 --- a/src/collectors/nfacct.plugin/plugin_nfacct.c +++ b/src/collectors/nfacct.plugin/plugin_nfacct.c @@ -748,6 +748,7 @@ void nfacct_signals() int main(int argc, char **argv) { nd_log_initialize_for_external_plugins("nfacct.plugin"); + netdata_threads_init_for_external_plugins(0); // ------------------------------------------------------------------------ // parse command line parameters diff --git a/src/collectors/perf.plugin/perf_plugin.c b/src/collectors/perf.plugin/perf_plugin.c index ccc7016e296cd2..272b4b60b89fff 100644 --- a/src/collectors/perf.plugin/perf_plugin.c +++ b/src/collectors/perf.plugin/perf_plugin.c @@ -1288,6 +1288,7 @@ void parse_command_line(int argc, char **argv) { int main(int argc, char **argv) { nd_log_initialize_for_external_plugins("perf.plugin"); + netdata_threads_init_for_external_plugins(0); parse_command_line(argc, argv); diff --git a/src/collectors/proc.plugin/plugin_proc.c b/src/collectors/proc.plugin/plugin_proc.c index e890a0d141ddc6..f7ea7e61c6134a 100644 --- a/src/collectors/proc.plugin/plugin_proc.c +++ b/src/collectors/proc.plugin/plugin_proc.c @@ -94,8 +94,6 @@ static void proc_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); - nd_thread_join(netdev_thread); worker_unregister(); diff --git a/src/collectors/proc.plugin/proc_net_dev.c b/src/collectors/proc.plugin/proc_net_dev.c index 3af59aed1374c9..b78308f61138cd 100644 --- a/src/collectors/proc.plugin/proc_net_dev.c +++ b/src/collectors/proc.plugin/proc_net_dev.c @@ -1685,8 +1685,6 @@ static void netdev_main_cleanup(void *pptr) { if(CLEANUP_FUNCTION_GET_PTR(pptr) != (void *)0x01) return; - collector_info("cleaning up..."); - worker_unregister(); } diff --git a/src/collectors/profile.plugin/plugin_profile.cc b/src/collectors/profile.plugin/plugin_profile.cc index 14de55db13a0b0..60875568cd0ed5 100644 --- a/src/collectors/profile.plugin/plugin_profile.cc +++ b/src/collectors/profile.plugin/plugin_profile.cc @@ -186,8 +186,6 @@ static void profile_main_cleanup(void *pptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_info("cleaning up..."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } diff --git a/src/collectors/slabinfo.plugin/slabinfo.c b/src/collectors/slabinfo.plugin/slabinfo.c index 98adc151318338..c71ef5352e3a2c 100644 --- a/src/collectors/slabinfo.plugin/slabinfo.c +++ b/src/collectors/slabinfo.plugin/slabinfo.c @@ -346,6 +346,7 @@ void usage(void) { int main(int argc, char **argv) { nd_log_initialize_for_external_plugins("slabinfo.plugin"); + netdata_threads_init_for_external_plugins(0); program_name = argv[0]; int update_every = 1, i, n, freq = 0; diff --git a/src/collectors/statsd.plugin/statsd.c b/src/collectors/statsd.plugin/statsd.c index cf344fe96bfff8..cc6a1ec9a9433f 100644 --- a/src/collectors/statsd.plugin/statsd.c +++ b/src/collectors/statsd.plugin/statsd.c @@ -1086,8 +1086,6 @@ void statsd_collector_thread_cleanup(void *pptr) { d->status->running = false; spinlock_unlock(&d->status->spinlock); - collector_info("cleaning up..."); - #ifdef HAVE_RECVMMSG size_t i; for (i = 0; i < d->size; i++) @@ -2399,7 +2397,6 @@ static void statsd_main_cleanup(void *pptr) { if(!static_thread) return; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); if (statsd.collection_threads_status) { int i; diff --git a/src/collectors/systemd-journal.plugin/systemd-main.c b/src/collectors/systemd-journal.plugin/systemd-main.c index e7d79c4137e04d..4488f1ee1d6cdb 100644 --- a/src/collectors/systemd-journal.plugin/systemd-main.c +++ b/src/collectors/systemd-journal.plugin/systemd-main.c @@ -20,6 +20,7 @@ static bool journal_data_directories_exist() { int main(int argc __maybe_unused, char **argv __maybe_unused) { nd_thread_tag_set("sd-jrnl.plugin"); nd_log_initialize_for_external_plugins("systemd-journal.plugin"); + netdata_threads_init_for_external_plugins(0); netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); if(verify_netdata_host_prefix(true) == -1) exit(1); diff --git a/src/collectors/tc.plugin/plugin_tc.c b/src/collectors/tc.plugin/plugin_tc.c index 5dc058a2086873..4d00e084cf3976 100644 --- a/src/collectors/tc.plugin/plugin_tc.c +++ b/src/collectors/tc.plugin/plugin_tc.c @@ -845,8 +845,6 @@ static void tc_main_cleanup(void *pptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); - if(tc_child_instance) { collector_info("TC: stopping the running tc-qos-helper script"); int code = spawn_popen_wait(tc_child_instance); (void)code; diff --git a/src/collectors/timex.plugin/plugin_timex.c b/src/collectors/timex.plugin/plugin_timex.c index 381079cf4693f0..2a8e96bfba8070 100644 --- a/src/collectors/timex.plugin/plugin_timex.c +++ b/src/collectors/timex.plugin/plugin_timex.c @@ -37,7 +37,6 @@ static void timex_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_info("cleaning up..."); worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/collectors/windows-events.plugin/windows-events.c b/src/collectors/windows-events.plugin/windows-events.c index 09ce558aebd2cc..e42ce549d364df 100644 --- a/src/collectors/windows-events.plugin/windows-events.c +++ b/src/collectors/windows-events.plugin/windows-events.c @@ -1292,6 +1292,7 @@ void function_windows_events(const char *transaction, char *function, usec_t *st int main(int argc __maybe_unused, char **argv __maybe_unused) { nd_thread_tag_set("wevt.plugin"); nd_log_initialize_for_external_plugins("windows-events.plugin"); + netdata_threads_init_for_external_plugins(0); // ------------------------------------------------------------------------ // initialization diff --git a/src/collectors/windows.plugin/windows_plugin.c b/src/collectors/windows.plugin/windows_plugin.c index 74b72e0ceec96e..c2f10f833f5ef1 100644 --- a/src/collectors/windows.plugin/windows_plugin.c +++ b/src/collectors/windows.plugin/windows_plugin.c @@ -48,8 +48,6 @@ static void windows_main_cleanup(void *pptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - collector_info("cleaning up..."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; worker_unregister(); diff --git a/src/collectors/xenstat.plugin/xenstat_plugin.c b/src/collectors/xenstat.plugin/xenstat_plugin.c index 63592b6fd499ef..e71facde0e4be6 100644 --- a/src/collectors/xenstat.plugin/xenstat_plugin.c +++ b/src/collectors/xenstat.plugin/xenstat_plugin.c @@ -926,6 +926,7 @@ int main(int argc, char **argv) { program_name = PLUGIN_XENSTAT_NAME; nd_log_initialize_for_external_plugins(PLUGIN_XENSTAT_NAME); + netdata_threads_init_for_external_plugins(0); // ------------------------------------------------------------------------ // parse command line parameters diff --git a/src/daemon/analytics.c b/src/daemon/analytics.c index 2835041eefb983..d1e39922a7752d 100644 --- a/src/daemon/analytics.c +++ b/src/daemon/analytics.c @@ -549,7 +549,6 @@ void analytics_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_debug(D_ANALYTICS, "Cleaning up..."); analytics_free_data(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/daemon/config/netdata-conf.c b/src/daemon/config/netdata-conf.c index 9e81f5c35bf157..e5df4c8b93cbf0 100644 --- a/src/daemon/config/netdata-conf.c +++ b/src/daemon/config/netdata-conf.c @@ -2,6 +2,8 @@ #include "netdata-conf.h" +struct config netdata_config = APPCONFIG_INITIALIZER; + bool netdata_conf_load(char *filename, char overwrite_used, const char **user) { static bool run = false; if(run) return false; diff --git a/src/daemon/config/netdata-conf.h b/src/daemon/config/netdata-conf.h index d69f26ee14d5d4..a9469adba4ad1f 100644 --- a/src/daemon/config/netdata-conf.h +++ b/src/daemon/config/netdata-conf.h @@ -5,6 +5,7 @@ #include "libnetdata/libnetdata.h" +extern struct config netdata_config; bool netdata_conf_load(char *filename, char overwrite_used, const char **user); #include "netdata-conf-backwards-compatibility.h" diff --git a/src/daemon/daemon-service.c b/src/daemon/daemon-service.c new file mode 100644 index 00000000000000..3898ec7f145e2c --- /dev/null +++ b/src/daemon/daemon-service.c @@ -0,0 +1,284 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "daemon-service.h" + +typedef struct service_thread { + pid_t tid; + SERVICE_THREAD_TYPE type; + SERVICE_TYPE services; + char name[ND_THREAD_TAG_MAX + 1]; + bool stop_immediately; + bool cancelled; + + union { + ND_THREAD *netdata_thread; + uv_thread_t uv_thread; + }; + + force_quit_t force_quit_callback; + request_quit_t request_quit_callback; + void *data; +} SERVICE_THREAD; + +struct service_globals { + SPINLOCK lock; + Pvoid_t pid_judy; +} service_globals = { + .pid_judy = NULL, +}; + +SERVICE_THREAD *service_register(SERVICE_THREAD_TYPE thread_type, request_quit_t request_quit_callback, force_quit_t force_quit_callback, void *data, bool update __maybe_unused) { + SERVICE_THREAD *sth = NULL; + pid_t tid = gettid_cached(); + + spinlock_lock(&service_globals.lock); + Pvoid_t *PValue = JudyLIns(&service_globals.pid_judy, tid, PJE0); + if(!*PValue) { + sth = callocz(1, sizeof(SERVICE_THREAD)); + sth->tid = tid; + sth->type = thread_type; + sth->request_quit_callback = request_quit_callback; + sth->force_quit_callback = force_quit_callback; + sth->data = data; + *PValue = sth; + + switch(thread_type) { + default: + case SERVICE_THREAD_TYPE_NETDATA: + sth->netdata_thread = nd_thread_self(); + break; + + case SERVICE_THREAD_TYPE_EVENT_LOOP: + case SERVICE_THREAD_TYPE_LIBUV: + sth->uv_thread = uv_thread_self(); + break; + } + + const char *name = nd_thread_tag(); + if(!name) name = ""; + strncpyz(sth->name, name, sizeof(sth->name) - 1); + } + else { + sth = *PValue; + } + spinlock_unlock(&service_globals.lock); + + return sth; +} + +void service_exits(void) { + pid_t tid = gettid_cached(); + + spinlock_lock(&service_globals.lock); + Pvoid_t *PValue = JudyLGet(service_globals.pid_judy, tid, PJE0); + if(PValue) { + freez(*PValue); + JudyLDel(&service_globals.pid_judy, tid, PJE0); + } + spinlock_unlock(&service_globals.lock); +} + +bool service_running(SERVICE_TYPE service) { + static __thread SERVICE_THREAD *sth = NULL; + + if(unlikely(!sth)) + sth = service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, false); + + sth->services |= service; + + bool cancelled = false; + if (sth->type == SERVICE_THREAD_TYPE_NETDATA) + cancelled = nd_thread_signaled_to_cancel(); + + return !sth->stop_immediately && !netdata_exit && !cancelled; +} + +void service_signal_exit(SERVICE_TYPE service) { + spinlock_lock(&service_globals.lock); + + Pvoid_t *PValue; + Word_t tid = 0; + bool first = true; + while((PValue = JudyLFirstThenNext(service_globals.pid_judy, &tid, &first))) { + SERVICE_THREAD *sth = *PValue; + + if((sth->services & service)) { + sth->stop_immediately = true; + + switch(sth->type) { + default: + case SERVICE_THREAD_TYPE_NETDATA: + nd_thread_signal_cancel(sth->netdata_thread); + break; + + case SERVICE_THREAD_TYPE_EVENT_LOOP: + case SERVICE_THREAD_TYPE_LIBUV: + break; + } + + if(sth->request_quit_callback) { + spinlock_unlock(&service_globals.lock); + sth->request_quit_callback(sth->data); + spinlock_lock(&service_globals.lock); + } + } + } + + spinlock_unlock(&service_globals.lock); +} + +static void service_to_buffer(BUFFER *wb, SERVICE_TYPE service) { + if(service & SERVICE_MAINTENANCE) + buffer_strcat(wb, "MAINTENANCE "); + if(service & SERVICE_COLLECTORS) + buffer_strcat(wb, "COLLECTORS "); + if(service & SERVICE_REPLICATION) + buffer_strcat(wb, "REPLICATION "); + if(service & ABILITY_DATA_QUERIES) + buffer_strcat(wb, "DATA_QUERIES "); + if(service & ABILITY_WEB_REQUESTS) + buffer_strcat(wb, "WEB_REQUESTS "); + if(service & SERVICE_WEB_SERVER) + buffer_strcat(wb, "WEB_SERVER "); + if(service & SERVICE_ACLK) + buffer_strcat(wb, "ACLK "); + if(service & SERVICE_HEALTH) + buffer_strcat(wb, "HEALTH "); + if(service & SERVICE_STREAMING) + buffer_strcat(wb, "STREAMING "); + if(service & ABILITY_STREAMING_CONNECTIONS) + buffer_strcat(wb, "STREAMING_CONNECTIONS "); + if(service & SERVICE_CONTEXT) + buffer_strcat(wb, "CONTEXT "); + if(service & SERVICE_ANALYTICS) + buffer_strcat(wb, "ANALYTICS "); + if(service & SERVICE_EXPORTERS) + buffer_strcat(wb, "EXPORTERS "); + if(service & SERVICE_HTTPD) + buffer_strcat(wb, "HTTPD "); +} + +bool service_wait_exit(SERVICE_TYPE service, usec_t timeout_ut) { + BUFFER *service_list = buffer_create(1024, NULL); + BUFFER *thread_list = buffer_create(1024, NULL); + usec_t started_ut = now_monotonic_usec(), ended_ut; + size_t running; + SERVICE_TYPE running_services = 0; + + // cancel the threads + running = 0; + running_services = 0; + { + buffer_flush(thread_list); + + spinlock_lock(&service_globals.lock); + + Pvoid_t *PValue; + Word_t tid = 0; + bool first = true; + while((PValue = JudyLFirstThenNext(service_globals.pid_judy, &tid, &first))) { + SERVICE_THREAD *sth = *PValue; + if(sth->services & service && sth->tid != gettid_cached() && !sth->cancelled) { + sth->cancelled = true; + + switch(sth->type) { + default: + case SERVICE_THREAD_TYPE_NETDATA: + nd_thread_signal_cancel(sth->netdata_thread); + break; + + case SERVICE_THREAD_TYPE_EVENT_LOOP: + case SERVICE_THREAD_TYPE_LIBUV: + break; + } + + if(running) + buffer_strcat(thread_list, ", "); + + buffer_sprintf(thread_list, "'%s' (%d)", sth->name, sth->tid); + + running++; + running_services |= sth->services & service; + + if(sth->force_quit_callback) { + spinlock_unlock(&service_globals.lock); + sth->force_quit_callback(sth->data); + spinlock_lock(&service_globals.lock); + continue; + } + } + } + + spinlock_unlock(&service_globals.lock); + } + + service_signal_exit(service); + + // signal them to stop + size_t last_running = 0; + size_t stale_time_ut = 0; + usec_t sleep_ut = 50 * USEC_PER_MS; + size_t log_countdown_ut = sleep_ut; + do { + if(running != last_running) + stale_time_ut = 0; + + last_running = running; + running = 0; + running_services = 0; + buffer_flush(thread_list); + + spinlock_lock(&service_globals.lock); + + Pvoid_t *PValue; + Word_t tid = 0; + bool first = true; + while((PValue = JudyLFirstThenNext(service_globals.pid_judy, &tid, &first))) { + SERVICE_THREAD *sth = *PValue; + if(sth->services & service && sth->tid != gettid_cached()) { + if(running) + buffer_strcat(thread_list, ", "); + + buffer_sprintf(thread_list, "'%s' (%d)", sth->name, sth->tid); + + running_services |= sth->services & service; + running++; + } + } + + spinlock_unlock(&service_globals.lock); + + if(running) { + log_countdown_ut -= (log_countdown_ut >= sleep_ut) ? sleep_ut : log_countdown_ut; + if(log_countdown_ut == 0 || running != last_running) { + log_countdown_ut = 20 * sleep_ut; + + buffer_flush(service_list); + service_to_buffer(service_list, running_services); + netdata_log_info("SERVICE CONTROL: waiting for the following %zu services [ %s] to exit: %s", + running, buffer_tostring(service_list), + running <= 10 ? buffer_tostring(thread_list) : ""); + } + + sleep_usec(sleep_ut); + stale_time_ut += sleep_ut; + } + + ended_ut = now_monotonic_usec(); + } while(running && (ended_ut - started_ut < timeout_ut || stale_time_ut < timeout_ut)); + + if(running) { + buffer_flush(service_list); + service_to_buffer(service_list, running_services); + netdata_log_info("SERVICE CONTROL: " + "the following %zu service(s) [ %s] take too long to exit: %s; " + "giving up on them...", + running, buffer_tostring(service_list), + buffer_tostring(thread_list)); + } + + buffer_free(thread_list); + buffer_free(service_list); + + return (running == 0); +} diff --git a/src/daemon/daemon-service.h b/src/daemon/daemon-service.h new file mode 100644 index 00000000000000..06ba521295e7cd --- /dev/null +++ b/src/daemon/daemon-service.h @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DAEMON_SERVICE_H +#define NETDATA_DAEMON_SERVICE_H + +#include "libnetdata/libnetdata.h" + +typedef enum { + ABILITY_DATA_QUERIES = (1 << 0), + ABILITY_WEB_REQUESTS = (1 << 1), + ABILITY_STREAMING_CONNECTIONS = (1 << 2), + SERVICE_MAINTENANCE = (1 << 3), + SERVICE_COLLECTORS = (1 << 4), + SERVICE_REPLICATION = (1 << 5), + SERVICE_WEB_SERVER = (1 << 6), + SERVICE_ACLK = (1 << 7), + SERVICE_HEALTH = (1 << 8), + SERVICE_STREAMING = (1 << 9), + SERVICE_CONTEXT = (1 << 10), + SERVICE_ANALYTICS = (1 << 11), + SERVICE_EXPORTERS = (1 << 12), + SERVICE_HTTPD = (1 << 13) +} SERVICE_TYPE; + +typedef enum { + SERVICE_THREAD_TYPE_NETDATA, + SERVICE_THREAD_TYPE_LIBUV, + SERVICE_THREAD_TYPE_EVENT_LOOP, +} SERVICE_THREAD_TYPE; + +typedef void (*force_quit_t)(void *data); +typedef void (*request_quit_t)(void *data); + +void service_exits(void); +bool service_running(SERVICE_TYPE service); +struct service_thread *service_register(SERVICE_THREAD_TYPE thread_type, request_quit_t request_quit_callback, force_quit_t force_quit_callback, void *data, bool update __maybe_unused); + +void service_signal_exit(SERVICE_TYPE service); +bool service_wait_exit(SERVICE_TYPE service, usec_t timeout_ut); + +#endif //NETDATA_DAEMON_SERVICE_H diff --git a/src/daemon/watcher.c b/src/daemon/daemon-shutdown-watcher.c similarity index 70% rename from src/daemon/watcher.c rename to src/daemon/daemon-shutdown-watcher.c index 4f53990bee8302..04603f85b4b53b 100644 --- a/src/daemon/watcher.c +++ b/src/daemon/daemon-shutdown-watcher.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "watcher.h" +#include "daemon-shutdown-watcher.h" watcher_step_t *watcher_steps; @@ -20,15 +20,28 @@ void watcher_step_complete(watcher_step_id_t step_id) { completion_mark_complete(&watcher_steps[step_id].p); } -static void watcher_wait_for_step(const watcher_step_id_t step_id) +static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdown_start_time) { - unsigned timeout = 90; - usec_t step_start_time = now_monotonic_usec(); + usec_t step_start_duration = step_start_time - shutdown_start_time; + + char start_duration_txt[64]; + duration_snprintf( + start_duration_txt, sizeof(start_duration_txt), (int64_t)step_start_duration, "us", true); + + netdata_log_info("shutdown step: [%d/%d] - {at %s} started '%s'...", + (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt, + watcher_steps[step_id].msg); #ifdef ENABLE_SENTRY // Wait with a timeout - bool ok = completion_timedwait_for(&watcher_steps[step_id].p, timeout); + time_t timeout = 135; // systemd gives us 150, we timeout at 135 + + time_t remaining_seconds = timeout - (time_t)(step_start_duration / USEC_PER_SEC); + if(remaining_seconds < 0) + remaining_seconds = 0; + + bool ok = completion_timedwait_for(&watcher_steps[step_id].p, remaining_seconds); #else // Wait indefinitely bool ok = true; @@ -37,16 +50,20 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id) usec_t step_duration = now_monotonic_usec() - step_start_time; + char step_duration_txt[64]; + duration_snprintf( + step_duration_txt, sizeof(step_duration_txt), (int64_t)(step_duration), "us", true); + if (ok) { - netdata_log_info("shutdown step: [%d/%d] - '%s' finished in %llu milliseconds", - (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, - watcher_steps[step_id].msg, step_duration / USEC_PER_MS); + netdata_log_info("shutdown step: [%d/%d] - {at %s} finished '%s' in %s", + (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt, + watcher_steps[step_id].msg, step_duration_txt); } else { // Do not call fatal() because it will try to execute the exit // sequence twice. - netdata_log_error("shutdown step: [%d/%d] - '%s' took more than %u seconds (ie. %llu milliseconds)", - (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, watcher_steps[step_id].msg, - timeout, step_duration / USEC_PER_MS); + netdata_log_error("shutdown step: [%d/%d] - {at %s} timeout '%s' takes too long (%s) - giving up...", + (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt, + watcher_steps[step_id].msg, step_duration_txt); abort(); } @@ -64,32 +81,30 @@ void *watcher_main(void *arg) usec_t shutdown_start_time = now_monotonic_usec(); - watcher_wait_for_step(WATCHER_STEP_ID_CREATE_SHUTDOWN_FILE); - watcher_wait_for_step(WATCHER_STEP_ID_DESTROY_MAIN_SPAWN_SERVER); - watcher_wait_for_step(WATCHER_STEP_ID_DBENGINE_EXIT_MODE); - watcher_wait_for_step(WATCHER_STEP_ID_CLOSE_WEBRTC_CONNECTIONS); - watcher_wait_for_step(WATCHER_STEP_ID_DISABLE_MAINTENANCE_NEW_QUERIES_NEW_WEB_REQUESTS_NEW_STREAMING_CONNECTIONS_AND_ACLK); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_MAINTENANCE_THREAD); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_EXPORTERS_HEALTH_AND_WEB_SERVERS_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_COLLECTORS_AND_STREAMING_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_REPLICATION_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_PREPARE_METASYNC_SHUTDOWN); - watcher_wait_for_step(WATCHER_STEP_ID_DISABLE_ML_DETECTION_AND_TRAINING_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_CONTEXT_THREAD); - watcher_wait_for_step(WATCHER_STEP_ID_CLEAR_WEB_CLIENT_CACHE); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_ACLK_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_ALL_REMAINING_WORKER_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_CANCEL_MAIN_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_FLUSH_DBENGINE_TIERS); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS); - watcher_wait_for_step(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); - watcher_wait_for_step(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); - watcher_wait_for_step(WATCHER_STEP_ID_STOP_METASYNC_THREADS); - watcher_wait_for_step(WATCHER_STEP_ID_CLOSE_SQL_DATABASES); - watcher_wait_for_step(WATCHER_STEP_ID_REMOVE_PID_FILE); - watcher_wait_for_step(WATCHER_STEP_ID_FREE_OPENSSL_STRUCTURES); - watcher_wait_for_step(WATCHER_STEP_ID_REMOVE_INCOMPLETE_SHUTDOWN_FILE); + watcher_wait_for_step(WATCHER_STEP_ID_CREATE_SHUTDOWN_FILE, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_DESTROY_MAIN_SPAWN_SERVER, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_DBENGINE_EXIT_MODE, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_CLOSE_WEBRTC_CONNECTIONS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_DISABLE_MAINTENANCE_NEW_QUERIES_NEW_WEB_REQUESTS_NEW_STREAMING_CONNECTIONS_AND_ACLK, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_MAINTENANCE_THREAD, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_EXPORTERS_HEALTH_AND_WEB_SERVERS_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_COLLECTORS_AND_STREAMING_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_REPLICATION_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_DISABLE_ML_DETECTION_AND_TRAINING_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_CONTEXT_THREAD, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_CLEAR_WEB_CLIENT_CACHE, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_ACLK_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_ALL_REMAINING_WORKER_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_CANCEL_MAIN_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_PREPARE_METASYNC_SHUTDOWN, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_DBENGINE_TIERS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_STOP_METASYNC_THREADS, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_CLOSE_SQL_DATABASES, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_REMOVE_PID_FILE, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_FREE_OPENSSL_STRUCTURES, shutdown_start_time); + watcher_wait_for_step(WATCHER_STEP_ID_REMOVE_INCOMPLETE_SHUTDOWN_FILE, shutdown_start_time); completion_wait_for(&shutdown_end_completion); usec_t shutdown_end_time = now_monotonic_usec(); @@ -136,14 +151,10 @@ void watcher_thread_start() { "stop all remaining worker threads"; watcher_steps[WATCHER_STEP_ID_CANCEL_MAIN_THREADS].msg = "cancel main threads"; - watcher_steps[WATCHER_STEP_ID_FLUSH_DBENGINE_TIERS].msg = - "flush dbengine tiers"; watcher_steps[WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS].msg = "stop collection for all hosts"; watcher_steps[WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH].msg = "wait for dbengine collectors to finish"; - watcher_steps[WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING].msg = - "wait for dbengine main cache to finish flushing"; watcher_steps[WATCHER_STEP_ID_STOP_DBENGINE_TIERS].msg = "stop dbengine tiers"; watcher_steps[WATCHER_STEP_ID_STOP_METASYNC_THREADS].msg = diff --git a/src/daemon/watcher.h b/src/daemon/daemon-shutdown-watcher.h similarity index 94% rename from src/daemon/watcher.h rename to src/daemon/daemon-shutdown-watcher.h index 6c15ca7bfe4917..f3d6e74ee43010 100644 --- a/src/daemon/watcher.h +++ b/src/daemon/daemon-shutdown-watcher.h @@ -15,17 +15,15 @@ typedef enum { WATCHER_STEP_ID_STOP_EXPORTERS_HEALTH_AND_WEB_SERVERS_THREADS, WATCHER_STEP_ID_STOP_COLLECTORS_AND_STREAMING_THREADS, WATCHER_STEP_ID_STOP_REPLICATION_THREADS, - WATCHER_STEP_ID_PREPARE_METASYNC_SHUTDOWN, WATCHER_STEP_ID_DISABLE_ML_DETECTION_AND_TRAINING_THREADS, WATCHER_STEP_ID_STOP_CONTEXT_THREAD, WATCHER_STEP_ID_CLEAR_WEB_CLIENT_CACHE, WATCHER_STEP_ID_STOP_ACLK_THREADS, WATCHER_STEP_ID_STOP_ALL_REMAINING_WORKER_THREADS, WATCHER_STEP_ID_CANCEL_MAIN_THREADS, - WATCHER_STEP_ID_FLUSH_DBENGINE_TIERS, + WATCHER_STEP_ID_PREPARE_METASYNC_SHUTDOWN, WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS, WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH, - WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING, WATCHER_STEP_ID_STOP_DBENGINE_TIERS, WATCHER_STEP_ID_STOP_METASYNC_THREADS, WATCHER_STEP_ID_CLOSE_SQL_DATABASES, diff --git a/src/daemon/daemon-shutdown.c b/src/daemon/daemon-shutdown.c new file mode 100644 index 00000000000000..8f1bb18a8db1a0 --- /dev/null +++ b/src/daemon/daemon-shutdown.c @@ -0,0 +1,301 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "daemon-shutdown.h" +#include "daemon-service.h" +#include "daemon/daemon-shutdown-watcher.h" +#include "static_threads.h" +#include "common.h" + +#include + +#ifdef ENABLE_SENTRY +#include "sentry-native/sentry-native.h" +#endif + +void web_client_cache_destroy(void); + +extern struct netdata_static_thread *static_threads; + +void cancel_main_threads(void) { + nd_log_limits_unlimited(); + + if (!static_threads) + return; + + int i, found = 0; + usec_t max = 5 * USEC_PER_SEC, step = 100000; + for (i = 0; static_threads[i].name != NULL ; i++) { + if (static_threads[i].enabled == NETDATA_MAIN_THREAD_RUNNING) { + if (static_threads[i].thread) { + netdata_log_info("EXIT: Stopping main thread: %s", static_threads[i].name); + nd_thread_signal_cancel(static_threads[i].thread); + } else { + netdata_log_info("EXIT: No thread running (marking as EXITED): %s", static_threads[i].name); + static_threads[i].enabled = NETDATA_MAIN_THREAD_EXITED; + } + found++; + } + } + + while(found && max > 0) { + max -= step; + netdata_log_info("Waiting %d threads to finish...", found); + sleep_usec(step); + found = 0; + for (i = 0; static_threads[i].name != NULL ; i++) { + if (static_threads[i].enabled == NETDATA_MAIN_THREAD_EXITED) + continue; + + // Don't wait ourselves. + if (nd_thread_is_me(static_threads[i].thread)) + continue; + + found++; + } + } + + if(found) { + for (i = 0; static_threads[i].name != NULL ; i++) { + if (static_threads[i].enabled != NETDATA_MAIN_THREAD_EXITED) + netdata_log_error("Main thread %s takes too long to exit. Giving up...", static_threads[i].name); + } + } + else + netdata_log_info("All threads finished."); + + freez(static_threads); + static_threads = NULL; +} + +static void *rrdeng_exit_background(void *ptr) { + struct rrdengine_instance *ctx = ptr; + rrdeng_exit(ctx); + return NULL; +} + +#ifdef ENABLE_DBENGINE +static void rrdeng_flush_everything_and_wait(bool wait_flush, bool wait_collectors) { + static size_t starting_size_to_flush = 0; + + if(!pgc_hot_and_dirty_entries(main_cache)) + return; + + nd_log(NDLS_DAEMON, NDLP_INFO, "Flushing DBENGINE dirty pages..."); + for (size_t tier = 0; tier < storage_tiers; tier++) + rrdeng_quiesce(multidb_ctx[tier]); + + struct pgc_statistics pgc_main_stats = pgc_get_statistics(main_cache); + size_t size_to_flush = pgc_main_stats.queues[PGC_QUEUE_HOT].size + pgc_main_stats.queues[PGC_QUEUE_DIRTY].size; + if(size_to_flush > starting_size_to_flush || !starting_size_to_flush) + starting_size_to_flush = size_to_flush; + + if(wait_collectors) { + size_t running = 1; + size_t count = 10; + while (running && count) { + running = 0; + for (size_t tier = 0; tier < storage_tiers; tier++) + running += rrdeng_collectors_running(multidb_ctx[tier]); + + if (running) { + nd_log_limit_static_thread_var(erl, 1, 100 * USEC_PER_MS); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE, "waiting for %zu collectors to finish", running); + } + count--; + } + } + + if(!wait_flush) + return; + + for(size_t iterations = 0; true ;iterations++) { + pgc_main_stats = pgc_get_statistics(main_cache); + size_to_flush = pgc_main_stats.queues[PGC_QUEUE_HOT].size + pgc_main_stats.queues[PGC_QUEUE_DIRTY].size; + if(!starting_size_to_flush || size_to_flush > starting_size_to_flush) + starting_size_to_flush = size_to_flush; + + if(!size_to_flush) + break; + + size_t flushed = starting_size_to_flush - size_to_flush; + + if(iterations % 10 == 0) { + char hot[64], dirty[64]; + size_snprintf(hot, sizeof(hot), pgc_main_stats.queues[PGC_QUEUE_HOT].size, "B", false); + size_snprintf(dirty, sizeof(hot), pgc_main_stats.queues[PGC_QUEUE_DIRTY].size, "B", false); + + nd_log(NDLS_DAEMON, NDLP_INFO, "DBENGINE: flushing at %.2f%% { hot: %s, dirty: %s }...", + (double)flushed * 100.0 / (double)starting_size_to_flush, + hot, dirty); + } + sleep_usec(100 * USEC_PER_MS); + } + nd_log(NDLS_DAEMON, NDLP_INFO, "DBENGINE: flushing completed!"); +} +#endif + +void netdata_cleanup_and_exit(int ret, const char *action, const char *action_result, const char *action_data) { + netdata_exit = 1; + +#ifdef ENABLE_DBENGINE + if(!ret && dbengine_enabled) + // flush all dirty pages asap + rrdeng_flush_everything_and_wait(false, false); +#endif + + usec_t shutdown_start_time = now_monotonic_usec(); + watcher_shutdown_begin(); + + nd_log_limits_unlimited(); + netdata_log_info("NETDATA SHUTDOWN: initializing shutdown with code %d...", ret); + + // send the stat from our caller + analytics_statistic_t statistic = { action, action_result, action_data }; + analytics_statistic_send(&statistic); + + // notify we are exiting + statistic = (analytics_statistic_t) {"EXIT", ret?"ERROR":"OK","-"}; + analytics_statistic_send(&statistic); + + char agent_crash_file[FILENAME_MAX + 1]; + char agent_incomplete_shutdown_file[FILENAME_MAX + 1]; + snprintfz(agent_crash_file, FILENAME_MAX, "%s/.agent_crash", netdata_configured_varlib_dir); + snprintfz(agent_incomplete_shutdown_file, FILENAME_MAX, "%s/.agent_incomplete_shutdown", netdata_configured_varlib_dir); + (void) rename(agent_crash_file, agent_incomplete_shutdown_file); + watcher_step_complete(WATCHER_STEP_ID_CREATE_SHUTDOWN_FILE); + + netdata_main_spawn_server_cleanup(); + watcher_step_complete(WATCHER_STEP_ID_DESTROY_MAIN_SPAWN_SERVER); + + watcher_step_complete(WATCHER_STEP_ID_DBENGINE_EXIT_MODE); + + webrtc_close_all_connections(); + watcher_step_complete(WATCHER_STEP_ID_CLOSE_WEBRTC_CONNECTIONS); + + service_signal_exit(SERVICE_MAINTENANCE | ABILITY_DATA_QUERIES | ABILITY_WEB_REQUESTS | + ABILITY_STREAMING_CONNECTIONS | SERVICE_ACLK); + watcher_step_complete(WATCHER_STEP_ID_DISABLE_MAINTENANCE_NEW_QUERIES_NEW_WEB_REQUESTS_NEW_STREAMING_CONNECTIONS_AND_ACLK); + + service_wait_exit(SERVICE_MAINTENANCE, 3 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_MAINTENANCE_THREAD); + + service_wait_exit(SERVICE_EXPORTERS | SERVICE_HEALTH | SERVICE_WEB_SERVER | SERVICE_HTTPD, 3 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_EXPORTERS_HEALTH_AND_WEB_SERVERS_THREADS); + + stream_threads_cancel(); + service_wait_exit(SERVICE_COLLECTORS | SERVICE_STREAMING, 3 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_COLLECTORS_AND_STREAMING_THREADS); + +#ifdef ENABLE_DBENGINE + if(!ret && dbengine_enabled) + // flush all dirty pages now that all collectors and streaming completed + rrdeng_flush_everything_and_wait(false, false); +#endif + + service_wait_exit(SERVICE_REPLICATION, 3 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_REPLICATION_THREADS); + + ml_stop_threads(); + ml_fini(); + watcher_step_complete(WATCHER_STEP_ID_DISABLE_ML_DETECTION_AND_TRAINING_THREADS); + + service_wait_exit(SERVICE_CONTEXT, 3 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_CONTEXT_THREAD); + + web_client_cache_destroy(); + watcher_step_complete(WATCHER_STEP_ID_CLEAR_WEB_CLIENT_CACHE); + + service_wait_exit(SERVICE_ACLK, 3 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_ACLK_THREADS); + + service_wait_exit(~0, 10 * USEC_PER_SEC); + watcher_step_complete(WATCHER_STEP_ID_STOP_ALL_REMAINING_WORKER_THREADS); + + cancel_main_threads(); + watcher_step_complete(WATCHER_STEP_ID_CANCEL_MAIN_THREADS); + + metadata_sync_shutdown_background(); + watcher_step_complete(WATCHER_STEP_ID_PREPARE_METASYNC_SHUTDOWN); + + if (ret) + { + watcher_step_complete(WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS); + watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); + watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); + watcher_step_complete(WATCHER_STEP_ID_STOP_METASYNC_THREADS); + } + else + { + // exit cleanly + rrd_finalize_collection_for_all_hosts(); + watcher_step_complete(WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS); + +#ifdef ENABLE_DBENGINE + if(dbengine_enabled) { + // flush anything remaining and wait for collectors to finish + rrdeng_flush_everything_and_wait(true, true); + watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); + + ND_THREAD *th[storage_tiers]; + for (size_t tier = 0; tier < storage_tiers; tier++) + th[tier] = nd_thread_create("rrdeng-exit", NETDATA_THREAD_OPTION_JOINABLE, rrdeng_exit_background, multidb_ctx[tier]); + + // flush anything remaining again - just in case + rrdeng_flush_everything_and_wait(true, false); + + for (size_t tier = 0; tier < storage_tiers; tier++) + nd_thread_join(th[tier]); + + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); + } + else { + // Skip these steps + watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); + watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); + } +#else + // Skip these steps + watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); + watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); +#endif + + metadata_sync_shutdown_background_wait(); + watcher_step_complete(WATCHER_STEP_ID_STOP_METASYNC_THREADS); + } + + // Don't register a shutdown event if we crashed + if (!ret) + add_agent_event(EVENT_AGENT_SHUTDOWN_TIME, (int64_t)(now_monotonic_usec() - shutdown_start_time)); + sqlite_close_databases(); + watcher_step_complete(WATCHER_STEP_ID_CLOSE_SQL_DATABASES); + sqlite_library_shutdown(); + + + // unlink the pid + if(pidfile && *pidfile) { + if(unlink(pidfile) != 0) + netdata_log_error("EXIT: cannot unlink pidfile '%s'.", pidfile); + } + watcher_step_complete(WATCHER_STEP_ID_REMOVE_PID_FILE); + + netdata_ssl_cleanup(); + watcher_step_complete(WATCHER_STEP_ID_FREE_OPENSSL_STRUCTURES); + + (void) unlink(agent_incomplete_shutdown_file); + watcher_step_complete(WATCHER_STEP_ID_REMOVE_INCOMPLETE_SHUTDOWN_FILE); + + watcher_shutdown_end(); + watcher_thread_stop(); + curl_global_cleanup(); + +#ifdef OS_WINDOWS + return; +#endif + +#ifdef ENABLE_SENTRY + nd_sentry_fini(); +#endif + + exit(ret); +} diff --git a/src/daemon/daemon-shutdown.h b/src/daemon/daemon-shutdown.h new file mode 100644 index 00000000000000..1a26ba63adbfe5 --- /dev/null +++ b/src/daemon/daemon-shutdown.h @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DAEMON_SHUTDOWN_H +#define NETDATA_DAEMON_SHUTDOWN_H + +#include "libnetdata/libnetdata.h" + +void cancel_main_threads(void); + +#endif //NETDATA_DAEMON_SHUTDOWN_H diff --git a/src/daemon/main.c b/src/daemon/main.c index f322bf2a56d759..d777cb914bad59 100644 --- a/src/daemon/main.c +++ b/src/daemon/main.c @@ -2,7 +2,7 @@ #include "common.h" #include "buildinfo.h" -#include "daemon/watcher.h" +#include "daemon/daemon-shutdown-watcher.h" #include "static_threads.h" #include "web/api/queries/backfill.h" @@ -29,509 +29,6 @@ bool ieee754_doubles = false; time_t netdata_start_time = 0; struct netdata_static_thread *static_threads; -struct config netdata_config = APPCONFIG_INITIALIZER; - -typedef struct service_thread { - pid_t tid; - SERVICE_THREAD_TYPE type; - SERVICE_TYPE services; - char name[ND_THREAD_TAG_MAX + 1]; - bool stop_immediately; - bool cancelled; - - union { - ND_THREAD *netdata_thread; - uv_thread_t uv_thread; - }; - - force_quit_t force_quit_callback; - request_quit_t request_quit_callback; - void *data; -} SERVICE_THREAD; - -struct service_globals { - SPINLOCK lock; - Pvoid_t pid_judy; -} service_globals = { - .pid_judy = NULL, -}; - -SERVICE_THREAD *service_register(SERVICE_THREAD_TYPE thread_type, request_quit_t request_quit_callback, force_quit_t force_quit_callback, void *data, bool update __maybe_unused) { - SERVICE_THREAD *sth = NULL; - pid_t tid = gettid_cached(); - - spinlock_lock(&service_globals.lock); - Pvoid_t *PValue = JudyLIns(&service_globals.pid_judy, tid, PJE0); - if(!*PValue) { - sth = callocz(1, sizeof(SERVICE_THREAD)); - sth->tid = tid; - sth->type = thread_type; - sth->request_quit_callback = request_quit_callback; - sth->force_quit_callback = force_quit_callback; - sth->data = data; - *PValue = sth; - - switch(thread_type) { - default: - case SERVICE_THREAD_TYPE_NETDATA: - sth->netdata_thread = nd_thread_self(); - break; - - case SERVICE_THREAD_TYPE_EVENT_LOOP: - case SERVICE_THREAD_TYPE_LIBUV: - sth->uv_thread = uv_thread_self(); - break; - } - - const char *name = nd_thread_tag(); - if(!name) name = ""; - strncpyz(sth->name, name, sizeof(sth->name) - 1); - } - else { - sth = *PValue; - } - spinlock_unlock(&service_globals.lock); - - return sth; -} - -void service_exits(void) { - pid_t tid = gettid_cached(); - - spinlock_lock(&service_globals.lock); - Pvoid_t *PValue = JudyLGet(service_globals.pid_judy, tid, PJE0); - if(PValue) { - freez(*PValue); - JudyLDel(&service_globals.pid_judy, tid, PJE0); - } - spinlock_unlock(&service_globals.lock); -} - -bool service_running(SERVICE_TYPE service) { - static __thread SERVICE_THREAD *sth = NULL; - - if(unlikely(!sth)) - sth = service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, false); - - sth->services |= service; - - bool cancelled = false; - if (sth->type == SERVICE_THREAD_TYPE_NETDATA) - cancelled = nd_thread_signaled_to_cancel(); - - return !sth->stop_immediately && !netdata_exit && !cancelled; -} - -void service_signal_exit(SERVICE_TYPE service) { - spinlock_lock(&service_globals.lock); - - Pvoid_t *PValue; - Word_t tid = 0; - bool first = true; - while((PValue = JudyLFirstThenNext(service_globals.pid_judy, &tid, &first))) { - SERVICE_THREAD *sth = *PValue; - - if((sth->services & service)) { - sth->stop_immediately = true; - - switch(sth->type) { - default: - case SERVICE_THREAD_TYPE_NETDATA: - nd_thread_signal_cancel(sth->netdata_thread); - break; - - case SERVICE_THREAD_TYPE_EVENT_LOOP: - case SERVICE_THREAD_TYPE_LIBUV: - break; - } - - if(sth->request_quit_callback) { - spinlock_unlock(&service_globals.lock); - sth->request_quit_callback(sth->data); - spinlock_lock(&service_globals.lock); - } - } - } - - spinlock_unlock(&service_globals.lock); -} - -static void service_to_buffer(BUFFER *wb, SERVICE_TYPE service) { - if(service & SERVICE_MAINTENANCE) - buffer_strcat(wb, "MAINTENANCE "); - if(service & SERVICE_COLLECTORS) - buffer_strcat(wb, "COLLECTORS "); - if(service & SERVICE_REPLICATION) - buffer_strcat(wb, "REPLICATION "); - if(service & ABILITY_DATA_QUERIES) - buffer_strcat(wb, "DATA_QUERIES "); - if(service & ABILITY_WEB_REQUESTS) - buffer_strcat(wb, "WEB_REQUESTS "); - if(service & SERVICE_WEB_SERVER) - buffer_strcat(wb, "WEB_SERVER "); - if(service & SERVICE_ACLK) - buffer_strcat(wb, "ACLK "); - if(service & SERVICE_HEALTH) - buffer_strcat(wb, "HEALTH "); - if(service & SERVICE_STREAMING) - buffer_strcat(wb, "STREAMING "); - if(service & ABILITY_STREAMING_CONNECTIONS) - buffer_strcat(wb, "STREAMING_CONNECTIONS "); - if(service & SERVICE_CONTEXT) - buffer_strcat(wb, "CONTEXT "); - if(service & SERVICE_ANALYTICS) - buffer_strcat(wb, "ANALYTICS "); - if(service & SERVICE_EXPORTERS) - buffer_strcat(wb, "EXPORTERS "); - if(service & SERVICE_HTTPD) - buffer_strcat(wb, "HTTPD "); -} - -static bool service_wait_exit(SERVICE_TYPE service, usec_t timeout_ut) { - BUFFER *service_list = buffer_create(1024, NULL); - BUFFER *thread_list = buffer_create(1024, NULL); - usec_t started_ut = now_monotonic_usec(), ended_ut; - size_t running; - SERVICE_TYPE running_services = 0; - - // cancel the threads - running = 0; - running_services = 0; - { - buffer_flush(thread_list); - - spinlock_lock(&service_globals.lock); - - Pvoid_t *PValue; - Word_t tid = 0; - bool first = true; - while((PValue = JudyLFirstThenNext(service_globals.pid_judy, &tid, &first))) { - SERVICE_THREAD *sth = *PValue; - if(sth->services & service && sth->tid != gettid_cached() && !sth->cancelled) { - sth->cancelled = true; - - switch(sth->type) { - default: - case SERVICE_THREAD_TYPE_NETDATA: - nd_thread_signal_cancel(sth->netdata_thread); - break; - - case SERVICE_THREAD_TYPE_EVENT_LOOP: - case SERVICE_THREAD_TYPE_LIBUV: - break; - } - - if(running) - buffer_strcat(thread_list, ", "); - - buffer_sprintf(thread_list, "'%s' (%d)", sth->name, sth->tid); - - running++; - running_services |= sth->services & service; - - if(sth->force_quit_callback) { - spinlock_unlock(&service_globals.lock); - sth->force_quit_callback(sth->data); - spinlock_lock(&service_globals.lock); - continue; - } - } - } - - spinlock_unlock(&service_globals.lock); - } - - service_signal_exit(service); - - // signal them to stop - size_t last_running = 0; - size_t stale_time_ut = 0; - usec_t sleep_ut = 50 * USEC_PER_MS; - size_t log_countdown_ut = sleep_ut; - do { - if(running != last_running) - stale_time_ut = 0; - - last_running = running; - running = 0; - running_services = 0; - buffer_flush(thread_list); - - spinlock_lock(&service_globals.lock); - - Pvoid_t *PValue; - Word_t tid = 0; - bool first = true; - while((PValue = JudyLFirstThenNext(service_globals.pid_judy, &tid, &first))) { - SERVICE_THREAD *sth = *PValue; - if(sth->services & service && sth->tid != gettid_cached()) { - if(running) - buffer_strcat(thread_list, ", "); - - buffer_sprintf(thread_list, "'%s' (%d)", sth->name, sth->tid); - - running_services |= sth->services & service; - running++; - } - } - - spinlock_unlock(&service_globals.lock); - - if(running) { - log_countdown_ut -= (log_countdown_ut >= sleep_ut) ? sleep_ut : log_countdown_ut; - if(log_countdown_ut == 0 || running != last_running) { - log_countdown_ut = 20 * sleep_ut; - - buffer_flush(service_list); - service_to_buffer(service_list, running_services); - netdata_log_info("SERVICE CONTROL: waiting for the following %zu services [ %s] to exit: %s", - running, buffer_tostring(service_list), - running <= 10 ? buffer_tostring(thread_list) : ""); - } - - sleep_usec(sleep_ut); - stale_time_ut += sleep_ut; - } - - ended_ut = now_monotonic_usec(); - } while(running && (ended_ut - started_ut < timeout_ut || stale_time_ut < timeout_ut)); - - if(running) { - buffer_flush(service_list); - service_to_buffer(service_list, running_services); - netdata_log_info("SERVICE CONTROL: " - "the following %zu service(s) [ %s] take too long to exit: %s; " - "giving up on them...", - running, buffer_tostring(service_list), - buffer_tostring(thread_list)); - } - - buffer_free(thread_list); - buffer_free(service_list); - - return (running == 0); -} - -void web_client_cache_destroy(void); - -void netdata_cleanup_and_exit(int ret, const char *action, const char *action_result, const char *action_data) { - netdata_exit = 1; - - usec_t shutdown_start_time = now_monotonic_usec(); - watcher_shutdown_begin(); - - nd_log_limits_unlimited(); - netdata_log_info("NETDATA SHUTDOWN: initializing shutdown with code %d...", ret); - - // send the stat from our caller - analytics_statistic_t statistic = { action, action_result, action_data }; - analytics_statistic_send(&statistic); - - // notify we are exiting - statistic = (analytics_statistic_t) {"EXIT", ret?"ERROR":"OK","-"}; - analytics_statistic_send(&statistic); - - char agent_crash_file[FILENAME_MAX + 1]; - char agent_incomplete_shutdown_file[FILENAME_MAX + 1]; - snprintfz(agent_crash_file, FILENAME_MAX, "%s/.agent_crash", netdata_configured_varlib_dir); - snprintfz(agent_incomplete_shutdown_file, FILENAME_MAX, "%s/.agent_incomplete_shutdown", netdata_configured_varlib_dir); - (void) rename(agent_crash_file, agent_incomplete_shutdown_file); - watcher_step_complete(WATCHER_STEP_ID_CREATE_SHUTDOWN_FILE); - - netdata_main_spawn_server_cleanup(); - watcher_step_complete(WATCHER_STEP_ID_DESTROY_MAIN_SPAWN_SERVER); - -#ifdef ENABLE_DBENGINE - if(dbengine_enabled) { - for (size_t tier = 0; tier < storage_tiers; tier++) - rrdeng_exit_mode(multidb_ctx[tier]); - } -#endif - watcher_step_complete(WATCHER_STEP_ID_DBENGINE_EXIT_MODE); - - webrtc_close_all_connections(); - watcher_step_complete(WATCHER_STEP_ID_CLOSE_WEBRTC_CONNECTIONS); - - service_signal_exit(SERVICE_MAINTENANCE | ABILITY_DATA_QUERIES | ABILITY_WEB_REQUESTS | - ABILITY_STREAMING_CONNECTIONS | SERVICE_ACLK); - watcher_step_complete(WATCHER_STEP_ID_DISABLE_MAINTENANCE_NEW_QUERIES_NEW_WEB_REQUESTS_NEW_STREAMING_CONNECTIONS_AND_ACLK); - - service_wait_exit(SERVICE_MAINTENANCE, 3 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_MAINTENANCE_THREAD); - - service_wait_exit(SERVICE_EXPORTERS | SERVICE_HEALTH | SERVICE_WEB_SERVER | SERVICE_HTTPD, 3 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_EXPORTERS_HEALTH_AND_WEB_SERVERS_THREADS); - - stream_threads_cancel(); - service_wait_exit(SERVICE_COLLECTORS | SERVICE_STREAMING, 3 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_COLLECTORS_AND_STREAMING_THREADS); - - service_wait_exit(SERVICE_REPLICATION, 3 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_REPLICATION_THREADS); - - metadata_sync_shutdown_prepare(); - watcher_step_complete(WATCHER_STEP_ID_PREPARE_METASYNC_SHUTDOWN); - - ml_stop_threads(); - ml_fini(); - watcher_step_complete(WATCHER_STEP_ID_DISABLE_ML_DETECTION_AND_TRAINING_THREADS); - - service_wait_exit(SERVICE_CONTEXT, 3 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_CONTEXT_THREAD); - - web_client_cache_destroy(); - watcher_step_complete(WATCHER_STEP_ID_CLEAR_WEB_CLIENT_CACHE); - - service_wait_exit(SERVICE_ACLK, 3 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_ACLK_THREADS); - - service_wait_exit(~0, 10 * USEC_PER_SEC); - watcher_step_complete(WATCHER_STEP_ID_STOP_ALL_REMAINING_WORKER_THREADS); - - cancel_main_threads(); - watcher_step_complete(WATCHER_STEP_ID_CANCEL_MAIN_THREADS); - - if (ret) - { - watcher_step_complete(WATCHER_STEP_ID_FLUSH_DBENGINE_TIERS); - watcher_step_complete(WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS); - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING); - watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); - watcher_step_complete(WATCHER_STEP_ID_STOP_METASYNC_THREADS); - } - else - { - // exit cleanly - -#ifdef ENABLE_DBENGINE - if(dbengine_enabled) { - nd_log(NDLS_DAEMON, NDLP_INFO, "Preparing DBENGINE shutdown..."); - for (size_t tier = 0; tier < storage_tiers; tier++) - rrdeng_prepare_exit(multidb_ctx[tier]); - - struct pgc_statistics pgc_main_stats = pgc_get_statistics(main_cache); - nd_log(NDLS_DAEMON, NDLP_INFO, "Waiting for DBENGINE to commit unsaved data to disk (%zu pages, %zu bytes)...", - pgc_main_stats.queues[PGC_QUEUE_HOT].entries + pgc_main_stats.queues[PGC_QUEUE_DIRTY].entries, - pgc_main_stats.queues[PGC_QUEUE_HOT].size + pgc_main_stats.queues[PGC_QUEUE_DIRTY].size); - - bool finished_tiers[RRD_STORAGE_TIERS] = { 0 }; - size_t waiting_tiers, iterations = 0; - do { - waiting_tiers = 0; - iterations++; - - for (size_t tier = 0; tier < storage_tiers; tier++) { - if (!multidb_ctx[tier] || finished_tiers[tier]) - continue; - - waiting_tiers++; - if (completion_timedwait_for(&multidb_ctx[tier]->quiesce.completion, 1)) { - completion_destroy(&multidb_ctx[tier]->quiesce.completion); - finished_tiers[tier] = true; - waiting_tiers--; - nd_log(NDLS_DAEMON, NDLP_INFO, "DBENGINE tier %zu finished!", tier); - } - else if(iterations % 10 == 0) { - pgc_main_stats = pgc_get_statistics(main_cache); - nd_log(NDLS_DAEMON, NDLP_INFO, - "Still waiting for DBENGINE tier %zu to finish " - "(cache still has %zu pages, %zu bytes hot, for all tiers)...", - tier, - pgc_main_stats.queues[PGC_QUEUE_HOT].entries + pgc_main_stats.queues[PGC_QUEUE_DIRTY].entries, - pgc_main_stats.queues[PGC_QUEUE_HOT].size + pgc_main_stats.queues[PGC_QUEUE_DIRTY].size); - } - } - } while(waiting_tiers); - nd_log(NDLS_DAEMON, NDLP_INFO, "DBENGINE shutdown completed..."); - } -#endif - watcher_step_complete(WATCHER_STEP_ID_FLUSH_DBENGINE_TIERS); - - rrd_finalize_collection_for_all_hosts(); - watcher_step_complete(WATCHER_STEP_ID_STOP_COLLECTION_FOR_ALL_HOSTS); - -#ifdef ENABLE_DBENGINE - if(dbengine_enabled) { - size_t running = 1; - size_t count = 10; - while(running && count) { - running = 0; - for (size_t tier = 0; tier < storage_tiers; tier++) - running += rrdeng_collectors_running(multidb_ctx[tier]); - - if (running) { - nd_log_limit_static_thread_var(erl, 1, 100 * USEC_PER_MS); - nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE, "waiting for %zu collectors to finish", running); - } - count--; - } - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); - - while (pgc_hot_and_dirty_entries(main_cache)) { - pgc_flush_all_hot_and_dirty_pages(main_cache, PGC_SECTION_ALL); - sleep_usec(100 * USEC_PER_MS); - } - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING); - - for (size_t tier = 0; tier < storage_tiers; tier++) - rrdeng_exit(multidb_ctx[tier]); - rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); - watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); - } - else { - // Skip these steps - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING); - watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); - } -#else - // Skip these steps - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_COLLECTORS_TO_FINISH); - watcher_step_complete(WATCHER_STEP_ID_WAIT_FOR_DBENGINE_MAIN_CACHE_TO_FINISH_FLUSHING); - watcher_step_complete(WATCHER_STEP_ID_STOP_DBENGINE_TIERS); -#endif - - metadata_sync_shutdown(); - watcher_step_complete(WATCHER_STEP_ID_STOP_METASYNC_THREADS); - } - - // Don't register a shutdown event if we crashed - if (!ret) - add_agent_event(EVENT_AGENT_SHUTDOWN_TIME, (int64_t)(now_monotonic_usec() - shutdown_start_time)); - sqlite_close_databases(); - watcher_step_complete(WATCHER_STEP_ID_CLOSE_SQL_DATABASES); - sqlite_library_shutdown(); - - - // unlink the pid - if(pidfile && *pidfile) { - if(unlink(pidfile) != 0) - netdata_log_error("EXIT: cannot unlink pidfile '%s'.", pidfile); - } - watcher_step_complete(WATCHER_STEP_ID_REMOVE_PID_FILE); - - netdata_ssl_cleanup(); - watcher_step_complete(WATCHER_STEP_ID_FREE_OPENSSL_STRUCTURES); - - (void) unlink(agent_incomplete_shutdown_file); - watcher_step_complete(WATCHER_STEP_ID_REMOVE_INCOMPLETE_SHUTDOWN_FILE); - - watcher_shutdown_end(); - watcher_thread_stop(); - curl_global_cleanup(); - -#ifdef OS_WINDOWS - return; -#endif - -#ifdef ENABLE_SENTRY - nd_sentry_fini(); -#endif - - exit(ret); -} - static void set_nofile_limit(struct rlimit *rl) { // get the num files allowed if(getrlimit(RLIMIT_NOFILE, rl) != 0) { @@ -540,7 +37,7 @@ static void set_nofile_limit(struct rlimit *rl) { } netdata_log_info("resources control: allowed file descriptors: soft = %zu, max = %zu", - (size_t) rl->rlim_cur, (size_t) rl->rlim_max); + (size_t) rl->rlim_cur, (size_t) rl->rlim_max); // make the soft/hard limits equal rl->rlim_cur = rl->rlim_max; @@ -558,57 +55,6 @@ static void set_nofile_limit(struct rlimit *rl) { netdata_log_error("Number of open file descriptors allowed for this process is too low (RLIMIT_NOFILE=%zu)", (size_t)rl->rlim_cur); } -void cancel_main_threads() { - nd_log_limits_unlimited(); - - if (!static_threads) - return; - - int i, found = 0; - usec_t max = 5 * USEC_PER_SEC, step = 100000; - for (i = 0; static_threads[i].name != NULL ; i++) { - if (static_threads[i].enabled == NETDATA_MAIN_THREAD_RUNNING) { - if (static_threads[i].thread) { - netdata_log_info("EXIT: Stopping main thread: %s", static_threads[i].name); - nd_thread_signal_cancel(static_threads[i].thread); - } else { - netdata_log_info("EXIT: No thread running (marking as EXITED): %s", static_threads[i].name); - static_threads[i].enabled = NETDATA_MAIN_THREAD_EXITED; - } - found++; - } - } - - while(found && max > 0) { - max -= step; - netdata_log_info("Waiting %d threads to finish...", found); - sleep_usec(step); - found = 0; - for (i = 0; static_threads[i].name != NULL ; i++) { - if (static_threads[i].enabled == NETDATA_MAIN_THREAD_EXITED) - continue; - - // Don't wait ourselves. - if (nd_thread_is_me(static_threads[i].thread)) - continue; - - found++; - } - } - - if(found) { - for (i = 0; static_threads[i].name != NULL ; i++) { - if (static_threads[i].enabled != NETDATA_MAIN_THREAD_EXITED) - netdata_log_error("Main thread %s takes too long to exit. Giving up...", static_threads[i].name); - } - } - else - netdata_log_info("All threads finished."); - - freez(static_threads); - static_threads = NULL; -} - static const struct option_def { const char val; const char *description; diff --git a/src/daemon/main.h b/src/daemon/main.h index f5da3feb6e4a92..ef2f84d784bf91 100644 --- a/src/daemon/main.h +++ b/src/daemon/main.h @@ -4,39 +4,7 @@ #define NETDATA_MAIN_H 1 #include "common.h" - -extern struct config netdata_config; - -void cancel_main_threads(void); - -typedef enum { - ABILITY_DATA_QUERIES = (1 << 0), - ABILITY_WEB_REQUESTS = (1 << 1), - ABILITY_STREAMING_CONNECTIONS = (1 << 2), - SERVICE_MAINTENANCE = (1 << 3), - SERVICE_COLLECTORS = (1 << 4), - SERVICE_REPLICATION = (1 << 5), - SERVICE_WEB_SERVER = (1 << 6), - SERVICE_ACLK = (1 << 7), - SERVICE_HEALTH = (1 << 8), - SERVICE_STREAMING = (1 << 9), - SERVICE_CONTEXT = (1 << 10), - SERVICE_ANALYTICS = (1 << 11), - SERVICE_EXPORTERS = (1 << 12), - SERVICE_HTTPD = (1 << 13) -} SERVICE_TYPE; - -typedef enum { - SERVICE_THREAD_TYPE_NETDATA, - SERVICE_THREAD_TYPE_LIBUV, - SERVICE_THREAD_TYPE_EVENT_LOOP, -} SERVICE_THREAD_TYPE; - -typedef void (*force_quit_t)(void *data); -typedef void (*request_quit_t)(void *data); - -void service_exits(void); -bool service_running(SERVICE_TYPE service); -struct service_thread *service_register(SERVICE_THREAD_TYPE thread_type, request_quit_t request_quit_callback, force_quit_t force_quit_callback, void *data, bool update __maybe_unused); +#include "daemon-service.h" +#include "daemon-shutdown.h" #endif /* NETDATA_MAIN_H */ diff --git a/src/daemon/pulse/pulse-aral.c b/src/daemon/pulse/pulse-aral.c index 494516df487533..1acc0944103af9 100644 --- a/src/daemon/pulse/pulse-aral.c +++ b/src/daemon/pulse/pulse-aral.c @@ -19,7 +19,7 @@ static struct { ARAL_STATS_JudyLSet idx; } globals = { 0 }; -static void pulse_aral_register_statistics(struct aral_statistics *stats, const char *name) { +void pulse_aral_register_statistics(struct aral_statistics *stats, const char *name) { if(!name || !stats) return; @@ -60,6 +60,7 @@ void pulse_aral_unregister(ARAL *ar) { void pulse_aral_init(void) { pulse_aral_register_statistics(aral_by_size_statistics(), "by-size"); + pulse_aral_register_statistics(judy_aral_statistics(), "judy"); } void pulse_aral_do(bool extended) { @@ -86,14 +87,17 @@ void pulse_aral_do(bool extended) { mmap_allocated_bytes = mmap_used_bytes; size_t mmap_free_bytes = mmap_allocated_bytes - mmap_used_bytes; + size_t allocated_total = malloc_allocated_bytes + mmap_allocated_bytes; + size_t used_total = malloc_used_bytes + mmap_used_bytes; + size_t structures_bytes = __atomic_load_n(&stats->structures.allocated_bytes, __ATOMIC_RELAXED); size_t padding_bytes = __atomic_load_n(&stats->malloc.padding_bytes, __ATOMIC_RELAXED) + __atomic_load_n(&stats->mmap.padding_bytes, __ATOMIC_RELAXED); NETDATA_DOUBLE utilization; - if((malloc_used_bytes + mmap_used_bytes != 0) && (malloc_allocated_bytes + mmap_allocated_bytes != 0)) - utilization = 100.0 * (NETDATA_DOUBLE)(malloc_used_bytes + mmap_used_bytes) / (NETDATA_DOUBLE)(malloc_allocated_bytes + mmap_allocated_bytes); + if(allocated_total) + utilization = 100.0 * (NETDATA_DOUBLE)used_total / (NETDATA_DOUBLE)allocated_total; else utilization = 100.0; @@ -160,10 +164,10 @@ void pulse_aral_do(bool extended) { rrdlabels_add(ai->st_utilization->rrdlabels, "ARAL", ai->name, RRDLABEL_SRC_AUTO); - ai->rd_utilization = rrddim_add(ai->st_utilization, "utilization", NULL, 1, 10000, RRD_ALGORITHM_ABSOLUTE); + ai->rd_utilization = rrddim_add(ai->st_utilization, "utilization", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(ai->st_utilization, ai->rd_utilization, (collected_number)(utilization * 10000.0)); + rrddim_set_by_pointer(ai->st_utilization, ai->rd_utilization, (collected_number)utilization * 1000LL); rrdset_done(ai->st_utilization); } } diff --git a/src/daemon/pulse/pulse-aral.h b/src/daemon/pulse/pulse-aral.h index de254f65079828..64257736072818 100644 --- a/src/daemon/pulse/pulse-aral.h +++ b/src/daemon/pulse/pulse-aral.h @@ -5,6 +5,7 @@ #include "daemon/common.h" +void pulse_aral_register_statistics(struct aral_statistics *stats, const char *name); void pulse_aral_register(ARAL *ar, const char *name); void pulse_aral_unregister(ARAL *ar); diff --git a/src/daemon/pulse/pulse-workers.c b/src/daemon/pulse/pulse-workers.c index fbe831534a0966..adf553dd519500 100644 --- a/src/daemon/pulse/pulse-workers.c +++ b/src/daemon/pulse/pulse-workers.c @@ -113,7 +113,7 @@ struct worker_utilization { static struct worker_utilization all_workers_utilization[] = { { .name = "PULSE", .family = "workers pulse", .priority = 1000000 }, - { .name = "HEALTH", .family = "workers health alarms", .priority = 1000000 }, + { .name = "HEALTH", .family = "workers health alerts", .priority = 1000000 }, { .name = "MLTRAIN", .family = "workers ML training", .priority = 1000000 }, { .name = "MLDETECT", .family = "workers ML detection", .priority = 1000000 }, { .name = "STREAM", .family = "workers streaming", .priority = 1000000 }, diff --git a/src/daemon/pulse/pulse.c b/src/daemon/pulse/pulse.c index 2c30e8931ab7eb..6af2adec639e3f 100644 --- a/src/daemon/pulse/pulse.c +++ b/src/daemon/pulse/pulse.c @@ -55,7 +55,6 @@ static void pulse_cleanup(void *pptr) pulse_workers_cleanup(); worker_unregister(); - netdata_log_info("cleaning up..."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } @@ -158,8 +157,6 @@ static void pulse_thread_sqlite3_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_info("cleaning up..."); - worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/daemon/service.c b/src/daemon/service.c index b015213f043bb9..e298b344cd8134 100644 --- a/src/daemon/service.c +++ b/src/daemon/service.c @@ -268,7 +268,6 @@ static void service_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_debug(D_SYSTEM, "Cleaning up..."); worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; diff --git a/src/database/engine/cache.c b/src/database/engine/cache.c index cf381bd33288e1..03be316c9b83e6 100644 --- a/src/database/engine/cache.c +++ b/src/database/engine/cache.c @@ -580,8 +580,12 @@ static void pgc_section_pages_static_aral_init(void) { spinlock_lock(&spinlock); - if(!pgc_sections_aral) - pgc_sections_aral = aral_by_size_acquire(sizeof(struct section_pages)); + if(!pgc_sections_aral) { + pgc_sections_aral = aral_create( + "pgc-sections", sizeof(struct section_pages), 0, 0, &pgc_aral_statistics, NULL, NULL, false, false); + + pulse_aral_register_statistics(&pgc_aral_statistics, "pgc"); + } spinlock_unlock(&spinlock); } @@ -1182,16 +1186,18 @@ static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evic else if(unlikely(wait)) { // evict as many as necessary for the cache to go at the predefined threshold per1000 = cache_usage_per1000(cache, &max_size_to_evict); - max_size_to_evict /= 2; // do it in 2 steps + max_size_to_evict /= 3; // do it in 3 steps if(per1000 >= cache->config.severe_pressure_per1000) { under_sever_pressure = true; - max_pages_to_evict = max_pages_to_evict ? max_pages_to_evict * 2 : 4096; - // max_pages_to_evict = 1; + max_pages_to_evict = max_pages_to_evict ? max_pages_to_evict * 2 : 512; + if(max_pages_to_evict > 4096) + max_pages_to_evict = 4096; } else if(per1000 >= cache->config.aggressive_evict_per1000) { under_sever_pressure = false; - max_pages_to_evict = max_pages_to_evict ? max_pages_to_evict * 2 : 128; - // max_pages_to_evict = 1; + max_pages_to_evict = max_pages_to_evict ? max_pages_to_evict * 2 : 32; + if(max_pages_to_evict > 1024) + max_pages_to_evict = 1024; } else { under_sever_pressure = false; @@ -1948,7 +1954,8 @@ static void *pgc_evict_thread(void *ptr) { worker_register_job_name(0, "signaled"); worker_register_job_name(1, "scheduled"); - unsigned job_id = 0, severe_pressure_counter = 0; + unsigned job_id = 0; + usec_t last_malloc_release_ut = 0; while (true) { worker_is_idle(); @@ -1965,18 +1972,17 @@ static void *pgc_evict_thread(void *ptr) { size_t size_to_evict = 0; if(cache_usage_per1000(cache, &size_to_evict) > cache->config.severe_pressure_per1000) { - severe_pressure_counter++; + usec_t now_ut = now_monotonic_usec(); + + if(last_malloc_release_ut + USEC_PER_SEC < now_ut) { + last_malloc_release_ut = now_ut; - if(severe_pressure_counter > 100) { - // so, we tried 100 times to reduce memory, + // so, we tried 100 times to reduce memory, and a second has passed, // but it is still severe! mallocz_release_as_much_memory_to_the_system(); - severe_pressure_counter = 0; } } - else - severe_pressure_counter = 0; } worker_unregister(); @@ -2066,8 +2072,6 @@ PGC *pgc_create(const char *name, false); } } - - pulse_aral_register(cache->index[0].aral, "pgc"); #endif diff --git a/src/database/engine/dbengine-stresstest.c b/src/database/engine/dbengine-stresstest.c index 098a4f0eebb6b0..9be15349225f36 100644 --- a/src/database/engine/dbengine-stresstest.c +++ b/src/database/engine/dbengine-stresstest.c @@ -447,7 +447,7 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi } freez(query_threads); rrd_wrlock(); - rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si); + rrdeng_quiesce((struct rrdengine_instance *)host->db[0].si); rrdeng_exit((struct rrdengine_instance *)host->db[0].si); rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); rrd_wrunlock(); diff --git a/src/database/engine/dbengine-unittest.c b/src/database/engine/dbengine-unittest.c index 0dbc05a23ada59..fb332b092320e9 100644 --- a/src/database/engine/dbengine-unittest.c +++ b/src/database/engine/dbengine-unittest.c @@ -408,7 +408,7 @@ int test_dbengine(void) { } rrd_wrlock(); - rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si); + rrdeng_quiesce((struct rrdengine_instance *)host->db[0].si); rrdeng_exit((struct rrdengine_instance *)host->db[0].si); rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); rrd_wrunlock(); diff --git a/src/database/engine/metric.c b/src/database/engine/metric.c index bf7ac657e85a7a..0ca20ec24b6340 100644 --- a/src/database/engine/metric.c +++ b/src/database/engine/metric.c @@ -380,7 +380,7 @@ inline MRG *mrg_create(ssize_t partitions) { mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false); } - pulse_aral_register(mrg->index[0].aral, "mrg"); + pulse_aral_register_statistics(&mrg_aral_statistics, "mrg"); return mrg; } diff --git a/src/database/engine/page.c b/src/database/engine/page.c index a2b6d750e4acdd..b0e66c75715d1f 100644 --- a/src/database/engine/page.c +++ b/src/database/engine/page.c @@ -200,7 +200,7 @@ void pgd_init_arals(void) { pgd_alloc_globals.sizeof_gorilla_writer_t = aral_actual_element_size(pgd_alloc_globals.aral_gorilla_writer[0]); pgd_alloc_globals.sizeof_gorilla_buffer_32bit = aral_actual_element_size(pgd_alloc_globals.aral_gorilla_buffer[0]); - pulse_aral_register(pgd_alloc_globals.aral_pgd[0], "pgd"); + pulse_aral_register_statistics(&pgd_aral_statistics, "pgd"); } static ARAL *pgd_get_aral_by_size_and_partition(size_t size, size_t partition) { diff --git a/src/database/engine/pagecache.c b/src/database/engine/pagecache.c index e03ee1f6e83327..09397bfcd075b5 100644 --- a/src/database/engine/pagecache.c +++ b/src/database/engine/pagecache.c @@ -783,6 +783,8 @@ void rrdeng_prep_query(struct page_details_control *pdc, bool worker) { usec_t start_ut = now_monotonic_usec(); if(likely(pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS)) pdc_route_synchronously(pdc->ctx, pdc); + else if(likely(pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS_FIRST)) + pdc_route_synchronously_first(pdc->ctx, pdc); else pdc_route_asynchronously(pdc->ctx, pdc); __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_to_route, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED); @@ -827,7 +829,7 @@ void pg_cache_preload(struct rrdeng_query_handle *handle) { if(ctx_is_available_for_queries(handle->ctx)) { handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread - if(unlikely(handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS)) + if(unlikely(handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS || handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS_FIRST)) rrdeng_prep_query(handle->pdc, false); else rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL); diff --git a/src/database/engine/rrdengine.c b/src/database/engine/rrdengine.c index 3d3cdb8b9df2c8..a2e5f253c43be6 100644 --- a/src/database/engine/rrdengine.c +++ b/src/database/engine/rrdengine.c @@ -265,7 +265,7 @@ void page_descriptors_init(void) { NULL, NULL, NULL, false, false); - pulse_aral_register(rrdeng_main.xt_io_descr.ar, "descriptors"); + pulse_aral_register(rrdeng_main.descriptors.ar, "descriptors"); } struct page_descr_with_data *page_descriptor_get(void) { @@ -1314,7 +1314,10 @@ static void after_flush_all_hot_and_dirty_pages_of_section(struct rrdengine_inst static void *flush_all_hot_and_dirty_pages_of_section_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { worker_is_busy(UV_EVENT_DBENGINE_QUIESCE); pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx); - completion_mark_complete(&ctx->quiesce.completion); + + for(size_t i = 0; i < pgc_max_flushers() ; i++) + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_FLUSH_MAIN, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + return data; } @@ -1499,6 +1502,10 @@ void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_synchronously); } +void pdc_route_synchronously_first(struct rrdengine_instance *ctx, struct page_details_control *pdc) { + pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_asynchronously); +} + #define MAX_RETRIES_TO_START_INDEX (100) static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { unsigned count = 0; diff --git a/src/database/engine/rrdengine.h b/src/database/engine/rrdengine.h index be897bd31b686a..0fbb8467b1775c 100644 --- a/src/database/engine/rrdengine.h +++ b/src/database/engine/rrdengine.h @@ -448,7 +448,6 @@ struct rrdengine_instance { struct { bool exit_mode; bool enabled; // when set (before shutdown), queries are prohibited - struct completion completion; } quiesce; struct { @@ -527,6 +526,7 @@ void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, v void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc); void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc); +void pdc_route_synchronously_first(struct rrdengine_instance *ctx, struct page_details_control *pdc); void pdc_acquire(PDC *pdc); bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router); diff --git a/src/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c index 1301d2e411d350..d2ad324d7f9bf3 100755 --- a/src/database/engine/rrdengineapi.c +++ b/src/database/engine/rrdengineapi.c @@ -1125,9 +1125,6 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) { netdata_log_info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier); } -void rrdeng_exit_mode(struct rrdengine_instance *ctx) { - __atomic_store_n(&ctx->quiesce.exit_mode, true, __ATOMIC_RELAXED); -} /* * Returns 0 on success, negative on error */ @@ -1228,13 +1225,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx) { count--; } - netdata_log_info("DBENGINE: flushing main cache for tier %d", ctx->config.tier); pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx); - netdata_log_info("DBENGINE: shutting down tier %d", ctx->config.tier); struct completion completion = {}; completion_init(&completion); rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_SHUTDOWN, NULL, &completion, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); + completion_wait_for(&completion); completion_destroy(&completion); @@ -1247,14 +1243,13 @@ int rrdeng_exit(struct rrdengine_instance *ctx) { return 0; } -void rrdeng_prepare_exit(struct rrdengine_instance *ctx) { +void rrdeng_quiesce(struct rrdengine_instance *ctx) { if (NULL == ctx) return; // FIXME - ktsaou - properly cleanup ctx // 1. make sure all collectors are stopped - completion_init(&ctx->quiesce.completion); rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_QUIESCE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); } diff --git a/src/database/engine/rrdengineapi.h b/src/database/engine/rrdengineapi.h index 8c44c3be0bba62..bcb083e7f97c43 100644 --- a/src/database/engine/rrdengineapi.h +++ b/src/database/engine/rrdengineapi.h @@ -71,10 +71,9 @@ int rrdeng_init( time_t max_retention_s); void rrdeng_readiness_wait(struct rrdengine_instance *ctx); -void rrdeng_exit_mode(struct rrdengine_instance *ctx); int rrdeng_exit(struct rrdengine_instance *ctx); -void rrdeng_prepare_exit(struct rrdengine_instance *ctx); +void rrdeng_quiesce(struct rrdengine_instance *ctx); bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, nd_uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s); extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si, nd_uuid_t *uuid); diff --git a/src/database/rrd.h b/src/database/rrd.h index e41816e4e174c8..8679109288ef49 100644 --- a/src/database/rrd.h +++ b/src/database/rrd.h @@ -61,6 +61,7 @@ typedef enum __attribute__ ((__packed__)) storage_priority { // synchronous query, not to be dispatched to workers or queued STORAGE_PRIORITY_SYNCHRONOUS, + STORAGE_PRIORITY_SYNCHRONOUS_FIRST, STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE, } STORAGE_PRIORITY; diff --git a/src/database/sqlite/sqlite_aclk_node.c b/src/database/sqlite/sqlite_aclk_node.c index 3ceb2baea16a40..cfe902e8a539d4 100644 --- a/src/database/sqlite/sqlite_aclk_node.c +++ b/src/database/sqlite/sqlite_aclk_node.c @@ -122,6 +122,10 @@ void aclk_check_node_info_and_collectors(void) size_t replicating = 0; size_t context_pp = 0; + STRING *context_loading_host = NULL; + STRING *replicating_host = NULL; + STRING *context_pp_host = NULL; + time_t now = now_realtime_sec(); dfe_start_reentrant(rrdhost_root_index, host) { @@ -132,6 +136,7 @@ void aclk_check_node_info_and_collectors(void) if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host)); context_loading++; + context_loading_host = host->hostname; continue; } @@ -141,13 +146,16 @@ void aclk_check_node_info_and_collectors(void) if (unlikely(rrdhost_receiver_replicating_charts(host))) { internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host)); replicating++; + replicating_host = host->hostname; continue; } bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue)); - if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send)) + if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send)) { context_pp++; + context_pp_host = host->hostname; + } if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) { wc->node_info_send_time = 0; @@ -165,14 +173,30 @@ void aclk_check_node_info_and_collectors(void) dfe_done(host); if (context_loading || replicating || context_pp) { + const char *context_loading_pre = "", *context_loading_body = "", *context_loading_post = ""; + if(context_loading == 1) { + context_loading_pre = " (host '"; + context_loading_body = string2str(context_loading_host); + context_loading_post = "')"; + } + const char *replicating_pre = "", *replicating_body = "", *replicating_post = ""; + if(replicating == 1) { + replicating_pre = " (host '"; + replicating_body = string2str(replicating_host); + replicating_post = "')"; + } + const char *context_pp_pre = "", *context_pp_body = "", *context_pp_post = ""; + if(context_pp == 1) { + context_pp_pre = " (host '"; + context_pp_body = string2str(context_pp_host); + context_pp_post = "')"; + } nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS); - nd_log_limit( - &erl, - NDLS_DAEMON, - NDLP_INFO, - "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing", - context_loading, - replicating, - context_pp); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_INFO, + "%zu nodes loading contexts%s%s%s, %zu replicating data%s%s%s, %zu pending context post processing%s%s%s", + context_loading, context_loading_pre, context_loading_body, context_loading_post, + replicating, replicating_pre, replicating_body, replicating_post, + context_pp, context_pp_pre, context_pp_body, context_pp_post + ); } } diff --git a/src/database/sqlite/sqlite_metadata.c b/src/database/sqlite/sqlite_metadata.c index 2b7cceb38070cf..ca225d197a4619 100644 --- a/src/database/sqlite/sqlite_metadata.c +++ b/src/database/sqlite/sqlite_metadata.c @@ -225,6 +225,7 @@ struct metadata_wc { /* FIFO command queue */ SPINLOCK cmd_queue_lock; struct metadata_cmd *cmd_base; + ARAL *ar; }; #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag)) @@ -447,7 +448,8 @@ struct node_instance_list *get_node_list(void) continue; if (rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) { - netdata_log_info( + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_INFO, "ACLK: 'host:%s' skipping get node list because context is initializing", rrdhost_hostname(host)); continue; } @@ -1490,7 +1492,7 @@ static void metadata_free_cmd_queue(struct metadata_wc *wc) while(wc->cmd_base) { struct metadata_cmd *t = wc->cmd_base; DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next); - freez(t); + aral_freez(wc->ar, t); } spinlock_unlock(&wc->cmd_queue_lock); } @@ -1505,7 +1507,7 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) goto wakeup_event_loop; - struct metadata_cmd *t = mallocz(sizeof(*t)); + struct metadata_cmd *t = aral_mallocz(wc->ar); *t = *cmd; t->prev = t->next = NULL; @@ -1519,14 +1521,14 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) { - struct metadata_cmd ret; + struct metadata_cmd ret, *to_free = NULL; spinlock_lock(&wc->cmd_queue_lock); if(wc->cmd_base) { struct metadata_cmd *t = wc->cmd_base; DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next); ret = *t; - freez(t); + to_free = t; } else { ret.opcode = METADATA_DATABASE_NOOP; @@ -1534,6 +1536,8 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) } spinlock_unlock(&wc->cmd_queue_lock); + aral_freez(wc->ar, to_free); + return ret; } @@ -2063,6 +2067,8 @@ static void metadata_event_loop(void *arg) struct metadata_wc *wc = arg; enum metadata_opcode opcode; + wc->ar = aral_by_size_acquire(sizeof(struct metadata_cmd)); + uv_thread_set_name_np("METASYNC"); loop = wc->loop = mallocz(sizeof(uv_loop_t)); ret = uv_loop_init(loop); @@ -2263,6 +2269,7 @@ static void metadata_event_loop(void *arg) fatal_assert(0 == uv_loop_close(loop)); error_after_loop_init: freez(loop); + aral_by_size_release(wc->ar); worker_unregister(); } @@ -2316,6 +2323,22 @@ void metadata_sync_shutdown_prepare(void) nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Host scan complete; can continue with shutdown"); } +void *metadata_sync_shutdown_thread(void *ptr __maybe_unused) { + metadata_sync_shutdown_prepare(); + return NULL; +} + +static ND_THREAD *metdata_sync_shutdown_background_wait_thread = NULL; +void metadata_sync_shutdown_background(void) { + metdata_sync_shutdown_background_wait_thread = nd_thread_create( + "METASYNC-SHUTDOWN", NETDATA_THREAD_OPTION_JOINABLE, metadata_sync_shutdown_thread, NULL); +} + +void metadata_sync_shutdown_background_wait(void) { + nd_thread_join(metdata_sync_shutdown_background_wait_thread); + metadata_sync_shutdown(); +} + // ------------------------------------------------------------- // Init function called on agent startup diff --git a/src/database/sqlite/sqlite_metadata.h b/src/database/sqlite/sqlite_metadata.h index bd5a98e597aab2..59a00a1a4ff475 100644 --- a/src/database/sqlite/sqlite_metadata.h +++ b/src/database/sqlite/sqlite_metadata.h @@ -70,6 +70,9 @@ void metadata_queue_ae_save(RRDHOST *host, ALARM_ENTRY *ae); void metadata_queue_ae_deletion(ALARM_ENTRY *ae); void commit_alert_transitions(RRDHOST *host); +void metadata_sync_shutdown_background(void); +void metadata_sync_shutdown_background_wait(void); + // UNIT TEST int metadata_unittest(void); #endif //NETDATA_SQLITE_METADATA_H diff --git a/src/exporting/exporting_engine.c b/src/exporting/exporting_engine.c index 7abe0b5ce18ea0..c35312365c614e 100644 --- a/src/exporting/exporting_engine.c +++ b/src/exporting/exporting_engine.c @@ -124,8 +124,6 @@ static void exporting_main_cleanup(void *pptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_info("cleaning up..."); - if (!engine) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return; diff --git a/src/health/health_event_loop.c b/src/health/health_event_loop.c index df323faffe6188..1b9c84aea5cdb2 100644 --- a/src/health/health_event_loop.c +++ b/src/health/health_event_loop.c @@ -660,39 +660,31 @@ static void health_event_loop(void) { health_alarm_log_process_to_send_notifications(host, hrm); alerts_raised_summary_free(hrm); - if (unlikely(!service_running(SERVICE_HEALTH))) { - // wait for all notifications to finish before allowing health to be cleaned up - wait_for_all_notifications_to_finish_before_allowing_health_to_be_cleaned_up(); - break; - } - } + int32_t pending = __atomic_load_n(&host->health.pending_transitions, __ATOMIC_RELAXED); + if (pending) + commit_alert_transitions(host); - int32_t pending = __atomic_load_n(&host->health.pending_transitions, __ATOMIC_RELAXED); - if (pending) - commit_alert_transitions(host); - - if (!__atomic_load_n(&host->health.pending_transitions, __ATOMIC_RELAXED)) { - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (wc && wc->send_snapshot == 1) { - wc->send_snapshot = 2; - rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - } else { - if (process_alert_pending_queue(host)) + if (!__atomic_load_n(&host->health.pending_transitions, __ATOMIC_RELAXED)) { + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (wc && wc->send_snapshot == 1) { + wc->send_snapshot = 2; rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + } else { + if (process_alert_pending_queue(host)) + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + } } } - dfe_done(host); - // wait for all notifications to finish before allowing health to be cleaned up - wait_for_all_notifications_to_finish_before_allowing_health_to_be_cleaned_up(); - if(unlikely(!service_running(SERVICE_HEALTH))) break; health_sleep(next_run, loop); - } // forever + + // wait for all notifications to finish before allowing health to be cleaned up + wait_for_all_notifications_to_finish_before_allowing_health_to_be_cleaned_up(); } @@ -702,7 +694,6 @@ static void health_main_cleanup(void *pptr) { worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_info("cleaning up..."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; nd_log(NDLS_DAEMON, NDLP_DEBUG, "Health thread ended."); @@ -716,8 +707,8 @@ void *health_main(void *ptr) { worker_register_job_name(WORKER_HEALTH_JOB_CALC_EVAL, "calc eval"); worker_register_job_name(WORKER_HEALTH_JOB_WARNING_EVAL, "warning eval"); worker_register_job_name(WORKER_HEALTH_JOB_CRITICAL_EVAL, "critical eval"); - worker_register_job_name(WORKER_HEALTH_JOB_ALARM_LOG_ENTRY, "alarm log entry"); - worker_register_job_name(WORKER_HEALTH_JOB_ALARM_LOG_PROCESS, "alarm log process"); + worker_register_job_name(WORKER_HEALTH_JOB_ALARM_LOG_ENTRY, "alert log entry"); + worker_register_job_name(WORKER_HEALTH_JOB_ALARM_LOG_PROCESS, "alert log process"); worker_register_job_name(WORKER_HEALTH_JOB_DELAYED_INIT_RRDSET, "rrdset init"); worker_register_job_name(WORKER_HEALTH_JOB_DELAYED_INIT_RRDDIM, "rrddim init"); diff --git a/src/libnetdata/functions_evloop/functions_evloop.c b/src/libnetdata/functions_evloop/functions_evloop.c index 4aa4a2a2f6a0a2..23729ea1aa97ea 100644 --- a/src/libnetdata/functions_evloop/functions_evloop.c +++ b/src/libnetdata/functions_evloop/functions_evloop.c @@ -59,6 +59,20 @@ struct functions_evloop_globals { } dyncfg; struct rrd_functions_expectation *expectations; + + struct buffered_reader reader; + BUFFER *buffer; + char *words[MAX_FUNCTION_PARAMETERS]; + struct { + size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc... + bool enabled; + char *transaction; + char *function; + char *timeout_s; + char *access; + char *source; + char *content_type; + } deferred; }; static void rrd_functions_worker_canceller(void *data) { @@ -137,7 +151,8 @@ static void worker_add_job(struct functions_evloop_globals *wg, const char *keyw function?function:"(unset)"); } else { - // nd_log(NDLS_COLLECTORS, NDLP_INFO, "WORKER JOB WITH PAYLOAD '%s'", payload ? buffer_tostring(payload) : "NONE"); +// nd_log(NDLS_COLLECTORS, NDLP_INFO, "WORKER JOB: keyword '%s', transaction '%s', function '%s', timeout '%s', access '%s', source '%s', payload '%s'", +// keyword, transaction, function, timeout_s, access, source, payload ? buffer_tostring(payload) : "NONE"); int timeout = str2i(timeout_s); @@ -187,28 +202,116 @@ static void worker_add_job(struct functions_evloop_globals *wg, const char *keyw } } +static bool rrd_function_worker_global_process_input(struct functions_evloop_globals *wg) { + if(wg->deferred.enabled) { + char *s = (char *)buffer_tostring(wg->buffer); + + if(strstr(&s[wg->deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD END"); + + if(wg->deferred.last_len > 0) + // remove the trailing newline from the buffer + wg->deferred.last_len--; + + s[wg->deferred.last_len] = '\0'; + wg->buffer->len = wg->deferred.last_len; + wg->buffer->content_type = content_type_string2id(wg->deferred.content_type); + worker_add_job(wg, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN, + wg->deferred.transaction, wg->deferred.function, + wg->deferred.timeout_s, wg->buffer, wg->deferred.access, wg->deferred.source); + buffer_flush(wg->buffer); + + freez(wg->deferred.transaction); + freez(wg->deferred.function); + freez(wg->deferred.timeout_s); + freez(wg->deferred.access); + freez(wg->deferred.source); + freez(wg->deferred.content_type); + memset(&wg->deferred, 0, sizeof(wg->deferred)); + } + else + wg->deferred.last_len = wg->buffer->len; + + return false; + } + + size_t num_words = quoted_strings_splitter_whitespace((char *)buffer_tostring(wg->buffer), wg->words, _countof(wg->words)); + const char *keyword = get_word(wg->words, num_words, 0); + + char **words = wg->words; + if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + char *access = get_word(words, num_words, 4); + char *source = get_word(words, num_words, 5); + worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source); + } + else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + char *access = get_word(words, num_words, 4); + char *source = get_word(words, num_words, 5); + char *content_type = get_word(words, num_words, 6); + + wg->deferred.transaction = strdupz(transaction ? transaction : ""); + wg->deferred.timeout_s = strdupz(timeout_s ? timeout_s : ""); + wg->deferred.function = strdupz(function ? function : ""); + wg->deferred.access = strdupz(access ? access : ""); + wg->deferred.source = strdupz(source ? source : ""); + wg->deferred.content_type = strdupz(content_type ? content_type : ""); + wg->deferred.last_len = 0; + wg->deferred.enabled = true; + } + else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) { + char *transaction = get_word(words, num_words, 1); + const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); + if(acquired) { + struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); + __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_del(wg->worker_queue, transaction); + dictionary_garbage_collect(wg->worker_queue); + } + else + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction); + } + else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) { + char *transaction = get_word(words, num_words, 1); + const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); + if(acquired) { + struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); + + functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut); + + dictionary_acquired_item_release(wg->worker_queue, acquired); + } + else + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction); + } + else if(keyword && strcmp(keyword, PLUGINSD_CALL_QUIT) == 0) { + *wg->plugin_should_exit = true; + return true; + } + else + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword ? keyword : "(unset)"); + + buffer_flush(wg->buffer); + + return false; +} + static void *rrd_functions_worker_globals_reader_main(void *arg) { struct functions_evloop_globals *wg = arg; - struct { - size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc... - bool enabled; - char *transaction; - char *function; - char *timeout_s; - char *access; - char *source; - char *content_type; - } deferred = { 0 }; - - struct buffered_reader reader = { 0 }; - buffered_reader_init(&reader); - BUFFER *buffer = buffer_create(sizeof(reader.read_buffer) + 2, NULL); + buffered_reader_init(&wg->reader); + wg->buffer = buffer_create(sizeof(wg->reader.read_buffer) + 2, NULL); while(!(*wg->plugin_should_exit)) { - if(unlikely(!buffered_reader_next_line(&reader, buffer))) { + if(unlikely(!buffered_reader_next_line(&wg->reader, wg->buffer))) { buffered_reader_ret_t ret = buffered_reader_read_timeout( - &reader, + &wg->reader, fileno((FILE *)stdin), 2 * 60 * MSEC_PER_SEC, false @@ -220,106 +323,8 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { continue; } - if(deferred.enabled) { - char *s = (char *)buffer_tostring(buffer); - - if(strstr(&s[deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) { - // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD END"); - - if(deferred.last_len > 0) - // remove the trailing newline from the buffer - deferred.last_len--; - - s[deferred.last_len] = '\0'; - buffer->len = deferred.last_len; - buffer->content_type = content_type_string2id(deferred.content_type); - worker_add_job(wg, - PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN, deferred.transaction, deferred.function, - deferred.timeout_s, buffer, deferred.access, deferred.source); - buffer_flush(buffer); - - freez(deferred.transaction); - freez(deferred.function); - freez(deferred.timeout_s); - freez(deferred.access); - freez(deferred.source); - freez(deferred.content_type); - memset(&deferred, 0, sizeof(deferred)); - } - else - deferred.last_len = buffer->len; - - continue; - } - - char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; - size_t num_words = quoted_strings_splitter_whitespace((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS); - - const char *keyword = get_word(words, num_words, 0); - - if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) { - // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION CALL"); - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); - char *access = get_word(words, num_words, 4); - char *source = get_word(words, num_words, 5); - worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source); - } - else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) { - // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD CALL"); - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); - char *access = get_word(words, num_words, 4); - char *source = get_word(words, num_words, 5); - char *content_type = get_word(words, num_words, 6); - - deferred.transaction = strdupz(transaction ? transaction : ""); - deferred.timeout_s = strdupz(timeout_s ? timeout_s : ""); - deferred.function = strdupz(function ? function : ""); - deferred.access = strdupz(access ? access : ""); - deferred.source = strdupz(source ? source : ""); - deferred.content_type = strdupz(content_type ? content_type : ""); - deferred.last_len = 0; - deferred.enabled = true; - } - else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) { - // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION CANCEL"); - char *transaction = get_word(words, num_words, 1); - const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); - if(acquired) { - struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); - __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); - dictionary_acquired_item_release(wg->worker_queue, acquired); - dictionary_del(wg->worker_queue, transaction); - dictionary_garbage_collect(wg->worker_queue); - } - else - nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction); - } - else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) { - // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PROGRESS"); - char *transaction = get_word(words, num_words, 1); - const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); - if(acquired) { - struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); - - functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut); - - dictionary_acquired_item_release(wg->worker_queue, acquired); - } - else - nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction); - } - else if(keyword && strcmp(keyword, PLUGINSD_CALL_QUIT) == 0) { - *wg->plugin_should_exit = true; + if(rrd_function_worker_global_process_input(wg)) break; - } - else - nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword ? keyword : "(unset)"); - - buffer_flush(buffer); } int status = 0; @@ -329,6 +334,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { } *wg->plugin_should_exit = true; + buffer_free(wg->buffer); exit(status); } diff --git a/src/libnetdata/libjudy/judy-malloc.c b/src/libnetdata/libjudy/judy-malloc.c index 7ce6f0d4b7fa2c..376f6cb80cbb2e 100644 --- a/src/libnetdata/libjudy/judy-malloc.c +++ b/src/libnetdata/libjudy/judy-malloc.c @@ -42,6 +42,10 @@ size_t judy_aral_structures(void) { return aral_structures_bytes_from_stats(&judy_sizes_aral_statistics); } +struct aral_statistics *judy_aral_statistics(void) { + return &judy_sizes_aral_statistics; +} + static ARAL *judy_size_aral(Word_t Words) { if(Words <= MAX_JUDY_SIZE_TO_ARAL && judy_sizes_aral[Words]) return judy_sizes_aral[Words]; diff --git a/src/libnetdata/libjudy/judy-malloc.h b/src/libnetdata/libjudy/judy-malloc.h index 23e10ca100ced4..2f6cf69391a625 100644 --- a/src/libnetdata/libjudy/judy-malloc.h +++ b/src/libnetdata/libjudy/judy-malloc.h @@ -7,6 +7,7 @@ size_t judy_aral_free_bytes(void); size_t judy_aral_structures(void); +struct aral_statistics *judy_aral_statistics(void); void JudyAllocThreadPulseReset(void); int64_t JudyAllocThreadPulseGetAndReset(void); diff --git a/src/ml/ml.cc b/src/ml/ml.cc index f828fad027b28b..d8881a4a0ae323 100644 --- a/src/ml/ml.cc +++ b/src/ml/ml.cc @@ -161,7 +161,8 @@ ml_dimension_add_model(const nd_uuid_t *metric_uuid, const ml_kmeans_inlined_t * int rc = 0; if (unlikely(!ml_db)) { - error_report("Database has not been initialized"); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "ML: Database has not been initialized to add ML models"); return 1; } @@ -235,7 +236,8 @@ ml_dimension_delete_models(const nd_uuid_t *metric_uuid, time_t before) int param = 0; if (unlikely(!ml_db)) { - error_report("Database has not been initialized"); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "ML: Database has not been initialized to delete ML models"); return 1; } @@ -285,7 +287,8 @@ ml_prune_old_models(size_t num_models_to_prune) int param = 0; if (unlikely(!ml_db)) { - error_report("Database has not been initialized"); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "ML: Database has not been initialized to prune old ML models"); return 1; } @@ -348,7 +351,8 @@ int ml_dimension_load_models(RRDDIM *rd, sqlite3_stmt **active_stmt) { int param = 0; if (unlikely(!ml_db)) { - error_report("Database has not been initialized"); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "ML: Database has not been initialized to load ML models"); return 1; } diff --git a/src/ml/ml_config.cc b/src/ml/ml_config.cc index 25f6f890fe1a5d..61de1e32a6fe17 100644 --- a/src/ml/ml_config.cc +++ b/src/ml/ml_config.cc @@ -45,7 +45,11 @@ void ml_config_load(ml_config_t *cfg) { std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average"); time_t anomaly_detection_query_duration = config_get_duration_seconds(config_section_ml, "anomaly detection grouping duration", 5 * 60); - size_t num_worker_threads = config_get_number(config_section_ml, "num training threads", os_get_system_cpus() / 4); + size_t num_worker_threads = stream_conf_configured_as_parent() ? get_netdata_cpus() / 4 : 1; + if (num_worker_threads < 1) num_worker_threads = 1; + else if (num_worker_threads > 256) num_worker_threads = 256; + num_worker_threads = config_get_number(config_section_ml, "num training threads", num_worker_threads); + size_t flush_models_batch_size = config_get_number(config_section_ml, "flush models batch size", 256); size_t suppression_window = diff --git a/src/plugins.d/pluginsd_parser.c b/src/plugins.d/pluginsd_parser.c index cd82a3186703d7..1427acc7d7a1d8 100644 --- a/src/plugins.d/pluginsd_parser.c +++ b/src/plugins.d/pluginsd_parser.c @@ -376,13 +376,19 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p return PARSER_RC_OK; } -static void backfill_callback(size_t successful_dims __maybe_unused, size_t failed_dims __maybe_unused, struct backfill_request_data *brd) { +static bool backfill_callback(size_t successful_dims __maybe_unused, size_t failed_dims __maybe_unused, struct backfill_request_data *brd) { if(!rrdhost_state_acquire(brd->host, brd->rrdhost_receiver_state_id)) - return; + return false; - if (!replicate_chart_request(send_to_plugin, brd->parser, brd->host, brd->st, - brd->first_entry_child, brd->last_entry_child, brd->child_wall_clock_time, - 0, 0)) { + bool rc = replicate_chart_request(send_to_plugin, brd->parser, brd->host, brd->st, + brd->first_entry_child, brd->last_entry_child, brd->child_wall_clock_time, + 0, 0); + if (rc) { + rrdset_flag_set(brd->st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(brd->st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdhost_receiver_replicating_charts_plus_one(brd->st->rrdhost); + } + else { netdata_log_error( "PLUGINSD: 'host:%s' failed to initiate replication for 'chart:%s'", rrdhost_hostname(brd->host), @@ -390,6 +396,7 @@ static void backfill_callback(size_t successful_dims __maybe_unused, size_t fail } rrdhost_state_release(brd->host); + return rc; } static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) { @@ -416,10 +423,6 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w st->replay.before = 0; #endif - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); - struct backfill_request_data brd = { .rrdhost_receiver_state_id = rrdhost_state_id(host), .parser = parser, @@ -432,8 +435,7 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w ok = backfill_request_add(st, backfill_callback, &brd); if(!ok) - ok = replicate_chart_request( - send_to_plugin, parser, host, st, first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); + ok = backfill_callback(0, 0, &brd); } #ifdef NETDATA_LOG_REPLICATION_REQUESTS else { diff --git a/src/streaming/replication.c b/src/streaming/replication.c index 1d970ed73d7b4b..09b3a61387e908 100644 --- a/src/streaming/replication.c +++ b/src/streaming/replication.c @@ -32,7 +32,7 @@ #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 -#define MAX_REPLICATION_THREADS 32 +#define MAX_REPLICATION_THREADS 256 #define REQUESTS_AHEAD_PER_THREAD 1 // 1 = enable synchronous queries static struct replication_query_statistics replication_queries = { @@ -190,7 +190,7 @@ static struct replication_query *replication_query_prepare( d->rd = rd; STORAGE_PRIORITY priority = q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW; - if(synchronous) priority = STORAGE_PRIORITY_SYNCHRONOUS; + if(synchronous) priority = STORAGE_PRIORITY_SYNCHRONOUS_FIRST; stream_control_replication_query_started(); storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, @@ -1889,9 +1889,7 @@ void *replication_thread_main(void *ptr) { replication_initialize_workers(true); - int nodes = (int)dictionary_entries(rrdhost_root_index); - int cpus = (int)get_netdata_cpus(); - int threads = cpus / 2; + int threads = stream_conf_configured_as_parent() ? (int)(get_netdata_cpus() / 2) : 1; if (threads < 1) threads = 1; else if (threads > MAX_REPLICATION_THREADS) threads = MAX_REPLICATION_THREADS; @@ -1907,8 +1905,6 @@ void *replication_thread_main(void *ptr) { config_set_number(CONFIG_SECTION_DB, "replication threads", threads); } - netdata_log_info("replication threads set to %d (cpu cores = %d, nodes = %d)", threads, cpus, nodes); - if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(ND_THREAD *)); diff --git a/src/streaming/stream-connector.c b/src/streaming/stream-connector.c index 2145039339b771..b184028dd207c4 100644 --- a/src/streaming/stream-connector.c +++ b/src/streaming/stream-connector.c @@ -549,10 +549,6 @@ void stream_connector_add(struct sender_state *s) { } static void stream_connector_remove(struct sender_state *s) { - nd_log(NDLS_DAEMON, NDLP_NOTICE, - "STREAM CONNECT '%s' [stopped]: stopped streaming connector for host, reason: %s", - rrdhost_hostname(s->host), stream_handshake_error_to_string(s->exit.reason)); - struct connector *sc = stream_connector_get(s); __atomic_sub_fetch(&sc->nodes, 1, __ATOMIC_RELAXED); diff --git a/src/streaming/stream-receiver.c b/src/streaming/stream-receiver.c index d6fb223ce20798..cfe6c716062102 100644 --- a/src/streaming/stream-receiver.c +++ b/src/streaming/stream-receiver.c @@ -466,14 +466,18 @@ void stream_receiver_move_entire_queue_to_running_unsafe(struct stream_thread *s static void stream_receiver_remove(struct stream_thread *sth, struct receiver_state *rpt, const char *why) { internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); + PARSER *parser = __atomic_load_n(&rpt->thread.parser, __ATOMIC_RELAXED); + size_t count = parser ? parser->user.data_collections_count : 0; + errno_clear(); nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM RCV[%zu] '%s' [from [%s]:%s]: " - "receiver disconnected: %s" + "receiver disconnected (after %zu received messages): %s" , sth->id , rpt->hostname ? rpt->hostname : "-" , rpt->client_ip ? rpt->client_ip : "-" , rpt->client_port ? rpt->client_port : "-" + , count , why ? why : ""); rrdhost_state_disconnected(rpt->host); @@ -498,8 +502,6 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st buffer_free(rpt->thread.buffer); rpt->thread.buffer = NULL; - size_t count = 0; - PARSER *parser = __atomic_load_n(&rpt->thread.parser, __ATOMIC_RELAXED); if(parser) { parser->user.v2.stream_buffer.wb = NULL; @@ -509,19 +511,11 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st parser->fd_output = -1; parser->sock = NULL; spinlock_unlock(&parser->writer.spinlock); - - count = parser->user.data_collections_count; } // the parser stopped receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false); - { - char msg[100 + 1]; - snprintfz(msg, sizeof(msg) - 1, "receiver disconnected (completed %zu updates)", count); - stream_receiver_log_status(rpt, msg, STREAM_STATUS_DISCONNECTED, NDLP_WARNING); - } - // in case we are connected to netdata cloud, // we inform cloud that a child got disconnected uint64_t total_reboot = rrdhost_stream_path_total_reboot_time_ms(rpt->host); @@ -786,6 +780,7 @@ void stream_receiver_cleanup(struct stream_thread *sth) { m = META_NEXT(&sth->run.meta, &idx)) { if (m->type != POLLFD_TYPE_RECEIVER) continue; struct receiver_state *rpt = m->rpt; + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); stream_receiver_remove(sth, rpt, "shutdown"); } } diff --git a/src/streaming/stream-sender.c b/src/streaming/stream-sender.c index a5984cdfed6b06..08f3b52306e743 100644 --- a/src/streaming/stream-sender.c +++ b/src/streaming/stream-sender.c @@ -619,6 +619,8 @@ void stream_sender_cleanup(struct stream_thread *sth) { }; ND_LOG_STACK_PUSH(lgs); + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN; + s->exit.shutdown = true; stream_sender_move_running_to_connector_or_remove(sth, s, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); } } diff --git a/src/web/api/queries/backfill.h b/src/web/api/queries/backfill.h index 5cce069d1cc173..7df7c2a141f2a8 100644 --- a/src/web/api/queries/backfill.h +++ b/src/web/api/queries/backfill.h @@ -16,7 +16,7 @@ struct backfill_request_data { time_t child_wall_clock_time; }; -typedef void (*backfill_callback_t)(size_t successful_dims, size_t failed_dims, struct backfill_request_data *brd); +typedef bool (*backfill_callback_t)(size_t successful_dims, size_t failed_dims, struct backfill_request_data *brd); void *backfill_thread(void *ptr); bool backfill_request_add(RRDSET *st, backfill_callback_t cb, struct backfill_request_data *data); diff --git a/src/web/api/queries/query.c b/src/web/api/queries/query.c index 86630ce506754b..1f7f62013a1317 100644 --- a/src/web/api/queries/query.c +++ b/src/web/api/queries/query.c @@ -2002,7 +2002,7 @@ bool backfill_tier_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) { long before_wanted = smaller_tier_last_time; struct rrddim_tier *tmp = &rd->tiers[read_tier]; - storage_engine_query_init(tmp->seb, tmp->smh, &seqh, after_wanted, before_wanted, STORAGE_PRIORITY_SYNCHRONOUS); + storage_engine_query_init(tmp->seb, tmp->smh, &seqh, after_wanted, before_wanted, STORAGE_PRIORITY_SYNCHRONOUS_FIRST); size_t points_read = 0; diff --git a/src/web/api/queries/weights.c b/src/web/api/queries/weights.c index c43d0116641b43..e94823342da787 100644 --- a/src/web/api/queries/weights.c +++ b/src/web/api/queries/weights.c @@ -1281,7 +1281,7 @@ NETDATA_DOUBLE *rrd2rrdr_ks2( .time_group_options = time_group_options, .tier = tier, .query_source = QUERY_SOURCE_API_WEIGHTS, - .priority = STORAGE_PRIORITY_SYNCHRONOUS, + .priority = STORAGE_PRIORITY_SYNCHRONOUS_FIRST, }; QUERY_TARGET *qt = query_target_create(&qtr); @@ -1420,7 +1420,7 @@ static void rrdset_metric_correlations_volume( QUERY_VALUE baseline_average = rrdmetric2value(host, rca, ria, rma, baseline_after, baseline_before, options, time_group_method, time_group_options, tier, 0, - QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS); + QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS_FIRST); merge_query_value_to_stats(&baseline_average, stats, 1); if(!netdata_double_isnumber(baseline_average.value)) { @@ -1430,7 +1430,7 @@ static void rrdset_metric_correlations_volume( QUERY_VALUE highlight_average = rrdmetric2value(host, rca, ria, rma, after, before, options, time_group_method, time_group_options, tier, 0, - QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS); + QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS_FIRST); merge_query_value_to_stats(&highlight_average, stats, 1); if(!netdata_double_isnumber(highlight_average.value)) @@ -1450,7 +1450,7 @@ static void rrdset_metric_correlations_volume( snprintfz(highlight_countif_options, 50, "%s" NETDATA_DOUBLE_FORMAT, highlight_average.value < baseline_average.value ? "<" : ">", baseline_average.value); QUERY_VALUE highlight_countif = rrdmetric2value(host, rca, ria, rma, after, before, options, RRDR_GROUPING_COUNTIF, highlight_countif_options, tier, 0, - QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS); + QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS_FIRST); merge_query_value_to_stats(&highlight_countif, stats, 1); if(!netdata_double_isnumber(highlight_countif.value)) { @@ -1494,7 +1494,7 @@ static void rrdset_weights_value( QUERY_VALUE qv = rrdmetric2value(host, rca, ria, rma, after, before, options, time_group_method, time_group_options, tier, 0, - QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS); + QUERY_SOURCE_API_WEIGHTS, STORAGE_PRIORITY_SYNCHRONOUS_FIRST); merge_query_value_to_stats(&qv, stats, 1);