Skip to content

Commit

Permalink
Streaming improvements #1 (netdata#19137)
Browse files Browse the repository at this point in the history
* prefer tinysleep over yielding the processor

* split spinlocks to separate files

* rename spinlock initializers

* Optimize ML queuing operations.

- Allocate 25% of cores for ML.
- Split queues by request type.
- Accurate stats for queue operations by type.

* abstracted circular buffer into a new private structure to enable using it in receiver sending side - no features added yet, only abstracted the existing functionality - not tested yet

* completed the abstraction of stream circular buffer

* unified list of receivers and senders; opcodes now support both receivers and senders

* use strings in pluginsd

* stream receivers send data back to the child using the event loop

* do not share pgc aral between caches

* pgc uses 4 to 256 partitions, by default equal to the number of CPU cores

* add forgotten worker job

* workers now monitor spinlock contention

* stream sender tries to lock the sender, but does not wait for it - it will be handled later

* increase the number of web server threads to the number of cpu cores, with a minimum of 6

* use the nowait versions of nd_sock functions

* handle EAGAIN properly

* add spinlock contention tracing for rw_spinlock

* aral lock/unlock contention tracing

* allocate the compressed buffer

* use 128KiB for aral default page size; limit memory protection to 5GiB

* aral uses mmap() for big pages

* enrich log messages

* renamed telemetry to pulse

* unified sender and receiver socket event loops

* logging improvements

* NETDATA_LOG_STREAM_SENDER logs inbound and outbound traffic

* 16k receiver buffer size to improve interactivity

* fix NETDATA_LOG_STREAM_SENDER in sender_execute

* do not stream ML models for charts and dimensions that have not been exposed

* add support for sending QUIT to plugins and waiting for some time for them to quit gracefully

* global spinlock contention per function

* use an aral per pgc partition; use 8 partitions for PGD

* rrdcalc: do not change the frequency of alerts - it uses arbitrary values used during replication, changing permanently the frequency of alerts
replication: use 1/3 of the cores or 1 core every 10 nodes (min of the two)
pgd: use as many aral partitions as the CPU cores, up to 256

* aral does 1 allocation per page (the structure and the elements together), instead of two

* use the evitor thread only when we run out of memory; restore the optimization about prepending or appending clean pages based on their accesses; use the main cache free memory for the other caches, reducing I/O when the main cache has enough room

* reduce the number of events per poll() to 10

* aral allocates pages of up to 1MiB; restore processing 100 events per nd_poll() call

* drain the sockets while reading

* receiver sockets should be non-blocking

* add stability detector to aral

* increase the receivers send buffer

* do not remove the sender or the receiver while we drain the input sockets

---------

Co-authored-by: vkalintiris <[email protected]>
  • Loading branch information
ktsaou and vkalintiris authored Dec 9, 2024
1 parent 8f53c4c commit 9ecf021
Show file tree
Hide file tree
Showing 165 changed files with 3,010 additions and 1,939 deletions.
70 changes: 38 additions & 32 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,10 @@ set(LIBNETDATA_FILES
src/libnetdata/os/system_memory.h
src/libnetdata/socket/nd-poll.c
src/libnetdata/socket/nd-poll.h
src/libnetdata/locks/spinlock.c
src/libnetdata/locks/spinlock.h
src/libnetdata/locks/rw-spinlock.c
src/libnetdata/locks/rw-spinlock.h
)

set(LIBH2O_FILES
Expand Down Expand Up @@ -1037,8 +1041,8 @@ set(DAEMON_FILES
src/daemon/daemon.h
src/daemon/libuv_workers.c
src/daemon/libuv_workers.h
src/daemon/telemetry/telemetry.c
src/daemon/telemetry/telemetry.h
src/daemon/pulse/pulse.c
src/daemon/pulse/pulse.h
src/daemon/analytics.c
src/daemon/analytics.h
src/daemon/main.c
Expand Down Expand Up @@ -1068,36 +1072,36 @@ set(DAEMON_FILES
src/daemon/dyncfg/dyncfg-internals.h
src/daemon/dyncfg/dyncfg-intercept.c
src/daemon/dyncfg/dyncfg-tree.c
src/daemon/telemetry/telemetry-http-api.c
src/daemon/telemetry/telemetry-http-api.h
src/daemon/telemetry/telemetry-queries.c
src/daemon/telemetry/telemetry-queries.h
src/daemon/telemetry/telemetry-ingestion.c
src/daemon/telemetry/telemetry-ingestion.h
src/daemon/telemetry/telemetry-ml.c
src/daemon/telemetry/telemetry-ml.h
src/daemon/telemetry/telemetry-gorilla.c
src/daemon/telemetry/telemetry-gorilla.h
src/daemon/telemetry/telemetry-daemon.c
src/daemon/telemetry/telemetry-daemon.h
src/daemon/telemetry/telemetry-daemon-memory.c
src/daemon/telemetry/telemetry-daemon-memory.h
src/daemon/telemetry/telemetry-sqlite3.c
src/daemon/telemetry/telemetry-sqlite3.h
src/daemon/telemetry/telemetry-dbengine.c
src/daemon/telemetry/telemetry-dbengine.h
src/daemon/telemetry/telemetry-string.c
src/daemon/telemetry/telemetry-string.h
src/daemon/telemetry/telemetry-heartbeat.c
src/daemon/telemetry/telemetry-heartbeat.h
src/daemon/telemetry/telemetry-dictionary.c
src/daemon/telemetry/telemetry-dictionary.h
src/daemon/telemetry/telemetry-workers.c
src/daemon/telemetry/telemetry-workers.h
src/daemon/telemetry/telemetry-trace-allocations.c
src/daemon/telemetry/telemetry-trace-allocations.h
src/daemon/telemetry/telemetry-aral.c
src/daemon/telemetry/telemetry-aral.h
src/daemon/pulse/pulse-http-api.c
src/daemon/pulse/pulse-http-api.h
src/daemon/pulse/pulse-queries.c
src/daemon/pulse/pulse-queries.h
src/daemon/pulse/pulse-ingestion.c
src/daemon/pulse/pulse-ingestion.h
src/daemon/pulse/pulse-ml.c
src/daemon/pulse/pulse-ml.h
src/daemon/pulse/pulse-gorilla.c
src/daemon/pulse/pulse-gorilla.h
src/daemon/pulse/pulse-daemon.c
src/daemon/pulse/pulse-daemon.h
src/daemon/pulse/pulse-daemon-memory.c
src/daemon/pulse/pulse-daemon-memory.h
src/daemon/pulse/pulse-sqlite3.c
src/daemon/pulse/pulse-sqlite3.h
src/daemon/pulse/pulse-dbengine.c
src/daemon/pulse/pulse-dbengine.h
src/daemon/pulse/pulse-string.c
src/daemon/pulse/pulse-string.h
src/daemon/pulse/pulse-heartbeat.c
src/daemon/pulse/pulse-heartbeat.h
src/daemon/pulse/pulse-dictionary.c
src/daemon/pulse/pulse-dictionary.h
src/daemon/pulse/pulse-workers.c
src/daemon/pulse/pulse-workers.h
src/daemon/pulse/pulse-trace-allocations.c
src/daemon/pulse/pulse-trace-allocations.h
src/daemon/pulse/pulse-aral.c
src/daemon/pulse/pulse-aral.h
src/daemon/config/netdata-conf-db.c
src/daemon/config/netdata-conf-db.h
src/daemon/config/netdata-conf.h
Expand Down Expand Up @@ -1543,6 +1547,8 @@ set(STREAMING_PLUGIN_FILES
src/streaming/stream-receiver-connection.c
src/streaming/stream-sender-commit.h
src/streaming/stream-traffic-types.h
src/streaming/stream-circular-buffer.c
src/streaming/stream-circular-buffer.h
)

set(WEB_PLUGIN_FILES
Expand Down
2 changes: 1 addition & 1 deletion src/aclk/aclk_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct pending_req_list {
};

static struct pending_req_list *pending_req_list_head = NULL;
static SPINLOCK pending_req_list_lock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK pending_req_list_lock = SPINLOCK_INITIALIZER;

void aclk_config_get_query_scope(void) {
const char *s = config_get(CONFIG_SECTION_CLOUD, "scope", "full");
Expand Down
2 changes: 1 addition & 1 deletion src/aclk/https_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include "aclk_util.h"

#include "daemon/telemetry/telemetry.h"
#include "daemon/pulse/pulse.h"

static const char *http_req_type_to_str(http_req_type_t req) {
switch (req) {
Expand Down
4 changes: 2 additions & 2 deletions src/claim/claim-with-api.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ static bool send_curl_request(const char *machine_guid, const char *hostname, co
}

bool claim_agent(const char *url, const char *token, const char *rooms, const char *proxy, bool insecure) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;
spinlock_lock(&spinlock);

if (!check_and_generate_certificates()) {
Expand Down Expand Up @@ -411,7 +411,7 @@ bool claim_agent_from_environment(void) {

bool claim_agent_from_claim_conf(void) {
static struct config claim_config = APPCONFIG_INITIALIZER;
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;
bool ret = false;

spinlock_lock(&spinlock);
Expand Down
2 changes: 1 addition & 1 deletion src/claim/claim_id.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ static struct {
ND_UUID claim_uuid;
ND_UUID claim_uuid_saved;
} claim = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.spinlock = SPINLOCK_INITIALIZER,
};

void claim_id_clear_previous_working(void) {
Expand Down
2 changes: 1 addition & 1 deletion src/collectors/cgroups.plugin/cgroup-network.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ static void read_from_spawned(SPAWN_INSTANCE *si, const char *name __maybe_unuse
}
fclose(fp);
spawn_server_instance_read_fd_unset(si);
spawn_server_exec_kill(spawn_server, si);
spawn_server_exec_kill(spawn_server, si, 0);
}

void detect_veth_interfaces_spawn(pid_t pid) {
Expand Down
2 changes: 1 addition & 1 deletion src/collectors/ebpf.plugin/ebpf_apps.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ int ebpf_read_apps_groups_conf(struct ebpf_target **agdt, struct ebpf_target **a
#define MAX_CMDLINE 16384

Pvoid_t ebpf_pid_judyL = NULL;
SPINLOCK ebpf_pid_spinlock = NETDATA_SPINLOCK_INITIALIZER;
SPINLOCK ebpf_pid_spinlock = SPINLOCK_INITIALIZER;

void ebpf_pid_del(pid_t pid)
{
Expand Down
4 changes: 2 additions & 2 deletions src/collectors/freeipmi.plugin/freeipmi_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ int main (int argc, char **argv) {
struct ipmi_collection_thread sensors_data = {
.type = IPMI_COLLECT_TYPE_SENSORS,
.freq_s = update_every,
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.spinlock = SPINLOCK_INITIALIZER,
.debug = debug,
.state = {
.debug = debug,
Expand All @@ -1974,7 +1974,7 @@ int main (int argc, char **argv) {
}, sel_data = {
.type = IPMI_COLLECT_TYPE_SEL,
.freq_s = update_every_sel,
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.spinlock = SPINLOCK_INITIALIZER,
.debug = debug,
.state = {
.debug = debug,
Expand Down
2 changes: 1 addition & 1 deletion src/collectors/proc.plugin/proc_net_dev_renames.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static void dictionary_netdev_rename_delete_cb(const DICTIONARY_ITEM *item __may
}

void netdev_renames_init(void) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;

spinlock_lock(&spinlock);
if(!netdev_renames) {
Expand Down
4 changes: 2 additions & 2 deletions src/collectors/statsd.plugin/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -2654,7 +2654,7 @@ void *statsd_main(void *ptr) {
RRDSET *st_pcharts = NULL;
RRDDIM *rd_pcharts = NULL;

if(telemetry_enabled) {
if(pulse_enabled) {
st_metrics = rrdset_create_localhost(
"netdata",
"statsd_metrics",
Expand Down Expand Up @@ -2851,7 +2851,7 @@ void *statsd_main(void *ptr) {
if(unlikely(!service_running(SERVICE_COLLECTORS)))
break;

if(telemetry_enabled) {
if(pulse_enabled) {
rrddim_set_by_pointer(st_metrics, rd_metrics_gauge, (collected_number)statsd.gauges.metrics);
rrddim_set_by_pointer(st_metrics, rd_metrics_counter, (collected_number)statsd.counters.metrics);
rrddim_set_by_pointer(st_metrics, rd_metrics_timer, (collected_number)statsd.timers.metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ int filenames_compar(const void *a, const void *b) {
}

void journal_files_registry_update(void) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;

if(spinlock_trylock(&spinlock)) {
usec_t scan_monotonic_ut = now_monotonic_usec();
Expand Down
2 changes: 1 addition & 1 deletion src/collectors/tc.plugin/plugin_tc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ void *tc_main(void *ptr) {
}

// fgets() failed or loop broke
int code = spawn_popen_kill(tc_child_instance);
int code = spawn_popen_kill(tc_child_instance, 0);
tc_child_instance = NULL;

if(unlikely(device)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static struct {
ARAL *aral_providers;
ARAL *aral_handles;
} pbc = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.spinlock = SPINLOCK_INITIALIZER,
};

static void provider_load_list(PROVIDER_META_HANDLE *h, WEVT_VARIANT *content, WEVT_VARIANT *property,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ WEVT_SOURCE_TYPE categorize_channel(const wchar_t *channel_path, const char **pr
}

void wevt_sources_scan(void) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;
LPWSTR channel = NULL;
EVT_HANDLE hChannelEnum = NULL;

Expand Down
2 changes: 1 addition & 1 deletion src/daemon/buildinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ __attribute__((constructor)) void initialize_build_info(void) {
int get_system_info(struct rrdhost_system_info *system_info);
static void populate_system_info(void) {
static bool populated = false;
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;

if(populated)
return;
Expand Down
2 changes: 1 addition & 1 deletion src/daemon/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern "C" {
#include "daemon/config/netdata-conf.h"
#include "daemon/dyncfg/dyncfg.h"

#include "daemon/telemetry/telemetry.h"
#include "daemon/pulse/pulse.h"

// health monitoring and alarm notifications
#include "health/health.h"
Expand Down
20 changes: 16 additions & 4 deletions src/daemon/config/netdata-conf-backwards-compatibility.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,26 @@ void netdata_conf_backwards_compatibility(void) {
config_move(CONFIG_SECTION_GLOBAL, "enable zero metrics",
CONFIG_SECTION_DB, "enable zero metrics");

config_move("global statistics", "update every",
CONFIG_SECTION_TELEMETRY, "update every");
// ----------------------------------------------------------------------------------------------------------------
// global statistics -> telemetry -> pulse

config_move(CONFIG_SECTION_PLUGINS, "netdata monitoring",
CONFIG_SECTION_PLUGINS, "netdata telemetry");
CONFIG_SECTION_PLUGINS, "netdata pulse");

config_move(CONFIG_SECTION_PLUGINS, "netdata telemetry",
CONFIG_SECTION_PLUGINS, "netdata pulse");

config_move(CONFIG_SECTION_PLUGINS, "netdata monitoring extended",
CONFIG_SECTION_TELEMETRY, "extended telemetry");
CONFIG_SECTION_PULSE, "extended");

config_move("telemetry", "extended telemetry",
CONFIG_SECTION_PULSE, "extended");

config_move("global statistics", "update every",
CONFIG_SECTION_PULSE, "update every");

config_move("telemetry", "update every",
CONFIG_SECTION_PULSE, "update every");


// ----------------------------------------------------------------------------------------------------------------
Expand Down
5 changes: 4 additions & 1 deletion src/daemon/config/netdata-conf-db.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ void netdata_conf_dbengine_init(const char *hostname) {
OS_SYSTEM_MEMORY sm = os_system_memory(true);
if(sm.ram_total_bytes && sm.ram_available_bytes && sm.ram_total_bytes > sm.ram_available_bytes) {
// calculate the default out of memory protection size
uint64_t keep_free = sm.ram_total_bytes / 10;
if(keep_free > 5ULL * 1024 * 1024 * 1024)
keep_free = 5ULL * 1024 * 1024 * 1024;
char buf[64];
size_snprintf(buf, sizeof(buf), sm.ram_total_bytes / 10, "B", false);
size_snprintf(buf, sizeof(buf), keep_free, "B", false);
size_parse(buf, &dbengine_out_of_memory_protection, "B");
}

Expand Down
10 changes: 5 additions & 5 deletions src/daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1424,14 +1424,14 @@ int netdata_main(int argc, char **argv) {
default_stacksize = 1 * 1024 * 1024;

#ifdef NETDATA_INTERNAL_CHECKS
telemetry_enabled = true;
telemetry_extended_enabled = true;
pulse_enabled = true;
pulse_extended_enabled = true;
#endif

telemetry_extended_enabled =
config_get_boolean(CONFIG_SECTION_TELEMETRY, "extended telemetry", telemetry_extended_enabled);
pulse_extended_enabled =
config_get_boolean(CONFIG_SECTION_PULSE, "extended", pulse_extended_enabled);

if(telemetry_extended_enabled)
if(pulse_extended_enabled)
// this has to run before starting any other threads that use workers
workers_utilization_enable();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#define TELEMETRY_INTERNALS 1
#include "telemetry-aral.h"
#define PULSE_INTERNALS 1
#include "pulse-aral.h"

struct aral_info {
const char *name;
Expand All @@ -19,7 +19,7 @@ static struct {
ARAL_STATS_JudyLSet idx;
} globals = { 0 };

static void telemetry_aral_register_statistics(struct aral_statistics *stats, const char *name) {
static void pulse_aral_register_statistics(struct aral_statistics *stats, const char *name) {
if(!name || !stats)
return;

Expand All @@ -33,18 +33,18 @@ static void telemetry_aral_register_statistics(struct aral_statistics *stats, co
spinlock_unlock(&globals.spinlock);
}

void telemetry_aral_register(ARAL *ar, const char *name) {
void pulse_aral_register(ARAL *ar, const char *name) {
if(!ar) return;

if(!name)
name = aral_name(ar);

struct aral_statistics *stats = aral_get_statistics(ar);

telemetry_aral_register_statistics(stats, name);
pulse_aral_register_statistics(stats, name);
}

void telemetry_aral_unregister(ARAL *ar) {
void pulse_aral_unregister(ARAL *ar) {
if(!ar) return;
struct aral_statistics *stats = aral_get_statistics(ar);

Expand All @@ -58,11 +58,11 @@ void telemetry_aral_unregister(ARAL *ar) {
spinlock_unlock(&globals.spinlock);
}

void telemerty_aral_init(void) {
telemetry_aral_register_statistics(aral_by_size_statistics(), "by-size");
void pulse_aral_init(void) {
pulse_aral_register_statistics(aral_by_size_statistics(), "by-size");
}

void telemetry_aral_do(bool extended) {
void pulse_aral_do(bool extended) {
if(!extended) return;

spinlock_lock(&globals.spinlock);
Expand Down Expand Up @@ -111,7 +111,7 @@ void telemetry_aral_do(bool extended) {
"Array Allocator Memory Utilization",
"bytes",
"netdata",
"telemetry",
"pulse",
910000,
localhost->rrd_update_every,
RRDSET_TYPE_STACKED);
Expand Down Expand Up @@ -145,7 +145,7 @@ void telemetry_aral_do(bool extended) {
"Array Allocator Memory Utilization",
"%",
"netdata",
"telemetry",
"pulse",
910001,
localhost->rrd_update_every,
RRDSET_TYPE_LINE);
Expand Down
Loading

0 comments on commit 9ecf021

Please sign in to comment.