Skip to content

Commit

Permalink
HG Core: add multi_recv_copy_threshold
Browse files Browse the repository at this point in the history
Use this new parameter to fallback to memcpy to prevent
starvation of multi-recv buffers and potential deadlock situations.

Fix auto sm cap disabled with multi-recv
  • Loading branch information
soumagne committed Jul 29, 2024
1 parent 7d34ab7 commit 086eb28
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 36 deletions.
133 changes: 100 additions & 33 deletions src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@
#endif

/* Handle flags */
#define HG_CORE_HANDLE_LISTEN (1 << 1) /* Listener handle */
#define HG_CORE_HANDLE_MULTI_RECV (1 << 2) /* Handle used for multi-recv */
#define HG_CORE_HANDLE_USER (1 << 3) /* User-created handle */
#define HG_CORE_HANDLE_LISTEN (1 << 1) /* Listener handle */
#define HG_CORE_HANDLE_MULTI_RECV (1 << 2) /* Handle used for multi-recv */
#define HG_CORE_HANDLE_USER (1 << 3) /* User-created handle */
#define HG_CORE_HANDLE_MULTI_RECV_COPY (1 << 4) /* Copy on multi-recv */

/* Op status bits */
#define HG_CORE_OP_COMPLETED (1 << 0) /* Operation completed */
Expand Down Expand Up @@ -136,6 +137,7 @@ struct hg_core_init_info {
uint32_t request_post_init; /* Init request count */
uint32_t request_post_incr; /* Increment request count */
uint32_t multi_recv_op_max; /* Multi-recv op max */
uint32_t multi_recv_copy_threshold; /* Copy threshold */
hg_checksum_level_t checksum_level; /* Checksum level */
uint8_t progress_mode; /* Progress mode */
bool loopback; /* Use loopback capability */
Expand Down Expand Up @@ -344,6 +346,8 @@ struct hg_core_private_handle {
na_op_id_t *na_recv_op_id; /* Operation ID for recv */
na_op_id_t *na_ack_op_id; /* Operation ID for ack */
struct hg_core_multi_recv_op *multi_recv_op; /* Multi-recv operation */
void *in_buf_storage; /* Storage input buffer */
size_t in_buf_storage_size; /* Storage input buffer size */
size_t in_buf_used; /* Amount of input buffer used */
size_t out_buf_used; /* Amount of output buffer used */
na_tag_t tag; /* Tag used for request and response */
Expand All @@ -358,6 +362,7 @@ struct hg_core_private_handle {
enum hg_core_op_type op_type; /* Core operation type */
hg_return_t ret; /* Return code associated to handle */
uint8_t cookie; /* Cookie */
bool multi_recv_copy; /* Copy on multi-recv */
bool reuse; /* Re-use handle once ref_count is 0 */
};

Expand Down Expand Up @@ -1240,8 +1245,19 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version,
(hg_init_info.multi_recv_op_max == 0) ? HG_CORE_MULTI_RECV_OP_COUNT
: hg_init_info.multi_recv_op_max;

/* Save checksum level */
HG_CHECK_SUBSYS_ERROR(cls,
hg_init_info.multi_recv_copy_threshold >
(unsigned int) hg_core_class->init_info.multi_recv_op_max,
error, ret, HG_INVALID_ARG,
"multi_recv_copy_threshold (%u) cannot exceed multi_recv_op_max "
"(%u)",
hg_init_info.multi_recv_copy_threshold,
(unsigned int) hg_core_class->init_info.multi_recv_op_max);
hg_core_class->init_info.multi_recv_copy_threshold =
hg_init_info.multi_recv_copy_threshold;

#ifdef HG_HAS_CHECKSUMS
/* Save checksum level */
hg_core_class->init_info.checksum_level = hg_init_info.checksum_level;
#else
HG_CHECK_SUBSYS_WARNING(cls,
Expand Down Expand Up @@ -1289,11 +1305,11 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version,
na_info_string, hg_core_class->init_info.listen);
}

/* Multi-recv capability (currently not compatible with auto_sm) */
/* Multi-recv capability */
hg_core_class->init_info.multi_recv =
NA_Has_opt_feature(
hg_core_class->core_class.na_class, NA_OPT_MULTI_RECV) &&
!hg_init_info.no_multi_recv && !hg_init_info.auto_sm;
!hg_init_info.no_multi_recv;
HG_LOG_SUBSYS_DEBUG(
cls, "Multi-recv set to %" PRIu8, hg_core_class->init_info.multi_recv);
if (hg_core_class->init_info.multi_recv &&
Expand Down Expand Up @@ -1885,6 +1901,8 @@ hg_core_context_post(struct hg_core_private_context *context)
HG_CHECK_SUBSYS_HG_ERROR(
ctx, error, ret, "Could not allocate multi-recv resources");
flags |= HG_CORE_HANDLE_MULTI_RECV;
if (hg_core_class->init_info.multi_recv_copy_threshold > 0)
flags |= HG_CORE_HANDLE_MULTI_RECV_COPY;
}

/* Create pool of handles */
Expand All @@ -1900,7 +1918,7 @@ hg_core_context_post(struct hg_core_private_context *context)
if (context->core_context.na_sm_context != NULL) {
ret = hg_core_handle_pool_create(context,
hg_core_class->core_class.na_sm_class,
context->core_context.na_sm_context, flags,
context->core_context.na_sm_context, HG_CORE_HANDLE_LISTEN,
hg_core_class->init_info.request_post_init,
hg_core_class->init_info.request_post_incr,
&context->sm_handle_pool);
Expand Down Expand Up @@ -3490,23 +3508,37 @@ hg_core_alloc_na(struct hg_core_private_handle *hg_core_handle,
/* When using multi-recv, only allocate resources per handle for expected
* messages. */
if (flags & HG_CORE_HANDLE_MULTI_RECV) {
if (flags & HG_CORE_HANDLE_MULTI_RECV_COPY) {
hg_core_handle->in_buf_storage_size =
NA_Msg_get_max_unexpected_size(na_class);

hg_core_handle->in_buf_storage =
NA_Msg_buf_alloc(na_class, hg_core_handle->in_buf_storage_size,
NA_RECV, &hg_core_handle->in_buf_plugin_data);
HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->in_buf_storage == NULL,
error, ret, HG_NOMEM, "Could not allocate buffer for input");
}
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
} else {
/* Initialize in/out buffers and use unexpected message size */
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size =
NA_Msg_get_max_unexpected_size(na_class);

hg_core_handle->core_handle.in_buf =
NA_Msg_buf_alloc(na_class, hg_core_handle->core_handle.in_buf_size,
hg_core_handle->in_buf_storage =
NA_Msg_buf_alloc(na_class, hg_core_handle->in_buf_storage_size,
(flags & HG_CORE_HANDLE_LISTEN) ? NA_RECV : NA_SEND,
&hg_core_handle->in_buf_plugin_data);
HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->core_handle.in_buf == NULL,
HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->in_buf_storage == NULL,
error, ret, HG_NOMEM, "Could not allocate buffer for input");

hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage;
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size;

na_ret =
NA_Msg_init_unexpected(na_class, hg_core_handle->core_handle.in_buf,
hg_core_handle->core_handle.in_buf_size);
NA_Msg_init_unexpected(na_class, hg_core_handle->in_buf_storage,
hg_core_handle->in_buf_storage_size);
HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error, ret,
(hg_return_t) na_ret, "Could not initialize input buffer (%s)",
NA_Error_to_string(na_ret));
Expand Down Expand Up @@ -3575,16 +3607,14 @@ hg_core_free_na(struct hg_core_private_handle *hg_core_handle)
hg_core_handle->na_ack_op_id = NULL;

/* Free buffers */
if (hg_atomic_get32(&hg_core_handle->status) & HG_CORE_OP_MULTI_RECV) {
if (hg_core_handle->multi_recv_op != NULL) {
hg_atomic_decr32(&hg_core_handle->multi_recv_op->ref_count);
hg_core_handle->multi_recv_op = NULL;
}
} else {
NA_Msg_buf_free(hg_core_handle->na_class,
hg_core_handle->core_handle.in_buf,
hg_core_handle->in_buf_plugin_data);
if ((hg_atomic_get32(&hg_core_handle->status) & HG_CORE_OP_MULTI_RECV) &&
(hg_core_handle->multi_recv_op != NULL)) {
hg_atomic_decr32(&hg_core_handle->multi_recv_op->ref_count);
hg_core_handle->multi_recv_op = NULL;
}
NA_Msg_buf_free(hg_core_handle->na_class, hg_core_handle->in_buf_storage,
hg_core_handle->in_buf_plugin_data);
hg_core_handle->in_buf_storage = NULL;
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->in_buf_plugin_data = NULL;

Expand Down Expand Up @@ -3673,7 +3703,7 @@ hg_core_reset_post(struct hg_core_private_handle *hg_core_handle)
hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) hg_core_handle->ret);

/* Multi-recv buffers */
if (use_multi_recv && multi_recv_op != NULL) {
if (use_multi_recv) {
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
hg_core_handle->multi_recv_op = NULL;
Expand Down Expand Up @@ -3860,8 +3890,10 @@ hg_core_release_input(struct hg_core_private_handle *hg_core_handle)

/* Multi-recv buffers */
if (multi_recv_op != NULL) {
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
if (!hg_core_handle->multi_recv_copy) {
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
}
hg_core_handle->multi_recv_op = NULL;

if (hg_atomic_decr32(&multi_recv_op->ref_count) == 0 &&
Expand Down Expand Up @@ -4436,12 +4468,14 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_recv_unexpected->tag;
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
na_cb_info_recv_unexpected->actual_buf_size >
hg_core_handle->core_handle.in_buf_size,
error, "Actual transfer size is too large for unexpected recv");
hg_core_handle->in_buf_used =
na_cb_info_recv_unexpected->actual_buf_size;
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
hg_core_handle->core_handle.in_buf_size,
error,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
Expand Down Expand Up @@ -4512,6 +4546,9 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV);
/* Prevent from reposting multi-recv buffer until done with handle */
hg_atomic_incr32(&multi_recv_op->ref_count);
hg_core_handle->multi_recv_copy =
(unsigned int) hg_atomic_get32(&context->multi_recv_op_count) <=
HG_CORE_CONTEXT_CLASS(context)->init_info.multi_recv_copy_threshold;

if (na_cb_info_multi_recv_unexpected->last) {
HG_LOG_SUBSYS_DEBUG(rpc,
Expand Down Expand Up @@ -4539,11 +4576,41 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_multi_recv_unexpected->tag;
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_used =
na_cb_info_multi_recv_unexpected->actual_buf_size;
hg_core_handle->in_buf_used = hg_core_handle->core_handle.in_buf_size;
hg_core_handle->core_handle.in_buf =
na_cb_info_multi_recv_unexpected->actual_buf;

/* Either copy the buffer to release early or point to the actual
* multi-recv buffer space to save a memcpy */
if (hg_core_handle->multi_recv_copy) {
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
hg_core_handle->in_buf_storage_size,
error,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);
HG_LOG_SUBSYS_DEBUG(rpc,
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size;
hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage;

ret = hg_core_release_input(hg_core_handle);
HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret,
"Could not release input for handle (%p)",
(void *) hg_core_handle);
} else {
HG_LOG_SUBSYS_DEBUG(rpc,
"Using direct multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_used;
hg_core_handle->core_handle.in_buf =
na_cb_info_multi_recv_unexpected->actual_buf;
}

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
Expand Down
10 changes: 9 additions & 1 deletion src/mercury_core_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ struct hg_init_info {
* existing buffers from being reposted.
* Default value is: 4 */
unsigned int multi_recv_op_max;

/* Controls when we should start copying data in an effort to release
* multi-recv buffers. Copy will occur when at most
* multi_recv_copy_threshold buffers remain. Value should not exceed
* multi_recv_op_max.
* Default value is: 0 (never copy) */
unsigned int multi_recv_copy_threshold;
};

/* Error return codes:
Expand Down Expand Up @@ -204,7 +211,8 @@ typedef enum {
.sm_info_string = NULL, .checksum_level = HG_CHECKSUM_NONE, \
.no_bulk_eager = false, .no_loopback = false, .stats = false, \
.no_multi_recv = false, .release_input_early = false, \
.no_overflow = false, .multi_recv_op_max = 0 \
.no_overflow = false, .multi_recv_op_max = 0, \
.multi_recv_copy_threshold = 0 \
}

#endif /* MERCURY_CORE_TYPES_H */
6 changes: 4 additions & 2 deletions src/mercury_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ hg_init_info_dup_2_3(
.release_input_early = old_info->release_input_early,
.traffic_class = NA_TC_UNSPEC,
.no_overflow = false,
.multi_recv_op_max = 0};
.multi_recv_op_max = 0,
.multi_recv_copy_threshold = 0};
}

/*---------------------------------------------------------------------------*/
Expand All @@ -193,7 +194,8 @@ hg_init_info_dup_2_2(
.release_input_early = false,
.traffic_class = NA_TC_UNSPEC,
.no_overflow = false,
.multi_recv_op_max = 0};
.multi_recv_op_max = 0,
.multi_recv_copy_threshold = 0};
}

#ifdef __cplusplus
Expand Down

0 comments on commit 086eb28

Please sign in to comment.