From 086eb287752b4f2737e90aaf4dce7fe6c3d6ea9a Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Thu, 11 Jul 2024 17:32:18 -0500 Subject: [PATCH] HG Core: add multi_recv_copy_threshold Use this new parameter to fallback to memcpy to prevent starvation of multi-recv buffers and potential deadlock situations. Fix auto sm cap disabled with multi-recv --- src/mercury_core.c | 133 +++++++++++++++++++++++++++++---------- src/mercury_core_types.h | 10 ++- src/mercury_private.h | 6 +- 3 files changed, 113 insertions(+), 36 deletions(-) diff --git a/src/mercury_core.c b/src/mercury_core.c index 74aad8d0..9f4135f5 100644 --- a/src/mercury_core.c +++ b/src/mercury_core.c @@ -66,9 +66,10 @@ #endif /* Handle flags */ -#define HG_CORE_HANDLE_LISTEN (1 << 1) /* Listener handle */ -#define HG_CORE_HANDLE_MULTI_RECV (1 << 2) /* Handle used for multi-recv */ -#define HG_CORE_HANDLE_USER (1 << 3) /* User-created handle */ +#define HG_CORE_HANDLE_LISTEN (1 << 1) /* Listener handle */ +#define HG_CORE_HANDLE_MULTI_RECV (1 << 2) /* Handle used for multi-recv */ +#define HG_CORE_HANDLE_USER (1 << 3) /* User-created handle */ +#define HG_CORE_HANDLE_MULTI_RECV_COPY (1 << 4) /* Copy on multi-recv */ /* Op status bits */ #define HG_CORE_OP_COMPLETED (1 << 0) /* Operation completed */ @@ -136,6 +137,7 @@ struct hg_core_init_info { uint32_t request_post_init; /* Init request count */ uint32_t request_post_incr; /* Increment request count */ uint32_t multi_recv_op_max; /* Multi-recv op max */ + uint32_t multi_recv_copy_threshold; /* Copy threshold */ hg_checksum_level_t checksum_level; /* Checksum level */ uint8_t progress_mode; /* Progress mode */ bool loopback; /* Use loopback capability */ @@ -344,6 +346,8 @@ struct hg_core_private_handle { na_op_id_t *na_recv_op_id; /* Operation ID for recv */ na_op_id_t *na_ack_op_id; /* Operation ID for ack */ struct hg_core_multi_recv_op *multi_recv_op; /* Multi-recv operation */ + void *in_buf_storage; /* Storage input buffer */ + size_t in_buf_storage_size; /* Storage input buffer size */ size_t in_buf_used; /* Amount of input buffer used */ size_t out_buf_used; /* Amount of output buffer used */ na_tag_t tag; /* Tag used for request and response */ @@ -358,6 +362,7 @@ struct hg_core_private_handle { enum hg_core_op_type op_type; /* Core operation type */ hg_return_t ret; /* Return code associated to handle */ uint8_t cookie; /* Cookie */ + bool multi_recv_copy; /* Copy on multi-recv */ bool reuse; /* Re-use handle once ref_count is 0 */ }; @@ -1240,8 +1245,19 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version, (hg_init_info.multi_recv_op_max == 0) ? HG_CORE_MULTI_RECV_OP_COUNT : hg_init_info.multi_recv_op_max; - /* Save checksum level */ + HG_CHECK_SUBSYS_ERROR(cls, + hg_init_info.multi_recv_copy_threshold > + (unsigned int) hg_core_class->init_info.multi_recv_op_max, + error, ret, HG_INVALID_ARG, + "multi_recv_copy_threshold (%u) cannot exceed multi_recv_op_max " + "(%u)", + hg_init_info.multi_recv_copy_threshold, + (unsigned int) hg_core_class->init_info.multi_recv_op_max); + hg_core_class->init_info.multi_recv_copy_threshold = + hg_init_info.multi_recv_copy_threshold; + #ifdef HG_HAS_CHECKSUMS + /* Save checksum level */ hg_core_class->init_info.checksum_level = hg_init_info.checksum_level; #else HG_CHECK_SUBSYS_WARNING(cls, @@ -1289,11 +1305,11 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version, na_info_string, hg_core_class->init_info.listen); } - /* Multi-recv capability (currently not compatible with auto_sm) */ + /* Multi-recv capability */ hg_core_class->init_info.multi_recv = NA_Has_opt_feature( hg_core_class->core_class.na_class, NA_OPT_MULTI_RECV) && - !hg_init_info.no_multi_recv && !hg_init_info.auto_sm; + !hg_init_info.no_multi_recv; HG_LOG_SUBSYS_DEBUG( cls, "Multi-recv set to %" PRIu8, hg_core_class->init_info.multi_recv); if (hg_core_class->init_info.multi_recv && @@ -1885,6 +1901,8 @@ hg_core_context_post(struct hg_core_private_context *context) HG_CHECK_SUBSYS_HG_ERROR( ctx, error, ret, "Could not allocate multi-recv resources"); flags |= HG_CORE_HANDLE_MULTI_RECV; + if (hg_core_class->init_info.multi_recv_copy_threshold > 0) + flags |= HG_CORE_HANDLE_MULTI_RECV_COPY; } /* Create pool of handles */ @@ -1900,7 +1918,7 @@ hg_core_context_post(struct hg_core_private_context *context) if (context->core_context.na_sm_context != NULL) { ret = hg_core_handle_pool_create(context, hg_core_class->core_class.na_sm_class, - context->core_context.na_sm_context, flags, + context->core_context.na_sm_context, HG_CORE_HANDLE_LISTEN, hg_core_class->init_info.request_post_init, hg_core_class->init_info.request_post_incr, &context->sm_handle_pool); @@ -3490,23 +3508,37 @@ hg_core_alloc_na(struct hg_core_private_handle *hg_core_handle, /* When using multi-recv, only allocate resources per handle for expected * messages. */ if (flags & HG_CORE_HANDLE_MULTI_RECV) { + if (flags & HG_CORE_HANDLE_MULTI_RECV_COPY) { + hg_core_handle->in_buf_storage_size = + NA_Msg_get_max_unexpected_size(na_class); + + hg_core_handle->in_buf_storage = + NA_Msg_buf_alloc(na_class, hg_core_handle->in_buf_storage_size, + NA_RECV, &hg_core_handle->in_buf_plugin_data); + HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->in_buf_storage == NULL, + error, ret, HG_NOMEM, "Could not allocate buffer for input"); + } hg_core_handle->core_handle.in_buf = NULL; hg_core_handle->core_handle.in_buf_size = 0; } else { /* Initialize in/out buffers and use unexpected message size */ - hg_core_handle->core_handle.in_buf_size = + hg_core_handle->in_buf_storage_size = NA_Msg_get_max_unexpected_size(na_class); - hg_core_handle->core_handle.in_buf = - NA_Msg_buf_alloc(na_class, hg_core_handle->core_handle.in_buf_size, + hg_core_handle->in_buf_storage = + NA_Msg_buf_alloc(na_class, hg_core_handle->in_buf_storage_size, (flags & HG_CORE_HANDLE_LISTEN) ? NA_RECV : NA_SEND, &hg_core_handle->in_buf_plugin_data); - HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->core_handle.in_buf == NULL, + HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->in_buf_storage == NULL, error, ret, HG_NOMEM, "Could not allocate buffer for input"); + hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage; + hg_core_handle->core_handle.in_buf_size = + hg_core_handle->in_buf_storage_size; + na_ret = - NA_Msg_init_unexpected(na_class, hg_core_handle->core_handle.in_buf, - hg_core_handle->core_handle.in_buf_size); + NA_Msg_init_unexpected(na_class, hg_core_handle->in_buf_storage, + hg_core_handle->in_buf_storage_size); HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error, ret, (hg_return_t) na_ret, "Could not initialize input buffer (%s)", NA_Error_to_string(na_ret)); @@ -3575,16 +3607,14 @@ hg_core_free_na(struct hg_core_private_handle *hg_core_handle) hg_core_handle->na_ack_op_id = NULL; /* Free buffers */ - if (hg_atomic_get32(&hg_core_handle->status) & HG_CORE_OP_MULTI_RECV) { - if (hg_core_handle->multi_recv_op != NULL) { - hg_atomic_decr32(&hg_core_handle->multi_recv_op->ref_count); - hg_core_handle->multi_recv_op = NULL; - } - } else { - NA_Msg_buf_free(hg_core_handle->na_class, - hg_core_handle->core_handle.in_buf, - hg_core_handle->in_buf_plugin_data); + if ((hg_atomic_get32(&hg_core_handle->status) & HG_CORE_OP_MULTI_RECV) && + (hg_core_handle->multi_recv_op != NULL)) { + hg_atomic_decr32(&hg_core_handle->multi_recv_op->ref_count); + hg_core_handle->multi_recv_op = NULL; } + NA_Msg_buf_free(hg_core_handle->na_class, hg_core_handle->in_buf_storage, + hg_core_handle->in_buf_plugin_data); + hg_core_handle->in_buf_storage = NULL; hg_core_handle->core_handle.in_buf = NULL; hg_core_handle->in_buf_plugin_data = NULL; @@ -3673,7 +3703,7 @@ hg_core_reset_post(struct hg_core_private_handle *hg_core_handle) hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) hg_core_handle->ret); /* Multi-recv buffers */ - if (use_multi_recv && multi_recv_op != NULL) { + if (use_multi_recv) { hg_core_handle->core_handle.in_buf = NULL; hg_core_handle->core_handle.in_buf_size = 0; hg_core_handle->multi_recv_op = NULL; @@ -3860,8 +3890,10 @@ hg_core_release_input(struct hg_core_private_handle *hg_core_handle) /* Multi-recv buffers */ if (multi_recv_op != NULL) { - hg_core_handle->core_handle.in_buf = NULL; - hg_core_handle->core_handle.in_buf_size = 0; + if (!hg_core_handle->multi_recv_copy) { + hg_core_handle->core_handle.in_buf = NULL; + hg_core_handle->core_handle.in_buf_size = 0; + } hg_core_handle->multi_recv_op = NULL; if (hg_atomic_decr32(&multi_recv_op->ref_count) == 0 && @@ -4436,12 +4468,14 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info) hg_core_handle->core_handle.info.addr->na_addr = hg_core_handle->na_addr; hg_core_handle->tag = na_cb_info_recv_unexpected->tag; - HG_CHECK_SUBSYS_ERROR_NORET(rpc, - na_cb_info_recv_unexpected->actual_buf_size > - hg_core_handle->core_handle.in_buf_size, - error, "Actual transfer size is too large for unexpected recv"); hg_core_handle->in_buf_used = na_cb_info_recv_unexpected->actual_buf_size; + HG_CHECK_SUBSYS_ERROR_NORET(rpc, + hg_core_handle->in_buf_used > + hg_core_handle->core_handle.in_buf_size, + error, + "Actual transfer size (%zu) is too large for unexpected recv", + hg_core_handle->in_buf_used); HG_LOG_SUBSYS_DEBUG(rpc, "Processing input for handle %p, tag=%u, buf_size=%zu", @@ -4512,6 +4546,9 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV); /* Prevent from reposting multi-recv buffer until done with handle */ hg_atomic_incr32(&multi_recv_op->ref_count); + hg_core_handle->multi_recv_copy = + (unsigned int) hg_atomic_get32(&context->multi_recv_op_count) <= + HG_CORE_CONTEXT_CLASS(context)->init_info.multi_recv_copy_threshold; if (na_cb_info_multi_recv_unexpected->last) { HG_LOG_SUBSYS_DEBUG(rpc, @@ -4539,11 +4576,41 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) hg_core_handle->core_handle.info.addr->na_addr = hg_core_handle->na_addr; hg_core_handle->tag = na_cb_info_multi_recv_unexpected->tag; - hg_core_handle->core_handle.in_buf_size = + hg_core_handle->in_buf_used = na_cb_info_multi_recv_unexpected->actual_buf_size; - hg_core_handle->in_buf_used = hg_core_handle->core_handle.in_buf_size; - hg_core_handle->core_handle.in_buf = - na_cb_info_multi_recv_unexpected->actual_buf; + + /* Either copy the buffer to release early or point to the actual + * multi-recv buffer space to save a memcpy */ + if (hg_core_handle->multi_recv_copy) { + HG_CHECK_SUBSYS_ERROR_NORET(rpc, + hg_core_handle->in_buf_used > + hg_core_handle->in_buf_storage_size, + error, + "Actual transfer size (%zu) is too large for unexpected recv", + hg_core_handle->in_buf_used); + HG_LOG_SUBSYS_DEBUG(rpc, + "Copying multi-recv payload of size %zu for handle (%p)", + hg_core_handle->in_buf_used, (void *) hg_core_handle); + memcpy(hg_core_handle->in_buf_storage, + na_cb_info_multi_recv_unexpected->actual_buf, + hg_core_handle->in_buf_used); + hg_core_handle->core_handle.in_buf_size = + hg_core_handle->in_buf_storage_size; + hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage; + + ret = hg_core_release_input(hg_core_handle); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, + "Could not release input for handle (%p)", + (void *) hg_core_handle); + } else { + HG_LOG_SUBSYS_DEBUG(rpc, + "Using direct multi-recv payload of size %zu for handle (%p)", + hg_core_handle->in_buf_used, (void *) hg_core_handle); + hg_core_handle->core_handle.in_buf_size = + hg_core_handle->in_buf_used; + hg_core_handle->core_handle.in_buf = + na_cb_info_multi_recv_unexpected->actual_buf; + } HG_LOG_SUBSYS_DEBUG(rpc, "Processing input for handle %p, tag=%u, buf_size=%zu", diff --git a/src/mercury_core_types.h b/src/mercury_core_types.h index a557fd9d..a1ad3a7e 100644 --- a/src/mercury_core_types.h +++ b/src/mercury_core_types.h @@ -110,6 +110,13 @@ struct hg_init_info { * existing buffers from being reposted. * Default value is: 4 */ unsigned int multi_recv_op_max; + + /* Controls when we should start copying data in an effort to release + * multi-recv buffers. Copy will occur when at most + * multi_recv_copy_threshold buffers remain. Value should not exceed + * multi_recv_op_max. + * Default value is: 0 (never copy) */ + unsigned int multi_recv_copy_threshold; }; /* Error return codes: @@ -204,7 +211,8 @@ typedef enum { .sm_info_string = NULL, .checksum_level = HG_CHECKSUM_NONE, \ .no_bulk_eager = false, .no_loopback = false, .stats = false, \ .no_multi_recv = false, .release_input_early = false, \ - .no_overflow = false, .multi_recv_op_max = 0 \ + .no_overflow = false, .multi_recv_op_max = 0, \ + .multi_recv_copy_threshold = 0 \ } #endif /* MERCURY_CORE_TYPES_H */ diff --git a/src/mercury_private.h b/src/mercury_private.h index 4883f24d..0710016c 100644 --- a/src/mercury_private.h +++ b/src/mercury_private.h @@ -171,7 +171,8 @@ hg_init_info_dup_2_3( .release_input_early = old_info->release_input_early, .traffic_class = NA_TC_UNSPEC, .no_overflow = false, - .multi_recv_op_max = 0}; + .multi_recv_op_max = 0, + .multi_recv_copy_threshold = 0}; } /*---------------------------------------------------------------------------*/ @@ -193,7 +194,8 @@ hg_init_info_dup_2_2( .release_input_early = false, .traffic_class = NA_TC_UNSPEC, .no_overflow = false, - .multi_recv_op_max = 0}; + .multi_recv_op_max = 0, + .multi_recv_copy_threshold = 0}; } #ifdef __cplusplus