From d3b09d814096654fcafd4fa5207133d44b164676 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Mon, 30 Dec 2024 10:51:10 +0000 Subject: [PATCH] nd_poll() fairness (#19298) * nd_poll() fairness * ensure we dont return events the user may have deleted * fixed warnings * fixed test compilation * minor fixes * statsd fix * track writer tid in rw spinlock * log replication counters on sender disconnect --- CMakeLists.txt | 1 + .../debugfs.plugin/module-libsensors.c | 6 +- src/libnetdata/locks/rw-spinlock.c | 32 ++- src/libnetdata/locks/rw-spinlock.h | 3 +- src/libnetdata/socket/nd-poll.c | 263 ++++++++++++------ src/libnetdata/socket/nd-poll.h | 17 +- src/libnetdata/socket/poll-events.c | 30 +- src/streaming/stream-receiver.c | 6 +- src/streaming/stream-sender.c | 12 +- src/streaming/stream-thread.c | 6 +- 10 files changed, 245 insertions(+), 131 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d2579e2aafaaae..48bd4d97a4662e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -562,6 +562,7 @@ void my_function() { ; } " HAVE_FUNC_ATTRIBUTE_NOINLINE) check_c_source_compiles(" +#include void my_exit_function() __attribute__((noreturn)); int main() { my_exit_function(); // Call the noreturn function diff --git a/src/collectors/debugfs.plugin/module-libsensors.c b/src/collectors/debugfs.plugin/module-libsensors.c index 67aa0a6e5ee401..8065eec71b6aae 100644 --- a/src/collectors/debugfs.plugin/module-libsensors.c +++ b/src/collectors/debugfs.plugin/module-libsensors.c @@ -664,7 +664,7 @@ static inline void check_value_greater_than_zero(SENSOR *s, SENSOR_SUBFEATURE_TY s->state = state; string_freez(s->log_msg); - char buf[100]; + char buf[1024]; snprintf(buf, sizeof(buf), "%s == %f (kernel driver generated)", SENSOR_SUBFEATURE_TYPE_2str(*config), status); s->log_msg = string_strdupz(buf); @@ -674,7 +674,7 @@ static inline void check_value_greater_than_zero(SENSOR *s, SENSOR_SUBFEATURE_TY static void userspace_evaluation_log_msg(SENSOR *s, const char *reading_txt, const char *condition, const char *threshold_txt, double reading, double threshold) { string_freez(s->log_msg); - char buf[200]; + char buf[1024]; snprintf(buf, sizeof(buf), "%s %f %s %s %f (userspace evaluation using kernel provided thresholds)", reading_txt, reading, condition, threshold_txt, threshold); s->log_msg = string_strdupz(buf); @@ -1210,7 +1210,7 @@ static int sensors_collect_data(void) { static bool libsensors_running = false; static int libsensors_update_every = 1; -void *libsensors_thread(void *ptr) { +void *libsensors_thread(void *ptr __maybe_unused) { int update_every = libsensors_update_every; // first try the default directory for libsensors diff --git a/src/libnetdata/locks/rw-spinlock.c b/src/libnetdata/locks/rw-spinlock.c index a8b6111fecc33a..f07274615bdd66 100644 --- a/src/libnetdata/locks/rw-spinlock.c +++ b/src/libnetdata/locks/rw-spinlock.c @@ -2,10 +2,13 @@ #include "libnetdata/libnetdata.h" +#define WRITER_LOCKED (-65536) + // ---------------------------------------------------------------------------- // rw_spinlock implementation void rw_spinlock_init_with_trace(RW_SPINLOCK *rw_spinlock, const char *func __maybe_unused) { + rw_spinlock->writer = 0; rw_spinlock->counter = 0; } @@ -14,10 +17,13 @@ bool rw_spinlock_tryread_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *f REFCOUNT expected = rw_spinlock->counter; while (true) { - if(expected < 0) + if(expected == WRITER_LOCKED) // writer is active return false; + if(expected < 0) + fatal("RW_SPINLOCK: refcount found negative, on %s(), called from %s()", __FUNCTION__, func); + // increment reader count if (__atomic_compare_exchange_n( &rw_spinlock->counter, @@ -43,9 +49,12 @@ void rw_spinlock_read_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *func REFCOUNT expected = rw_spinlock->counter; // we should not increase it if it is negative (a writer holds the lock) - if(expected < 0) expected = 0; + if(expected == WRITER_LOCKED) expected = 0; while (true) { + if(expected < 0) + fatal("RW_SPINLOCK: refcount found negative, on %s(), called from %s()", __FUNCTION__, func); + // Attempt to increment reader count if (__atomic_compare_exchange_n( &rw_spinlock->counter, @@ -59,7 +68,7 @@ void rw_spinlock_read_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *func spins++; - if (expected < 0) { + if (expected == WRITER_LOCKED) { // writer is active // we should not increase it if it is negative (a writer holds the lock) @@ -76,13 +85,9 @@ void rw_spinlock_read_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *func } void rw_spinlock_read_unlock_with_trace(RW_SPINLOCK *rw_spinlock, const char *func __maybe_unused) { -#ifndef NETDATA_INTERNAL_CHECKS - __atomic_sub_fetch(&rw_spinlock->counter, 1, __ATOMIC_RELEASE); -#else REFCOUNT x = __atomic_sub_fetch(&rw_spinlock->counter, 1, __ATOMIC_RELEASE); if (x < 0) - fatal("RW_SPINLOCK: readers is negative %d", x); -#endif + fatal("RW_SPINLOCK: readers is negative %d, on %s called from %s()", x, __FUNCTION__, func); nd_thread_rwspinlock_read_unlocked(); } @@ -94,7 +99,7 @@ bool rw_spinlock_trywrite_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char * if (!__atomic_compare_exchange_n( &rw_spinlock->counter, &expected, - -1, + WRITER_LOCKED, false, // Strong CAS __ATOMIC_ACQUIRE, // Success memory order __ATOMIC_RELAXED // Failure memory order @@ -102,6 +107,7 @@ bool rw_spinlock_trywrite_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char * return false; } + __atomic_store_n(&rw_spinlock->writer, gettid_cached(), __ATOMIC_RELAXED); worker_spinlock_contention(func, 0); nd_thread_rwspinlock_write_locked(); return true; @@ -117,7 +123,7 @@ void rw_spinlock_write_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *fun if (__atomic_compare_exchange_n( &rw_spinlock->counter, &expected, - -1, + WRITER_LOCKED, false, // Strong CAS __ATOMIC_ACQUIRE, // Success memory order __ATOMIC_RELAXED // Failure memory order @@ -129,6 +135,7 @@ void rw_spinlock_write_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *fun tinysleep(); } + __atomic_store_n(&rw_spinlock->writer, gettid_cached(), __ATOMIC_RELAXED); worker_spinlock_contention(func, spins); nd_thread_rwspinlock_write_locked(); } @@ -136,10 +143,11 @@ void rw_spinlock_write_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *fun void rw_spinlock_write_unlock_with_trace(RW_SPINLOCK *rw_spinlock, const char *func __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS int32_t x = __atomic_load_n(&rw_spinlock->counter, __ATOMIC_RELAXED); - if (x != -1) - fatal("RW_SPINLOCK: writer unlock encountered unexpected state: %d", x); + if (x != WRITER_LOCKED) + fatal("RW_SPINLOCK: writer unlock encountered unexpected state: %d, on %s() called from %s()", x, __FUNCTION__, func); #endif + __atomic_store_n(&rw_spinlock->writer, 0, __ATOMIC_RELAXED); __atomic_store_n(&rw_spinlock->counter, 0, __ATOMIC_RELEASE); // Release writer lock nd_thread_rwspinlock_write_unlocked(); } diff --git a/src/libnetdata/locks/rw-spinlock.h b/src/libnetdata/locks/rw-spinlock.h index 1d7aa62f874447..8edcfc00c6267a 100644 --- a/src/libnetdata/locks/rw-spinlock.h +++ b/src/libnetdata/locks/rw-spinlock.h @@ -7,10 +7,11 @@ #include "spinlock.h" typedef struct netdata_rw_spinlock { + pid_t writer; REFCOUNT counter; // positive is readers, negative is a writer } RW_SPINLOCK; -#define RW_SPINLOCK_INITIALIZER { .counter = 0, } +#define RW_SPINLOCK_INITIALIZER { .counter = 0, .writer = 0, } void rw_spinlock_init_with_trace(RW_SPINLOCK *rw_spinlock, const char *func); void rw_spinlock_read_lock_with_trace(RW_SPINLOCK *rw_spinlock, const char *func); diff --git a/src/libnetdata/socket/nd-poll.c b/src/libnetdata/socket/nd-poll.c index 7582a3f2a186b6..a0b7f98ec81c60 100644 --- a/src/libnetdata/socket/nd-poll.c +++ b/src/libnetdata/socket/nd-poll.c @@ -9,15 +9,28 @@ #if defined(OS_LINUX) #include +struct fd_info { + uint32_t events; + uint32_t last_served; + const void *data; +}; + +DEFINE_JUDYL_TYPED(POINTERS, struct fd_info *); + #define MAX_EVENTS_PER_CALL 100 // Event poll context struct nd_poll_t { int epoll_fd; + struct epoll_event ev[MAX_EVENTS_PER_CALL]; size_t last_pos; size_t used; - size_t nfds; + + POINTERS_JudyLSet pointers; // Judy array to store user data + + uint32_t nfds; // the number of sockets we have + uint32_t iteration_counter; }; // Initialize the event poll context @@ -33,97 +46,161 @@ nd_poll_t *nd_poll_create() { return ndpl; } -static inline void nd_poll_replace_data_on_loaded_events(nd_poll_t *ndpl, int fd __maybe_unused, void *old_data, void *new_data) { - for(size_t i = ndpl->last_pos; i < ndpl->used; i++) { - if(ndpl->ev[i].data.ptr == old_data) - ndpl->ev[i].data.ptr = new_data; - } +static inline uint32_t nd_poll_events_to_epoll_events(nd_poll_event_t events) { + uint32_t pevents = EPOLLERR | EPOLLHUP; + if (events & ND_POLL_READ) pevents |= EPOLLIN; + if (events & ND_POLL_WRITE) pevents |= EPOLLOUT; + return pevents; +} + +static inline nd_poll_event_t nd_poll_events_from_epoll_events(uint32_t events) { + nd_poll_event_t nd_poll_events = ND_POLL_NONE; + + if (events & (EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) + nd_poll_events |= ND_POLL_READ; + + if (events & (EPOLLOUT|EPOLLWRNORM|EPOLLWRBAND)) + nd_poll_events |= ND_POLL_WRITE; + + if (events & EPOLLERR) + nd_poll_events |= ND_POLL_ERROR; + + if (events & (EPOLLHUP|EPOLLRDHUP)) + nd_poll_events |= ND_POLL_HUP; + + return nd_poll_events; } // Add a file descriptor to the event poll -bool nd_poll_add(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data) { +bool nd_poll_add(nd_poll_t *ndpl, int fd, nd_poll_event_t events, const void *data) { internal_fatal(!data, "nd_poll() does not support NULL data pointers"); + struct fd_info *fdi = mallocz(sizeof(*fdi)); + fdi->data = data; + fdi->last_served = 0; + fdi->events = nd_poll_events_to_epoll_events(events); + + if(POINTERS_GET(&ndpl->pointers, fd) || !POINTERS_SET(&ndpl->pointers, fd, fdi)) { + freez(fdi); + return false; + } + struct epoll_event ev = { - .events = (events & ND_POLL_READ ? EPOLLIN : 0) | (events & ND_POLL_WRITE ? EPOLLOUT : 0), - .data.ptr = data, + .events = fdi->events, + .data.fd = fd, }; + bool rc = epoll_ctl(ndpl->epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0; - if(rc) ndpl->nfds++; + if(rc) + ndpl->nfds++; + else { + POINTERS_DEL(&ndpl->pointers, fd); + freez(fdi); + } + internal_fatal(!rc, "epoll_ctl() failed"); + return rc; } // Remove a file descriptor from the event poll -bool nd_poll_del(nd_poll_t *ndpl, int fd, void *data) { - internal_fatal(!data, "nd_poll() does not support NULL data pointers"); +bool nd_poll_del(nd_poll_t *ndpl, int fd) { + struct fd_info *fdi = POINTERS_GET(&ndpl->pointers, fd); + if(!fdi) return false; + + POINTERS_DEL(&ndpl->pointers, fd); + freez(fdi); ndpl->nfds--; // we can't check for success/failure here, because epoll() removes fds when they are closed bool rc = epoll_ctl(ndpl->epoll_fd, EPOLL_CTL_DEL, fd, NULL) == 0; internal_error(!rc, "epoll_ctl() failed (is the socket already closed)"); // this is ok if the socket is already closed - - // we may have an event pending for this fd. - // but epoll() does not give us fd in the events, - // so we use the data pointer to invalidate it - nd_poll_replace_data_on_loaded_events(ndpl, fd, data, NULL); return rc; } // Update an existing file descriptor in the event poll -bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data) { - internal_fatal(!data, "nd_poll() does not support NULL data pointers - you should also NEVER change the pointer with an update"); +bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events) { + struct fd_info *fdi = POINTERS_GET(&ndpl->pointers, fd); + if(!fdi) return false; + + fdi->events = nd_poll_events_to_epoll_events(events); struct epoll_event ev = { - .events = (events & ND_POLL_READ ? EPOLLIN : 0) | (events & ND_POLL_WRITE ? EPOLLOUT : 0), - .data.ptr = data, + .events = fdi->events, + .data.fd = fd, }; bool rc = epoll_ctl(ndpl->epoll_fd, EPOLL_CTL_MOD, fd, &ev) == 0; - internal_fatal(!rc, "epoll_ctl() failed"); + internal_fatal(!rc, "epoll_ctl() failed"); // this may happen if fd is closed return rc; } -static inline nd_poll_event_t nd_poll_events_from_epoll_events(uint32_t events) { - nd_poll_event_t nd_poll_events = ND_POLL_NONE; - - if (events & (EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) - nd_poll_events |= ND_POLL_READ; - - if (events & (EPOLLOUT|EPOLLWRNORM|EPOLLWRBAND)) - nd_poll_events |= ND_POLL_WRITE; - - if (events & EPOLLERR) - nd_poll_events |= ND_POLL_ERROR; - - if (events & (EPOLLHUP|EPOLLRDHUP)) - nd_poll_events |= ND_POLL_HUP; - - return nd_poll_events; -} - static inline bool nd_poll_get_next_event(nd_poll_t *ndpl, nd_poll_result_t *result) { while(ndpl->last_pos < ndpl->used) { - void *data = ndpl->ev[ndpl->last_pos].data.ptr; + struct fd_info *fdi = POINTERS_GET(&ndpl->pointers, ndpl->ev[ndpl->last_pos].data.fd); // Skip events that have been invalidated by nd_poll_del() - if(!data) { + if(!fdi || !fdi->data) { ndpl->last_pos++; continue; } *result = (nd_poll_result_t){ - .events = nd_poll_events_from_epoll_events(ndpl->ev[ndpl->last_pos].events), - .data = data, + .events = nd_poll_events_from_epoll_events(ndpl->ev[ndpl->last_pos].events & fdi->events), + .data = fdi->data, }; ndpl->last_pos++; + + if(!result->events) + // nd_poll_upd() may have removed some flags since we got this + continue; + + fdi->last_served = ndpl->iteration_counter; return true; } return false; } +typedef struct { + struct epoll_event event; + uint32_t last_served; +} sortable_event_t; + +static int compare_last_served(const void *a, const void *b) { + const sortable_event_t *ev_a = (const sortable_event_t *)a; + const sortable_event_t *ev_b = (const sortable_event_t *)b; + + if (ev_a->last_served < ev_b->last_served) + return -1; + if (ev_a->last_served > ev_b->last_served) + return 1; + + return 0; +} + +static void sort_events(nd_poll_t *ndpl) { + if(ndpl->used <= 1) return; + + sortable_event_t sortable_array[ndpl->used]; + for (size_t i = 0; i < ndpl->used; ++i) { + struct fd_info *fdi = POINTERS_GET(&ndpl->pointers, ndpl->ev[i].data.fd); + sortable_array[i] = (sortable_event_t){ + .event = ndpl->ev[i], + .last_served = fdi ? fdi->last_served : UINT32_MAX, + }; + } + + qsort(sortable_array, ndpl->used, sizeof(sortable_event_t), compare_last_served); + + // Reorder `ndpl->ev` based on the sorted order + for (size_t i = 0; i < ndpl->used; ++i) + ndpl->ev[i] = sortable_array[i].event; +} + // Wait for events int nd_poll_wait(nd_poll_t *ndpl, int timeout_ms, nd_poll_result_t *result) { + ndpl->iteration_counter++; + if(nd_poll_get_next_event(ndpl, result)) return 1; @@ -132,11 +209,7 @@ int nd_poll_wait(nd_poll_t *ndpl, int timeout_ms, nd_poll_result_t *result) { ndpl->last_pos = 0; ndpl->used = 0; - int maxevents = ndpl->nfds / 2; - if(maxevents > (int)_countof(ndpl->ev)) maxevents = (int)_countof(ndpl->ev); - if(maxevents < 2) maxevents = 2; - - int n = epoll_wait(ndpl->epoll_fd, &ndpl->ev[0], maxevents, timeout_ms); + int n = epoll_wait(ndpl->epoll_fd, &ndpl->ev[0], _countof(ndpl->ev), timeout_ms); if(unlikely(n <= 0)) { if(n == 0) { @@ -155,6 +228,7 @@ int nd_poll_wait(nd_poll_t *ndpl, int timeout_ms, nd_poll_result_t *result) { ndpl->used = n; ndpl->last_pos = 0; + sort_events(ndpl); if (nd_poll_get_next_event(ndpl, result)) return 1; @@ -162,16 +236,21 @@ int nd_poll_wait(nd_poll_t *ndpl, int timeout_ms, nd_poll_result_t *result) { } while(true); } +static void nd_poll_free_callback(Word_t fd __maybe_unused, struct fd_info *fdi) { + freez(fdi); +} + // Destroy the event poll context void nd_poll_destroy(nd_poll_t *ndpl) { if (ndpl) { close(ndpl->epoll_fd); + POINTERS_FREE(&ndpl->pointers, nd_poll_free_callback); freez(ndpl); } } #else -DEFINE_JUDYL_TYPED(POINTERS, void *); +DEFINE_JUDYL_TYPED(POINTERS, const void *); struct nd_poll_t { struct pollfd *fds; // Array of file descriptors @@ -206,31 +285,60 @@ static void ensure_capacity(nd_poll_t *ndpl) { ndpl->capacity = new_capacity; } -bool nd_poll_add(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data) { +static inline short int nd_poll_events_to_poll_events(nd_poll_event_t events) { + short int pevents = POLLERR | POLLHUP | POLLNVAL; + if (events & ND_POLL_READ) pevents |= POLLIN; + if (events & ND_POLL_WRITE) pevents |= POLLOUT; + return pevents; +} + +static inline nd_poll_event_t nd_poll_events_from_poll_revents(short int events) { + nd_poll_event_t nd_poll_events = ND_POLL_NONE; + + if (events & (POLLIN|POLLPRI|POLLRDNORM|POLLRDBAND)) + nd_poll_events |= ND_POLL_READ; + + if (events & (POLLOUT|POLLWRNORM|POLLWRBAND)) + nd_poll_events |= ND_POLL_WRITE; + + if (events & POLLERR) + nd_poll_events |= ND_POLL_ERROR; + + if (events & (POLLHUP|POLLRDHUP)) + nd_poll_events |= ND_POLL_HUP; + + if (events & (POLLNVAL)) + nd_poll_events |= ND_POLL_INVALID; + + return nd_poll_events; +} + +bool nd_poll_add(nd_poll_t *ndpl, int fd, nd_poll_event_t events, const void *data) { internal_fatal(POINTERS_GET(&ndpl->pointers, fd) != NULL, "File descriptor %d is already served - cannot add", fd); + if(POINTERS_GET(&ndpl->pointers, fd) || !POINTERS_SET(&ndpl->pointers, fd, data)) + return false; + ensure_capacity(ndpl); struct pollfd *pfd = &ndpl->fds[ndpl->nfds++]; pfd->fd = fd; - pfd->events = 0; - if (events & ND_POLL_READ) pfd->events |= POLLIN; - if (events & ND_POLL_WRITE) pfd->events |= POLLOUT; + pfd->events = nd_poll_events_to_poll_events(events); pfd->revents = 0; - POINTERS_SET(&ndpl->pointers, fd, data); - return true; } // Remove a file descriptor from the event poll -bool nd_poll_del(nd_poll_t *ndpl, int fd, void *data __maybe_unused) { +bool nd_poll_del(nd_poll_t *ndpl, int fd) { + if(!POINTERS_DEL(&ndpl->pointers, fd)) + return false; + for (nfds_t i = 0; i < ndpl->nfds; i++) { if (ndpl->fds[i].fd == fd) { // Remove the file descriptor by shifting the array memmove(&ndpl->fds[i], &ndpl->fds[i + 1], (ndpl->nfds - i - 1) * sizeof(struct pollfd)); ndpl->nfds--; - POINTERS_DEL(&ndpl->pointers, fd); if(i < ndpl->last_pos) ndpl->last_pos--; @@ -243,16 +351,11 @@ bool nd_poll_del(nd_poll_t *ndpl, int fd, void *data __maybe_unused) { } // Update an existing file descriptor in the event poll -bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data) { - internal_fatal(POINTERS_GET(&ndpl->pointers, fd) == NULL, "File descriptor %d is not found - cannot modify", fd); - +bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events) { for (nfds_t i = 0; i < ndpl->nfds; i++) { if (ndpl->fds[i].fd == fd) { struct pollfd *pfd = &ndpl->fds[i]; - pfd->events = 0; - if (events & ND_POLL_READ) pfd->events |= POLLIN; - if (events & ND_POLL_WRITE) pfd->events |= POLLOUT; - POINTERS_SET(&ndpl->pointers, fd, data); + pfd->events = nd_poll_events_to_poll_events(events); return true; } } @@ -261,33 +364,19 @@ bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data) { return false; } -static inline nd_poll_event_t nd_poll_events_from_poll_revents(short int events) { - nd_poll_event_t nd_poll_events = ND_POLL_NONE; - - if (events & (POLLIN|POLLPRI|POLLRDNORM|POLLRDBAND)) - nd_poll_events |= ND_POLL_READ; - - if (events & (POLLOUT|POLLWRNORM|POLLWRBAND)) - nd_poll_events |= ND_POLL_WRITE; - - if (events & POLLERR) - nd_poll_events |= ND_POLL_ERROR; - - if (events & (POLLHUP|POLLRDHUP)) - nd_poll_events |= ND_POLL_HUP; - - if (events & (POLLNVAL)) - nd_poll_events |= ND_POLL_INVALID; - - return nd_poll_events; -} - static inline bool nd_poll_get_next_event(nd_poll_t *ndpl, nd_poll_result_t *result) { for (nfds_t i = ndpl->last_pos; i < ndpl->nfds; i++) { if (ndpl->fds[i].revents != 0) { result->data = POINTERS_GET(&ndpl->pointers, ndpl->fds[i].fd); - result->events = nd_poll_events_from_poll_revents(ndpl->fds[i].revents); + if(!result->data) + continue; + + result->events = nd_poll_events_from_poll_revents(ndpl->fds[i].revents & ndpl->fds[i].events); + if(!result->events) + // nd_poll_upd() may have removed some flags since we got this + continue; + ndpl->fds[i].revents = 0; ndpl->last_pos = i + 1; diff --git a/src/libnetdata/socket/nd-poll.h b/src/libnetdata/socket/nd-poll.h index d0778ec6c144cd..f9cfe1544ccb99 100644 --- a/src/libnetdata/socket/nd-poll.h +++ b/src/libnetdata/socket/nd-poll.h @@ -20,26 +20,23 @@ typedef enum __attribute__((packed)) { typedef struct { nd_poll_event_t events; - void *data; + const void *data; } nd_poll_result_t; typedef struct nd_poll_t nd_poll_t; -nd_poll_t *nd_poll_create(); +nd_poll_t *nd_poll_create() WARNUNUSED; void nd_poll_destroy(nd_poll_t *ndpl); // the events can be updated with nd_poll_upd // the data pointer SHOULD NEVER be changed and cannot be NULL -bool nd_poll_add(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data); +bool nd_poll_add(nd_poll_t *ndpl, int fd, nd_poll_event_t events, const void *data); -// give the same data pointer used in nd_poll_add() -// otherwise, you may receive back invalid events -bool nd_poll_del(nd_poll_t *ndpl, int fd, void *data); +// delete an fd +bool nd_poll_del(nd_poll_t *ndpl, int fd); -// this is for updating events -// the data pointer must be the same used in nd_poll_add() -// to change the data pointer, delete and add the same fd again -bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events, void *data); +// update the expected events on an fd +bool nd_poll_upd(nd_poll_t *ndpl, int fd, nd_poll_event_t events); // returns -1 = error, 0 = timeout, 1 = event in result int nd_poll_wait(nd_poll_t *ndpl, int timeout_ms, nd_poll_result_t *result); diff --git a/src/libnetdata/socket/poll-events.c b/src/libnetdata/socket/poll-events.c index 244c00a2c620c0..c0ac736b318a25 100644 --- a/src/libnetdata/socket/poll-events.c +++ b/src/libnetdata/socket/poll-events.c @@ -4,7 +4,7 @@ static inline void poll_process_updated_events(POLLINFO *pi) { if(pi->events != pi->events_we_wait_for) { - if(!nd_poll_upd(pi->p->ndpl, pi->fd, pi->events, pi)) + if(!nd_poll_upd(pi->p->ndpl, pi->fd, pi->events)) nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to update socket %d to nd_poll", pi->fd); pi->events_we_wait_for = pi->events; } @@ -76,7 +76,7 @@ static inline void poll_close_fd(POLLINFO *pi, const char *func) { POLLJOB *p = pi->p; DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(p->ll, pi, prev, next); - if(!nd_poll_del(p->ndpl, pi->fd, pi)) + if(!nd_poll_del(p->ndpl, pi->fd)) // this is ok, if the socket is already closed nd_log(NDLS_DAEMON, NDLP_DEBUG, "Failed to delete socket %d from nd_poll() - called from %s() - is the socket already closed?", @@ -438,7 +438,7 @@ void poll_events(LISTEN_SOCKETS *sockets ; } else { - POLLINFO *pi = result.data; + POLLINFO *pi = (POLLINFO *)result.data; if(result.events & (ND_POLL_HUP | ND_POLL_INVALID | ND_POLL_ERROR)) poll_process_error(pi, result.events); @@ -466,8 +466,24 @@ void poll_events(LISTEN_SOCKETS *sockets } } else if (pi->flags & POLLINFO_FLAG_SERVER_SOCKET) { - if(!p.limit || p.used < p.limit) - poll_process_new_tcp_connection(pi, now); + if(pi->socktype == SOCK_DGRAM) + poll_process_udp_read(pi, now); + + else if(pi->socktype == SOCK_STREAM) { + if (!p.limit || p.used < p.limit) + poll_process_new_tcp_connection(pi, now); + } + else { + nd_log(NDLS_DAEMON, NDLP_ERR, + "POLLFD: LISTENER: server slot %zu (fd %d) connection from %s port %s using unhandled socket type %d.", + i, + pi->fd, + pi->client_ip ? pi->client_ip : "", + pi->client_port ? pi->client_port : "", + pi->socktype); + + poll_close_fd(pi, "poll_events2"); + } } else { nd_log(NDLS_DAEMON, NDLP_ERR, @@ -479,7 +495,7 @@ void poll_events(LISTEN_SOCKETS *sockets , pi->flags ); - poll_close_fd(pi, "poll_events2"); + poll_close_fd(pi, "poll_events3"); } } else { @@ -492,7 +508,7 @@ void poll_events(LISTEN_SOCKETS *sockets , (int)result.events ); - poll_close_fd(pi, "poll_events3"); + poll_close_fd(pi, "poll_events4"); } } diff --git a/src/streaming/stream-receiver.c b/src/streaming/stream-receiver.c index 5092f7c6b46d0c..4e4e7f1a904b8a 100644 --- a/src/streaming/stream-receiver.c +++ b/src/streaming/stream-receiver.c @@ -562,7 +562,7 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st META_DEL(&sth->run.meta, (Word_t)&rpt->thread.meta); rpt->thread.wanted = 0; - if(!nd_poll_del(sth->run.ndpl, rpt->sock.fd, &rpt->thread.meta)) + if(!nd_poll_del(sth->run.ndpl, rpt->sock.fd)) nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to delete receiver socket from nd_poll()"); rpt->host->stream.rcv.status.tid = 0; @@ -727,7 +727,7 @@ bool stream_receiver_send_data(struct stream_thread *sth, struct receiver_state stream_circular_buffer_del_unsafe(scb, rc, now_ut); if (!stats->bytes_outstanding) { rpt->thread.wanted = ND_POLL_READ; - if (!nd_poll_upd(sth->run.ndpl, rpt->sock.fd, rpt->thread.wanted, &rpt->thread.meta)) + if (!nd_poll_upd(sth->run.ndpl, rpt->sock.fd, rpt->thread.wanted)) nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM RCV[%zu] '%s' [from [%s]:%s]: cannot update nd_poll()", sth->id, rrdhost_hostname(rpt->host), rpt->remote_ip, rpt->remote_port); @@ -983,7 +983,7 @@ void stream_receiver_check_all_nodes_from_poll(struct stream_thread *sth, usec_t } rpt->thread.wanted = ND_POLL_READ | (stats.bytes_outstanding ? ND_POLL_WRITE : 0); - if(!nd_poll_upd(sth->run.ndpl, rpt->sock.fd, rpt->thread.wanted, &rpt->thread.meta)) + if(!nd_poll_upd(sth->run.ndpl, rpt->sock.fd, rpt->thread.wanted)) nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM RCV[%zu] '%s' [from %s]: failed to update nd_poll().", sth->id, rrdhost_hostname(rpt->host), rpt->remote_ip); diff --git a/src/streaming/stream-sender.c b/src/streaming/stream-sender.c index c59998bba0f929..9a98b99029f4d5 100644 --- a/src/streaming/stream-sender.c +++ b/src/streaming/stream-sender.c @@ -371,8 +371,10 @@ static void stream_sender_log_disconnection(struct stream_thread *sth, struct se ND_LOG_STACK_PUSH(lgs); nd_log(NDLS_DAEMON, NDLP_NOTICE, - "STREAM SND[%zu] '%s' [to %s]: sender disconnected from parent, reason: %s", - sth->id, rrdhost_hostname(s->host), s->remote_ip, stream_handshake_error_to_string(reason)); + "STREAM SND[%zu] '%s' [to %s]: sender disconnected from parent, reason: %s (replication in: %u, out: %u, pending: %u)", + sth->id, rrdhost_hostname(s->host), s->remote_ip, stream_handshake_error_to_string(reason), + s->host->stream.snd.status.replication.counter_in, s->host->stream.snd.status.replication.counter_out, + dictionary_entries(s->replication.requests)); } static void stream_sender_move_running_to_connector_or_remove(struct stream_thread *sth, struct sender_state *s, STREAM_HANDSHAKE reason, bool reconnect) { @@ -392,7 +394,7 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre META_DEL(&sth->run.meta, (Word_t)&s->thread.meta); s->thread.wanted = 0; - if(!nd_poll_del(sth->run.ndpl, s->sock.fd, &s->thread.meta)) + if(!nd_poll_del(sth->run.ndpl, s->sock.fd)) nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM SND[%zu] '%s' [to %s]: failed to delete sender socket from nd_poll()", sth->id, rrdhost_hostname(s->host), s->remote_ip); @@ -488,7 +490,7 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t n bytes_uncompressed += stats.bytes_uncompressed; s->thread.wanted = ND_POLL_READ | (stats.bytes_outstanding ? ND_POLL_WRITE : 0); - if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, s->thread.wanted, &s->thread.meta)) + if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, s->thread.wanted)) nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM SND[%zu] '%s' [to %s]: failed to update nd_poll().", sth->id, rrdhost_hostname(s->host), s->remote_ip); @@ -617,7 +619,7 @@ bool stream_sender_send_data(struct stream_thread *sth, struct sender_state *s, if (!stats->bytes_outstanding) { // we sent them all - remove ND_POLL_WRITE s->thread.wanted = ND_POLL_READ; - if (!nd_poll_upd(sth->run.ndpl, s->sock.fd, s->thread.wanted, &s->thread.meta)) + if (!nd_poll_upd(sth->run.ndpl, s->sock.fd, s->thread.wanted)) nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM SND[%zu] '%s' [to %s]: failed to update nd_poll().", sth->id, rrdhost_hostname(s->host), s->remote_ip); diff --git a/src/streaming/stream-thread.c b/src/streaming/stream-thread.c index 1c956048574720..edaf178ed524c2 100644 --- a/src/streaming/stream-thread.c +++ b/src/streaming/stream-thread.c @@ -30,7 +30,7 @@ static void stream_thread_handle_op(struct stream_thread *sth, struct stream_opc if(m->type == POLLFD_TYPE_SENDER) { if(msg->opcode & STREAM_OPCODE_SENDER_POLLOUT) { m->s->thread.wanted = ND_POLL_READ | ND_POLL_WRITE; - if(!nd_poll_upd(sth->run.ndpl, m->s->sock.fd, m->s->thread.wanted, m)) { + if(!nd_poll_upd(sth->run.ndpl, m->s->sock.fd, m->s->thread.wanted)) { nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_DEBUG, "STREAM SND[%zu] '%s' [to %s]: cannot enable output on sender socket %d.", @@ -50,7 +50,7 @@ static void stream_thread_handle_op(struct stream_thread *sth, struct stream_opc else if(m->type == POLLFD_TYPE_RECEIVER) { if (msg->opcode & STREAM_OPCODE_RECEIVER_POLLOUT) { m->rpt->thread.wanted = ND_POLL_READ | ND_POLL_WRITE; - if (!nd_poll_upd(sth->run.ndpl, m->rpt->sock.fd, m->rpt->thread.wanted, m)) { + if (!nd_poll_upd(sth->run.ndpl, m->rpt->sock.fd, m->rpt->thread.wanted)) { nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "STREAM RCV[%zu] '%s' [from [%s]:%s]: cannot enable output on receiver socket %d.", @@ -328,7 +328,7 @@ static void stream_thread_messages_resize_unsafe(struct stream_thread *sth) { static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_result_t *ev, usec_t now_ut, size_t *replay_entries) { internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); - struct pollfd_meta *m = ev->data; + struct pollfd_meta *m = (struct pollfd_meta *)ev->data; if(!m) { nd_log(NDLS_DAEMON, NDLP_ERR, "STREAM THREAD[%zu]: cannot get meta from nd_poll() event. Ignoring event.", sth->id);