From e16bd45668d61b118d92f5b6467d18b009b4f91d Mon Sep 17 00:00:00 2001 From: Phil Carns Date: Tue, 17 Sep 2024 17:24:09 -0400 Subject: [PATCH] experimental changes to minimize eventfd activity - revert prio_pool to use condition variable at all times for its own internal purposes - add user-space tracking of eventfd state to avoid writing events when it is already activated - clear eventfd only when it has produced epoll events --- src/margo-abt-config.c | 14 +++--- src/margo-abt-config.h | 3 +- src/margo-core.c | 15 ++++-- src/margo-init.c | 2 +- src/margo-prio-pool.c | 105 ++++++++++------------------------------- src/margo-prio-pool.h | 5 ++ 6 files changed, 52 insertions(+), 92 deletions(-) diff --git a/src/margo-abt-config.c b/src/margo-abt-config.c index d0c3217..24a16c3 100644 --- a/src/margo-abt-config.c +++ b/src/margo-abt-config.c @@ -108,7 +108,6 @@ bool __margo_abt_pool_init_from_json(const json_object_t* jpool, access = ABT_POOL_ACCESS_SPMC; } - pool->efd = -1; int ret; if (kind != (ABT_pool_kind)(-1)) { if (!pool->access) pool->access = strdup("mpmc"); @@ -130,8 +129,9 @@ bool __margo_abt_pool_init_from_json(const json_object_t* jpool, * to rearrange code a little to make sure we can determine those * things before initializing pool. */ - pool->efd = eventfd(0, EFD_NONBLOCK); - if (pool->efd < 0) { + pool->efd.efd = eventfd(0, EFD_NONBLOCK); + pool->efd.efd_count = 0; + if (pool->efd.efd < 0) { margo_error(mid, "eventfd failed with error code %d", errno); } ret = ABT_pool_config_create(&prio_pool_config); @@ -139,9 +139,11 @@ bool __margo_abt_pool_init_from_json(const json_object_t* jpool, margo_error(mid, "ABT_pool_config_create failed with error code %d", ret); } - ret = ABT_pool_config_set(prio_pool_config, - MARGO_PRIO_POOL_CONFIG_KEY_EFD, - ABT_POOL_CONFIG_INT, &pool->efd); + // fprintf(stderr, "DBG: config_set with %p\n", &pool->efd); + void* foo = &pool->efd; + ret = ABT_pool_config_set(prio_pool_config, + MARGO_PRIO_POOL_CONFIG_KEY_EFD, + ABT_POOL_CONFIG_PTR, &foo); if (ret != ABT_SUCCESS) { margo_error(mid, "ABT_pool_config_set failed with error code %d", ret); diff --git a/src/margo-abt-config.h b/src/margo-abt-config.h index 62133f1..1852643 100644 --- a/src/margo-abt-config.h +++ b/src/margo-abt-config.h @@ -62,8 +62,7 @@ typedef struct margo_abt_pool { bool margo_free_flag; /* flag if Margo is responsible for freeing */ bool used_by_primary; /* flag indicating the this pool is used by the primary ES */ - int efd; /* event fd for notification of transition from idle to busy; -1 if - not supported or not enabled */ + struct margo_prio_pool_efd efd; } margo_abt_pool_t; bool __margo_abt_pool_validate_json(const json_object_t* config, diff --git a/src/margo-core.c b/src/margo-core.c index c44e676..740cc20 100644 --- a/src/margo-core.c +++ b/src/margo-core.c @@ -2084,9 +2084,9 @@ void __margo_hg_event_progress_fn(void* foo) epevs[0].events = EPOLLIN; epevs[0].data.u32 = 0; // fprintf(stderr, "DBG: progress pool efd: %d\n", - // mid->abt.pools[mid->progress_pool_idx].efd); - epoll_ctl(epfd, EPOLL_CTL_ADD, mid->abt.pools[mid->progress_pool_idx].efd, - &epevs[0]); + // mid->abt.pools[mid->progress_pool_idx].efd.efd); + epoll_ctl(epfd, EPOLL_CTL_ADD, + mid->abt.pools[mid->progress_pool_idx].efd.efd, &epevs[0]); /* watch Mercury */ epevs[1].events = EPOLLIN; @@ -2127,6 +2127,7 @@ void __margo_hg_event_progress_fn(void* foo) * sure we can rely on notifications */ // fprintf(stderr, "DBG: calling epoll_wait()\n"); + // fprintf(stderr, "DBG: progress waiting.\n"); ret = epoll_wait(epfd, epevs, 4, -1); if (ret == 0) { /* didn't detect anything that needs attention; continue @@ -2139,6 +2140,14 @@ void __margo_hg_event_progress_fn(void* foo) for (i = 0; i < ret; i++) { switch (epevs[i].data.u32) { case 0: /* pool needs attention */ + if (ret == 1) { + uint64_t tmp_val; + eventfd_read( + mid->abt.pools[mid->progress_pool_idx].efd.efd, + &tmp_val); + mid->abt.pools[mid->progress_pool_idx].efd.efd_count + = 0; + } break; case 1: /* mercury needs attention */ mercury_attention_flag = 1; diff --git a/src/margo-init.c b/src/margo-init.c index d78fdbc..e33793a 100644 --- a/src/margo-init.c +++ b/src/margo-init.c @@ -376,7 +376,7 @@ margo_instance_id margo_init_ext(const char* address, * supports event fds and b) the progress pool supports event fds. * Otherwise we need to stick to the traditional progress loop method. */ - if (mid->abt.pools[mid->progress_pool_idx].efd > -1 + if (mid->abt.pools[mid->progress_pool_idx].efd.efd > -1 && HG_Event_get_wait_fd(mid->hg.hg_context) > -1) { MARGO_TRACE(0, "Using event-driven progress loop"); margo_progress_fn_ptr = __margo_hg_event_progress_fn; diff --git a/src/margo-prio-pool.c b/src/margo-prio-pool.c index 6d947af..6063ffc 100644 --- a/src/margo-prio-pool.c +++ b/src/margo-prio-pool.c @@ -10,6 +10,7 @@ #include #include #include +#include #include "margo-prio-pool.h" @@ -85,14 +86,13 @@ static inline unit_t* queue_pop(queue_t* p_queue) } typedef struct pool_t { - queue_t high_prio_queue; - queue_t low_prio_queue; - int num; - int cnt; - pthread_mutex_t mutex; - pthread_cond_t cond; - int efd; - int epfd; + queue_t high_prio_queue; + queue_t low_prio_queue; + int num; + int cnt; + pthread_mutex_t mutex; + pthread_cond_t cond; + struct margo_prio_pool_efd* efd; } pool_t; static ABT_unit pool_unit_create_from_thread(ABT_thread thread) @@ -113,35 +113,21 @@ static void pool_unit_free(ABT_unit* p_unit) static int pool_init(ABT_pool pool, ABT_pool_config config) { - struct epoll_event epev = {0}; - pool_t* p_pool = (pool_t*)calloc(1, sizeof(pool_t)); + pool_t* p_pool = (pool_t*)calloc(1, sizeof(pool_t)); p_pool->high_prio_queue.p_tail = NULL; p_pool->high_prio_queue.p_head = NULL; p_pool->low_prio_queue.p_tail = NULL; p_pool->low_prio_queue.p_head = NULL; p_pool->num = 0; p_pool->cnt = 0; - p_pool->efd = -1; - p_pool->epfd = -1; + p_pool->efd = NULL; pthread_mutex_init(&p_pool->mutex, NULL); pthread_cond_init(&p_pool->cond, NULL); - /* NOTE: we don't check return code here because efd will still be -1 if - * it fails. - */ - ABT_pool_config_get(config, MARGO_PRIO_POOL_CONFIG_KEY_EFD, NULL, - &p_pool->efd); - // fprintf(stderr, "DBG: prio_pool got efd %d\n", p_pool->efd); - - /* if we have been asked to trigger an eventfd, then configure an epoll - * set so that the pool can also monitor it. - */ - /* TODO: error handling */ - if (p_pool->efd > -1) { - p_pool->epfd = epoll_create(1); - epev.events = EPOLLIN; - epev.data.fd = p_pool->efd; - epoll_ctl(p_pool->epfd, EPOLL_CTL_ADD, p_pool->efd, &epev); - } + int ret = ABT_pool_config_get(config, MARGO_PRIO_POOL_CONFIG_KEY_EFD, NULL, + &p_pool->efd); + assert(ret == 0); + // fprintf(stderr, "DBG: prio_pool got efd %p\n", p_pool->efd); + assert(p_pool->efd); ABT_pool_set_data(pool, (void*)p_pool); return ABT_SUCCESS; @@ -192,11 +178,13 @@ static void pool_push(ABT_pool pool, ABT_unit unit) /* only signal on the transition from empty to non-empty; in other cases * there will be no one waiting on the condition. */ - if (p_pool->num == 1) { - if (p_pool->epfd) - eventfd_write(p_pool->efd, 1); - else - pthread_cond_signal(&p_pool->cond); + // fprintf(stderr, "DBG: pool signaling\n"); + pthread_cond_signal(&p_pool->cond); + if (!p_pool->efd->efd_count + && sched_counter < SCHED_COUNTER_PRIORITY_LIMIT) { + // fprintf(stderr, "DBG: pool also eventfd'ing\n"); + eventfd_write(p_pool->efd->efd, 1); + p_pool->efd->efd_count++; } pthread_mutex_unlock(&p_pool->mutex); } @@ -236,10 +224,6 @@ static ABT_unit pool_pop(ABT_pool pool) } } while (0); if (p_unit) p_pool->num--; - if (p_pool->num == 0) { - /* clear eventfd state when we transition to empty */ - eventfd_read(p_pool->efd, &tmp_val); - } pthread_mutex_unlock(&p_pool->mutex); return p_unit ? (ABT_unit)p_unit : ABT_UNIT_NULL; } @@ -260,43 +244,9 @@ static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs) pthread_mutex_lock(&p_pool->mutex); unit_t* p_unit = NULL; if (p_pool->num == 0) { - if (p_pool->efd > -1) { - struct epoll_event event; - int timeout_ms; - int ret; - /* if the pool is configured to signal an event file descriptor - * when it transitions to non-idle, then go ahead and reuse it for - * internal signalling as well. - */ - if (!abstime_secs) { - timeout_ms = 0; - } else { - /* TODO: if we used the new pop_wait() interface we could - * avoid two time calls per iteration. - */ - timeout_ms = (int)((ABT_get_wtime() - abstime_secs) * 1000.0); - if (timeout_ms < 0) timeout_ms = 100; - } - // fprintf(stderr, - // "DBG: calling epoll_wait with timeout %d (abstime_secs - // was " - // "%f)\n", - // timeout_ms, abstime_secs); - /* TODO: error handling */ - /* NOTE: we have to drop mutex while blocking in epoll to avoid - * deadlock. Note that we only set and clear events with the - * lock held. - */ - pthread_mutex_unlock(&p_pool->mutex); - ret = epoll_wait(p_pool->epfd, &event, 1, timeout_ms); - pthread_mutex_lock(&p_pool->mutex); - // fprintf(stderr, "DBG: epoll_wait returned %d, num %d\n", ret, - // p_pool->num); - } else { - struct timespec ts; - convert_double_sec_to_timespec(&ts, abstime_secs); - pthread_cond_timedwait(&p_pool->cond, &p_pool->mutex, &ts); - } + struct timespec ts; + convert_double_sec_to_timespec(&ts, abstime_secs); + pthread_cond_timedwait(&p_pool->cond, &p_pool->mutex, &ts); } do { if ((p_pool->cnt++ & 0xFF) != 0) { @@ -312,10 +262,6 @@ static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs) } } while (0); if (p_unit) p_pool->num--; - if (p_pool->num == 0) { - /* clear eventfd state when we transition to empty */ - eventfd_read(p_pool->efd, &tmp_val); - } pthread_mutex_unlock(&p_pool->mutex); return p_unit ? (ABT_unit)p_unit : ABT_UNIT_NULL; } @@ -326,7 +272,6 @@ static int pool_free(ABT_pool pool) ABT_pool_get_data(pool, (void**)&p_pool); pthread_mutex_destroy(&p_pool->mutex); pthread_cond_destroy(&p_pool->cond); - if (p_pool->epfd > -1) close(p_pool->epfd); free(p_pool); return ABT_SUCCESS; diff --git a/src/margo-prio-pool.h b/src/margo-prio-pool.h index 3e2cae6..40b6d53 100644 --- a/src/margo-prio-pool.h +++ b/src/margo-prio-pool.h @@ -12,9 +12,14 @@ extern "C" { #endif #include +#include void margo_create_prio_pool_def(ABT_pool_def* p_def); #define MARGO_PRIO_POOL_CONFIG_KEY_EFD 0 +struct margo_prio_pool_efd { + _Atomic int efd_count; + int efd; +}; #ifdef __cplusplus }