Skip to content

Commit

Permalink
Fix metric retention check and cleanup (netdata#19278)
Browse files Browse the repository at this point in the history
fixed bug in mrg cleanup, not deleting metrics that do not have retention
fix for mrg acquired and referenced going negative

Co-authored-by: Costa Tsaousis <[email protected]>
  • Loading branch information
stelfrag and ktsaou authored Dec 23, 2024
1 parent 5897fe0 commit b985604
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/daemon/pulse/pulse-db-dbengine.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ void pulse_dbengine_do(bool extended) {
priority++;

rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_metrics, (collected_number)mrg_stats.entries);
rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_acquired, (collected_number)mrg_stats.entries_referenced);
rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_acquired, (collected_number)mrg_stats.entries_acquired);
rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_collected, (collected_number)mrg_stats.writers);
rrddim_set_by_pointer(st_mrg_metrics, rd_mrg_multiple_writers, (collected_number)mrg_stats.writers_conflicts);

Expand Down
24 changes: 16 additions & 8 deletions src/database/engine/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg)
static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) {
time_t first, last;
mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
return (!first || !last || first > last);
bool rc = (first != 0 && last != 0 && first <= last);

if(!rc && __atomic_load_n(&mrg->index[metric->partition].stats.writers, __ATOMIC_RELAXED) > 0)
rc = true;

return rc;
}

static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) {
Expand Down Expand Up @@ -217,14 +222,14 @@ static inline bool metric_acquire(MRG *mrg, METRIC *metric) {
size_t partition = metric->partition;

if(desired == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->index[partition].stats.entries_acquired, 1, __ATOMIC_RELAXED);

__atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);

return true;
}

static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) {
static inline bool metric_release(MRG *mrg, METRIC *metric) {
size_t partition = metric->partition;
REFCOUNT expected, desired;

Expand All @@ -236,15 +241,15 @@ static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_
fatal("METRIC: refcount is %d (zero or negative) during release", expected);
}

if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric))
if(expected == 1 && !acquired_metric_has_retention(mrg, metric))
desired = REFCOUNT_DELETING;
else
desired = expected - 1;

} while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));

if(desired == 0 || desired == REFCOUNT_DELETING) {
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&mrg->index[partition].stats.entries_acquired, 1, __ATOMIC_RELAXED);

if(desired == REFCOUNT_DELETING)
acquired_for_deletion_metric_delete(mrg, metric);
Expand Down Expand Up @@ -318,6 +323,9 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
metric->partition = partition;
*PValue = metric;

__atomic_add_fetch(&mrg->index[partition].stats.entries_acquired, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);

MRG_STATS_ADDED_METRIC(mrg, partition);

mrg_index_write_unlock(mrg, partition);
Expand Down Expand Up @@ -411,7 +419,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, nd_uuid_t *uuid, Word_t sect
}

inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
return metric_release(mrg, metric, true);
return metric_release(mrg, metric);
}

inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
Expand All @@ -420,7 +428,7 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
}

inline void mrg_metric_release(MRG *mrg, METRIC *metric) {
metric_release(mrg, metric, false);
metric_release(mrg, metric);
}

inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
Expand Down Expand Up @@ -717,7 +725,7 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {

for(size_t i = 0; i < mrg->partitions ;i++) {
s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
s->entries_acquired += __atomic_load_n(&mrg->index[i].stats.entries_acquired, __ATOMIC_RELAXED);
s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
Expand Down
4 changes: 2 additions & 2 deletions src/database/engine/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ struct mrg_statistics {
// --- atomic --- multiple readers / writers

CACHE_LINE_PADDING();
size_t entries_referenced;
ssize_t entries_acquired;

CACHE_LINE_PADDING();
size_t current_references;
ssize_t current_references;

CACHE_LINE_PADDING();
size_t search_hits;
Expand Down

0 comments on commit b985604

Please sign in to comment.