Skip to content

Commit

Permalink
Streaming improvements No 7 (netdata#19204)
Browse files Browse the repository at this point in the history
* db ram pulse statistics using an API

* dbengine total memory should be available in basic pulse statistics; buffers sum should be consistent with the buffers chart

* unification of threads count for aclk and the web server; now we use 2x the cores for parents, 1x the cores for children

* retention should be a duration

* buffer size should be a size

* up to 5 replication workers work on the same host

* up to 2 replication workers work on the same host

* Revert "up to 2 replication workers work on the same host"

This reverts commit 314e586.

* Revert "up to 5 replication workers work on the same host"

This reverts commit 55d253b.
  • Loading branch information
ktsaou authored Dec 14, 2024
1 parent 8560701 commit fad05b0
Show file tree
Hide file tree
Showing 30 changed files with 184 additions and 103 deletions.
8 changes: 6 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1086,8 +1086,8 @@ set(DAEMON_FILES
src/daemon/pulse/pulse-daemon-memory.h
src/daemon/pulse/pulse-sqlite3.c
src/daemon/pulse/pulse-sqlite3.h
src/daemon/pulse/pulse-dbengine.c
src/daemon/pulse/pulse-dbengine.h
src/daemon/pulse/pulse-db-dbengine.c
src/daemon/pulse/pulse-db-dbengine.h
src/daemon/pulse/pulse-string.c
src/daemon/pulse/pulse-string.h
src/daemon/pulse/pulse-heartbeat.c
Expand Down Expand Up @@ -1118,6 +1118,10 @@ set(DAEMON_FILES
src/daemon/daemon-shutdown.h
src/daemon/daemon-service.c
src/daemon/daemon-service.h
src/daemon/pulse/pulse-db-rrd.c
src/daemon/pulse/pulse-db-rrd.h
src/daemon/config/netdata-conf-cloud.c
src/daemon/config/netdata-conf-cloud.h
)

set(H2O_FILES
Expand Down
2 changes: 1 addition & 1 deletion src/daemon/analytics.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ void analytics_gather_mutable_meta_data(void)
analytics_alarms_notifications();

analytics_set_data(
&analytics_data.netdata_config_is_parent, (rrdhost_hosts_available() > 1 || stream_conf_configured_as_parent()) ? "true" : "false");
&analytics_data.netdata_config_is_parent, (rrdhost_hosts_available() > 1 || stream_conf_is_parent(false)) ? "true" : "false");

analytics_set_data(&analytics_data.netdata_host_agent_claimed, is_agent_claimed() ? "true" : "false");

Expand Down
13 changes: 8 additions & 5 deletions src/daemon/config/netdata-conf-backwards-compatibility.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ void netdata_conf_backwards_compatibility(void) {
config_move(CONFIG_SECTION_DB, "page cache size",
CONFIG_SECTION_DB, "dbengine page cache size MB");

config_move(CONFIG_SECTION_GLOBAL, "page cache uses malloc",
CONFIG_SECTION_DB, "dbengine page cache with malloc");

config_move(CONFIG_SECTION_DB, "page cache with malloc",
CONFIG_SECTION_DB, "dbengine page cache with malloc");
// config_move(CONFIG_SECTION_GLOBAL, "page cache uses malloc",
// CONFIG_SECTION_DB, "dbengine page cache with malloc");
//
// config_move(CONFIG_SECTION_DB, "page cache with malloc",
// CONFIG_SECTION_DB, "dbengine page cache with malloc");

config_move(CONFIG_SECTION_GLOBAL, "memory deduplication (ksm)",
CONFIG_SECTION_DB, "memory deduplication (ksm)");
Expand Down Expand Up @@ -177,6 +177,9 @@ void netdata_conf_backwards_compatibility(void) {
config_move(CONFIG_SECTION_GLOBAL, "enable zero metrics",
CONFIG_SECTION_DB, "enable zero metrics");

config_move(CONFIG_SECTION_CLOUD, "query thread count",
CONFIG_SECTION_CLOUD, "query threads");

// ----------------------------------------------------------------------------------------------------------------
// global statistics -> telemetry -> pulse

Expand Down
18 changes: 18 additions & 0 deletions src/daemon/config/netdata-conf-cloud.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "netdata-conf-cloud.h"
#include "../common.h"

int netdata_conf_cloud_query_threads(void) {
int cpus = MIN(get_netdata_cpus(), 256); // max 256 cores
int threads = MIN(cpus * (stream_conf_is_parent(false) ? 2 : 1), libuv_worker_threads / 2);
threads = MAX(threads, 6);

threads = config_get_number(CONFIG_SECTION_CLOUD, "query threads", threads);
if(threads < 1) {
netdata_log_error("[" CONFIG_SECTION_CLOUD "].query threads in netdata.conf needs to be at least 1. Overwriting it.");
threads = 1;
config_set_number(CONFIG_SECTION_CLOUD, "query threads", threads);
}
return threads;
}
10 changes: 10 additions & 0 deletions src/daemon/config/netdata-conf-cloud.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_NETDATA_CONF_CLOUD_H
#define NETDATA_NETDATA_CONF_CLOUD_H

#include "libnetdata/libnetdata.h"

int netdata_conf_cloud_query_threads(void);

#endif //NETDATA_NETDATA_CONF_CLOUD_H
4 changes: 2 additions & 2 deletions src/daemon/config/netdata-conf-db.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,13 @@ void netdata_conf_section_db(void) {
// get default database size

if(default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && default_rrd_memory_mode != RRD_MEMORY_MODE_NONE) {
default_rrd_history_entries = (int)config_get_number(
default_rrd_history_entries = (int)config_get_duration_seconds(
CONFIG_SECTION_DB, "retention",
align_entries_to_pagesize(default_rrd_memory_mode, RRD_DEFAULT_HISTORY_ENTRIES));

long h = align_entries_to_pagesize(default_rrd_memory_mode, default_rrd_history_entries);
if (h != default_rrd_history_entries) {
config_set_number(CONFIG_SECTION_DB, "retention", h);
config_set_duration_seconds(CONFIG_SECTION_DB, "retention", h);
default_rrd_history_entries = (int)h;
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/daemon/config/netdata-conf-web.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@
#include "netdata-conf-web.h"
#include "daemon/static_threads.h"

int netdata_conf_web_query_threads(void) {
// See https://github.com/netdata/netdata/issues/11081#issuecomment-831998240 for more details
if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) {
config_set_number(CONFIG_SECTION_WEB, "web server threads", 1);
netdata_log_info("You are running an OpenSSL older than 1.1.0, web server will not enable multithreading.");
return 1;
}

int cpus = MIN(get_netdata_cpus(), 256); // max 256 cores
int threads = cpus * (stream_conf_is_parent(false) ? 2 : 1);
threads = MAX(threads, 6);

threads = config_get_number(CONFIG_SECTION_WEB, "web server threads", threads);
if(threads < 1) {
netdata_log_error("[" CONFIG_SECTION_WEB "].web server threads in netdata.conf needs to be at least 1. Overwriting it.");
threads = 1;
config_set_number(CONFIG_SECTION_WEB, "web server threads", threads);
}
return threads;
}

static int make_dns_decision(const char *section_name, const char *config_name, const char *default_value, SIMPLE_PATTERN *p) {
const char *value = config_get(section_name,config_name,default_value);

Expand Down Expand Up @@ -141,3 +162,4 @@ void netdata_conf_web_security_init(void) {

netdata_ssl_initialize_openssl();
}

2 changes: 2 additions & 0 deletions src/daemon/config/netdata-conf-web.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ void netdata_conf_section_web(void);
void web_server_threading_selection(void);
void netdata_conf_web_security_init(void);

int netdata_conf_web_query_threads(void);

#endif //NETDATA_NETDATA_CONF_WEB_H
1 change: 1 addition & 0 deletions src/daemon/config/netdata-conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bool netdata_conf_load(char *filename, char overwrite_used, const char **user);
#include "netdata-conf-global.h"
#include "netdata-conf-logs.h"
#include "netdata-conf-web.h"
#include "netdata-conf-cloud.h"

#include "daemon/common.h"

Expand Down
25 changes: 15 additions & 10 deletions src/daemon/pulse/pulse-daemon-memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ struct netdata_buffers_statistics netdata_buffers_statistics = {};
void pulse_daemon_memory_do(bool extended) {
{
static RRDSET *st_memory = NULL;
static RRDDIM *rd_database = NULL;
static RRDDIM *rd_db_dbengine = NULL;
static RRDDIM *rd_db_rrd = NULL;

#ifdef DICT_WITH_STATS
static RRDDIM *rd_collectors = NULL;
static RRDDIM *rd_rrdhosts = NULL;
Expand Down Expand Up @@ -50,7 +52,8 @@ void pulse_daemon_memory_do(bool extended) {
localhost->rrd_update_every,
RRDSET_TYPE_STACKED);

rd_database = rrddim_add(st_memory, "db", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_db_dbengine = rrddim_add(st_memory, "dbengine", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_db_rrd = rrddim_add(st_memory, "rrd", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);

#ifdef DICT_WITH_STATS
rd_collectors = rrddim_add(st_memory, "collectors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
Expand All @@ -75,8 +78,9 @@ void pulse_daemon_memory_do(bool extended) {
rd_other = rrddim_add(st_memory, "other", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}

// each of these should also be analyzed below at the buffers chart
size_t buffers =
netdata_buffers_statistics.query_targets_size +
netdata_buffers_statistics.query_targets_size + onewayalloc_allocated_memory() +
netdata_buffers_statistics.rrdset_done_rda_size +
netdata_buffers_statistics.buffers_aclk +
netdata_buffers_statistics.buffers_api +
Expand All @@ -92,8 +96,8 @@ void pulse_daemon_memory_do(bool extended) {
size_t strings = 0;
string_statistics(NULL, NULL, NULL, NULL, NULL, &strings, NULL, NULL);

rrddim_set_by_pointer(st_memory, rd_database,
(collected_number)pulse_dbengine_total_memory + (collected_number)rrddim_db_memory_size);
rrddim_set_by_pointer(st_memory, rd_db_dbengine, (collected_number)pulse_dbengine_total_memory);
rrddim_set_by_pointer(st_memory, rd_db_rrd, (collected_number)pulse_rrd_memory_size);

#ifdef DICT_WITH_STATS
rrddim_set_by_pointer(st_memory, rd_collectors,
Expand Down Expand Up @@ -157,11 +161,11 @@ void pulse_daemon_memory_do(bool extended) {
rrddim_set_by_pointer(st_memory, rd_aral,
(collected_number)aral_by_size_structures_bytes());

rrddim_set_by_pointer(st_memory,
rd_judy, (collected_number) judy_aral_structures());
rrddim_set_by_pointer(st_memory, rd_judy,
(collected_number) judy_aral_structures());

rrddim_set_by_pointer(st_memory,
rd_other, (collected_number)dictionary_stats_memory_total(dictionary_stats_category_other));
rrddim_set_by_pointer(st_memory, rd_other,
(collected_number)dictionary_stats_memory_total(dictionary_stats_category_other));

rrdset_done(st_memory);
}
Expand Down Expand Up @@ -221,6 +225,7 @@ void pulse_daemon_memory_do(bool extended) {
rd_buffers_judy = rrddim_add(st_memory_buffers, "aral-judy free", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}

// the sum of all these needs to be above at the total buffers calculation
rrddim_set_by_pointer(st_memory_buffers, rd_queries, (collected_number)netdata_buffers_statistics.query_targets_size + (collected_number) onewayalloc_allocated_memory());
rrddim_set_by_pointer(st_memory_buffers, rd_collectors, (collected_number)netdata_buffers_statistics.rrdset_done_rda_size);
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_aclk, (collected_number)netdata_buffers_statistics.buffers_aclk);
Expand All @@ -231,8 +236,8 @@ void pulse_daemon_memory_do(bool extended) {
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_health, (collected_number)netdata_buffers_statistics.buffers_health);
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_streaming, (collected_number)netdata_buffers_statistics.buffers_streaming);
rrddim_set_by_pointer(st_memory_buffers, rd_cbuffers_streaming, (collected_number)netdata_buffers_statistics.cbuffers_streaming);
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_replication, (collected_number)replication_allocated_buffers());
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_web, (collected_number)netdata_buffers_statistics.buffers_web);
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_replication, (collected_number)replication_allocated_buffers());
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_aral, (collected_number)aral_by_size_free_bytes());
rrddim_set_by_pointer(st_memory_buffers, rd_buffers_judy, (collected_number)judy_aral_free_bytes());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#define PULSE_INTERNALS 1
#include "pulse-dbengine.h"
#include "pulse-db-dbengine.h"

size_t pulse_dbengine_total_memory = 0;

Expand Down Expand Up @@ -638,11 +638,7 @@ static void dbengine2_cache_statistics_charts(struct dbengine2_cache_pointers *p
}
}


void pulse_dbengine_do(bool extended) {
if(!main_cache || !main_mrg || !extended)
return;

static struct dbengine2_cache_pointers main_cache_ptrs = {}, open_cache_ptrs = {}, extent_cache_ptrs = {};
static struct rrdeng_cache_efficiency_stats cache_efficiency_stats = {}, cache_efficiency_stats_old = {};
static struct pgc_statistics pgc_main_stats = {}, pgc_main_stats_old = {}; (void)pgc_main_stats_old;
Expand Down Expand Up @@ -686,6 +682,10 @@ void pulse_dbengine_do(bool extended) {
mrg_stats.size +
buffers_total_size + aral_structures_total_size + aral_padding_total_size + pgd_padding_bytes();

// we need all the above for the total dbengine memory as reported by the non-extended netdata memory chart
if(!main_cache || !main_mrg || !extended)
return;

size_t priority = 135000;

{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_PULSE_DBENGINE_H
#define NETDATA_PULSE_DBENGINE_H
#ifndef NETDATA_PULSE_DB_DBENGINE_H
#define NETDATA_PULSE_DB_DBENGINE_H

#include "daemon/common.h"

Expand All @@ -14,4 +14,4 @@ void pulse_dbengine_do(bool extended);

#endif

#endif //NETDATA_PULSE_DBENGINE_H
#endif //NETDATA_PULSE_DB_DBENGINE_H
18 changes: 18 additions & 0 deletions src/daemon/pulse/pulse-db-rrd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "pulse-db-rrd.h"

int64_t pulse_rrd_memory_size = 0;

void pulse_db_rrd_memory_change(int64_t value) {
__atomic_add_fetch(&pulse_rrd_memory_size, value, __ATOMIC_RELAXED);
}

void pulse_db_rrd_memory_add(uint64_t value) {
__atomic_add_fetch(&pulse_rrd_memory_size, value, __ATOMIC_RELAXED);
}

void pulse_db_rrd_memory_sub(uint64_t value) {
__atomic_sub_fetch(&pulse_rrd_memory_size, value, __ATOMIC_RELAXED);
}

16 changes: 16 additions & 0 deletions src/daemon/pulse/pulse-db-rrd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_PULSE_DB_RRD_H
#define NETDATA_PULSE_DB_RRD_H

#include "libnetdata/libnetdata.h"

void pulse_db_rrd_memory_change(int64_t value);
void pulse_db_rrd_memory_add(uint64_t value);
void pulse_db_rrd_memory_sub(uint64_t value);

#if defined(PULSE_INTERNALS)
extern int64_t pulse_rrd_memory_size;
#endif

#endif //NETDATA_PULSE_DB_RRD_H
2 changes: 0 additions & 2 deletions src/daemon/pulse/pulse-dictionary.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ struct dictionary_stats dictionary_stats_category_rrdhealth = { .name = "health"
struct dictionary_stats dictionary_stats_category_functions = { .name = "functions" };
struct dictionary_stats dictionary_stats_category_replication = { .name = "replication" };

size_t rrddim_db_memory_size = 0;

#ifdef DICT_WITH_STATS
struct dictionary_categories {
struct dictionary_stats *stats;
Expand Down
2 changes: 0 additions & 2 deletions src/daemon/pulse/pulse-dictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ extern struct dictionary_stats dictionary_stats_category_rrdhealth;
extern struct dictionary_stats dictionary_stats_category_functions;
extern struct dictionary_stats dictionary_stats_category_replication;

extern size_t rrddim_db_memory_size;

#if defined(PULSE_INTERNALS)
void pulse_dictionary_do(bool extended);
#endif
Expand Down
3 changes: 2 additions & 1 deletion src/daemon/pulse/pulse.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ extern bool pulse_extended_enabled;
#include "pulse-daemon.h"
#include "pulse-daemon-memory.h"
#include "pulse-sqlite3.h"
#include "pulse-dbengine.h"
#include "pulse-db-dbengine.h"
#include "pulse-db-rrd.h"
#include "pulse-string.h"
#include "pulse-heartbeat.h"
#include "pulse-dictionary.h"
Expand Down
Loading

0 comments on commit fad05b0

Please sign in to comment.