Skip to content

Commit

Permalink
added margo_set_progress_when_needed
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Nov 1, 2024
1 parent ac07b7d commit ff65a22
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 11 deletions.
14 changes: 14 additions & 0 deletions include/margo.h
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,20 @@ int margo_set_progress_timeout_ub_msec(margo_instance_id mid, unsigned timeout);
int margo_get_progress_timeout_ub_msec(margo_instance_id mid,
unsigned* timeout);

/**
* @brief Set the progress mode. By default, the progress ULT will run
* whenever it is scheduled, even if there aren't any on-going operations.
* For clients, it may be useful to prevent it from being scheduled if
* there is no reason for making progress (i.e. there is no on-going RPC).
* Setting when_needed to true does just that.
*
* @param mid Margo instance.
* @param when_needed Progress only when needed.
*
* @return 0 in case of success, -1 otherwise.
*/
int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed);

/**
* @brief Sets configurable parameters/hints.
*
Expand Down
36 changes: 31 additions & 5 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,13 @@ void margo_finalize(margo_instance_id mid)

/* tell progress thread to wrap things up */
mid->hg_progress_shutdown_flag = 1;
PROGRESS_NEEDED_INCR(mid);

/* wait for it to shutdown cleanly */
MARGO_TRACE(mid, "Waiting for progress thread to complete");
ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid);
PROGRESS_NEEDED_DECR(mid);
mid->refcount--;

ABT_mutex_lock(mid->finalize_mutex);
Expand Down Expand Up @@ -701,11 +703,13 @@ margo_addr_lookup(margo_instance_id mid, const char* name, hg_addr_t* addr)

hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
(void*)eventual, name, HG_OP_ID_IGNORE);
PROGRESS_NEEDED_INCR(mid);
if (hret == HG_SUCCESS) {
ABT_eventual_wait(eventual, (void**)&evt);
*addr = evt->addr;
hret = evt->hret;
}
PROGRESS_NEEDED_DECR(mid);

ABT_eventual_free(&eventual);
#endif
Expand Down Expand Up @@ -902,6 +906,8 @@ static hg_return_t margo_cb(const struct hg_cb_info* info)
// handed to the user, hence it has to be freed here.
if (req->kind == MARGO_REQ_CALLBACK) free(req);

PROGRESS_NEEDED_DECR(mid);

return HG_SUCCESS;
}

Expand Down Expand Up @@ -1115,6 +1121,7 @@ static hg_return_t margo_provider_iforward_internal(
req->timer = NULL;
// LCOV_EXCL_END
}
PROGRESS_NEEDED_INCR(mid);

finish:

Expand Down Expand Up @@ -1319,6 +1326,8 @@ margo_irespond_internal(hg_handle_t handle,

hret = HG_Respond(handle, margo_cb, (void*)req, (void*)&respond_args);

if (hret == HG_SUCCESS) { PROGRESS_NEEDED_INCR(mid); }

finish:

/* monitoring */
Expand Down Expand Up @@ -1677,6 +1686,7 @@ static hg_return_t margo_bulk_itransfer_internal(
hret = HG_Bulk_transfer(mid->hg.hg_context, margo_cb, (void*)req, op,
origin_addr, origin_handle, origin_offset,
local_handle, local_offset, size, HG_OP_ID_IGNORE);
if (hret == HG_SUCCESS) { PROGRESS_NEEDED_INCR(mid); }

finish:

Expand Down Expand Up @@ -1816,6 +1826,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
ABT_POOL_NULL, &sleep_timer);
margo_timer_start(sleep_timer, timeout_ms);

PROGRESS_NEEDED_INCR(mid);

Check warning on line 1829 in src/margo-core.c

View check run for this annotation

Codecov / codecov/patch

src/margo-core.c#L1829

Added line #L1829 was not covered by tests

/* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex);
while (sleep_cb_dat.is_asleep)
Expand All @@ -1828,6 +1840,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)

margo_timer_destroy(sleep_timer);

PROGRESS_NEEDED_DECR(mid);

Check warning on line 1843 in src/margo-core.c

View check run for this annotation

Codecov / codecov/patch

src/margo-core.c#L1843

Added line #L1843 was not covered by tests

/* monitoring */
__MARGO_MONITOR(mid, FN_END, sleep, monitoring_args);
}
Expand Down Expand Up @@ -1956,14 +1970,15 @@ void __margo_hg_progress_fn(void* foo)
unsigned int hg_progress_timeout;
double next_timer_exp;
unsigned int pending;
int spin_flag = 0;
int spin_flag = 0;
double spin_start_ts = 0;

while (!mid->hg_progress_shutdown_flag) {

/* Wait for progress to actually be needed */
WAIT_FOR_PROGRESS_TO_BE_NEEDED(mid);

do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
ret = margo_internal_trigger(mid, 0, 1, &actual_count);
} while ((ret == HG_SUCCESS) && actual_count
&& !mid->hg_progress_shutdown_flag);
Expand All @@ -1978,7 +1993,7 @@ void __margo_hg_progress_fn(void* foo)
/* We used a zero progress timeout (busy spinning) on the last
* iteration. See if spindown time has elapsed yet.
*/
if (((ABT_get_wtime() - spin_start_ts)*1000)
if (((ABT_get_wtime() - spin_start_ts) * 1000)
< (double)mid->hg_progress_spindown_msec) {
/* We are still in the spindown window; continue spinning
* regardless of current conditions.
Expand Down Expand Up @@ -2416,3 +2431,14 @@ hg_return_t _handler_for_NULL(hg_handle_t handle)
margo_destroy(handle);
return HG_SUCCESS;
}

int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed)
{
if (mid == MARGO_INSTANCE_NULL) return -1;
mid->progress_when_needed.flag = when_needed;
if (!when_needed) {
ABT_cond_signal(
ABT_COND_MEMORY_GET_HANDLE(&mid->progress_when_needed.cond));
}
return 0;
}
63 changes: 58 additions & 5 deletions src/margo-instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ struct margo_instance {
_Atomic unsigned hg_progress_timeout_ub;
_Atomic unsigned hg_progress_spindown_msec;

/* "when_needed" progress logic */
struct {
bool flag;
uint64_t pending;
ABT_mutex_memory mutex;
ABT_cond_memory cond;
} progress_when_needed;

uint16_t num_registered_rpcs; /* number of registered rpc's by all providers
on this instance */
/* list of rpcs registered on this instance for debugging and profiling
Expand Down Expand Up @@ -131,7 +139,8 @@ struct margo_instance {

#define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool

typedef enum margo_request_kind {
typedef enum margo_request_kind
{
MARGO_REQ_EVENTUAL,
MARGO_REQ_CALLBACK
} margo_request_kind;
Expand Down Expand Up @@ -159,10 +168,10 @@ struct margo_request_struct {
struct margo_rpc_data {
margo_instance_id mid;
_Atomic(ABT_pool) pool;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
void (*user_free_callback)(void*);
};

Expand Down Expand Up @@ -196,6 +205,50 @@ typedef struct {
char is_asleep;
} margo_thread_sleep_cb_dat;

#define PROGRESS_NEEDED_INCR(__mid__) \
do { \
bool notify = false; \
if ((__mid__)->progress_when_needed.flag) { \
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
notify = ++(__mid__)->progress_when_needed.pending == 1; \
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
} \
if (notify) { \
ABT_cond_signal(ABT_COND_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.cond)); \
} \
} while (0)

#define PROGRESS_NEEDED_DECR(__mid__) \
do { \
if ((__mid__)->progress_when_needed.flag) { \
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
--(__mid__)->progress_when_needed.pending; \
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
} \
} while (0)

#define WAIT_FOR_PROGRESS_TO_BE_NEEDED(__mid__) \
do { \
if ((__mid__)->progress_when_needed.flag) { \
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
while (!(__mid__)->progress_when_needed.pending) { \
ABT_cond_wait(ABT_COND_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.cond), \
ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
if (!(__mid__)->progress_when_needed.flag) break; \
} \
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
} \
} while (0)

#define MARGO_TRACE margo_trace
#define MARGO_DEBUG margo_debug
#define MARGO_INFO margo_info
Expand Down
17 changes: 16 additions & 1 deletion tests/unit-tests/margo-forward.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ static void* test_context_setup(const MunitParameter params[], void* user_data)
}
munit_assert_not_null(ctx->mid);

const char* progress_when_needed = munit_parameters_get(params, "progress_when_needed");
if(progress_when_needed && strcmp(progress_when_needed, "true") == 0)
munit_assert_int(
margo_set_progress_when_needed(ctx->mid, true), ==, 0);
else
munit_assert_int(
margo_set_progress_when_needed(ctx->mid, false), ==, 0);

return ctx;
}

Expand Down Expand Up @@ -666,8 +674,15 @@ static MunitResult test_provider_cforward(const MunitParameter params[],

static char* protocol_params[] = {"na+sm", NULL};
static char* progress_pool_params[] = {"fifo_wait", "prio_wait", "earliest_first", NULL};
static char* progress_when_needed_params[] = {"true", "false", NULL};

static MunitParameterEnum test_params[]
= {{"protocol", protocol_params},
{"progress_pool", progress_pool_params},
{"progress_when_needed", progress_when_needed_params},
{NULL, NULL}};

static MunitParameterEnum test_params2[]
= {{"protocol", protocol_params},
{"progress_pool", progress_pool_params},
{NULL, NULL}};
Expand All @@ -678,7 +693,7 @@ static MunitTest test_suite_tests[] = {
{(char*)"/forward_with_args", test_forward_with_args, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{(char*)"/forward_with_shim", test_forward_with_shim, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params2},
{(char*)"/forward_to_null", test_forward_to_null, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{(char*)"/self_forward_to_null", test_self_forward_to_null, test_context_setup,
Expand Down

0 comments on commit ff65a22

Please sign in to comment.