From fad05b0b4634336932c622fea3c6ecb4ad85874f Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Sat, 14 Dec 2024 12:01:09 +0200 Subject: [PATCH] Streaming improvements No 7 (#19204) * db ram pulse statistics using an API * dbengine total memory should be available in basic pulse statistics; buffers sum should be consistent with the buffers chart * unification of threads count for aclk and the web server; now we use 2x the cores for parents, 1x the cores for children * retention should be a duration * buffer size should be a size * up to 5 replication workers work on the same host * up to 2 replication workers work on the same host * Revert "up to 2 replication workers work on the same host" This reverts commit 314e5869e6d29fd25e19c0bdb5cd810125dab94e. * Revert "up to 5 replication workers work on the same host" This reverts commit 55d253bb57f87a12f76c3c299f24a473586c02ec. --- CMakeLists.txt | 8 ++++-- src/daemon/analytics.c | 2 +- .../netdata-conf-backwards-compatibility.c | 13 ++++++---- src/daemon/config/netdata-conf-cloud.c | 18 +++++++++++++ src/daemon/config/netdata-conf-cloud.h | 10 ++++++++ src/daemon/config/netdata-conf-db.c | 4 +-- src/daemon/config/netdata-conf-web.c | 22 ++++++++++++++++ src/daemon/config/netdata-conf-web.h | 2 ++ src/daemon/config/netdata-conf.h | 1 + src/daemon/pulse/pulse-daemon-memory.c | 25 +++++++++++-------- .../{pulse-dbengine.c => pulse-db-dbengine.c} | 10 ++++---- .../{pulse-dbengine.h => pulse-db-dbengine.h} | 6 ++--- src/daemon/pulse/pulse-db-rrd.c | 18 +++++++++++++ src/daemon/pulse/pulse-db-rrd.h | 16 ++++++++++++ src/daemon/pulse/pulse-dictionary.c | 2 -- src/daemon/pulse/pulse-dictionary.h | 2 -- src/daemon/pulse/pulse.h | 3 ++- src/database/ram/rrddim_mem.c | 16 +++++++----- src/database/rrddim.c | 14 +++++------ src/database/sqlite/sqlite_aclk.c | 21 +--------------- src/libnetdata/config/appconfig.c | 23 ++++++++++++----- src/libnetdata/config/appconfig.h | 2 +- src/ml/ml_config.cc | 2 +- src/plugins.d/plugins_d.c | 2 +- src/streaming/replication.c | 2 +- src/streaming/stream-conf.c | 16 +++++++++--- src/streaming/stream-conf.h | 2 +- src/streaming/stream.conf | 2 +- src/web/api/queries/backfill.c | 2 +- src/web/server/static/static-threaded.c | 21 +--------------- 30 files changed, 184 insertions(+), 103 deletions(-) create mode 100644 src/daemon/config/netdata-conf-cloud.c create mode 100644 src/daemon/config/netdata-conf-cloud.h rename src/daemon/pulse/{pulse-dbengine.c => pulse-db-dbengine.c} (99%) rename src/daemon/pulse/{pulse-dbengine.h => pulse-db-dbengine.h} (68%) create mode 100644 src/daemon/pulse/pulse-db-rrd.c create mode 100644 src/daemon/pulse/pulse-db-rrd.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 23726f52e20301..b03467c75d003b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1086,8 +1086,8 @@ set(DAEMON_FILES src/daemon/pulse/pulse-daemon-memory.h src/daemon/pulse/pulse-sqlite3.c src/daemon/pulse/pulse-sqlite3.h - src/daemon/pulse/pulse-dbengine.c - src/daemon/pulse/pulse-dbengine.h + src/daemon/pulse/pulse-db-dbengine.c + src/daemon/pulse/pulse-db-dbengine.h src/daemon/pulse/pulse-string.c src/daemon/pulse/pulse-string.h src/daemon/pulse/pulse-heartbeat.c @@ -1118,6 +1118,10 @@ set(DAEMON_FILES src/daemon/daemon-shutdown.h src/daemon/daemon-service.c src/daemon/daemon-service.h + src/daemon/pulse/pulse-db-rrd.c + src/daemon/pulse/pulse-db-rrd.h + src/daemon/config/netdata-conf-cloud.c + src/daemon/config/netdata-conf-cloud.h ) set(H2O_FILES diff --git a/src/daemon/analytics.c b/src/daemon/analytics.c index d1e39922a7752d..a263aff69a3b76 100644 --- a/src/daemon/analytics.c +++ b/src/daemon/analytics.c @@ -519,7 +519,7 @@ void analytics_gather_mutable_meta_data(void) analytics_alarms_notifications(); analytics_set_data( - &analytics_data.netdata_config_is_parent, (rrdhost_hosts_available() > 1 || stream_conf_configured_as_parent()) ? "true" : "false"); + &analytics_data.netdata_config_is_parent, (rrdhost_hosts_available() > 1 || stream_conf_is_parent(false)) ? "true" : "false"); analytics_set_data(&analytics_data.netdata_host_agent_claimed, is_agent_claimed() ? "true" : "false"); diff --git a/src/daemon/config/netdata-conf-backwards-compatibility.c b/src/daemon/config/netdata-conf-backwards-compatibility.c index cd8ba029dd0313..5e6c0140ca968d 100644 --- a/src/daemon/config/netdata-conf-backwards-compatibility.c +++ b/src/daemon/config/netdata-conf-backwards-compatibility.c @@ -132,11 +132,11 @@ void netdata_conf_backwards_compatibility(void) { config_move(CONFIG_SECTION_DB, "page cache size", CONFIG_SECTION_DB, "dbengine page cache size MB"); - config_move(CONFIG_SECTION_GLOBAL, "page cache uses malloc", - CONFIG_SECTION_DB, "dbengine page cache with malloc"); - - config_move(CONFIG_SECTION_DB, "page cache with malloc", - CONFIG_SECTION_DB, "dbengine page cache with malloc"); +// config_move(CONFIG_SECTION_GLOBAL, "page cache uses malloc", +// CONFIG_SECTION_DB, "dbengine page cache with malloc"); +// +// config_move(CONFIG_SECTION_DB, "page cache with malloc", +// CONFIG_SECTION_DB, "dbengine page cache with malloc"); config_move(CONFIG_SECTION_GLOBAL, "memory deduplication (ksm)", CONFIG_SECTION_DB, "memory deduplication (ksm)"); @@ -177,6 +177,9 @@ void netdata_conf_backwards_compatibility(void) { config_move(CONFIG_SECTION_GLOBAL, "enable zero metrics", CONFIG_SECTION_DB, "enable zero metrics"); + config_move(CONFIG_SECTION_CLOUD, "query thread count", + CONFIG_SECTION_CLOUD, "query threads"); + // ---------------------------------------------------------------------------------------------------------------- // global statistics -> telemetry -> pulse diff --git a/src/daemon/config/netdata-conf-cloud.c b/src/daemon/config/netdata-conf-cloud.c new file mode 100644 index 00000000000000..ecc24ebef3048d --- /dev/null +++ b/src/daemon/config/netdata-conf-cloud.c @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "netdata-conf-cloud.h" +#include "../common.h" + +int netdata_conf_cloud_query_threads(void) { + int cpus = MIN(get_netdata_cpus(), 256); // max 256 cores + int threads = MIN(cpus * (stream_conf_is_parent(false) ? 2 : 1), libuv_worker_threads / 2); + threads = MAX(threads, 6); + + threads = config_get_number(CONFIG_SECTION_CLOUD, "query threads", threads); + if(threads < 1) { + netdata_log_error("[" CONFIG_SECTION_CLOUD "].query threads in netdata.conf needs to be at least 1. Overwriting it."); + threads = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query threads", threads); + } + return threads; +} diff --git a/src/daemon/config/netdata-conf-cloud.h b/src/daemon/config/netdata-conf-cloud.h new file mode 100644 index 00000000000000..d30df61fac2494 --- /dev/null +++ b/src/daemon/config/netdata-conf-cloud.h @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_NETDATA_CONF_CLOUD_H +#define NETDATA_NETDATA_CONF_CLOUD_H + +#include "libnetdata/libnetdata.h" + +int netdata_conf_cloud_query_threads(void); + +#endif //NETDATA_NETDATA_CONF_CLOUD_H diff --git a/src/daemon/config/netdata-conf-db.c b/src/daemon/config/netdata-conf-db.c index 19f0229f97a948..0b29a4a7813d49 100644 --- a/src/daemon/config/netdata-conf-db.c +++ b/src/daemon/config/netdata-conf-db.c @@ -373,13 +373,13 @@ void netdata_conf_section_db(void) { // get default database size if(default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && default_rrd_memory_mode != RRD_MEMORY_MODE_NONE) { - default_rrd_history_entries = (int)config_get_number( + default_rrd_history_entries = (int)config_get_duration_seconds( CONFIG_SECTION_DB, "retention", align_entries_to_pagesize(default_rrd_memory_mode, RRD_DEFAULT_HISTORY_ENTRIES)); long h = align_entries_to_pagesize(default_rrd_memory_mode, default_rrd_history_entries); if (h != default_rrd_history_entries) { - config_set_number(CONFIG_SECTION_DB, "retention", h); + config_set_duration_seconds(CONFIG_SECTION_DB, "retention", h); default_rrd_history_entries = (int)h; } } diff --git a/src/daemon/config/netdata-conf-web.c b/src/daemon/config/netdata-conf-web.c index 2bb9e7337a5223..9052a393e1c754 100644 --- a/src/daemon/config/netdata-conf-web.c +++ b/src/daemon/config/netdata-conf-web.c @@ -3,6 +3,27 @@ #include "netdata-conf-web.h" #include "daemon/static_threads.h" +int netdata_conf_web_query_threads(void) { + // See https://github.com/netdata/netdata/issues/11081#issuecomment-831998240 for more details + if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) { + config_set_number(CONFIG_SECTION_WEB, "web server threads", 1); + netdata_log_info("You are running an OpenSSL older than 1.1.0, web server will not enable multithreading."); + return 1; + } + + int cpus = MIN(get_netdata_cpus(), 256); // max 256 cores + int threads = cpus * (stream_conf_is_parent(false) ? 2 : 1); + threads = MAX(threads, 6); + + threads = config_get_number(CONFIG_SECTION_WEB, "web server threads", threads); + if(threads < 1) { + netdata_log_error("[" CONFIG_SECTION_WEB "].web server threads in netdata.conf needs to be at least 1. Overwriting it."); + threads = 1; + config_set_number(CONFIG_SECTION_WEB, "web server threads", threads); + } + return threads; +} + static int make_dns_decision(const char *section_name, const char *config_name, const char *default_value, SIMPLE_PATTERN *p) { const char *value = config_get(section_name,config_name,default_value); @@ -141,3 +162,4 @@ void netdata_conf_web_security_init(void) { netdata_ssl_initialize_openssl(); } + diff --git a/src/daemon/config/netdata-conf-web.h b/src/daemon/config/netdata-conf-web.h index bc95a8202df4e7..cab09026e14c72 100644 --- a/src/daemon/config/netdata-conf-web.h +++ b/src/daemon/config/netdata-conf-web.h @@ -9,4 +9,6 @@ void netdata_conf_section_web(void); void web_server_threading_selection(void); void netdata_conf_web_security_init(void); +int netdata_conf_web_query_threads(void); + #endif //NETDATA_NETDATA_CONF_WEB_H diff --git a/src/daemon/config/netdata-conf.h b/src/daemon/config/netdata-conf.h index a9469adba4ad1f..57efb7751b2720 100644 --- a/src/daemon/config/netdata-conf.h +++ b/src/daemon/config/netdata-conf.h @@ -14,6 +14,7 @@ bool netdata_conf_load(char *filename, char overwrite_used, const char **user); #include "netdata-conf-global.h" #include "netdata-conf-logs.h" #include "netdata-conf-web.h" +#include "netdata-conf-cloud.h" #include "daemon/common.h" diff --git a/src/daemon/pulse/pulse-daemon-memory.c b/src/daemon/pulse/pulse-daemon-memory.c index dac7d317997c81..a780f4cd9f38af 100644 --- a/src/daemon/pulse/pulse-daemon-memory.c +++ b/src/daemon/pulse/pulse-daemon-memory.c @@ -12,7 +12,9 @@ struct netdata_buffers_statistics netdata_buffers_statistics = {}; void pulse_daemon_memory_do(bool extended) { { static RRDSET *st_memory = NULL; - static RRDDIM *rd_database = NULL; + static RRDDIM *rd_db_dbengine = NULL; + static RRDDIM *rd_db_rrd = NULL; + #ifdef DICT_WITH_STATS static RRDDIM *rd_collectors = NULL; static RRDDIM *rd_rrdhosts = NULL; @@ -50,7 +52,8 @@ void pulse_daemon_memory_do(bool extended) { localhost->rrd_update_every, RRDSET_TYPE_STACKED); - rd_database = rrddim_add(st_memory, "db", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_db_dbengine = rrddim_add(st_memory, "dbengine", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_db_rrd = rrddim_add(st_memory, "rrd", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); #ifdef DICT_WITH_STATS rd_collectors = rrddim_add(st_memory, "collectors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -75,8 +78,9 @@ void pulse_daemon_memory_do(bool extended) { rd_other = rrddim_add(st_memory, "other", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } + // each of these should also be analyzed below at the buffers chart size_t buffers = - netdata_buffers_statistics.query_targets_size + + netdata_buffers_statistics.query_targets_size + onewayalloc_allocated_memory() + netdata_buffers_statistics.rrdset_done_rda_size + netdata_buffers_statistics.buffers_aclk + netdata_buffers_statistics.buffers_api + @@ -92,8 +96,8 @@ void pulse_daemon_memory_do(bool extended) { size_t strings = 0; string_statistics(NULL, NULL, NULL, NULL, NULL, &strings, NULL, NULL); - rrddim_set_by_pointer(st_memory, rd_database, - (collected_number)pulse_dbengine_total_memory + (collected_number)rrddim_db_memory_size); + rrddim_set_by_pointer(st_memory, rd_db_dbengine, (collected_number)pulse_dbengine_total_memory); + rrddim_set_by_pointer(st_memory, rd_db_rrd, (collected_number)pulse_rrd_memory_size); #ifdef DICT_WITH_STATS rrddim_set_by_pointer(st_memory, rd_collectors, @@ -157,11 +161,11 @@ void pulse_daemon_memory_do(bool extended) { rrddim_set_by_pointer(st_memory, rd_aral, (collected_number)aral_by_size_structures_bytes()); - rrddim_set_by_pointer(st_memory, - rd_judy, (collected_number) judy_aral_structures()); + rrddim_set_by_pointer(st_memory, rd_judy, + (collected_number) judy_aral_structures()); - rrddim_set_by_pointer(st_memory, - rd_other, (collected_number)dictionary_stats_memory_total(dictionary_stats_category_other)); + rrddim_set_by_pointer(st_memory, rd_other, + (collected_number)dictionary_stats_memory_total(dictionary_stats_category_other)); rrdset_done(st_memory); } @@ -221,6 +225,7 @@ void pulse_daemon_memory_do(bool extended) { rd_buffers_judy = rrddim_add(st_memory_buffers, "aral-judy free", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } + // the sum of all these needs to be above at the total buffers calculation rrddim_set_by_pointer(st_memory_buffers, rd_queries, (collected_number)netdata_buffers_statistics.query_targets_size + (collected_number) onewayalloc_allocated_memory()); rrddim_set_by_pointer(st_memory_buffers, rd_collectors, (collected_number)netdata_buffers_statistics.rrdset_done_rda_size); rrddim_set_by_pointer(st_memory_buffers, rd_buffers_aclk, (collected_number)netdata_buffers_statistics.buffers_aclk); @@ -231,8 +236,8 @@ void pulse_daemon_memory_do(bool extended) { rrddim_set_by_pointer(st_memory_buffers, rd_buffers_health, (collected_number)netdata_buffers_statistics.buffers_health); rrddim_set_by_pointer(st_memory_buffers, rd_buffers_streaming, (collected_number)netdata_buffers_statistics.buffers_streaming); rrddim_set_by_pointer(st_memory_buffers, rd_cbuffers_streaming, (collected_number)netdata_buffers_statistics.cbuffers_streaming); - rrddim_set_by_pointer(st_memory_buffers, rd_buffers_replication, (collected_number)replication_allocated_buffers()); rrddim_set_by_pointer(st_memory_buffers, rd_buffers_web, (collected_number)netdata_buffers_statistics.buffers_web); + rrddim_set_by_pointer(st_memory_buffers, rd_buffers_replication, (collected_number)replication_allocated_buffers()); rrddim_set_by_pointer(st_memory_buffers, rd_buffers_aral, (collected_number)aral_by_size_free_bytes()); rrddim_set_by_pointer(st_memory_buffers, rd_buffers_judy, (collected_number)judy_aral_free_bytes()); diff --git a/src/daemon/pulse/pulse-dbengine.c b/src/daemon/pulse/pulse-db-dbengine.c similarity index 99% rename from src/daemon/pulse/pulse-dbengine.c rename to src/daemon/pulse/pulse-db-dbengine.c index 950aff5b09928a..b5fc64c6d68f6d 100644 --- a/src/daemon/pulse/pulse-dbengine.c +++ b/src/daemon/pulse/pulse-db-dbengine.c @@ -1,7 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #define PULSE_INTERNALS 1 -#include "pulse-dbengine.h" +#include "pulse-db-dbengine.h" size_t pulse_dbengine_total_memory = 0; @@ -638,11 +638,7 @@ static void dbengine2_cache_statistics_charts(struct dbengine2_cache_pointers *p } } - void pulse_dbengine_do(bool extended) { - if(!main_cache || !main_mrg || !extended) - return; - static struct dbengine2_cache_pointers main_cache_ptrs = {}, open_cache_ptrs = {}, extent_cache_ptrs = {}; static struct rrdeng_cache_efficiency_stats cache_efficiency_stats = {}, cache_efficiency_stats_old = {}; static struct pgc_statistics pgc_main_stats = {}, pgc_main_stats_old = {}; (void)pgc_main_stats_old; @@ -686,6 +682,10 @@ void pulse_dbengine_do(bool extended) { mrg_stats.size + buffers_total_size + aral_structures_total_size + aral_padding_total_size + pgd_padding_bytes(); + // we need all the above for the total dbengine memory as reported by the non-extended netdata memory chart + if(!main_cache || !main_mrg || !extended) + return; + size_t priority = 135000; { diff --git a/src/daemon/pulse/pulse-dbengine.h b/src/daemon/pulse/pulse-db-dbengine.h similarity index 68% rename from src/daemon/pulse/pulse-dbengine.h rename to src/daemon/pulse/pulse-db-dbengine.h index cb7a7001abb651..8954fa042a64ab 100644 --- a/src/daemon/pulse/pulse-dbengine.h +++ b/src/daemon/pulse/pulse-db-dbengine.h @@ -1,7 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#ifndef NETDATA_PULSE_DBENGINE_H -#define NETDATA_PULSE_DBENGINE_H +#ifndef NETDATA_PULSE_DB_DBENGINE_H +#define NETDATA_PULSE_DB_DBENGINE_H #include "daemon/common.h" @@ -14,4 +14,4 @@ void pulse_dbengine_do(bool extended); #endif -#endif //NETDATA_PULSE_DBENGINE_H +#endif //NETDATA_PULSE_DB_DBENGINE_H diff --git a/src/daemon/pulse/pulse-db-rrd.c b/src/daemon/pulse/pulse-db-rrd.c new file mode 100644 index 00000000000000..1e9077a3df033c --- /dev/null +++ b/src/daemon/pulse/pulse-db-rrd.c @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "pulse-db-rrd.h" + +int64_t pulse_rrd_memory_size = 0; + +void pulse_db_rrd_memory_change(int64_t value) { + __atomic_add_fetch(&pulse_rrd_memory_size, value, __ATOMIC_RELAXED); +} + +void pulse_db_rrd_memory_add(uint64_t value) { + __atomic_add_fetch(&pulse_rrd_memory_size, value, __ATOMIC_RELAXED); +} + +void pulse_db_rrd_memory_sub(uint64_t value) { + __atomic_sub_fetch(&pulse_rrd_memory_size, value, __ATOMIC_RELAXED); +} + diff --git a/src/daemon/pulse/pulse-db-rrd.h b/src/daemon/pulse/pulse-db-rrd.h new file mode 100644 index 00000000000000..7d1e16d20aead2 --- /dev/null +++ b/src/daemon/pulse/pulse-db-rrd.h @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_PULSE_DB_RRD_H +#define NETDATA_PULSE_DB_RRD_H + +#include "libnetdata/libnetdata.h" + +void pulse_db_rrd_memory_change(int64_t value); +void pulse_db_rrd_memory_add(uint64_t value); +void pulse_db_rrd_memory_sub(uint64_t value); + +#if defined(PULSE_INTERNALS) +extern int64_t pulse_rrd_memory_size; +#endif + +#endif //NETDATA_PULSE_DB_RRD_H diff --git a/src/daemon/pulse/pulse-dictionary.c b/src/daemon/pulse/pulse-dictionary.c index 4677ae7864a75f..4dd2ed86175cb8 100644 --- a/src/daemon/pulse/pulse-dictionary.c +++ b/src/daemon/pulse/pulse-dictionary.c @@ -13,8 +13,6 @@ struct dictionary_stats dictionary_stats_category_rrdhealth = { .name = "health" struct dictionary_stats dictionary_stats_category_functions = { .name = "functions" }; struct dictionary_stats dictionary_stats_category_replication = { .name = "replication" }; -size_t rrddim_db_memory_size = 0; - #ifdef DICT_WITH_STATS struct dictionary_categories { struct dictionary_stats *stats; diff --git a/src/daemon/pulse/pulse-dictionary.h b/src/daemon/pulse/pulse-dictionary.h index 2b97eecc147012..a16303887abc0d 100644 --- a/src/daemon/pulse/pulse-dictionary.h +++ b/src/daemon/pulse/pulse-dictionary.h @@ -15,8 +15,6 @@ extern struct dictionary_stats dictionary_stats_category_rrdhealth; extern struct dictionary_stats dictionary_stats_category_functions; extern struct dictionary_stats dictionary_stats_category_replication; -extern size_t rrddim_db_memory_size; - #if defined(PULSE_INTERNALS) void pulse_dictionary_do(bool extended); #endif diff --git a/src/daemon/pulse/pulse.h b/src/daemon/pulse/pulse.h index 6aca15326a768e..28b8488f364c07 100644 --- a/src/daemon/pulse/pulse.h +++ b/src/daemon/pulse/pulse.h @@ -16,7 +16,8 @@ extern bool pulse_extended_enabled; #include "pulse-daemon.h" #include "pulse-daemon-memory.h" #include "pulse-sqlite3.h" -#include "pulse-dbengine.h" +#include "pulse-db-dbengine.h" +#include "pulse-db-rrd.h" #include "pulse-string.h" #include "pulse-heartbeat.h" #include "pulse-dictionary.h" diff --git a/src/database/ram/rrddim_mem.c b/src/database/ram/rrddim_mem.c index fab1479a192a6e..7c55ebd23f3d4f 100644 --- a/src/database/ram/rrddim_mem.c +++ b/src/database/ram/rrddim_mem.c @@ -52,7 +52,9 @@ rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si __maybe_unused) { struct mem_metric_handle *mh = (struct mem_metric_handle *)rrddim_metric_get(si, &rd->metric_uuid); while(!mh) { netdata_rwlock_wrlock(&rrddim_JudyHS_rwlock); + JudyAllocThreadPulseReset(); Pvoid_t *PValue = JudyHSIns(&rrddim_JudyHS_array, &rd->metric_uuid, sizeof(nd_uuid_t), PJE0); + int64_t mem = JudyAllocThreadPulseGetAndReset(); mh = *PValue; if(!mh) { mh = callocz(1, sizeof(struct mem_metric_handle)); @@ -60,7 +62,7 @@ rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si __maybe_unused) { mh->refcount = 1; update_metric_handle_from_rrddim(mh, rd); *PValue = mh; - __atomic_add_fetch(&rrddim_db_memory_size, sizeof(struct mem_metric_handle) + JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(nd_uuid_t)), __ATOMIC_RELAXED); + pulse_db_rrd_memory_change(mem + (int64_t)sizeof(struct mem_metric_handle)); } else { if(__atomic_add_fetch(&mh->refcount, 1, __ATOMIC_RELAXED) <= 0) @@ -107,11 +109,13 @@ void rrddim_metric_release(STORAGE_METRIC_HANDLE *smh __maybe_unused) { RRDDIM *rd = mh->rd; netdata_rwlock_wrlock(&rrddim_JudyHS_rwlock); + JudyAllocThreadPulseReset(); JudyHSDel(&rrddim_JudyHS_array, &rd->metric_uuid, sizeof(nd_uuid_t), PJE0); + int64_t mem = JudyAllocThreadPulseGetAndReset(); netdata_rwlock_wrunlock(&rrddim_JudyHS_rwlock); freez(mh); - __atomic_sub_fetch(&rrddim_db_memory_size, sizeof(struct mem_metric_handle) + JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(nd_uuid_t)), __ATOMIC_RELAXED); + pulse_db_rrd_memory_change(mem - (int64_t)sizeof(struct mem_metric_handle)); } } } @@ -147,7 +151,7 @@ STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *smh, uint32_t ch->rd = rd; ch->smh = smh; - __atomic_add_fetch(&rrddim_db_memory_size, sizeof(struct mem_collect_handle), __ATOMIC_RELAXED); + pulse_db_rrd_memory_add(sizeof(struct mem_collect_handle)); return (STORAGE_COLLECT_HANDLE *)ch; } @@ -235,7 +239,7 @@ void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *sch, int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *sch) { freez(sch); - __atomic_sub_fetch(&rrddim_db_memory_size, sizeof(struct mem_collect_handle), __ATOMIC_RELAXED); + pulse_db_rrd_memory_sub(sizeof(struct mem_collect_handle)); return 0; } @@ -355,7 +359,7 @@ void rrddim_query_init(STORAGE_METRIC_HANDLE *smh, struct storage_engine_query_h // netdata_log_info("RRDDIM QUERY INIT: start %ld, end %ld, next %ld, first %ld, last %ld, dt %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp, h->dt); - __atomic_add_fetch(&rrddim_db_memory_size, sizeof(struct mem_query_handle), __ATOMIC_RELAXED); + pulse_db_rrd_memory_add(sizeof(struct mem_query_handle)); seqh->handle = (STORAGE_QUERY_HANDLE *)h; } @@ -419,7 +423,7 @@ void rrddim_query_finalize(struct storage_engine_query_handle *seqh) { #endif freez(seqh->handle); - __atomic_sub_fetch(&rrddim_db_memory_size, sizeof(struct mem_query_handle), __ATOMIC_RELAXED); + pulse_db_rrd_memory_sub(sizeof(struct mem_query_handle)); } time_t rrddim_query_align_to_optimal_before(struct storage_engine_query_handle *seqh) { diff --git a/src/database/rrddim.c b/src/database/rrddim.c index de2c6259c589ec..571543b1fce767 100644 --- a/src/database/rrddim.c +++ b/src/database/rrddim.c @@ -63,13 +63,13 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v if(!entries) entries = 5; rd->db.data = netdata_mmap(NULL, entries * sizeof(storage_number), MAP_PRIVATE, 1, false, NULL); - if(!rd->db.data) { - netdata_log_info("Failed to use memory mode ram for chart '%s', dimension '%s', falling back to alloc", rrdset_name(st), rrddim_name(rd)); - ctr->memory_mode = RRD_MEMORY_MODE_ALLOC; + if(rd->db.data) { + rd->db.memsize = entries * sizeof(storage_number); + pulse_db_rrd_memory_add(rd->db.memsize); } else { - rd->db.memsize = entries * sizeof(storage_number); - __atomic_add_fetch(&rrddim_db_memory_size, rd->db.memsize, __ATOMIC_RELAXED); + netdata_log_info("Failed to use memory mode ram for chart '%s', dimension '%s', falling back to alloc", rrdset_name(st), rrddim_name(rd)); + ctr->memory_mode = RRD_MEMORY_MODE_ALLOC; } } @@ -79,7 +79,7 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v rd->db.data = rrddim_alloc_db(entries); rd->db.memsize = entries * sizeof(storage_number); - __atomic_add_fetch(&rrddim_db_memory_size, rd->db.memsize, __ATOMIC_RELAXED); + pulse_db_rrd_memory_add(rd->db.memsize); } rd->rrd_memory_mode = ctr->memory_mode; @@ -223,7 +223,7 @@ static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v } if(rd->db.data) { - __atomic_sub_fetch(&rrddim_db_memory_size, rd->db.memsize, __ATOMIC_RELAXED); + pulse_db_rrd_memory_sub(rd->db.memsize); if(rd->rrd_memory_mode == RRD_MEMORY_MODE_RAM) netdata_munmap(rd->db.data, rd->db.memsize); diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index 82a89ee63541d1..ea5ecdc89c0faf 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -329,25 +329,6 @@ static void aclk_run_query_job(uv_work_t *req) aclk_run_query(config, query); } -static int read_query_thread_count() -{ - int threads = MIN(get_netdata_cpus()/2, 6); - threads = MAX(threads, 2); - threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); - if(threads < 1) { - netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); - threads = 1; - config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); - } - else { - if (threads > libuv_worker_threads / 2) { - threads = MAX(libuv_worker_threads / 2, 2); - config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); - } - } - return threads; -} - static void node_update_timer_cb(uv_timer_t *handle) { struct aclk_sync_cfg_t *ahc = handle->data; @@ -398,7 +379,7 @@ static void aclk_synchronization(void *arg) sql_delete_aclk_table_list(); - int query_thread_count = read_query_thread_count(); + int query_thread_count = netdata_conf_cloud_query_threads(); netdata_log_info("Starting ACLK synchronization thread with %d parallel query threads", query_thread_count); while (likely(service_running(SERVICE_ACLK))) { diff --git a/src/libnetdata/config/appconfig.c b/src/libnetdata/config/appconfig.c index ae5008fd15b784..613a96e79695cc 100644 --- a/src/libnetdata/config/appconfig.c +++ b/src/libnetdata/config/appconfig.c @@ -62,19 +62,30 @@ bool stream_conf_needs_dbengine(struct config *root) { return ret; } -bool stream_conf_has_uuid_section(struct config *root) { +bool stream_conf_has_api_enabled(struct config *root) { struct config_section *sect = NULL; + struct config_option *opt; bool is_parent = false; APPCONFIG_LOCK(root); for (sect = root->sections; sect; sect = sect->next) { nd_uuid_t uuid; - if (uuid_parse(string2str(sect->name), uuid) != -1 && - appconfig_get_boolean_by_section(sect, "enabled", 0)) { - is_parent = true; - break; - } + if (uuid_parse(string2str(sect->name), uuid) != 0) + continue; + + opt = appconfig_option_find(sect, "type"); + // when the 'type' is missing, we assume it is 'api' + if(opt && string_strcmp(opt->value, "api") != 0) + continue; + + opt = appconfig_option_find(sect, "enabled"); + // when the 'enabled' is missing, we assume it is 'false' + if(!opt || !appconfig_test_boolean_value(string2str(opt->value))) + continue; + + is_parent = true; + break; } APPCONFIG_UNLOCK(root); diff --git a/src/libnetdata/config/appconfig.h b/src/libnetdata/config/appconfig.h index 2fabe91a58cb71..766dfeee81e8f6 100644 --- a/src/libnetdata/config/appconfig.h +++ b/src/libnetdata/config/appconfig.h @@ -181,7 +181,7 @@ _CONNECTOR_INSTANCE *add_connector_instance(struct config_section *connector, st #define config_section_option_destroy(section, name) appconfig_section_option_destroy_non_loaded(&netdata_config, section, name) bool stream_conf_needs_dbengine(struct config *root); -bool stream_conf_has_uuid_section(struct config *root); +bool stream_conf_has_api_enabled(struct config *root); void appconfig_foreach_section(struct config *root, void (*cb)(struct config *root, const char *name, void *data), void *data); diff --git a/src/ml/ml_config.cc b/src/ml/ml_config.cc index 61de1e32a6fe17..7ee94d9495e7a6 100644 --- a/src/ml/ml_config.cc +++ b/src/ml/ml_config.cc @@ -45,7 +45,7 @@ void ml_config_load(ml_config_t *cfg) { std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average"); time_t anomaly_detection_query_duration = config_get_duration_seconds(config_section_ml, "anomaly detection grouping duration", 5 * 60); - size_t num_worker_threads = stream_conf_configured_as_parent() ? get_netdata_cpus() / 4 : 1; + size_t num_worker_threads = stream_conf_is_parent(false) ? get_netdata_cpus() / 4 : 1; if (num_worker_threads < 1) num_worker_threads = 1; else if (num_worker_threads > 256) num_worker_threads = 256; num_worker_threads = config_get_number(config_section_ml, "num training threads", num_worker_threads); diff --git a/src/plugins.d/plugins_d.c b/src/plugins.d/plugins_d.c index 1b7e65cbe1d34d..9bc5fd24ae7910 100644 --- a/src/plugins.d/plugins_d.c +++ b/src/plugins.d/plugins_d.c @@ -262,7 +262,7 @@ void *pluginsd_main(void *ptr) { CLEANUP_FUNCTION_REGISTER(pluginsd_main_cleanup) cleanup_ptr = ptr; int automatic_run = config_get_boolean(CONFIG_SECTION_PLUGINS, "enable running new plugins", 1); - int scan_frequency = (int)config_get_number(CONFIG_SECTION_PLUGINS, "check for new plugins every", 60); + int scan_frequency = (int)config_get_duration_seconds(CONFIG_SECTION_PLUGINS, "check for new plugins every", 60); if (scan_frequency < 1) scan_frequency = 1; diff --git a/src/streaming/replication.c b/src/streaming/replication.c index 09b3a61387e908..afdec4f9de7064 100644 --- a/src/streaming/replication.c +++ b/src/streaming/replication.c @@ -1889,7 +1889,7 @@ void *replication_thread_main(void *ptr) { replication_initialize_workers(true); - int threads = stream_conf_configured_as_parent() ? (int)(get_netdata_cpus() / 2) : 1; + int threads = stream_conf_is_parent(false) ? (int)(get_netdata_cpus() / 2) : 1; if (threads < 1) threads = 1; else if (threads > MAX_REPLICATION_THREADS) threads = MAX_REPLICATION_THREADS; diff --git a/src/streaming/stream-conf.c b/src/streaming/stream-conf.c index 71165a683abd2b..24cbc37d067573 100644 --- a/src/streaming/stream-conf.c +++ b/src/streaming/stream-conf.c @@ -82,6 +82,7 @@ static void stream_conf_load_internal() { appconfig_move_everywhere(&stream_config, "default postpone alarms on connect seconds", "postpone alerts on connect"); appconfig_move_everywhere(&stream_config, "postpone alarms on connect seconds", "postpone alerts on connect"); appconfig_move_everywhere(&stream_config, "health enabled by default", "health enabled"); + appconfig_move_everywhere(&stream_config, "buffer size bytes", "buffer size"); } bool stream_conf_receiver_needs_dbengine(void) { @@ -119,8 +120,8 @@ void stream_conf_load() { config_get_duration_seconds(CONFIG_SECTION_DB, "replication step", stream_receive.replication.step); - stream_send.buffer_max_size = (size_t)appconfig_get_number( - &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", + stream_send.buffer_max_size = (size_t)appconfig_get_size_bytes( + &stream_config, CONFIG_SECTION_STREAM, "buffer size", stream_send.buffer_max_size); stream_send.parents.reconnect_delay_s = (unsigned int)appconfig_get_duration_seconds( @@ -185,8 +186,15 @@ void stream_conf_load() { } } -bool stream_conf_configured_as_parent() { - return stream_conf_has_uuid_section(&stream_config); +bool stream_conf_is_parent(bool recheck) { + static bool rc = false, queried = false; + if(!recheck && queried) + return rc; + + rc = stream_conf_has_api_enabled(&stream_config); + queried = true; + + return rc; } void stream_conf_receiver_config(struct receiver_state *rpt, struct stream_receiver_config *config, const char *api_key, const char *machine_guid) { diff --git a/src/streaming/stream-conf.h b/src/streaming/stream-conf.h index ab1b2712115815..0cad0cc06e3489 100644 --- a/src/streaming/stream-conf.h +++ b/src/streaming/stream-conf.h @@ -85,7 +85,7 @@ void stream_conf_receiver_config(struct receiver_state *rpt, struct stream_recei void stream_conf_load(); bool stream_conf_receiver_needs_dbengine(); -bool stream_conf_configured_as_parent(); +bool stream_conf_is_parent(bool recheck); bool stream_conf_is_key_type(const char *api_key, const char *type); bool stream_conf_api_key_is_enabled(const char *api_key, bool enabled); diff --git a/src/streaming/stream.conf b/src/streaming/stream.conf index 7232b3a7ab9120..f54dba1f85f9f8 100644 --- a/src/streaming/stream.conf +++ b/src/streaming/stream.conf @@ -80,7 +80,7 @@ # The buffer to use for sending metrics. # 10MB is good for 60 seconds of data, so increase this if you expect latencies. # The buffer is flushed on reconnects (this will not prevent gaps at the charts). - #buffer size bytes = 10485760 + #buffer size = 10MiB # If the connection fails, or it disconnects, # retry after that many seconds (randomized from 5s to whatever is here). diff --git a/src/web/api/queries/backfill.c b/src/web/api/queries/backfill.c index 8b5ee43a485b1e..4f09aeee98a9e4 100644 --- a/src/web/api/queries/backfill.c +++ b/src/web/api/queries/backfill.c @@ -228,5 +228,5 @@ void *backfill_thread(void *ptr) { } bool backfill_threads_detect_from_stream_conf(void) { - return stream_conf_configured_as_parent(); + return stream_conf_is_parent(false); } diff --git a/src/web/server/static/static-threaded.c b/src/web/server/static/static-threaded.c index b2609fa3b278db..246be4994913a3 100644 --- a/src/web/server/static/static-threaded.c +++ b/src/web/server/static/static-threaded.c @@ -506,26 +506,7 @@ void *socket_listen_main_static_threaded(void *ptr) { netdata_ssl_initialize_ctx(NETDATA_SSL_WEB_SERVER_CTX); - // 6 threads is the optimal value - // since 6 are the parallel connections browsers will do - // so, if the machine has more CPUs, avoid using resources unnecessarily - int def_thread_count = (int)get_netdata_cpus(); - if(def_thread_count < 6) def_thread_count = 6; - - if (!strcmp(config_get(CONFIG_SECTION_WEB, "mode", ""),"single-threaded")) { - netdata_log_info("Running web server with one thread, because mode is single-threaded"); - config_set(CONFIG_SECTION_WEB, "mode", "static-threaded"); - def_thread_count = 1; - } - static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count); - - if (static_threaded_workers_count < 1) static_threaded_workers_count = 1; - - // See https://github.com/netdata/netdata/issues/11081#issuecomment-831998240 for more details - if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) { - static_threaded_workers_count = 1; - netdata_log_info("You are running an OpenSSL older than 1.1.0, web server will not enable multithreading."); - } + static_threaded_workers_count = netdata_conf_web_query_threads(); size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 4));