Skip to content

Commit

Permalink
sketch event based progress loop
Browse files Browse the repository at this point in the history
- disabled for now, need to make some other adjustments
  • Loading branch information
carns committed Sep 9, 2024
1 parent d7bb0b6 commit 74ef181
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 68 deletions.
129 changes: 64 additions & 65 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -2056,13 +2056,15 @@ void __margo_hg_progress_fn(void* foo)
void __margo_hg_event_progress_fn(void* foo)
{
int ret;
unsigned int actual_count;
unsigned int progress_count;
unsigned int trigger_count;
struct margo_instance* mid = (struct margo_instance*)foo;
size_t size;
unsigned int hg_progress_timeout;
unsigned int pending;
int epfd;
struct epoll_event epev = {0};
// size_t size;
// unsigned int hg_progress_timeout;
// unsigned int pending;
int epfd;
struct epoll_event epevs[4] = {0};
int i;

/* set up an epoll file descriptor that will be used to multiplex
* events. It may need to watch Mercury, the Argobots pool used to
Expand All @@ -2071,11 +2073,22 @@ void __margo_hg_event_progress_fn(void* foo)
/* TODO: error handling */
epfd = epoll_create(1);
assert(epfd > -1);
epev.events = EPOLLIN;
epev.data.fd = mid->abt.pools[mid->progress_pool_idx].efd;
fprintf(stderr, "DBG: progress pool efd: %d\n", epev.data.fd);
/* TODO: could use ptr, u32, or u64 in data */
epoll_ctl(epfd, EPOLL_CTL_ADD, epev.data.fd, &epev);

/* watch progress pool */
epevs[0].events = EPOLLIN | EPOLLET;
epevs[0].data.u32 = 0;
// fprintf(stderr, "DBG: progress pool efd: %d\n",
// mid->abt.pools[mid->progress_pool_idx].efd);
epoll_ctl(epfd, EPOLL_CTL_ADD, mid->abt.pools[mid->progress_pool_idx].efd,
&epevs[0]);

/* watch Mercury */
epevs[1].events = EPOLLIN;
epevs[1].data.u32 = 1;
// fprintf(stderr, "DBG: mercury event fd: %d\n",
// HG_Event_get_wait_fd(mid->hg.hg_context));
epoll_ctl(epfd, EPOLL_CTL_ADD, HG_Event_get_wait_fd(mid->hg.hg_context),
&epevs[1]);

/* TODO: for now this is just a stripped down version of the normal loop
* to use as a starting point.
Expand All @@ -2093,68 +2106,54 @@ void __margo_hg_event_progress_fn(void* foo)
* on epoll.
*/
while (!mid->hg_progress_shutdown_flag) {
do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
ret = HG_Trigger(mid->hg.hg_context, 0, 1, &actual_count);
} while ((ret == HG_SUCCESS) && actual_count
&& !mid->hg_progress_shutdown_flag);
/* once we have processed callbacks, give the ES an opportunity to
* run other ULTs if it needs to.
*/
ABT_thread_yield();

/* Check to see if there are any runnable ULTs in the pool now. If
* so, then we yield here to allow them a chance to execute.
* We check here because new ULTs may now be elegible as a result of
* being spawned by the trigger, but existing ones also may have been
* activated by an external event.
*
* NOTE: the output size value does not count the calling ULT itself,
* because it is not technically in the pool as a runnable thread at
* the moment.
*/
ABT_pool progress_pool = MARGO_PROGRESS_POOL(mid);
ABT_pool_get_size(progress_pool, &size);
if (size) ABT_thread_yield();

/* Are there any other threads in this pool that *might* need to
* execute at some point in the future? If so, then it's not
* necessarily safe for Mercury to sleep here in progress. It
* doesn't matter whether they are runnable now or not, because an
* external event could resume them.
*
* NOTE: we use ABT_pool_get_total_size() rather than
* ABT_pool_get_size() in order to include suspended ULTs in our
* count. Note that this function *does* count the caller, so it
* will always be at least one, unlike ABT_pool_get_size().
*/
ABT_pool_get_total_size(progress_pool, &size);

/* Are there any RPCs in flight, regardless of what pool they were
* issued to? If so, then we also cannot block in Mercury, because
* they may issue self forward() calls that cannot complete until we
* get through this progress/trigger cycle
/* TODO: reorganize this loop; expected triggers first and lest
* dependency on counts. For now mimic'ing HG test program examples
* to minimize chance of bugs
*/
ABT_mutex_lock(mid->pending_operations_mtx);
pending = mid->pending_operations;
ABT_mutex_unlock(mid->pending_operations_mtx);

if (pending || size > 1) {
hg_progress_timeout = 0;
} else {
hg_progress_timeout = mid->hg_progress_timeout_ub;
ABT_thread_yield();
if (!HG_Event_ready(mid->hg.hg_context)) {
/* TODO: use mid->hg_progress_timeout_ub; check type/units */
// fprintf(stderr, "DBG: calling epoll_wait()\n");
ABT_thread_yield();
ret = epoll_wait(epfd, epevs, 1, 1000);
/* TODO: error handling */
assert(ret > -1);
for (i = 0; i < ret; i++) {
if (epevs[i].data.u32 == 0) {
// fprintf(stderr, "DBG: pool needs attention.\n");
/* the progress pool has something new; yield to let it
* run
*/
} else if (epevs[i].data.u32 == 1) {
// fprintf(stderr, "DBG: mercury needs attention.\n");
} else {
assert(0);
}
}
}

ret = HG_Progress(mid->hg.hg_context, hg_progress_timeout);
// fprintf(stderr, "DBG: calling HG_Event_progress()\n");
ret = HG_Event_progress(mid->hg.hg_context, &progress_count);
if (ret != HG_SUCCESS && ret != HG_TIMEOUT) {
/* TODO: error handling */
MARGO_CRITICAL(mid,
"unexpected return code (%d: %s) from HG_Progress()",
ret, HG_Error_to_string(ret));
MARGO_CRITICAL(
mid, "unexpected return code (%d: %s) from HG_Event_progress()",
ret, HG_Error_to_string(ret));
assert(0);
}
if (!progress_count) continue;

// fprintf(stderr, "DBG: calling HG_Event_trigger()\n");
ret = HG_Event_trigger(mid->hg.hg_context, progress_count,
&trigger_count);
if (ret == HG_SUCCESS && trigger_count) {
/* once we have processed callbacks, give the ES an opportunity to
* run other ULTs if it needs to.
*/
// fprintf(stderr, "DBG: triggered something.\n");
ABT_thread_yield();
}
}

close(epfd);
Expand Down
5 changes: 2 additions & 3 deletions src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,8 @@ margo_instance_id margo_init_ext(const char* address,
= MARGO_REGISTER(mid, "__identity__", void, hg_string_t, NULL);

MARGO_TRACE(0, "Starting progress loop");
ret = ABT_thread_create(MARGO_PROGRESS_POOL(mid),
__margo_hg_event_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
ret = ABT_thread_create(MARGO_PROGRESS_POOL(mid), __margo_hg_progress_fn,
mid, ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if (ret != ABT_SUCCESS) goto error;

mid->refcount = 1;
Expand Down

0 comments on commit 74ef181

Please sign in to comment.