Skip to content

Commit

Permalink
Waiting Queue (netdata#19302)
Browse files Browse the repository at this point in the history
* add strict checks to waiting queue

* waiting queue implementation purely using atomics

* trylock should insist if we are the potential winner
  • Loading branch information
ktsaou authored Dec 31, 2024
1 parent 0a9eb3c commit a801fd3
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 595 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,8 @@ set(LIBNETDATA_FILES
src/libnetdata/locks/rw-spinlock.h
src/libnetdata/atomics/atomic_flags.h
src/libnetdata/atomics/atomics.h
src/libnetdata/waiting-queue/waiting-queue.c
src/libnetdata/waiting-queue/waiting-queue.h
src/libnetdata/locks/waitq.c
src/libnetdata/locks/waitq.h
src/libnetdata/object-state/object-state.c
src/libnetdata/object-state/object-state.h
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ static void netdata_systemd_journal_message_ids_init(void) {
msgid_into_dict("9ce0cb58ab8b44df82c4bf1ad9ee22de", "Netdata alert transition");
msgid_into_dict("6db0018e83e34320ae2a659d78019fb7", "Netdata alert notification");
msgid_into_dict("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message");

msgid_into_dict("8ddaf5ba33a74078b609250db1e951f3", "Sensor state transition");
}

Expand Down
62 changes: 31 additions & 31 deletions src/database/engine/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ typedef int32_t REFCOUNT;
#define PGC_WITH_ARAL 1
#endif

// unfortunately waiting queue is significantly slower than spinlock
// #define PGC_QUEUE_LOCK_AS_WAITING_QUEUE 1
#define PGC_QUEUE_LOCK_AS_WAITING_QUEUE 1

typedef enum __attribute__ ((__packed__)) {
// mutually exclusive flags
Expand Down Expand Up @@ -75,7 +74,7 @@ struct pgc_page {

struct pgc_queue {
#if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE)
WAITING_QUEUE *wq;
WAITQ wq;
#else
SPINLOCK spinlock;
#endif
Expand Down Expand Up @@ -246,17 +245,17 @@ static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) {
_result; \
})

#define PGC_QUEUE_LOCK_PRIO_COLLECTORS WAITING_QUEUE_PRIO_URGENT
#define PGC_QUEUE_LOCK_PRIO_EVICTORS WAITING_QUEUE_PRIO_HIGH
#define PGC_QUEUE_LOCK_PRIO_FLUSHERS WAITING_QUEUE_PRIO_NORMAL
#define PGC_QUEUE_LOCK_PRIO_OTHERS WAITING_QUEUE_PRIO_LOW
#define PGC_QUEUE_LOCK_PRIO_COLLECTORS WAITQ_PRIO_URGENT
#define PGC_QUEUE_LOCK_PRIO_EVICTORS WAITQ_PRIO_HIGH
#define PGC_QUEUE_LOCK_PRIO_FLUSHERS WAITQ_PRIO_NORMAL
#define PGC_QUEUE_LOCK_PRIO_LOW WAITQ_PRIO_LOW

#if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE)
#define pgc_queue_trylock(cache, ll) waiting_queue_try_acquire((ll)->wq)
#define pgc_queue_lock(cache, ll, prio) waiting_queue_acquire((ll)->wq, prio)
#define pgc_queue_unlock(cache, ll) waiting_queue_release((ll)->wq)
#define pgc_queue_trylock(cache, ll, prio) waitq_try_acquire(&((ll)->wq), prio)
#define pgc_queue_lock(cache, ll, prio) waitq_acquire(&((ll)->wq), prio)
#define pgc_queue_unlock(cache, ll) waitq_release(&((ll)->wq))
#else
#define pgc_queue_trylock(cache, ll) spinlock_trylock(&((ll)->spinlock))
#define pgc_queue_trylock(cache, ll, prio) spinlock_trylock(&((ll)->spinlock))
#define pgc_queue_lock(cache, ll, prio) spinlock_lock(&((ll)->spinlock))
#define pgc_queue_unlock(cache, ll) spinlock_unlock(&((ll)->spinlock))
#endif
Expand Down Expand Up @@ -608,7 +607,7 @@ static inline void pgc_stats_index_judy_change(PGC *cache, size_t mem_before_jud
}
}

static void pgc_queue_add(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, WAITING_QUEUE_PRIORITY prio __maybe_unused) {
static void pgc_queue_add(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, WAITQ_PRIORITY prio __maybe_unused) {
if(!having_lock)
pgc_queue_lock(cache, q, prio);

Expand Down Expand Up @@ -677,7 +676,8 @@ static void pgc_queue_add(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PA
pgc_size_histogram_add(cache, &q->stats->size_histogram, page);
}

static void pgc_queue_del(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock, WAITING_QUEUE_PRIORITY prio __maybe_unused) {
static void pgc_queue_del(PGC *cache __maybe_unused, struct pgc_queue *q, PGC_PAGE *page, bool having_lock,
WAITQ_PRIORITY prio __maybe_unused) {
if(cache->config.stats)
pgc_size_histogram_del(cache, &q->stats->size_histogram, page);

Expand Down Expand Up @@ -736,7 +736,7 @@ static inline void page_has_been_accessed(PGC *cache, PGC_PAGE *page) {
__atomic_add_fetch(&page->accesses, 1, __ATOMIC_RELAXED);

if (flags & PGC_PAGE_CLEAN) {
if(pgc_queue_trylock(cache, &cache->clean)) {
if(pgc_queue_trylock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_EVICTORS)) {
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
pgc_queue_unlock(cache, &cache->clean);
Expand All @@ -752,7 +752,7 @@ static inline void page_has_been_accessed(PGC *cache, PGC_PAGE *page) {
// ----------------------------------------------------------------------------
// state transitions

static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transition_lock, bool having_clean_lock, WAITING_QUEUE_PRIORITY prio) {
static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transition_lock, bool having_clean_lock, WAITQ_PRIORITY prio) {
if(!having_transition_lock)
page_transition_lock(cache, page);

Expand All @@ -777,7 +777,7 @@ static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transi
page_transition_unlock(cache, page);
}

static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lock, WAITING_QUEUE_PRIORITY prio) {
static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lock, WAITQ_PRIORITY prio) {
if(!having_hot_lock)
// to avoid deadlocks, we have to get the hot lock before the page transition
// since this is what all_hot_to_dirty() does
Expand Down Expand Up @@ -819,7 +819,7 @@ static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lo
page_transition_unlock(cache, page);
}

static inline void page_set_hot(PGC *cache, PGC_PAGE *page, WAITING_QUEUE_PRIORITY prio) {
static inline void page_set_hot(PGC *cache, PGC_PAGE *page, WAITQ_PRIORITY prio) {
page_transition_lock(cache, page);

PGC_PAGE_FLAGS flags = page_get_status_flags(page);
Expand Down Expand Up @@ -1222,7 +1222,7 @@ static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evic
timing_dbengine_evict_init();

if(!all_of_them && !wait) {
if(!pgc_queue_trylock(cache, &cache->clean)) {
if(!pgc_queue_trylock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_EVICTORS)) {
stopped_before_finishing = true;
goto premature_exit;
}
Expand Down Expand Up @@ -1759,7 +1759,7 @@ static bool flush_pages(PGC *cache, size_t max_flushes, Word_t section, bool wai
// we have been called from a data collection thread
// let's not waste its time...

if(!pgc_queue_trylock(cache, &cache->dirty)) {
if(!pgc_queue_trylock(cache, &cache->dirty, PGC_QUEUE_LOCK_PRIO_FLUSHERS)) {
// we would block, so give up...
return false;
}
Expand Down Expand Up @@ -1942,7 +1942,7 @@ static bool flush_pages(PGC *cache, size_t max_flushes, Word_t section, bool wai
, "DBENGINE CACHE: flushing pages mismatch");

if(!all_of_them && !wait) {
if(pgc_queue_trylock(cache, &cache->dirty))
if(pgc_queue_trylock(cache, &cache->dirty, PGC_QUEUE_LOCK_PRIO_FLUSHERS))
have_dirty_lock = true;

else {
Expand Down Expand Up @@ -2099,9 +2099,9 @@ PGC *pgc_create(const char *name,


#if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE)
cache->hot.wq = waiting_queue_create();
cache->dirty.wq = waiting_queue_create();
cache->clean.wq = waiting_queue_create();
waitq_init(&cache->hot.wq);
waitq_init(&cache->dirty.wq);
waitq_init(&cache->clean.wq);
#else
spinlock_init(&cache->hot.spinlock);
spinlock_init(&cache->dirty.spinlock);
Expand Down Expand Up @@ -2174,9 +2174,9 @@ void pgc_destroy(PGC *cache) {
}

#if defined(PGC_QUEUE_LOCK_AS_WAITING_QUEUE)
waiting_queue_destroy(cache->hot.wq);
waiting_queue_destroy(cache->dirty.wq);
waiting_queue_destroy(cache->clean.wq);
waitq_destroy(&cache->hot.wq);
waitq_destroy(&cache->dirty.wq);
waitq_destroy(&cache->clean.wq);
#endif
freez(cache->index);
freez(cache);
Expand Down Expand Up @@ -2453,7 +2453,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_indexing_started, 1, __ATOMIC_RELAXED);
p2_add_fetch(&cache->stats.p2_workers_jv2_flush, 1);

pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_OTHERS);
pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_LOW);

Pvoid_t JudyL_metrics = NULL;
Pvoid_t JudyL_extents_pos = NULL;
Expand Down Expand Up @@ -2584,7 +2584,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
}

yield_the_processor(); // do not lock too aggressively
pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_OTHERS);
pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_LOW);
}

spinlock_unlock(&sp->migration_to_v2_spinlock);
Expand All @@ -2608,7 +2608,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_

// balance-parents: transition from hot to clean directly
yield_the_processor(); // do not lock too aggressively
page_set_clean(cache, pi->page, true, false, PGC_QUEUE_LOCK_PRIO_OTHERS);
page_set_clean(cache, pi->page, true, false, PGC_QUEUE_LOCK_PRIO_LOW);
page_transition_unlock(cache, pi->page);
page_release(cache, pi->page, true);

Expand Down Expand Up @@ -2659,7 +2659,7 @@ void pgc_open_evict_clean_pages_of_datafile(PGC *cache, struct rrdengine_datafil
size_t pgc_count_clean_pages_having_data_ptr(PGC *cache, Word_t section, void *ptr) {
size_t found = 0;

pgc_queue_lock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_OTHERS);
pgc_queue_lock(cache, &cache->clean, PGC_QUEUE_LOCK_PRIO_LOW);
for(PGC_PAGE *page = cache->clean.base; page ;page = page->link.next)
found += (page->data == ptr && page->section == section) ? 1 : 0;
pgc_queue_unlock(cache, &cache->clean);
Expand All @@ -2670,7 +2670,7 @@ size_t pgc_count_clean_pages_having_data_ptr(PGC *cache, Word_t section, void *p
size_t pgc_count_hot_pages_having_data_ptr(PGC *cache, Word_t section, void *ptr) {
size_t found = 0;

pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_OTHERS);
pgc_queue_lock(cache, &cache->hot, PGC_QUEUE_LOCK_PRIO_LOW);
Pvoid_t *section_pages_pptr = JudyLGet(cache->hot.sections_judy, section, PJE0);
if(section_pages_pptr) {
struct section_pages *sp = *section_pages_pptr;
Expand Down
2 changes: 1 addition & 1 deletion src/libnetdata/libnetdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extern const char *netdata_configured_host_prefix;
#include "locks/spinlock.h"
#include "locks/rw-spinlock.h"
#include "completion/completion.h"
#include "waiting-queue/waiting-queue.h"
#include "libnetdata/locks/waitq.h"
#include "clocks/clocks.h"
#include "simple_pattern/simple_pattern.h"
#include "libnetdata/log/nd_log.h"
Expand Down
2 changes: 1 addition & 1 deletion src/libnetdata/locks/spinlock.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void spinlock_lock_with_trace(SPINLOCK *spinlock, const char *func) {
// Backoff strategy with exponential growth
spins++;
microsleep(usec);
usec = usec > MAX_USEC ? MAX_USEC : usec * 2;
usec = usec >= MAX_USEC ? MAX_USEC : usec * 2;
}

#ifdef NETDATA_INTERNAL_CHECKS
Expand Down
Loading

0 comments on commit a801fd3

Please sign in to comment.