diff --git a/src/margo-core.c b/src/margo-core.c index d6a90ab..318f867 100644 --- a/src/margo-core.c +++ b/src/margo-core.c @@ -1965,47 +1965,40 @@ void __margo_hg_progress_fn(void* foo) ret = margo_internal_trigger(mid, 0, 1, &actual_count); } while ((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); - /* once we have processed callbacks, give the ES an opportunity to - * run other ULTs if it needs to. + + /* Yield now to give an opportunity for this ES to either a) run other + * ULTs that are eligible in this pool or b) check for runnable ULTs + * in other pools that the ES is associated with. */ ABT_thread_yield(); - /* Check to see if there are any runnable ULTs in the pool now. If - * so, then we yield here to allow them a chance to execute. - * We check here because new ULTs may now be elegible as a result of - * being spawned by the trigger, but existing ones also may have been - * activated by an external event. - * - * NOTE: the output size value does not count the calling ULT itself, - * because it is not technically in the pool as a runnable thread at - * the moment. - */ - ABT_pool progress_pool = MARGO_PROGRESS_POOL(mid); - ABT_pool_get_size(progress_pool, &size); - if (size) ABT_thread_yield(); - - /* Are there any other threads in this pool that *might* need to - * execute at some point in the future? If so, then it's not - * necessarily safe for Mercury to sleep here in progress. It - * doesn't matter whether they are runnable now or not, because an - * external event could resume them. + /* Determine if it is reasonably safe to briefly block on Mercury + * progress. We check two conditions: are there any RPCs currently + * being processed (i.e. pending_operations) or are there any other + * threads assicated with the current pool that might become + * runnable while this thread is blocked. If either condition is + * met, then we use a zero timeout to Mercury to avoid blocking this + * ULT for too long. * - * NOTE: we use ABT_pool_get_total_size() rather than - * ABT_pool_get_size() in order to include suspended ULTs in our - * count. Note that this function *does* count the caller, so it - * will always be at least one, unlike ABT_pool_get_size(). - */ - ABT_pool_get_total_size(progress_pool, &size); - - /* Are there any RPCs in flight, regardless of what pool they were - * issued to? If so, then we also cannot block in Mercury, because - * they may issue self forward() calls that cannot complete until we - * get through this progress/trigger cycle + * Note that there is no easy way to determine if this ES is expected to + * also execute work in other pools, so we may still introduce + * hg_progress_timeout_ub of latency in that configuration scenario. + * Latency-sensitive use cases should avoid running the Margo + * progress function in pools that share execution streams with + * other pools. */ ABT_mutex_lock(mid->pending_operations_mtx); pending = mid->pending_operations; ABT_mutex_unlock(mid->pending_operations_mtx); + /* + * Note that we intentionally use get_total_size() rather than + * get_size() to make sure that we count suspended ULTs, not just + * currently runnable ULTs. The resulting count includes this ULT + * so we look for a count > 1 instead of a count > 0. + */ + ABT_pool_get_total_size(MARGO_PROGRESS_POOL(mid), &size); + if (pending || size > 1) { hg_progress_timeout = 0; } else {