Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved timers #272

Merged
merged 8 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ jobs:

- name: Build code and run unit tests
run: |
spack env activate -d tests
eval `spack env activate --sh tests`
./prepare.sh
./configure --enable-coverage --prefix=`pwd`
eval `spack env activate --sh tests` &&
./prepare.sh &&
./configure --enable-coverage --prefix=`pwd` &&
make check
make clean

- uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
gcov: true
42 changes: 39 additions & 3 deletions include/margo-timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct margo_timer* margo_timer_t;

/**
* @brief Creates a timer object.
* The callback will be submitted to the handler pool.
*
* @param mid Margo instance
* @param cb_fn Callback to call when the timer finishes
Expand All @@ -39,6 +40,33 @@ int margo_timer_create(margo_instance_id mid,
void* cb_dat,
margo_timer_t* timer);

/**
* @brief Creates a timer object and specifies the pool
* in which to run the callback.
*
* @note Passing ABT_POOL_NULL as the pool is allowed.
* In this case, the callback will be invoked directly
* within the ULT that runs the progress loop. This should
* generally be avoided unless the callback is very short,
* and does not call any margo_* or HG_* function. A typical
* example of a valid callback would be one that simply
* sets the value of an ABT_eventual, or one that submits
* a ULT and returns.
*
* @param mid Margo instance
* @param cb_fn Callback to call when the timer finishes
* @param cb_dat Callback data
* @param pool Pool in which to run the callback
* @param timer Resulting timer
*
* @return 0 on success, negative value on failure
*/
int margo_timer_create_with_pool(margo_instance_id mid,
margo_timer_callback_fn cb_fn,
void* cb_dat,
ABT_pool pool,
margo_timer_t* timer);

/**
* @brief Start the timer with a provided timeout.
*
Expand All @@ -50,7 +78,12 @@ int margo_timer_create(margo_instance_id mid,
int margo_timer_start(margo_timer_t timer, double timeout_ms);

/**
* @brief Cancel a started timer.
* @brief Cancel a started timer. If the timer's callback
* has already been submitted as a ULT, this ULT will
* eventually be executed. Hence, calling margo_timer_cancel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function comment is incorrect. There is no wait_pending() function. By my reading of the code it will block until timer ULTs are complete (if they have been launched already). I like that because it is much simpler anyway.

In place of the note about wait_pending() it should probably warn callers that this particular could block (in an Argobots-safe way, that is).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I forgot to update this comment.

* does not guarantee that the timer won't actually fire
* later on. To ensure that no such ULT is pending, call
* margo_timer_wait_pending().
*
* @param timer Timer to cancel
*
Expand All @@ -59,8 +92,11 @@ int margo_timer_start(margo_timer_t timer, double timeout_ms);
int margo_timer_cancel(margo_timer_t timer);

/**
* @brief Destroys a timer. If the timer was started,
* this function will also cancel it.
* @brief Destroys a timer.
*
* @important This function will not cancel the timer.
* If it was started, it will still fire, and the timer's
* memory will be freed afterward.
*
* @param timer Timer to destroy.
*
Expand Down
45 changes: 26 additions & 19 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static void margo_cleanup(margo_instance_id mid)

/* shut down pending timers */
MARGO_TRACE(mid, "Cleaning up pending timers");
__margo_timer_list_free(mid, mid->timer_list);
__margo_timer_list_free(mid);

MARGO_TRACE(mid, "Destroying mutex and condition variables");
ABT_mutex_free(&mid->finalize_mutex);
Expand Down Expand Up @@ -883,12 +883,11 @@ static hg_return_t margo_cb(const struct hg_cb_info* info)

if (hret == HG_CANCELED && req->timer) { hret = HG_TIMEOUT; }

/* remove timer if there is one and it is still in place (i.e., not timed
* out) */
if (hret != HG_TIMEOUT && req->timer && req->handle) {
__margo_timer_destroy(mid, req->timer);
/* remove timer if there is one and it is still in place */
if (req->timer) {
margo_timer_cancel(req->timer);
margo_timer_destroy(req->timer);
}
if (req->timer) { free(req->timer); }

if (req->kind == MARGO_REQ_CALLBACK) {
if (req->u.callback.cb) req->u.callback.cb(req->u.callback.uargs, hret);
Expand Down Expand Up @@ -955,7 +954,6 @@ static void margo_forward_timeout_cb(void* arg)
margo_request req = (margo_request)arg;
/* cancel the Mercury op if the forward timed out */
HG_Cancel(req->handle);
return;
}

static hg_return_t margo_provider_iforward_internal(
Expand Down Expand Up @@ -1084,18 +1082,24 @@ static hg_return_t margo_provider_iforward_internal(

if (timeout_ms > 0) {
/* set a timer object to expire when this forward times out */
req->timer = calloc(1, sizeof(*(req->timer)));
if (!(req->timer)) {
hret = margo_timer_create_with_pool(mid, margo_forward_timeout_cb, req,
ABT_POOL_NULL, &req->timer);
if (hret != HG_SUCCESS) {
// LCOV_EXCL_START
MARGO_EVENTUAL_FREE(&eventual);
margo_error(mid, "in %s: could not allocate memory for timer",
__func__);
hret = HG_NOMEM_ERROR;
margo_error(mid, "in %s: could not create timer", __func__);
goto finish;
// LCOV_EXCL_END
}
hret = margo_timer_start(req->timer, timeout_ms);
if (hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_timer_destroy(req->timer);
MARGO_EVENTUAL_FREE(&eventual);
margo_error(mid, "in %s: could not start timer", __func__);
goto finish;
// LCOV_EXCL_END
}
__margo_timer_init(mid, req->timer, margo_forward_timeout_cb, req,
timeout_ms);
}

// get parent RPC id
Expand All @@ -1120,8 +1124,8 @@ static hg_return_t margo_provider_iforward_internal(
/* remove timer if HG_Forward failed */
if (hret != HG_SUCCESS && req->timer) {
// LCOV_EXCL_START
__margo_timer_destroy(mid, req->timer);
free(req->timer);
margo_timer_cancel(req->timer);
margo_timer_destroy(req->timer);
req->timer = NULL;
// LCOV_EXCL_END
}
Expand Down Expand Up @@ -1805,7 +1809,7 @@ static void margo_thread_sleep_cb(void* arg)

void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
{
margo_timer sleep_timer;
margo_timer_t sleep_timer;
margo_thread_sleep_cb_dat sleep_cb_dat;

/* monitoring */
Expand All @@ -1822,8 +1826,9 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
sleep_cb_dat.is_asleep = 1;

/* initialize the sleep timer */
__margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, &sleep_cb_dat,
timeout_ms);
margo_timer_create_with_pool(mid, margo_thread_sleep_cb, &sleep_cb_dat,
ABT_POOL_NULL, &sleep_timer);
margo_timer_start(sleep_timer, timeout_ms);

/* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex);
Expand All @@ -1835,6 +1840,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
ABT_mutex_free(&sleep_cb_dat.mutex);
ABT_cond_free(&sleep_cb_dat.cond);

margo_timer_destroy(sleep_timer);

/* monitoring */
__MARGO_MONITOR(mid, FN_END, sleep, monitoring_args);
}
Expand Down
2 changes: 1 addition & 1 deletion src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@
error:
if (mid) {
__margo_handle_cache_destroy(mid);
__margo_timer_list_free(mid, mid->timer_list);
__margo_timer_list_free(mid);

Check warning on line 372 in src/margo-init.c

View check run for this annotation

Codecov / codecov/patch

src/margo-init.c#L372

Added line #L372 was not covered by tests
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
Expand Down
14 changes: 8 additions & 6 deletions src/margo-instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <math.h>

#include "margo.h"
#include "margo-timer.h"
#include "margo-config.h"
#include "margo-abt-config.h"
#include "margo-hg-config.h"
Expand Down Expand Up @@ -129,13 +130,14 @@ struct margo_instance {

#define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool

typedef enum margo_request_kind {
typedef enum margo_request_kind
{
MARGO_REQ_EVENTUAL,
MARGO_REQ_CALLBACK
} margo_request_kind;

struct margo_request_struct {
margo_timer* timer;
margo_timer_t timer;
margo_instance_id mid;
hg_handle_t handle;
margo_monitor_data_t monitor_data;
Expand All @@ -157,10 +159,10 @@ struct margo_request_struct {
struct margo_rpc_data {
margo_instance_id mid;
_Atomic(ABT_pool) pool;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
void (*user_free_callback)(void*);
};

Expand Down
43 changes: 4 additions & 39 deletions src/margo-timer-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,19 @@
extern "C" {
#endif

typedef struct margo_timer {
margo_instance_id mid;
margo_timer_callback_fn cb_fn;
void* cb_dat;
double expiration;
struct margo_timer* next;
struct margo_timer* prev;
} margo_timer;
struct margo_timer_list;

/**
* Creates a margo_timer_list.
* @returns a new margo_timer_list, or NULL if failed
* Creates and initializes the margo_timer_list associated with the
* margo instance.
*/
struct margo_timer_list* __margo_timer_list_create();

/**
* Frees the timer list
* @param [in] timer_lst timer list to free
*/
void __margo_timer_list_free(margo_instance_id mid,
struct margo_timer_list* timer_lst);

/**
* Initializes a margo timer object which will perform some action
* after a specified time duration
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be initialized
* @param [in] cb_fn callback function for timeout action
* @param [in] cb_dat callback data passed to the callback function
* @param [in] timeout_ms timeout duration in milliseconds
*/
void __margo_timer_init(margo_instance_id mid,
margo_timer* timer,
margo_timer_callback_fn cb_fn,
void* cb_dat,
double timeout_ms);

/**
* Destroys a margo timer object which was previously initialized
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be destroyed
*/
void __margo_timer_destroy(margo_instance_id mid, margo_timer* timer);
void __margo_timer_list_free(margo_instance_id mid);

/**
* Checks for expired timers and performs specified timeout action
Expand All @@ -73,11 +43,6 @@ void __margo_check_timers(margo_instance_id mid);
int __margo_timer_get_next_expiration(margo_instance_id mid,
double* next_timer_exp);

/**
* Gets the margo_timer_list from the margo instance.
*/
struct margo_timer_list* __margo_get_timer_list(margo_instance_id mid);

#ifdef __cplusplus
}
#endif
Expand Down
Loading
Loading