From a801fd3dcaa892593033d3108bb6d4eba7db6a5d Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Tue, 31 Dec 2024 10:13:19 +0000 Subject: [PATCH] Waiting Queue (#19302) * add strict checks to waiting queue * waiting queue implementation purely using atomics * trylock should insist if we are the potential winner --- CMakeLists.txt | 4 +- .../systemd-journal-annotations.c | 1 - src/database/engine/cache.c | 62 +-- src/libnetdata/libnetdata.h | 2 +- src/libnetdata/locks/spinlock.c | 2 +- src/libnetdata/locks/waitq.c | 264 ++++++++++ src/libnetdata/locks/waitq.h | 64 +++ src/libnetdata/log/nd_log.c | 5 +- src/libnetdata/waiting-queue/waiting-queue.c | 485 ------------------ src/libnetdata/waiting-queue/waiting-queue.h | 60 --- src/streaming/stream-sender-api.c | 5 +- src/streaming/stream-sender-commit.c | 16 +- src/streaming/stream-sender-internals.h | 2 +- 13 files changed, 377 insertions(+), 595 deletions(-) create mode 100644 src/libnetdata/locks/waitq.c create mode 100644 src/libnetdata/locks/waitq.h delete mode 100644 src/libnetdata/waiting-queue/waiting-queue.c delete mode 100644 src/libnetdata/waiting-queue/waiting-queue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 48bd4d97a4662e..460aaa2759a447 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -949,8 +949,8 @@ set(LIBNETDATA_FILES src/libnetdata/locks/rw-spinlock.h src/libnetdata/atomics/atomic_flags.h src/libnetdata/atomics/atomics.h - src/libnetdata/waiting-queue/waiting-queue.c - src/libnetdata/waiting-queue/waiting-queue.h + src/libnetdata/locks/waitq.c + src/libnetdata/locks/waitq.h src/libnetdata/object-state/object-state.c src/libnetdata/object-state/object-state.h ) diff --git a/src/collectors/systemd-journal.plugin/systemd-journal-annotations.c b/src/collectors/systemd-journal.plugin/systemd-journal-annotations.c index 440138d5f98131..a72e57981c27e3 100644 --- a/src/collectors/systemd-journal.plugin/systemd-journal-annotations.c +++ b/src/collectors/systemd-journal.plugin/systemd-journal-annotations.c @@ -611,7 +611,6 @@ static void netdata_systemd_journal_message_ids_init(void) { msgid_into_dict("9ce0cb58ab8b44df82c4bf1ad9ee22de", "Netdata alert transition"); msgid_into_dict("6db0018e83e34320ae2a659d78019fb7", "Netdata alert notification"); msgid_into_dict("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"); - msgid_into_dict("8ddaf5ba33a74078b609250db1e951f3", "Sensor state transition"); } diff --git a/src/database/engine/cache.c b/src/database/engine/cache.c index 7ebddcd6ac903b..99a135caebfab9 100644 --- a/src/database/engine/cache.c +++ b/src/database/engine/cache.c @@ -22,8 +22,7 @@ typedef int32_t REFCOUNT; #define PGC_WITH_ARAL 1 #endif -// unfortunately waiting queue is significantly slower than spinlock -// #define PGC_QUEUE_LOCK_AS_WAITING_QUEUE 1 +#define PGC_QUEUE_LOCK_AS_WAITING_QUEUE 1 typedef enum __attribute__ ((__packed__)) { // mutually exclusive flags @@ -75,7 +74,7 @@ struct pgc_page { struct pgc_queue { #if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE) - WAITING_QUEUE *wq; + WAITQ wq; #else SPINLOCK spinlock; #endif @@ -246,17 +245,17 @@ static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) { _result; \ }) -#define PGC_QUEUE_LOCK_PRIO_COLLECTORS WAITING_QUEUE_PRIO_URGENT -#define PGC_QUEUE_LOCK_PRIO_EVICTORS WAITING_QUEUE_PRIO_HIGH -#define PGC_QUEUE_LOCK_PRIO_FLUSHERS WAITING_QUEUE_PRIO_NORMAL -#define PGC_QUEUE_LOCK_PRIO_OTHERS WAITING_QUEUE_PRIO_LOW +#define PGC_QUEUE_LOCK_PRIO_COLLECTORS WAITQ_PRIO_URGENT +#define PGC_QUEUE_LOCK_PRIO_EVICTORS WAITQ_PRIO_HIGH +#define PGC_QUEUE_LOCK_PRIO_FLUSHERS WAITQ_PRIO_NORMAL +#define PGC_QUEUE_LOCK_PRIO_LOW WAITQ_PRIO_LOW #if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE) -#define pgc_queue_trylock(cache, ll) waiting_queue_try_acquire((ll)->wq) -#define pgc_queue_lock(cache, ll, prio) waiting_queue_acquire((ll)->wq, prio) -#define pgc_queue_unlock(cache, ll) waiting_queue_release((ll)->wq) +#define pgc_queue_trylock(cache, ll, prio) waitq_try_acquire(&((ll)->wq), prio) +#define pgc_queue_lock(cache, ll, prio) waitq_acquire(&((ll)->wq), prio) +#define pgc_queue_unlock(cache, ll) waitq_release(&((ll)->wq)) #else -#define pgc_queue_trylock(cache, ll) spinlock_trylock(&((ll)->spinlock)) +#define pgc_queue_trylock(cache, ll, prio) spinlock_trylock(&((ll)->spinlock)) #define pgc_queue_lock(cache, ll, prio) spinlock_lock(&((ll)->spinlock)) #define pgc_queue_unlock(cache, ll) spinlock_unlock(&((ll)->spinlock)) #endif @@ -608,7 +607,7 @@ static inline void pgc_stats_index_judy_change(PGC *cache, size_t mem_before_jud } } -static void pgc_queue_add(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, WAITING_QUEUE_PRIORITY prio __maybe_unused) { +static void pgc_queue_add(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, WAITQ_PRIORITY prio __maybe_unused) { if(!having_lock) pgc_queue_lock(cache, q, prio); @@ -677,7 +676,8 @@ static void pgc_queue_add(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PA pgc_size_histogram_add(cache, &q->stats->size_histogram, page); } -static void pgc_queue_del(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, WAITING_QUEUE_PRIORITY prio __maybe_unused) { +static void pgc_queue_del(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, + WAITQ_PRIORITY prio __maybe_unused) { if(cache->config.stats) pgc_size_histogram_del(cache, &q->stats->size_histogram, page); @@ -736,7 +736,7 @@ static inline void page_has_been_accessed(PGC *cache, PGC_PAGE *page) { __atomic_add_fetch(&page->accesses, 1, __ATOMIC_RELAXED); if (flags & PGC_PAGE_CLEAN) { - if(pgc_queue_trylock(cache, &cache->clean)) { + if(pgc_queue_trylock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_EVICTORS)) { DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next); pgc_queue_unlock(cache, &cache->clean); @@ -752,7 +752,7 @@ static inline void page_has_been_accessed(PGC *cache, PGC_PAGE *page) { // ---------------------------------------------------------------------------- // state transitions -static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transition_lock, bool having_clean_lock, WAITING_QUEUE_PRIORITY prio) { +static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transition_lock, bool having_clean_lock, WAITQ_PRIORITY prio) { if(!having_transition_lock) page_transition_lock(cache, page); @@ -777,7 +777,7 @@ static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transi page_transition_unlock(cache, page); } -static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lock, WAITING_QUEUE_PRIORITY prio) { +static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lock, WAITQ_PRIORITY prio) { if(!having_hot_lock) // to avoid deadlocks, we have to get the hot lock before the page transition // since this is what all_hot_to_dirty() does @@ -819,7 +819,7 @@ static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lo page_transition_unlock(cache, page); } -static inline void page_set_hot(PGC *cache, PGC_PAGE *page, WAITING_QUEUE_PRIORITY prio) { +static inline void page_set_hot(PGC *cache, PGC_PAGE *page, WAITQ_PRIORITY prio) { page_transition_lock(cache, page); PGC_PAGE_FLAGS flags = page_get_status_flags(page); @@ -1222,7 +1222,7 @@ static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evic timing_dbengine_evict_init(); if(!all_of_them && !wait) { - if(!pgc_queue_trylock(cache, &cache->clean)) { + if(!pgc_queue_trylock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_EVICTORS)) { stopped_before_finishing = true; goto premature_exit; } @@ -1759,7 +1759,7 @@ static bool flush_pages(PGC *cache, size_t max_flushes, Word_t section, bool wai // we have been called from a data collection thread // let's not waste its time... - if(!pgc_queue_trylock(cache, &cache->dirty)) { + if(!pgc_queue_trylock(cache, &cache->dirty, PGC_QUEUE_LOCK_PRIO_FLUSHERS)) { // we would block, so give up... return false; } @@ -1942,7 +1942,7 @@ static bool flush_pages(PGC *cache, size_t max_flushes, Word_t section, bool wai , "DBENGINE CACHE: flushing pages mismatch"); if(!all_of_them && !wait) { - if(pgc_queue_trylock(cache, &cache->dirty)) + if(pgc_queue_trylock(cache, &cache->dirty, PGC_QUEUE_LOCK_PRIO_FLUSHERS)) have_dirty_lock = true; else { @@ -2099,9 +2099,9 @@ PGC *pgc_create(const char *name, #if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE) - cache->hot.wq = waiting_queue_create(); - cache->dirty.wq = waiting_queue_create(); - cache->clean.wq = waiting_queue_create(); + waitq_init(&cache->hot.wq); + waitq_init(&cache->dirty.wq); + waitq_init(&cache->clean.wq); #else spinlock_init(&cache->hot.spinlock); spinlock_init(&cache->dirty.spinlock); @@ -2174,9 +2174,9 @@ void pgc_destroy(PGC *cache) { } #if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE) - waiting_queue_destroy(cache->hot.wq); - waiting_queue_destroy(cache->dirty.wq); - waiting_queue_destroy(cache->clean.wq); + waitq_destroy(&cache->hot.wq); + waitq_destroy(&cache->dirty.wq); + waitq_destroy(&cache->clean.wq); #endif freez(cache->index); freez(cache); @@ -2453,7 +2453,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_indexing_started, 1, __ATOMIC_RELAXED); p2_add_fetch(&cache->stats.p2_workers_jv2_flush, 1); - pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_OTHERS); + pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_LOW); Pvoid_t JudyL_metrics = NULL; Pvoid_t JudyL_extents_pos = NULL; @@ -2584,7 +2584,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ } yield_the_processor(); // do not lock too aggressively - pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_OTHERS); + pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_LOW); } spinlock_unlock(&sp->migration_to_v2_spinlock); @@ -2608,7 +2608,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ // balance-parents: transition from hot to clean directly yield_the_processor(); // do not lock too aggressively - page_set_clean(cache, pi->page, true, false, PGC_QUEUE_LOCK_PRIO_OTHERS); + page_set_clean(cache, pi->page, true, false, PGC_QUEUE_LOCK_PRIO_LOW); page_transition_unlock(cache, pi->page); page_release(cache, pi->page, true); @@ -2659,7 +2659,7 @@ void pgc_open_evict_clean_pages_of_datafile(PGC *cache, struct rrdengine_datafil size_t pgc_count_clean_pages_having_data_ptr(PGC *cache, Word_t section, void *ptr) { size_t found = 0; - pgc_queue_lock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_OTHERS); + pgc_queue_lock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_LOW); for(PGC_PAGE *page = cache->clean.base; page ;page = page->link.next) found += (page->data == ptr && page->section == section) ? 1 : 0; pgc_queue_unlock(cache, &cache->clean); @@ -2670,7 +2670,7 @@ size_t pgc_count_clean_pages_having_data_ptr(PGC *cache, Word_t section, void *p size_t pgc_count_hot_pages_having_data_ptr(PGC *cache, Word_t section, void *ptr) { size_t found = 0; - pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_OTHERS); + pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_LOW); Pvoid_t *section_pages_pptr = JudyLGet(cache->hot.sections_judy, section, PJE0); if(section_pages_pptr) { struct section_pages *sp = *section_pages_pptr; diff --git a/src/libnetdata/libnetdata.h b/src/libnetdata/libnetdata.h index c68cc21092c729..61b256b5e8ad95 100644 --- a/src/libnetdata/libnetdata.h +++ b/src/libnetdata/libnetdata.h @@ -125,7 +125,7 @@ extern const char *netdata_configured_host_prefix; #include "locks/spinlock.h" #include "locks/rw-spinlock.h" #include "completion/completion.h" -#include "waiting-queue/waiting-queue.h" +#include "libnetdata/locks/waitq.h" #include "clocks/clocks.h" #include "simple_pattern/simple_pattern.h" #include "libnetdata/log/nd_log.h" diff --git a/src/libnetdata/locks/spinlock.c b/src/libnetdata/locks/spinlock.c index 3f89d17d640991..58729d905bbae3 100644 --- a/src/libnetdata/locks/spinlock.c +++ b/src/libnetdata/locks/spinlock.c @@ -28,7 +28,7 @@ void spinlock_lock_with_trace(SPINLOCK *spinlock, const char *func) { // Backoff strategy with exponential growth spins++; microsleep(usec); - usec = usec > MAX_USEC ? MAX_USEC : usec * 2; + usec = usec >= MAX_USEC ? MAX_USEC : usec * 2; } #ifdef NETDATA_INTERNAL_CHECKS diff --git a/src/libnetdata/locks/waitq.c b/src/libnetdata/locks/waitq.c new file mode 100644 index 00000000000000..760bb57d406512 --- /dev/null +++ b/src/libnetdata/locks/waitq.c @@ -0,0 +1,264 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "waitq.h" + +#define MAX_USEC 512 // Maximum backoff limit in microseconds + +#define PRIORITY_SHIFT 32 +#define NO_PRIORITY 0 + +static inline uint64_t make_order(WAITQ_PRIORITY priority, uint64_t seqno) { + return ((uint64_t)priority << PRIORITY_SHIFT) + seqno; +} + +static inline uint64_t get_our_order(WAITQ *waitq, WAITQ_PRIORITY priority) { + uint64_t seqno = __atomic_add_fetch(&waitq->last_seqno, 1, __ATOMIC_RELAXED); + return make_order(priority, seqno); +} + +void waitq_init(WAITQ *waitq) { + spinlock_init(&waitq->spinlock); + waitq->current_priority = 0; + waitq->last_seqno = 0; +} + +void waitq_destroy(WAITQ *wq __maybe_unused) { ; } + +static inline bool write_our_priority(WAITQ *waitq, uint64_t our_order) { + uint64_t current = __atomic_load_n(&waitq->current_priority, __ATOMIC_RELAXED); + if(current == our_order) return true; + + do { + + if(current > our_order) + return false; + + } while(!__atomic_compare_exchange_n( + &waitq->current_priority, + ¤t, + our_order, + false, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + + return true; +} + +static inline bool clear_our_priority(WAITQ *waitq, uint64_t our_order) { + uint64_t expected = our_order; + + return + __atomic_compare_exchange_n( + &waitq->current_priority, + &expected, + NO_PRIORITY, + false, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED); +} + +bool waitq_try_acquire_with_trace(WAITQ *waitq, WAITQ_PRIORITY priority, const char *func __maybe_unused) { + uint64_t our_order = get_our_order(waitq, priority); + + bool rc = write_our_priority(waitq, our_order) && spinlock_trylock(&waitq->spinlock); + if(rc) + waitq->writer = gettid_cached(); + + clear_our_priority(waitq, our_order); + + return rc; +} + +void waitq_acquire_with_trace(WAITQ *waitq, WAITQ_PRIORITY priority, const char *func) { + uint64_t our_order = get_our_order(waitq, priority); + + size_t spins = 0; + usec_t usec = 1; + + while(true) { + while (write_our_priority(waitq, our_order)) { + if(spinlock_trylock(&waitq->spinlock)) { + waitq->writer = gettid_cached(); + clear_our_priority(waitq, our_order); + worker_spinlock_contention(func, spins); + return; + } + tinysleep(); + } + + // Back off + spins++; + microsleep(usec); + usec = usec >= MAX_USEC ? MAX_USEC : usec * 2; + } +} + +void waitq_release(WAITQ *waitq) { + spinlock_unlock(&waitq->spinlock); +} + +// -------------------------------------------------------------------------------------------------------------------- + +#define THREADS_PER_PRIORITY 2 +#define TEST_DURATION_SEC 2 + +// For stress test statistics +typedef struct thread_stats { + WAITQ_PRIORITY priority; + size_t executions; // how many times we got through + usec_t total_wait_time; // total time spent waiting + usec_t max_wait_time; // maximum time spent waiting +} THREAD_STATS; + +struct thread_args { + THREAD_STATS *stats; + WAITQ *wq; + bool with_sleep; + bool *stop_flag; +}; + +static const char *priority_to_string(WAITQ_PRIORITY p) { + switch(p) { + case WAITQ_PRIO_URGENT: return "URGENT"; + case WAITQ_PRIO_HIGH: return "HIGH"; + case WAITQ_PRIO_NORMAL: return "NORMAL"; + case WAITQ_PRIO_LOW: return "LOW"; + default: return "UNKNOWN"; + } +} + +static void *stress_thread(void *arg) { + struct thread_args *args = arg; + + THREAD_STATS *stats = args->stats; + WAITQ *wq = args->wq; + bool with_sleep = args->with_sleep; + bool *stop_flag = args->stop_flag; + + while(!__atomic_load_n(stop_flag, __ATOMIC_ACQUIRE)) { + usec_t waiting_since_ut = now_monotonic_usec(); + waitq_acquire(wq, stats->priority); + usec_t wait_time = now_monotonic_usec() - waiting_since_ut; + stats->executions++; + stats->total_wait_time += wait_time; + if(wait_time > stats->max_wait_time) + stats->max_wait_time = wait_time; + + if(with_sleep) + tinysleep(); + + waitq_release(wq); + } + + return NULL; +} + +static void print_thread_stats(THREAD_STATS *stats, size_t count, usec_t duration) { + fprintf(stderr, "\n%-8s %12s %12s %12s %12s %12s\n", + "PRIORITY", "EXECUTIONS", "EXEC/SEC", "AVG WAIT", "MAX WAIT", "% WAITING"); + + size_t total_execs = 0; + for(size_t i = 0; i < count; i++) + total_execs += stats[i].executions; + + double total_time_sec = duration / (double)USEC_PER_SEC; + + for(size_t i = 0; i < count; i++) { + double execs_per_sec = stats[i].executions / total_time_sec; + double avg_wait = stats[i].executions ? (double)stats[i].total_wait_time / stats[i].executions : 0; + double percent_waiting = stats[i].total_wait_time * 100.0 / duration; + + fprintf(stderr, "%-8s %12zu %12.1f %12.1f %12"PRIu64" %12.1f%%\n", + priority_to_string(stats[i].priority), + stats[i].executions, + execs_per_sec, + avg_wait, + stats[i].max_wait_time, + percent_waiting); + } +} + +static int unittest_stress(void) { + int errors = 0; + fprintf(stderr, "\nStress testing waiting queue...\n"); + + WAITQ wq = WAITQ_INITIALIZER; + const size_t num_priorities = 4; + const size_t total_threads = num_priorities * THREADS_PER_PRIORITY; + + // Test both with and without sleep + for(int test = 0; test < 2; test++) { + bool with_sleep = (test == 1); + bool stop_flag = false; + + fprintf(stderr, "\nRunning %ds stress test %s sleep:\n", + TEST_DURATION_SEC, with_sleep ? "with" : "without"); + + // Prepare thread stats and args + THREAD_STATS stats[total_threads]; + struct thread_args thread_args[total_threads]; + ND_THREAD *threads[total_threads]; + + fprintf(stderr, "Starting %zu threads for %ds test %s sleep...\n", + total_threads, + TEST_DURATION_SEC, + with_sleep ? "with" : "without"); + + // Initialize stats and create threads + size_t thread_idx = 0; + for(int prio = WAITQ_PRIO_URGENT; prio >= WAITQ_PRIO_LOW; prio--) { + for(int t = 0; t < THREADS_PER_PRIORITY; t++) { + stats[thread_idx] = (THREAD_STATS){ + .priority = prio, + .executions = 0, + .total_wait_time = 0, + .max_wait_time = 0 + }; + thread_args[thread_idx] = (struct thread_args){ + .stats = &stats[thread_idx], // Pass pointer to stats + .wq = &wq, + .with_sleep = with_sleep, + .stop_flag = &stop_flag + }; + + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), "STRESS%d-%d", prio, t); + threads[thread_idx] = nd_thread_create( + thread_name, + NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE, + stress_thread, + &thread_args[thread_idx]); + thread_idx++; + } + } + + // Let it run + time_t start = now_monotonic_sec(); + fprintf(stderr, "Running..."); + while(now_monotonic_sec() - start < TEST_DURATION_SEC) { + fprintf(stderr, "."); + sleep_usec(500000); // Print a dot every 0.5 seconds + } + fprintf(stderr, "\n"); + + + fprintf(stderr, "Stopping threads...\n"); + __atomic_store_n(&stop_flag, true, __ATOMIC_RELEASE); + + // Wait for threads and collect stats + fprintf(stderr, "Waiting for %zu threads to finish...\n", total_threads); + for(size_t i = 0; i < total_threads; i++) + nd_thread_join(threads[i]); + + // Print stats + print_thread_stats(stats, total_threads, TEST_DURATION_SEC * USEC_PER_SEC); + } + + waitq_destroy(&wq); + return errors; +} + +int unittest_waiting_queue(void) { + int errors = unittest_stress(); + return errors; +} diff --git a/src/libnetdata/locks/waitq.h b/src/libnetdata/locks/waitq.h new file mode 100644 index 00000000000000..7f79d64d206809 --- /dev/null +++ b/src/libnetdata/locks/waitq.h @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_WAITQ_H +#define NETDATA_WAITQ_H + +#include "libnetdata/libnetdata.h" + +/* + * WAITING QUEUE + * Like a spinlock, but: + * + * 1. Waiters get a sequence number (FIFO) + * 2. FIFO is respected within each priority + * 3. Higher priority threads get in first + * + * This is equivalent to 3 atomic operations for lock, and 1 for unlock. + * + * As lightweight and fast as it can be. + * About 3M thread switches/s per WAITING QUEUE, on modern hardware. + * + * Be careful: higher priority threads can starve the rest! + * + */ + +typedef enum __attribute__((packed)) { + WAITQ_PRIO_LOW = 0, // will be last + WAITQ_PRIO_NORMAL, // will be third + WAITQ_PRIO_HIGH, // will be second + WAITQ_PRIO_URGENT, // will be first + + // terminator + WAITQ_PRIO_MAX, +} WAITQ_PRIORITY; + +typedef struct waiting_queue { + SPINLOCK spinlock; // protects the actual resource + pid_t writer; // the pid the thread currently holding the lock + uint64_t current_priority; // current highest priority attempting to acquire + uint64_t last_seqno; // for FIFO ordering within same priority +} WAITQ; + +#define WAITQ_INITIALIZER (WAITQ){ .spinlock = SPINLOCK_INITIALIZER, .current_priority = 0, .last_seqno = 0, } + +// Initialize a waiting queue +void waitq_init(WAITQ *waitq); + +// Destroy a waiting queue - must be empty +void waitq_destroy(WAITQ *wq); + +// Returns true when the queue is acquired +bool waitq_try_acquire_with_trace(WAITQ *waitq, WAITQ_PRIORITY priority, const char *func); +#define waitq_try_acquire(waitq, priority) waitq_try_acquire_with_trace(waitq, priority, __FUNCTION__) + +// Returns when it is our turn to run +// Returns time spent waiting in microseconds +void waitq_acquire_with_trace(WAITQ *waitq, WAITQ_PRIORITY priority, const char *func); +#define waitq_acquire(waitq, priority) waitq_acquire_with_trace(waitq, priority, __FUNCTION__) + +// Mark that we are done - wakes up the next in line +void waitq_release(WAITQ *waitq); + +int unittest_waiting_queue(void); + +#endif // NETDATA_WAITQ_H diff --git a/src/libnetdata/log/nd_log.c b/src/libnetdata/log/nd_log.c index ec77610dba6520..d931ce070bd69f 100644 --- a/src/libnetdata/log/nd_log.c +++ b/src/libnetdata/log/nd_log.c @@ -415,8 +415,9 @@ void netdata_logger_fatal(const char *file, const char *function, const unsigned size_t recursion = __atomic_add_fetch(&already_in_fatal, 1, __ATOMIC_SEQ_CST); if(recursion > 1) { // exit immediately, nothing more to be done - fprintf(stderr, "RECURSIVE FATAL STATEMENTS, latest from %lu@%s() of %s, EXITING NOW!\n", - line, function, file); + sleep(2); // give the first fatal the chance to be written + fprintf(stderr, "\nRECURSIVE FATAL STATEMENTS, latest from %s() of %lu@%s, EXITING NOW! 23e93dfccbf64e11aac858b9410d8a82\n", + function, line, file); fflush(stderr); _exit(1); } diff --git a/src/libnetdata/waiting-queue/waiting-queue.c b/src/libnetdata/waiting-queue/waiting-queue.c deleted file mode 100644 index e12c109b83f3bc..00000000000000 --- a/src/libnetdata/waiting-queue/waiting-queue.c +++ /dev/null @@ -1,485 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "waiting-queue.h" - -// #define WAITING_QUEUE_USE_FUTEX 1 - -#if defined(WAITING_QUEUE_USE_FUTEX) -#include -#include - -typedef int WQ_COND; -typedef SPINLOCK WQ_MUTEX; - -static inline int futex_wait(int *uaddr, int val) { - return syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0); -} - -static inline int futex_wake(int *uaddr, int n) { - return syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, n, NULL, NULL, 0); -} - -static int WQ_COND_init(WQ_COND *cond) { *(cond) = 0; return 0; } -#define WQ_COND_destroy(cond) debug_dummy() - -static inline int WQ_COND_wait(WQ_COND *cond, WQ_MUTEX *mutex) { - int value = *cond; - spinlock_unlock(mutex); - futex_wait(cond, value); - spinlock_lock(mutex); - return 0; -} - -static inline int WQ_COND_signal(WQ_COND *cond) { - __atomic_add_fetch(cond, 1, __ATOMIC_SEQ_CST); - futex_wake(cond, 1); - return 0; -} - -#define WQ_MUTEX_init(mutex) ({ spinlock_init(mutex) ; 0; }) -#define WQ_MUTEX_destroy(mutex) debug_dummy() -#define WQ_MUTEX_lock(mutex) spinlock_lock(mutex) -#define WQ_MUTEX_unlock(mutex) spinlock_unlock(mutex) - -#else // !USE_FUTEX - -typedef uv_cond_t WQ_COND; -typedef uv_mutex_t WQ_MUTEX; - -#define WQ_COND_init(cond) uv_cond_init(cond) -#define WQ_COND_destroy(cond) uv_cond_destroy(cond) -#define WQ_COND_wait(cond, mutex) uv_cond_wait(cond, mutex) -#define WQ_COND_signal(cond) uv_cond_signal(cond) - -#define WQ_MUTEX_init(mutex) uv_mutex_init(mutex) -#define WQ_MUTEX_destroy(mutex) uv_mutex_destroy(mutex) -#define WQ_MUTEX_lock(mutex) uv_mutex_lock(mutex) -#define WQ_MUTEX_unlock(mutex) uv_mutex_unlock(mutex) - -#endif // USE_FUTEX - -typedef struct waiting_thread { - WQ_COND cond; // condition variable for this thread - usec_t waiting_since_ut; // when we started waiting - Word_t priority; - struct waiting_thread *prev, *next; -} WAITING_THREAD; - -struct waiting_queue { - WQ_MUTEX mutex; // ensure acquirers and releasers are synchronized - Word_t last_seqno; // incrementing sequence counter - SPINLOCK spinlock; // ensures there is only 1 runner at a time - REFCOUNT running; // number of threads, including the one holding the lock - pid_t writer; - WAITING_THREAD *list; // the list of threads waiting, not including the 1 holding the lock -}; - -// Determine available bits based on system word size -#if SIZEOF_VOID_P == 8 -#define PRIORITY_SHIFT 62ULL -#define SEQNO_MASK ((1ULL << PRIORITY_SHIFT) - 1) -#else -#define PRIORITY_SHIFT 30U -#define SEQNO_MASK ((1U << PRIORITY_SHIFT) - 1) -#endif - -static inline Word_t make_key(WAITING_QUEUE_PRIORITY priority, Word_t seqno) { - return ((Word_t)priority << PRIORITY_SHIFT) | (seqno & SEQNO_MASK); -} - -static inline WAITING_QUEUE_PRIORITY key_get_priority(Word_t key) { - return (WAITING_QUEUE_PRIORITY)(key >> PRIORITY_SHIFT); -} - -static inline Word_t key_get_seqno(Word_t key) { - return key & SEQNO_MASK; -} - -WAITING_QUEUE *waiting_queue_create(void) { - WAITING_QUEUE *wq = callocz(1, sizeof(WAITING_QUEUE)); - - int ret = WQ_MUTEX_init(&wq->mutex); - if(ret != 0) { - freez(wq); - return NULL; - } - - spinlock_init(&wq->spinlock); - - wq->running = 0; - return wq; -} - -void waiting_queue_destroy(WAITING_QUEUE *wq) { - if(!wq) return; - - if(wq->running) - fatal("WAITING_QUEUE: destroying waiting queue that still has %d threads running/waiting", wq->running); - - WQ_MUTEX_destroy(&wq->mutex); - freez(wq); -} - -static inline void WAITERS_SET(WAITING_QUEUE *wq, WAITING_THREAD *wt) { - for(WAITING_THREAD *t = wq->list ; t ;t = t->next) { - if(wt->priority < t->priority) { - DOUBLE_LINKED_LIST_INSERT_ITEM_BEFORE_UNSAFE(wq->list, t, wt, prev, next); - return; - } - } - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wq->list, wt, prev, next); -} - -static inline void WAITERS_DEL(WAITING_QUEUE *wq, WAITING_THREAD *wt) { - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wq->list, wt, prev, next); -} - -static inline WAITING_THREAD *WAITERS_FIRST(WAITING_QUEUE *wq) { - return wq->list; -} - -static inline void WAITING_THREAD_init(WAITING_QUEUE *wq, WAITING_THREAD *wt, WAITING_QUEUE_PRIORITY priority) { - Word_t seqno = __atomic_add_fetch(&wq->last_seqno, 1, __ATOMIC_RELAXED); - wt->priority = make_key(priority, seqno); - wt->waiting_since_ut = now_monotonic_usec(); - wt->prev = wt->next = NULL; - - int ret = WQ_COND_init(&wt->cond); - if(ret != 0) - fatal("WAITING_QUEUE: cannot initialize condition variable"); -} - -static inline void WAITING_THREAD_cleanup(WAITING_QUEUE *wq __maybe_unused, WAITING_THREAD *wt __maybe_unused) { - WQ_COND_destroy(&wt->cond); -} - -bool waiting_queue_try_acquire(WAITING_QUEUE *wq) { - if(__atomic_add_fetch(&wq->running, 1, __ATOMIC_RELAXED) == 1 && - spinlock_trylock(&wq->spinlock)) { - return true; - } - - __atomic_sub_fetch(&wq->running, 1, __ATOMIC_RELAXED); - return false; -} - -usec_t waiting_queue_acquire(WAITING_QUEUE *wq, WAITING_QUEUE_PRIORITY priority) { - // Try fast path first - if we're the only one, just go - if(__atomic_add_fetch(&wq->running, 1, __ATOMIC_RELAXED) == 1 && - spinlock_trylock(&wq->spinlock)) { - return 0; - } - - // Slow path - need to wait - - WAITING_THREAD wt; - WAITING_THREAD_init(wq, &wt, priority); - - WQ_MUTEX_lock(&wq->mutex); - WAITERS_SET(wq, &wt); - - // Wait for our turn - do { - if (WAITERS_FIRST(wq) == &wt && spinlock_trylock(&wq->spinlock)) - break; - else - WQ_COND_wait(&wt.cond, &wq->mutex); - } while(true); - - wq->writer = gettid_cached(); - WAITERS_DEL(wq, &wt); - WQ_MUTEX_unlock(&wq->mutex); - WAITING_THREAD_cleanup(wq, &wt); - - return now_monotonic_usec() - wt.waiting_since_ut; -} - -void waiting_queue_release(WAITING_QUEUE *wq) { - wq->writer = 0; - spinlock_unlock(&wq->spinlock); - - // Fast path if we're alone - if(__atomic_sub_fetch(&wq->running, 1, __ATOMIC_RELAXED) == 0) - return; - - // Slow path - need to signal next in line - WQ_MUTEX_lock(&wq->mutex); - - // Wake up next in line if any - if(wq->list) - WQ_COND_signal(&wq->list->cond); - - WQ_MUTEX_unlock(&wq->mutex); -} - -size_t waiting_queue_waiting(WAITING_QUEUE *wq) { - return __atomic_load_n(&wq->running, __ATOMIC_RELAXED); -} - - -// -------------------------------------------------------------------------------------------------------------------- - -// For stress test statistics -typedef struct thread_stats { - WAITING_QUEUE_PRIORITY priority; - size_t executions; // how many times we got through - usec_t total_wait_time; // total time spent waiting - usec_t max_wait_time; // maximum time spent waiting -} THREAD_STATS; - -struct thread_args { - THREAD_STATS *stats; - WAITING_QUEUE *wq; - bool with_sleep; - bool *stop_flag; -}; - -static const char *priority_to_string(WAITING_QUEUE_PRIORITY p) { - switch(p) { - case WAITING_QUEUE_PRIO_URGENT: return "URGENT"; - case WAITING_QUEUE_PRIO_HIGH: return "HIGH"; - case WAITING_QUEUE_PRIO_NORMAL: return "NORMAL"; - case WAITING_QUEUE_PRIO_LOW: return "LOW"; - default: return "UNKNOWN"; - } -} - -static int unittest_functional(void) { - int errors = 0; - fprintf(stderr, "\nTesting waiting queue...\n"); - - WAITING_QUEUE *wq = waiting_queue_create(); - - // Test 1: Fast path should work with no contention - fprintf(stderr, " Test 1: Fast path - no contention: "); - usec_t wait_time = waiting_queue_acquire(wq, WAITING_QUEUE_PRIO_NORMAL); - waiting_queue_release(wq); - if(wait_time != 0) { - fprintf(stderr, "FAILED (waited %"PRIu64" usec)\n", wait_time); - errors++; - } - else - fprintf(stderr, "OK\n"); - - // Test 2: Priorities should be respected - fprintf(stderr, " Test 2: Priority ordering: "); - WAITING_THREAD threads[100]; - for(size_t t = 0; t < _countof(threads); t++) { - __atomic_add_fetch(&wq->running, 1, __ATOMIC_RELAXED); - WAITING_THREAD_init(wq, &threads[t], os_random(WAITING_QUEUE_PRIO_MAX)); - WAITERS_SET(wq, &threads[t]); - } - - bool failed = false; - size_t prio_counts[WAITING_QUEUE_PRIO_MAX] = { 0 }; - WAITING_QUEUE_PRIORITY last_prio = WAITING_QUEUE_PRIO_URGENT; - Word_t last_seqno = 0; - for(size_t t = 0; t < _countof(threads); t++) { - WAITING_THREAD *wt = WAITERS_FIRST(wq); - WAITERS_DEL(wq, wt); - __atomic_sub_fetch(&wq->running, 1, __ATOMIC_RELAXED); - - WAITING_QUEUE_PRIORITY prio = key_get_priority(wt->priority); - Word_t seqno = key_get_seqno(wt->priority); - - prio_counts[prio]++; - if(prio < last_prio) { - if(!failed) - fprintf(stderr, "FAILED\n"); - - fprintf(stderr, " > ERROR: prio %u is before prio %u\n", prio, last_prio); - errors++; - failed = true; - } - else if(prio == last_prio && seqno < last_seqno) { - if(!failed) - fprintf(stderr, "FAILED\n"); - - fprintf(stderr, " > ERROR: seqno %lu is before seqno %lu\n", seqno, last_seqno); - errors++; - failed = true; - } - - last_seqno = seqno; - last_prio = prio; - WAITING_THREAD_cleanup(wq, wt); - } - - if(!failed) - fprintf(stderr, "OK\n"); - - for(size_t p = 0; p < WAITING_QUEUE_PRIO_MAX ;p++) - fprintf(stderr, " > prio %zu got %zu waiters\n", p, prio_counts[p]); - - // Test 3: Queue stats should be accurate - fprintf(stderr, " Test 3: Queue statistics: "); - size_t waiting = waiting_queue_waiting(wq); - if(waiting != 0) { - fprintf(stderr, "FAILED (queue shows %zu waiting)\n", waiting); - errors++; - } - else - fprintf(stderr, "OK\n"); - - waiting_queue_destroy(wq); - return errors; -} - -static void *stress_thread(void *arg) { - struct thread_args *args = arg; - - THREAD_STATS *stats = args->stats; - WAITING_QUEUE *wq = args->wq; - bool with_sleep = args->with_sleep; - bool *stop_flag = args->stop_flag; - - while(!__atomic_load_n(stop_flag, __ATOMIC_ACQUIRE)) { - usec_t wait_time = waiting_queue_acquire(wq, stats->priority); - stats->executions++; - stats->total_wait_time += wait_time; - if(wait_time > stats->max_wait_time) - stats->max_wait_time = wait_time; - - if(with_sleep) - tinysleep(); - - waiting_queue_release(wq); - } - - return NULL; -} - -static void print_thread_stats(THREAD_STATS *stats, size_t count, usec_t duration) { - fprintf(stderr, "\n%-8s %12s %12s %12s %12s %12s\n", - "PRIORITY", "EXECUTIONS", "EXEC/SEC", "AVG WAIT", "MAX WAIT", "% WAITING"); - - size_t total_execs = 0; - for(size_t i = 0; i < count; i++) - total_execs += stats[i].executions; - - double total_time_sec = duration / (double)USEC_PER_SEC; - - for(size_t i = 0; i < count; i++) { - double execs_per_sec = stats[i].executions / total_time_sec; - double avg_wait = stats[i].executions ? (double)stats[i].total_wait_time / stats[i].executions : 0; - double percent_waiting = stats[i].total_wait_time * 100.0 / duration; - - fprintf(stderr, "%-8s %12zu %12.1f %12.1f %12"PRIu64" %12.1f%%\n", - priority_to_string(stats[i].priority), - stats[i].executions, - execs_per_sec, - avg_wait, - stats[i].max_wait_time, - percent_waiting); - } -} - -#define THREADS_PER_PRIORITY 2 -#define TEST_DURATION_SEC 5 - -static int unittest_stress(void) { - int errors = 0; - fprintf(stderr, "\nStress testing waiting queue...\n"); - - WAITING_QUEUE *wq = waiting_queue_create(); - const size_t num_priorities = 4; - const size_t total_threads = num_priorities * THREADS_PER_PRIORITY; - - // Test both with and without sleep - for(int test = 0; test < 2; test++) { - bool with_sleep = (test == 1); - bool stop_flag = false; - - fprintf(stderr, "\nRunning %ds stress test %s sleep:\n", - TEST_DURATION_SEC, with_sleep ? "with" : "without"); - - // Prepare thread stats and args - THREAD_STATS stats[total_threads]; - struct thread_args thread_args[total_threads]; - ND_THREAD *threads[total_threads]; - - fprintf(stderr, "Starting %zu threads for %ds test %s sleep...\n", - total_threads, - TEST_DURATION_SEC, - with_sleep ? "with" : "without"); - - // Initialize stats and create threads - size_t thread_idx = 0; - for(int prio = WAITING_QUEUE_PRIO_URGENT; prio <= WAITING_QUEUE_PRIO_LOW; prio++) { - for(int t = 0; t < THREADS_PER_PRIORITY; t++) { - stats[thread_idx] = (THREAD_STATS){ - .priority = prio, - .executions = 0, - .total_wait_time = 0, - .max_wait_time = 0 - }; - thread_args[thread_idx] = (struct thread_args){ - .stats = &stats[thread_idx], // Pass pointer to stats - .wq = wq, - .with_sleep = with_sleep, - .stop_flag = &stop_flag - }; - - char thread_name[32]; - snprintf(thread_name, sizeof(thread_name), "STRESS%d-%d", prio, t); - threads[thread_idx] = nd_thread_create( - thread_name, - NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE, - stress_thread, - &thread_args[thread_idx]); - thread_idx++; - } - } - - // Let it run - time_t start = now_monotonic_sec(); - fprintf(stderr, "Running..."); - while(now_monotonic_sec() - start < TEST_DURATION_SEC) { - fprintf(stderr, "."); - sleep_usec(500000); // Print a dot every 0.5 seconds - } - fprintf(stderr, "\n"); - - - fprintf(stderr, "Stopping threads...\n"); - __atomic_store_n(&stop_flag, true, __ATOMIC_RELEASE); - - // Wait for threads and collect stats - fprintf(stderr, "Waiting for %zu threads to finish...\n", total_threads); - for(size_t i = 0; i < total_threads; i++) - nd_thread_join(threads[i]); - - // Print stats - print_thread_stats(stats, total_threads, TEST_DURATION_SEC * USEC_PER_SEC); - -// // Basic validation -// for(size_t i = 0; i < total_threads - THREADS_PER_PRIORITY; i++) { -// if(stats[i].executions < stats[i + THREADS_PER_PRIORITY].executions) { -// fprintf(stderr, "ERROR: Higher priority thread got fewer executions!\n"); -// errors++; -// } -// } -// -// // Check fairness within same priority -// for(size_t i = 0; i < total_threads; i += THREADS_PER_PRIORITY) { -// for(size_t j = i + 1; j < i + THREADS_PER_PRIORITY; j++) { -// double diff = (double)(stats[i].executions - stats[j].executions) / -// (double)(stats[i].executions + stats[j].executions); -// if(fabs(diff) > 0.1) { // allow 10% difference -// fprintf(stderr, "ERROR: Unfair distribution within same priority!\n"); -// errors++; -// } -// } -// } - } - - waiting_queue_destroy(wq); - return errors; -} - -int unittest_waiting_queue(void) { - int errors = unittest_functional(); - errors += unittest_stress(); - - return errors; -} diff --git a/src/libnetdata/waiting-queue/waiting-queue.h b/src/libnetdata/waiting-queue/waiting-queue.h deleted file mode 100644 index c808e788cfac6c..00000000000000 --- a/src/libnetdata/waiting-queue/waiting-queue.h +++ /dev/null @@ -1,60 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_WAITING_QUEUE_H -#define NETDATA_WAITING_QUEUE_H - -#include "libnetdata/libnetdata.h" - -/* - * WAITING QUEUE - * Like a mutex, or a spinlock, but: - * - * 1. Waiters get a sequence number (FIFO) - * 2. FIFO is respected within each priority - * 3. Higher priority threads get in first - * 4. No wasting of resources, there are no spins - * - * When there are no other waiters, this is equivalent to 2 atomic - * operations for lock, and 2 for unlock. - * - * As lightweight and fast as it can be. - * About 0.5M thread switches/s per WAITING QUEUE, on modern hardware. - * - * Be careful: higher priority threads can starve the rest! - * - */ - -typedef struct waiting_queue WAITING_QUEUE; - -typedef enum __attribute__((packed)) { - WAITING_QUEUE_PRIO_URGENT = 0, // will be first - WAITING_QUEUE_PRIO_HIGH, // will be second - WAITING_QUEUE_PRIO_NORMAL, // will be third - WAITING_QUEUE_PRIO_LOW, // will be last - - // terminator - WAITING_QUEUE_PRIO_MAX, -} WAITING_QUEUE_PRIORITY; - -// Initialize a waiting queue -WAITING_QUEUE *waiting_queue_create(void); - -// Destroy a waiting queue - must be empty -void waiting_queue_destroy(WAITING_QUEUE *wq); - -// Returns true when the queue is acquired -bool waiting_queue_try_acquire(WAITING_QUEUE *wq); - -// Returns when it is our turn to run -// Returns time spent waiting in microseconds -usec_t waiting_queue_acquire(WAITING_QUEUE *wq, WAITING_QUEUE_PRIORITY priority); - -// Mark that we are done - wakes up the next in line -void waiting_queue_release(WAITING_QUEUE *wq); - -// Return the number of threads currently waiting -size_t waiting_queue_waiting(WAITING_QUEUE *wq); - -int unittest_waiting_queue(void); - -#endif // NETDATA_WAITING_QUEUE_H diff --git a/src/streaming/stream-sender-api.c b/src/streaming/stream-sender-api.c index f84c91d808976e..a904c6ae8881f7 100644 --- a/src/streaming/stream-sender-api.c +++ b/src/streaming/stream-sender-api.c @@ -34,7 +34,7 @@ void stream_sender_structures_init(RRDHOST *host, bool stream, STRING *parents, host->sender->connector.id = -1; host->sender->host = host; host->sender->scb = stream_circular_buffer_create(); - host->sender->wait_queue = waiting_queue_create(); + waitq_init(&host->sender->waitq); host->sender->capabilities = stream_our_capabilities(host, true); nd_sock_init(&host->sender->sock, netdata_ssl_streaming_sender_ctx, netdata_ssl_validate_certificate_sender); @@ -65,8 +65,7 @@ void stream_sender_structures_free(struct rrdhost *host) { stream_sender_signal_to_stop_and_wait(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); stream_circular_buffer_destroy(host->sender->scb); host->sender->scb = NULL; - waiting_queue_destroy(host->sender->wait_queue); - host->sender->wait_queue = NULL; + waitq_destroy(&host->sender->waitq); stream_compressor_destroy(&host->sender->compressor); replication_sender_cleanup(host->sender); diff --git a/src/streaming/stream-sender-commit.c b/src/streaming/stream-sender-commit.c index 55135ef3a02993..5ef0f61f5ddcbd 100644 --- a/src/streaming/stream-sender-commit.c +++ b/src/streaming/stream-sender-commit.c @@ -68,11 +68,11 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff if (unlikely(!src || !src_len)) return; - waiting_queue_acquire( - s->wait_queue, + waitq_acquire( + &s->waitq, (s->host->stream.rcv.status.tid == gettid_cached() || s->host->stream.snd.status.tid == gettid_cached()) ? - WAITING_QUEUE_PRIO_HIGH : - WAITING_QUEUE_PRIO_NORMAL); + WAITQ_PRIO_HIGH : + WAITQ_PRIO_NORMAL); stream_sender_lock(s); // copy the sequence number of sender buffer recreates, while having our lock @@ -87,7 +87,7 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff sender_buffer_destroy(commit); stream_sender_unlock(s); - waiting_queue_release(s->wait_queue); + waitq_release(&s->waitq); return; } @@ -183,7 +183,7 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff msg = s->thread.msg; stream_sender_unlock(s); - waiting_queue_release(s->wait_queue); + waitq_release(&s->waitq); if (enable_sending) { msg.opcode = STREAM_OPCODE_SENDER_POLLOUT; @@ -195,7 +195,7 @@ void sender_buffer_commit(struct sender_state *s, BUFFER *wb, struct sender_buff overflow_with_lock: { msg = s->thread.msg; stream_sender_unlock(s); - waiting_queue_release(s->wait_queue); + waitq_release(&s->waitq); msg.opcode = STREAM_OPCODE_SENDER_BUFFER_OVERFLOW; stream_sender_send_opcode(s, msg); nd_log_limit_static_global_var(erl, 1, 0); @@ -211,7 +211,7 @@ compression_failed_with_lock: { stream_compression_deactivate(s); msg = s->thread.msg; stream_sender_unlock(s); - waiting_queue_release(s->wait_queue); + waitq_release(&s->waitq); msg.opcode = STREAM_OPCODE_SENDER_RECONNECT_WITHOUT_COMPRESSION; stream_sender_send_opcode(s, msg); nd_log_limit_static_global_var(erl, 1, 0); diff --git a/src/streaming/stream-sender-internals.h b/src/streaming/stream-sender-internals.h index 3960343c384ef7..df6b80458f1ad4 100644 --- a/src/streaming/stream-sender-internals.h +++ b/src/streaming/stream-sender-internals.h @@ -59,7 +59,7 @@ struct sender_state { char remote_ip[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c time_t last_state_since_t; // the timestamp of the last state (online/offline) change - WAITING_QUEUE *wait_queue; + WAITQ waitq; STREAM_CIRCULAR_BUFFER *scb; struct {