Skip to content

Commit

Permalink
complete overhaul of the timer system
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Apr 11, 2024
1 parent 8bee0a5 commit fd2769b
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 183 deletions.
9 changes: 9 additions & 0 deletions include/margo-timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ int margo_timer_create(margo_instance_id mid,
* @brief Creates a timer object and specifies the pool
* in which to run the callback.
*
* @note Passing ABT_POOL_NULL as the pool is allowed.
* In this case, the callback will be invoked directly
* within the ULT that runs the progress loop. This should
* generally be avoided unless the callback is very short,
* and does not call any margo_* or HG_* function. A typical
* example of a valid callback would be one that simply
* sets the value of an ABT_eventual, or one that submits
* a ULT and returns.
*
* @param mid Margo instance
* @param cb_fn Callback to call when the timer finishes
* @param cb_dat Callback data
Expand Down
45 changes: 26 additions & 19 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static void margo_cleanup(margo_instance_id mid)

/* shut down pending timers */
MARGO_TRACE(mid, "Cleaning up pending timers");
__margo_timer_list_free(mid, mid->timer_list);
__margo_timer_list_free(mid);

MARGO_TRACE(mid, "Destroying mutex and condition variables");
ABT_mutex_free(&mid->finalize_mutex);
Expand Down Expand Up @@ -883,12 +883,11 @@ static hg_return_t margo_cb(const struct hg_cb_info* info)

if (hret == HG_CANCELED && req->timer) { hret = HG_TIMEOUT; }

/* remove timer if there is one and it is still in place (i.e., not timed
* out) */
if (hret != HG_TIMEOUT && req->timer && req->handle) {
__margo_timer_destroy(mid, req->timer);
/* remove timer if there is one and it is still in place */
if (req->timer) {
margo_timer_cancel(req->timer);
margo_timer_destroy(req->timer);
}
if (req->timer) { free(req->timer); }

if (req->kind == MARGO_REQ_CALLBACK) {
if (req->u.callback.cb) req->u.callback.cb(req->u.callback.uargs, hret);
Expand Down Expand Up @@ -955,7 +954,6 @@ static void margo_forward_timeout_cb(void* arg)
margo_request req = (margo_request)arg;
/* cancel the Mercury op if the forward timed out */
HG_Cancel(req->handle);
return;
}

static hg_return_t margo_provider_iforward_internal(
Expand Down Expand Up @@ -1084,18 +1082,24 @@ static hg_return_t margo_provider_iforward_internal(

if (timeout_ms > 0) {
/* set a timer object to expire when this forward times out */
req->timer = calloc(1, sizeof(*(req->timer)));
if (!(req->timer)) {
hret = margo_timer_create_with_pool(mid, margo_forward_timeout_cb, req,
ABT_POOL_NULL, &req->timer);
if (hret != HG_SUCCESS) {
// LCOV_EXCL_START
MARGO_EVENTUAL_FREE(&eventual);
margo_error(mid, "in %s: could not allocate memory for timer",
__func__);
hret = HG_NOMEM_ERROR;
margo_error(mid, "in %s: could not create timer", __func__);
goto finish;
// LCOV_EXCL_END
}
hret = margo_timer_start(req->timer, timeout_ms);
if (hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_timer_destroy(req->timer);
MARGO_EVENTUAL_FREE(&eventual);
margo_error(mid, "in %s: could not start timer", __func__);
goto finish;
// LCOV_EXCL_END
}
__margo_timer_init(mid, req->timer, margo_forward_timeout_cb, req,
timeout_ms);
}

// get parent RPC id
Expand All @@ -1120,8 +1124,8 @@ static hg_return_t margo_provider_iforward_internal(
/* remove timer if HG_Forward failed */
if (hret != HG_SUCCESS && req->timer) {
// LCOV_EXCL_START
__margo_timer_destroy(mid, req->timer);
free(req->timer);
margo_timer_cancel(req->timer);
margo_timer_destroy(req->timer);
req->timer = NULL;
// LCOV_EXCL_END
}
Expand Down Expand Up @@ -1805,7 +1809,7 @@ static void margo_thread_sleep_cb(void* arg)

void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
{
margo_timer sleep_timer;
margo_timer_t sleep_timer;
margo_thread_sleep_cb_dat sleep_cb_dat;

/* monitoring */
Expand All @@ -1822,8 +1826,9 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
sleep_cb_dat.is_asleep = 1;

/* initialize the sleep timer */
__margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, &sleep_cb_dat,
timeout_ms);
margo_timer_create_with_pool(mid, margo_thread_sleep_cb, &sleep_cb_dat,
ABT_POOL_NULL, &sleep_timer);
margo_timer_start(sleep_timer, timeout_ms);

/* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex);
Expand All @@ -1835,6 +1840,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
ABT_mutex_free(&sleep_cb_dat.mutex);
ABT_cond_free(&sleep_cb_dat.cond);

margo_timer_destroy(sleep_timer);

/* monitoring */
__MARGO_MONITOR(mid, FN_END, sleep, monitoring_args);
}
Expand Down
2 changes: 1 addition & 1 deletion src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ margo_instance_id margo_init_ext(const char* address,
error:
if (mid) {
__margo_handle_cache_destroy(mid);
__margo_timer_list_free(mid, mid->timer_list);
__margo_timer_list_free(mid);

Check warning on line 372 in src/margo-init.c

View check run for this annotation

Codecov / codecov/patch

src/margo-init.c#L372

Added line #L372 was not covered by tests
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
Expand Down
14 changes: 8 additions & 6 deletions src/margo-instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <math.h>

#include "margo.h"
#include "margo-timer.h"
#include "margo-config.h"
#include "margo-abt-config.h"
#include "margo-hg-config.h"
Expand Down Expand Up @@ -129,13 +130,14 @@ struct margo_instance {

#define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool

typedef enum margo_request_kind {
typedef enum margo_request_kind
{
MARGO_REQ_EVENTUAL,
MARGO_REQ_CALLBACK
} margo_request_kind;

struct margo_request_struct {
margo_timer* timer;
margo_timer_t timer;
margo_instance_id mid;
hg_handle_t handle;
margo_monitor_data_t monitor_data;
Expand All @@ -157,10 +159,10 @@ struct margo_request_struct {
struct margo_rpc_data {
margo_instance_id mid;
_Atomic(ABT_pool) pool;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
void (*user_free_callback)(void*);
};

Expand Down
51 changes: 4 additions & 47 deletions src/margo-timer-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,19 @@
extern "C" {
#endif

typedef struct margo_timer {
margo_instance_id mid;
margo_timer_callback_fn cb_fn;
void* cb_dat;
ABT_pool pool;

ABT_mutex_memory mtx_mem;
ABT_cond_memory cv_mem;
size_t num_pending;
_Atomic bool canceled;
_Atomic bool destroy_requested;

double expiration;
struct margo_timer* next;
struct margo_timer* prev;
} margo_timer;
struct margo_timer_list;

/**
* Creates a margo_timer_list.
* @returns a new margo_timer_list, or NULL if failed
* Creates and initializes the margo_timer_list associated with the
* margo instance.
*/
struct margo_timer_list* __margo_timer_list_create();

/**
* Frees the timer list
* @param [in] timer_lst timer list to free
*/
void __margo_timer_list_free(margo_instance_id mid,
struct margo_timer_list* timer_lst);

/**
* Initializes a margo timer object which will perform some action
* after a specified time duration
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be initialized
* @param [in] cb_fn callback function for timeout action
* @param [in] cb_dat callback data passed to the callback function
* @param [in] timeout_ms timeout duration in milliseconds
*/
void __margo_timer_init(margo_instance_id mid,
margo_timer* timer,
margo_timer_callback_fn cb_fn,
void* cb_dat,
double timeout_ms);

/**
* Destroys a margo timer object which was previously initialized
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be destroyed
*/
void __margo_timer_destroy(margo_instance_id mid, margo_timer* timer);
void __margo_timer_list_free(margo_instance_id mid);

/**
* Checks for expired timers and performs specified timeout action
Expand All @@ -81,11 +43,6 @@ void __margo_check_timers(margo_instance_id mid);
int __margo_timer_get_next_expiration(margo_instance_id mid,
double* next_timer_exp);

/**
* Gets the margo_timer_list from the margo instance.
*/
struct margo_timer_list* __margo_get_timer_list(margo_instance_id mid);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit fd2769b

Please sign in to comment.