Skip to content

Commit

Permalink
experimental changes to minimize eventfd activity
Browse files Browse the repository at this point in the history
- revert prio_pool to use condition variable at all times for its own
  internal purposes
- add user-space tracking of eventfd state to avoid writing events when
  it is already activated
- clear eventfd only when it has produced epoll events
  • Loading branch information
carns committed Sep 17, 2024
1 parent 4859d06 commit e16bd45
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 92 deletions.
14 changes: 8 additions & 6 deletions src/margo-abt-config.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ bool __margo_abt_pool_init_from_json(const json_object_t* jpool,
access = ABT_POOL_ACCESS_SPMC;
}

pool->efd = -1;
int ret;
if (kind != (ABT_pool_kind)(-1)) {
if (!pool->access) pool->access = strdup("mpmc");
Expand All @@ -130,18 +129,21 @@ bool __margo_abt_pool_init_from_json(const json_object_t* jpool,
* to rearrange code a little to make sure we can determine those
* things before initializing pool.
*/
pool->efd = eventfd(0, EFD_NONBLOCK);
if (pool->efd < 0) {
pool->efd.efd = eventfd(0, EFD_NONBLOCK);
pool->efd.efd_count = 0;
if (pool->efd.efd < 0) {
margo_error(mid, "eventfd failed with error code %d", errno);
}
ret = ABT_pool_config_create(&prio_pool_config);
if (ret != ABT_SUCCESS) {
margo_error(mid, "ABT_pool_config_create failed with error code %d",
ret);
}
ret = ABT_pool_config_set(prio_pool_config,
MARGO_PRIO_POOL_CONFIG_KEY_EFD,
ABT_POOL_CONFIG_INT, &pool->efd);
// fprintf(stderr, "DBG: config_set with %p\n", &pool->efd);
void* foo = &pool->efd;
ret = ABT_pool_config_set(prio_pool_config,
MARGO_PRIO_POOL_CONFIG_KEY_EFD,
ABT_POOL_CONFIG_PTR, &foo);
if (ret != ABT_SUCCESS) {
margo_error(mid, "ABT_pool_config_set failed with error code %d",
ret);
Expand Down
3 changes: 1 addition & 2 deletions src/margo-abt-config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ typedef struct margo_abt_pool {
bool margo_free_flag; /* flag if Margo is responsible for freeing */
bool used_by_primary; /* flag indicating the this pool is used by the
primary ES */
int efd; /* event fd for notification of transition from idle to busy; -1 if
not supported or not enabled */
struct margo_prio_pool_efd efd;
} margo_abt_pool_t;

bool __margo_abt_pool_validate_json(const json_object_t* config,
Expand Down
15 changes: 12 additions & 3 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -2084,9 +2084,9 @@ void __margo_hg_event_progress_fn(void* foo)
epevs[0].events = EPOLLIN;
epevs[0].data.u32 = 0;
// fprintf(stderr, "DBG: progress pool efd: %d\n",
// mid->abt.pools[mid->progress_pool_idx].efd);
epoll_ctl(epfd, EPOLL_CTL_ADD, mid->abt.pools[mid->progress_pool_idx].efd,
&epevs[0]);
// mid->abt.pools[mid->progress_pool_idx].efd.efd);
epoll_ctl(epfd, EPOLL_CTL_ADD,
mid->abt.pools[mid->progress_pool_idx].efd.efd, &epevs[0]);

/* watch Mercury */
epevs[1].events = EPOLLIN;
Expand Down Expand Up @@ -2127,6 +2127,7 @@ void __margo_hg_event_progress_fn(void* foo)
* sure we can rely on notifications
*/
// fprintf(stderr, "DBG: calling epoll_wait()\n");
// fprintf(stderr, "DBG: progress waiting.\n");
ret = epoll_wait(epfd, epevs, 4, -1);
if (ret == 0) {
/* didn't detect anything that needs attention; continue
Expand All @@ -2139,6 +2140,14 @@ void __margo_hg_event_progress_fn(void* foo)
for (i = 0; i < ret; i++) {
switch (epevs[i].data.u32) {
case 0: /* pool needs attention */
if (ret == 1) {
uint64_t tmp_val;
eventfd_read(
mid->abt.pools[mid->progress_pool_idx].efd.efd,
&tmp_val);
mid->abt.pools[mid->progress_pool_idx].efd.efd_count
= 0;
}
break;
case 1: /* mercury needs attention */
mercury_attention_flag = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ margo_instance_id margo_init_ext(const char* address,
* supports event fds and b) the progress pool supports event fds.
* Otherwise we need to stick to the traditional progress loop method.
*/
if (mid->abt.pools[mid->progress_pool_idx].efd > -1
if (mid->abt.pools[mid->progress_pool_idx].efd.efd > -1
&& HG_Event_get_wait_fd(mid->hg.hg_context) > -1) {
MARGO_TRACE(0, "Using event-driven progress loop");
margo_progress_fn_ptr = __margo_hg_event_progress_fn;
Expand Down
105 changes: 25 additions & 80 deletions src/margo-prio-pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <assert.h>

#include "margo-prio-pool.h"

Expand Down Expand Up @@ -85,14 +86,13 @@ static inline unit_t* queue_pop(queue_t* p_queue)
}

typedef struct pool_t {
queue_t high_prio_queue;
queue_t low_prio_queue;
int num;
int cnt;
pthread_mutex_t mutex;
pthread_cond_t cond;
int efd;
int epfd;
queue_t high_prio_queue;
queue_t low_prio_queue;
int num;
int cnt;
pthread_mutex_t mutex;
pthread_cond_t cond;
struct margo_prio_pool_efd* efd;
} pool_t;

static ABT_unit pool_unit_create_from_thread(ABT_thread thread)
Expand All @@ -113,35 +113,21 @@ static void pool_unit_free(ABT_unit* p_unit)

static int pool_init(ABT_pool pool, ABT_pool_config config)
{
struct epoll_event epev = {0};
pool_t* p_pool = (pool_t*)calloc(1, sizeof(pool_t));
pool_t* p_pool = (pool_t*)calloc(1, sizeof(pool_t));
p_pool->high_prio_queue.p_tail = NULL;
p_pool->high_prio_queue.p_head = NULL;
p_pool->low_prio_queue.p_tail = NULL;
p_pool->low_prio_queue.p_head = NULL;
p_pool->num = 0;
p_pool->cnt = 0;
p_pool->efd = -1;
p_pool->epfd = -1;
p_pool->efd = NULL;
pthread_mutex_init(&p_pool->mutex, NULL);
pthread_cond_init(&p_pool->cond, NULL);
/* NOTE: we don't check return code here because efd will still be -1 if
* it fails.
*/
ABT_pool_config_get(config, MARGO_PRIO_POOL_CONFIG_KEY_EFD, NULL,
&p_pool->efd);
// fprintf(stderr, "DBG: prio_pool got efd %d\n", p_pool->efd);

/* if we have been asked to trigger an eventfd, then configure an epoll
* set so that the pool can also monitor it.
*/
/* TODO: error handling */
if (p_pool->efd > -1) {
p_pool->epfd = epoll_create(1);
epev.events = EPOLLIN;
epev.data.fd = p_pool->efd;
epoll_ctl(p_pool->epfd, EPOLL_CTL_ADD, p_pool->efd, &epev);
}
int ret = ABT_pool_config_get(config, MARGO_PRIO_POOL_CONFIG_KEY_EFD, NULL,
&p_pool->efd);
assert(ret == 0);
// fprintf(stderr, "DBG: prio_pool got efd %p\n", p_pool->efd);
assert(p_pool->efd);

ABT_pool_set_data(pool, (void*)p_pool);
return ABT_SUCCESS;
Expand Down Expand Up @@ -192,11 +178,13 @@ static void pool_push(ABT_pool pool, ABT_unit unit)
/* only signal on the transition from empty to non-empty; in other cases
* there will be no one waiting on the condition.
*/
if (p_pool->num == 1) {
if (p_pool->epfd)
eventfd_write(p_pool->efd, 1);
else
pthread_cond_signal(&p_pool->cond);
// fprintf(stderr, "DBG: pool signaling\n");
pthread_cond_signal(&p_pool->cond);
if (!p_pool->efd->efd_count
&& sched_counter < SCHED_COUNTER_PRIORITY_LIMIT) {
// fprintf(stderr, "DBG: pool also eventfd'ing\n");
eventfd_write(p_pool->efd->efd, 1);
p_pool->efd->efd_count++;
}
pthread_mutex_unlock(&p_pool->mutex);
}
Expand Down Expand Up @@ -236,10 +224,6 @@ static ABT_unit pool_pop(ABT_pool pool)
}
} while (0);
if (p_unit) p_pool->num--;
if (p_pool->num == 0) {
/* clear eventfd state when we transition to empty */
eventfd_read(p_pool->efd, &tmp_val);
}
pthread_mutex_unlock(&p_pool->mutex);
return p_unit ? (ABT_unit)p_unit : ABT_UNIT_NULL;
}
Expand All @@ -260,43 +244,9 @@ static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
pthread_mutex_lock(&p_pool->mutex);
unit_t* p_unit = NULL;
if (p_pool->num == 0) {
if (p_pool->efd > -1) {
struct epoll_event event;
int timeout_ms;
int ret;
/* if the pool is configured to signal an event file descriptor
* when it transitions to non-idle, then go ahead and reuse it for
* internal signalling as well.
*/
if (!abstime_secs) {
timeout_ms = 0;
} else {
/* TODO: if we used the new pop_wait() interface we could
* avoid two time calls per iteration.
*/
timeout_ms = (int)((ABT_get_wtime() - abstime_secs) * 1000.0);
if (timeout_ms < 0) timeout_ms = 100;
}
// fprintf(stderr,
// "DBG: calling epoll_wait with timeout %d (abstime_secs
// was "
// "%f)\n",
// timeout_ms, abstime_secs);
/* TODO: error handling */
/* NOTE: we have to drop mutex while blocking in epoll to avoid
* deadlock. Note that we only set and clear events with the
* lock held.
*/
pthread_mutex_unlock(&p_pool->mutex);
ret = epoll_wait(p_pool->epfd, &event, 1, timeout_ms);
pthread_mutex_lock(&p_pool->mutex);
// fprintf(stderr, "DBG: epoll_wait returned %d, num %d\n", ret,
// p_pool->num);
} else {
struct timespec ts;
convert_double_sec_to_timespec(&ts, abstime_secs);
pthread_cond_timedwait(&p_pool->cond, &p_pool->mutex, &ts);
}
struct timespec ts;
convert_double_sec_to_timespec(&ts, abstime_secs);
pthread_cond_timedwait(&p_pool->cond, &p_pool->mutex, &ts);
}
do {
if ((p_pool->cnt++ & 0xFF) != 0) {
Expand All @@ -312,10 +262,6 @@ static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
}
} while (0);
if (p_unit) p_pool->num--;
if (p_pool->num == 0) {
/* clear eventfd state when we transition to empty */
eventfd_read(p_pool->efd, &tmp_val);
}
pthread_mutex_unlock(&p_pool->mutex);
return p_unit ? (ABT_unit)p_unit : ABT_UNIT_NULL;
}
Expand All @@ -326,7 +272,6 @@ static int pool_free(ABT_pool pool)
ABT_pool_get_data(pool, (void**)&p_pool);
pthread_mutex_destroy(&p_pool->mutex);
pthread_cond_destroy(&p_pool->cond);
if (p_pool->epfd > -1) close(p_pool->epfd);
free(p_pool);

return ABT_SUCCESS;
Expand Down
5 changes: 5 additions & 0 deletions src/margo-prio-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ extern "C" {
#endif

#include <abt.h>
#include <stdatomic.h>

void margo_create_prio_pool_def(ABT_pool_def* p_def);
#define MARGO_PRIO_POOL_CONFIG_KEY_EFD 0
struct margo_prio_pool_efd {
_Atomic int efd_count;
int efd;
};

#ifdef __cplusplus
}
Expand Down

0 comments on commit e16bd45

Please sign in to comment.