Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added margo_set_progress_when_needed #296

Merged
merged 1 commit into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/margo.h
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,20 @@ int margo_set_progress_timeout_ub_msec(margo_instance_id mid, unsigned timeout);
int margo_get_progress_timeout_ub_msec(margo_instance_id mid,
unsigned* timeout);

/**
* @brief Set the progress mode. By default, the progress ULT will run
* whenever it is scheduled, even if there aren't any on-going operations.
* For clients, it may be useful to prevent it from being scheduled if
* there is no reason for making progress (i.e. there is no on-going RPC).
* Setting when_needed to true does just that.
*
* @param mid Margo instance.
* @param when_needed Progress only when needed.
*
* @return 0 in case of success, -1 otherwise.
*/
int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed);

/**
* @brief Sets configurable parameters/hints.
*
Expand Down
36 changes: 31 additions & 5 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,13 @@

/* tell progress thread to wrap things up */
mid->hg_progress_shutdown_flag = 1;
PROGRESS_NEEDED_INCR(mid);

/* wait for it to shutdown cleanly */
MARGO_TRACE(mid, "Waiting for progress thread to complete");
ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid);
PROGRESS_NEEDED_DECR(mid);
mid->refcount--;

ABT_mutex_lock(mid->finalize_mutex);
Expand Down Expand Up @@ -701,11 +703,13 @@

hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
(void*)eventual, name, HG_OP_ID_IGNORE);
PROGRESS_NEEDED_INCR(mid);
if (hret == HG_SUCCESS) {
ABT_eventual_wait(eventual, (void**)&evt);
*addr = evt->addr;
hret = evt->hret;
}
PROGRESS_NEEDED_DECR(mid);

ABT_eventual_free(&eventual);
#endif
Expand Down Expand Up @@ -902,6 +906,8 @@
// handed to the user, hence it has to be freed here.
if (req->kind == MARGO_REQ_CALLBACK) free(req);

PROGRESS_NEEDED_DECR(mid);

return HG_SUCCESS;
}

Expand Down Expand Up @@ -1115,6 +1121,7 @@
req->timer = NULL;
// LCOV_EXCL_END
}
PROGRESS_NEEDED_INCR(mid);

finish:

Expand Down Expand Up @@ -1319,6 +1326,8 @@

hret = HG_Respond(handle, margo_cb, (void*)req, (void*)&respond_args);

if (hret == HG_SUCCESS) { PROGRESS_NEEDED_INCR(mid); }

finish:

/* monitoring */
Expand Down Expand Up @@ -1677,6 +1686,7 @@
hret = HG_Bulk_transfer(mid->hg.hg_context, margo_cb, (void*)req, op,
origin_addr, origin_handle, origin_offset,
local_handle, local_offset, size, HG_OP_ID_IGNORE);
if (hret == HG_SUCCESS) { PROGRESS_NEEDED_INCR(mid); }

finish:

Expand Down Expand Up @@ -1816,6 +1826,8 @@
ABT_POOL_NULL, &sleep_timer);
margo_timer_start(sleep_timer, timeout_ms);

PROGRESS_NEEDED_INCR(mid);

Check warning on line 1829 in src/margo-core.c

View check run for this annotation

Codecov / codecov/patch

src/margo-core.c#L1829

Added line #L1829 was not covered by tests

/* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex);
while (sleep_cb_dat.is_asleep)
Expand All @@ -1828,6 +1840,8 @@

margo_timer_destroy(sleep_timer);

PROGRESS_NEEDED_DECR(mid);

Check warning on line 1843 in src/margo-core.c

View check run for this annotation

Codecov / codecov/patch

src/margo-core.c#L1843

Added line #L1843 was not covered by tests

/* monitoring */
__MARGO_MONITOR(mid, FN_END, sleep, monitoring_args);
}
Expand Down Expand Up @@ -1956,14 +1970,15 @@
unsigned int hg_progress_timeout;
double next_timer_exp;
unsigned int pending;
int spin_flag = 0;
int spin_flag = 0;
double spin_start_ts = 0;

while (!mid->hg_progress_shutdown_flag) {

/* Wait for progress to actually be needed */
WAIT_FOR_PROGRESS_TO_BE_NEEDED(mid);

do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
ret = margo_internal_trigger(mid, 0, 1, &actual_count);
} while ((ret == HG_SUCCESS) && actual_count
&& !mid->hg_progress_shutdown_flag);
Expand All @@ -1978,7 +1993,7 @@
/* We used a zero progress timeout (busy spinning) on the last
* iteration. See if spindown time has elapsed yet.
*/
if (((ABT_get_wtime() - spin_start_ts)*1000)
if (((ABT_get_wtime() - spin_start_ts) * 1000)
< (double)mid->hg_progress_spindown_msec) {
/* We are still in the spindown window; continue spinning
* regardless of current conditions.
Expand Down Expand Up @@ -2416,3 +2431,14 @@
margo_destroy(handle);
return HG_SUCCESS;
}

int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed)
{
if (mid == MARGO_INSTANCE_NULL) return -1;
mid->progress_when_needed.flag = when_needed;
if (!when_needed) {
ABT_cond_signal(
ABT_COND_MEMORY_GET_HANDLE(&mid->progress_when_needed.cond));
}
return 0;
}
63 changes: 58 additions & 5 deletions src/margo-instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ struct margo_instance {
_Atomic unsigned hg_progress_timeout_ub;
_Atomic unsigned hg_progress_spindown_msec;

/* "when_needed" progress logic */
struct {
bool flag;
uint64_t pending;
ABT_mutex_memory mutex;
ABT_cond_memory cond;
} progress_when_needed;

uint16_t num_registered_rpcs; /* number of registered rpc's by all providers
on this instance */
/* list of rpcs registered on this instance for debugging and profiling
Expand Down Expand Up @@ -131,7 +139,8 @@ struct margo_instance {

#define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool

typedef enum margo_request_kind {
typedef enum margo_request_kind
{
MARGO_REQ_EVENTUAL,
MARGO_REQ_CALLBACK
} margo_request_kind;
Expand Down Expand Up @@ -159,10 +168,10 @@ struct margo_request_struct {
struct margo_rpc_data {
margo_instance_id mid;
_Atomic(ABT_pool) pool;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
void (*user_free_callback)(void*);
};

Expand Down Expand Up @@ -196,6 +205,50 @@ typedef struct {
char is_asleep;
} margo_thread_sleep_cb_dat;

#define PROGRESS_NEEDED_INCR(__mid__) \
do { \
bool notify = false; \
if ((__mid__)->progress_when_needed.flag) { \
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
notify = ++(__mid__)->progress_when_needed.pending == 1; \
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
} \
if (notify) { \
ABT_cond_signal(ABT_COND_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.cond)); \
} \
} while (0)

#define PROGRESS_NEEDED_DECR(__mid__) \
do { \
if ((__mid__)->progress_when_needed.flag) { \
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
--(__mid__)->progress_when_needed.pending; \
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
} \
} while (0)

#define WAIT_FOR_PROGRESS_TO_BE_NEEDED(__mid__) \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these macros can be disabled (i.e. no locking etc.) if margo is initialized in listening mode? In that case (on the server side, when unexpected requests could arrive at any time) we always have to call progress, but on clients you only need to call progress when there is margo activity.

do { \
if ((__mid__)->progress_when_needed.flag) { \
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
while (!(__mid__)->progress_when_needed.pending) { \
ABT_cond_wait(ABT_COND_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.cond), \
ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
if (!(__mid__)->progress_when_needed.flag) break; \
} \
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \
&(__mid__)->progress_when_needed.mutex)); \
} \
} while (0)

#define MARGO_TRACE margo_trace
#define MARGO_DEBUG margo_debug
#define MARGO_INFO margo_info
Expand Down
17 changes: 16 additions & 1 deletion tests/unit-tests/margo-forward.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ static void* test_context_setup(const MunitParameter params[], void* user_data)
}
munit_assert_not_null(ctx->mid);

const char* progress_when_needed = munit_parameters_get(params, "progress_when_needed");
if(progress_when_needed && strcmp(progress_when_needed, "true") == 0)
munit_assert_int(
margo_set_progress_when_needed(ctx->mid, true), ==, 0);
else
munit_assert_int(
margo_set_progress_when_needed(ctx->mid, false), ==, 0);

return ctx;
}

Expand Down Expand Up @@ -666,8 +674,15 @@ static MunitResult test_provider_cforward(const MunitParameter params[],

static char* protocol_params[] = {"na+sm", NULL};
static char* progress_pool_params[] = {"fifo_wait", "prio_wait", "earliest_first", NULL};
static char* progress_when_needed_params[] = {"true", "false", NULL};

static MunitParameterEnum test_params[]
= {{"protocol", protocol_params},
{"progress_pool", progress_pool_params},
{"progress_when_needed", progress_when_needed_params},
{NULL, NULL}};

static MunitParameterEnum test_params2[]
= {{"protocol", protocol_params},
{"progress_pool", progress_pool_params},
{NULL, NULL}};
Expand All @@ -678,7 +693,7 @@ static MunitTest test_suite_tests[] = {
{(char*)"/forward_with_args", test_forward_with_args, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{(char*)"/forward_with_shim", test_forward_with_shim, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params2},
{(char*)"/forward_to_null", test_forward_to_null, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{(char*)"/self_forward_to_null", test_self_forward_to_null, test_context_setup,
Expand Down
Loading