From b985604c88a8a3c52738d2fafc1c0adc657409be Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Mon, 23 Dec 2024 16:39:07 +0200 Subject: [PATCH] Fix metric retention check and cleanup (#19278) fixed bug in mrg cleanup, not deleting metrics that do not have retention fix for mrg acquired and referenced going negative Co-authored-by: Costa Tsaousis --- src/daemon/pulse/pulse-db-dbengine.c | 2 +- src/database/engine/metric.c | 24 ++++++++++++++++-------- src/database/engine/metric.h | 4 ++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/daemon/pulse/pulse-db-dbengine.c b/src/daemon/pulse/pulse-db-dbengine.c index a057ba70bb46e7..ba00818f9cf893 100644 --- a/src/daemon/pulse/pulse-db-dbengine.c +++ b/src/daemon/pulse/pulse-db-dbengine.c @@ -836,7 +836,7 @@ void pulse_dbengine_do(bool extended) { priority++; rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_metrics, (collected_number)mrg_stats.entries); - rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_acquired, (collected_number)mrg_stats.entries_referenced); + rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_acquired, (collected_number)mrg_stats.entries_acquired); rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_collected, (collected_number)mrg_stats.writers); rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_multiple_writers, (collected_number)mrg_stats.writers_conflicts); diff --git a/src/database/engine/metric.c b/src/database/engine/metric.c index a20a9cc054ecda..a2b99dd4931ca1 100644 --- a/src/database/engine/metric.c +++ b/src/database/engine/metric.c @@ -154,7 +154,12 @@ static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) { time_t first, last; mrg_metric_get_retention(mrg, metric, &first, &last, NULL); - return (!first || !last || first > last); + bool rc = (first != 0 && last != 0 && first <= last); + + if(!rc && __atomic_load_n(&mrg->index[metric->partition].stats.writers, __ATOMIC_RELAXED) > 0) + rc = true; + + return rc; } static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) { @@ -217,14 +222,14 @@ static inline bool metric_acquire(MRG *mrg, METRIC *metric) { size_t partition = metric->partition; if(desired == 1) - __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.entries_acquired, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); return true; } -static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) { +static inline bool metric_release(MRG *mrg, METRIC *metric) { size_t partition = metric->partition; REFCOUNT expected, desired; @@ -236,7 +241,7 @@ static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_ fatal("METRIC: refcount is %d (zero or negative) during release", expected); } - if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric)) + if(expected == 1 && !acquired_metric_has_retention(mrg, metric)) desired = REFCOUNT_DELETING; else desired = expected - 1; @@ -244,7 +249,7 @@ static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); if(desired == 0 || desired == REFCOUNT_DELETING) { - __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.entries_acquired, 1, __ATOMIC_RELAXED); if(desired == REFCOUNT_DELETING) acquired_for_deletion_metric_delete(mrg, metric); @@ -318,6 +323,9 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r metric->partition = partition; *PValue = metric; + __atomic_add_fetch(&mrg->index[partition].stats.entries_acquired, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); + MRG_STATS_ADDED_METRIC(mrg, partition); mrg_index_write_unlock(mrg, partition); @@ -411,7 +419,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, nd_uuid_t *uuid, Word_t sect } inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { - return metric_release(mrg, metric, true); + return metric_release(mrg, metric); } inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { @@ -420,7 +428,7 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { } inline void mrg_metric_release(MRG *mrg, METRIC *metric) { - metric_release(mrg, metric, false); + metric_release(mrg, metric); } inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { @@ -717,7 +725,7 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { for(size_t i = 0; i < mrg->partitions ;i++) { s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); - s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); + s->entries_acquired += __atomic_load_n(&mrg->index[i].stats.entries_acquired, __ATOMIC_RELAXED); s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); diff --git a/src/database/engine/metric.h b/src/database/engine/metric.h index b92cc399c05761..da4fbdb1f4e37e 100644 --- a/src/database/engine/metric.h +++ b/src/database/engine/metric.h @@ -31,10 +31,10 @@ struct mrg_statistics { // --- atomic --- multiple readers / writers CACHE_LINE_PADDING(); - size_t entries_referenced; + ssize_t entries_acquired; CACHE_LINE_PADDING(); - size_t current_references; + ssize_t current_references; CACHE_LINE_PADDING(); size_t search_hits;