From e0494e8c95ed664fe3de72057acaae7873929167 Mon Sep 17 00:00:00 2001 From: Shachar Hasson Date: Tue, 1 Oct 2024 10:23:07 +0300 Subject: [PATCH] UCP/WIREUP: Support EP reconfiguration for non-reused wired-up scenarios --- src/ucp/proto/proto_multi.c | 4 +- src/ucp/rndv/proto_rndv.c | 4 + src/ucp/rndv/proto_rndv.inl | 1 - src/ucp/rndv/rndv_rkey_ptr.c | 22 +++- src/ucp/wireup/select.c | 38 +----- src/ucp/wireup/wireup.c | 98 ++++++++------ src/ucp/wireup/wireup_ep.c | 6 +- src/ucp/wireup/wireup_ep.h | 2 + test/gtest/ucp/test_ucp_ep_reconfig.cc | 169 ++++++++++++++++++++++--- test/gtest/ucp/test_ucp_request.cc | 64 ++++++++-- 10 files changed, 293 insertions(+), 115 deletions(-) diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index a75667781e1..3aa0f4937b5 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -41,7 +41,6 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, uint32_t weight_sum; ucs_status_t status; - ucs_assert(params->max_lanes >= 1); ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES); if ((ucp_proto_select_op_flags(params->super.super.select_param) & @@ -50,7 +49,8 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, return UCS_ERR_UNSUPPORTED; } - if (!ucp_proto_common_init_check_err_handling(¶ms->super)) { + if (!ucp_proto_common_init_check_err_handling(¶ms->super) || + (params->max_lanes == 0)) { return UCS_ERR_UNSUPPORTED; } diff --git a/src/ucp/rndv/proto_rndv.c b/src/ucp/rndv/proto_rndv.c index 7ddd92f0ce3..89b92be2f28 100644 --- a/src/ucp/rndv/proto_rndv.c +++ b/src/ucp/rndv/proto_rndv.c @@ -668,6 +668,10 @@ size_t ucp_proto_rndv_common_pack_ack(void *dest, void *arg) ucs_status_t ucp_proto_rndv_ats_complete(ucp_request_t *req) { ucp_datatype_iter_cleanup(&req->send.state.dt_iter, 1, UCP_DT_MASK_ALL); + if (req->send.rndv.rkey != NULL) { + ucp_proto_rndv_rkey_destroy(req); + } + return ucp_proto_rndv_recv_complete(req); } diff --git a/src/ucp/rndv/proto_rndv.inl b/src/ucp/rndv/proto_rndv.inl index 7d5f54ceab7..e4196cb7381 100644 --- a/src/ucp/rndv/proto_rndv.inl +++ b/src/ucp/rndv/proto_rndv.inl @@ -162,7 +162,6 @@ ucp_proto_rndv_rkey_destroy(ucp_request_t *req) static UCS_F_ALWAYS_INLINE void ucp_proto_rndv_recv_complete_with_ats(ucp_request_t *req, uint8_t ats_stage) { - ucp_proto_rndv_rkey_destroy(req); ucp_proto_request_set_stage(req, ats_stage); ucp_request_send(req); } diff --git a/src/ucp/rndv/rndv_rkey_ptr.c b/src/ucp/rndv/rndv_rkey_ptr.c index f88b526e288..6d8d8cc3d63 100644 --- a/src/ucp/rndv/rndv_rkey_ptr.c +++ b/src/ucp/rndv/rndv_rkey_ptr.c @@ -101,7 +101,9 @@ ucp_proto_rndv_rkey_ptr_probe(const ucp_proto_init_params_t *init_params) ucs_status_t status; if (!ucp_proto_rndv_op_check(init_params, UCP_OP_ID_RNDV_RECV, 0) || - !ucp_proto_common_init_check_err_handling(¶ms.super)) { + !ucp_proto_common_init_check_err_handling(¶ms.super) || + (ucp_proto_select_op_flags(params.super.super.select_param) & + UCP_PROTO_SELECT_OP_FLAG_RESUME)) { return; } @@ -214,6 +216,22 @@ ucp_proto_rndv_rkey_ptr_fetch_progress(uct_pending_req_t *uct_req) return UCS_OK; } +static ucs_status_t ucp_proto_rndv_rkey_ptr_reset(ucp_request_t *request) +{ + switch (request->send.proto_stage) { + case UCP_PROTO_RNDV_RKEY_PTR_STAGE_COPY: + ucs_queue_remove(&request->send.ep->worker->rkey_ptr_reqs, + &request->send.rndv.rkey_ptr.queue_elem); + break; + case UCP_PROTO_RNDV_RKEY_PTR_STAGE_ACK: + break; + default: + ucp_proto_fatal_invalid_stage(request, "reset"); + } + + return UCS_OK; +} + ucp_proto_t ucp_rndv_rkey_ptr_proto = { .name = "rndv/rkey_ptr", .desc = "copy from mapped remote memory", @@ -225,7 +243,7 @@ ucp_proto_t ucp_rndv_rkey_ptr_proto = { [UCP_PROTO_RNDV_RKEY_PTR_STAGE_ACK] = ucp_proto_rndv_ats_progress }, .abort = ucp_proto_abort_fatal_not_implemented, - .reset = (ucp_request_reset_func_t)ucp_proto_reset_fatal_not_implemented + .reset = ucp_proto_rndv_rkey_ptr_reset }; static void diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 23cd6666a33..9b75006f69a 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -1558,29 +1558,6 @@ ucp_wireup_add_fast_lanes(ucp_worker_h worker, return num_lanes; } -static unsigned ucp_wireup_get_current_num_lanes( - const ucp_wireup_select_params_t *select_params, ucp_lane_type_t type) -{ - ucp_ep_h ep = select_params->ep; - unsigned current_num_lanes = 0; - ucp_lane_index_t lane; - - /* First initialization (current lanes weren't chosen yet) or - * CM is enabled (so we can reconfigure the endpoint). - */ - if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) || - ucp_ep_has_cm_lane(ep)) { - return ucp_wireup_bw_max_lanes(select_params); - } - - for (lane = 0; lane < ucp_ep_config(ep)->key.num_lanes; ++lane) { - if (ucp_ep_config(ep)->key.lanes[lane].lane_types & UCS_BIT(type)) { - ++current_num_lanes; - } - } - return current_num_lanes; -} - static unsigned ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params, ucp_wireup_select_bw_info_t *bw_info, @@ -1602,25 +1579,17 @@ ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params, ucp_rsc_index_t rsc_index; unsigned addr_index; ucp_wireup_select_info_t *sinfo; - unsigned max_lanes, num_lanes; + unsigned num_lanes; unsigned local_num_paths, remote_num_paths; local_dev_bitmap = bw_info->local_dev_bitmap; remote_dev_bitmap = bw_info->remote_dev_bitmap; bw_info->criteria.arg = &dev_count; - /* Restrict choosing more lanes that were already chosen when CM is disabled - * to prevent EP reconfiguration in case of dropping lanes due to low BW. - * TODO: Remove when endpoint reconfiguration is supported. - */ - max_lanes = ucp_wireup_get_current_num_lanes(select_params, - bw_info->criteria.lane_type); - max_lanes = ucs_min(max_lanes, bw_info->max_lanes); - /* lookup for requested number of lanes or limit of MD map * (we have to limit MD's number to avoid malloc in * memory registration) */ - while (ucs_array_length(&sinfo_array) < max_lanes) { + while (ucs_array_length(&sinfo_array) < bw_info->max_lanes) { if (excl_lane == UCP_NULL_LANE) { sinfo = ucs_array_append(&sinfo_array, break); status = ucp_wireup_select_transport(select_ctx, select_params, @@ -1848,7 +1817,8 @@ ucp_wireup_add_rma_bw_lanes(const ucp_wireup_select_params_t *select_params, ucp_wireup_init_select_flags(&iface_rma_flags, 0, 0); ucp_wireup_init_select_flags(&peer_rma_flags, 0, 0); - if (ep_init_flags & UCP_EP_INIT_CREATE_AM_LANE_ONLY) { + if ((ep_init_flags & UCP_EP_INIT_CREATE_AM_LANE_ONLY) || + (context->config.ext.max_rndv_lanes == 0)) { return UCS_OK; } diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 009feb46426..39ed0cd5957 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -1085,11 +1085,9 @@ ucp_wireup_connect_lane_to_iface(ucp_ep_h ep, ucp_lane_index_t lane, } } else { /* If EP already exists, it's a wireup proxy, and we need to update - * its next_ep instead of replacing it. The wireup EP was created - * during CM pack_cb() on a client side */ + * its next_ep instead of replacing it. */ ucs_assert(ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))); ucs_assert(ucp_proxy_ep_extract(ucp_ep_get_lane(ep, lane)) == NULL); - ucs_assert(ucp_ep_has_cm_lane(ep)); ucp_wireup_ep_lane_set_next_ep(ep, lane, uct_ep); } @@ -1355,23 +1353,7 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps, } } -static int -ucp_wireup_are_all_lanes_p2p(ucp_ep_h ep, const ucp_ep_config_key_t *key) -{ - ucp_lane_index_t lane; - ucp_rsc_index_t rsc_index; - - for (lane = 0; lane < key->num_lanes; ++lane) { - rsc_index = ucp_ep_get_rsc_index(ep, lane); - ucs_assert(rsc_index != UCP_NULL_RESOURCE); - - if (ucp_worker_is_tl_2iface(ep->worker, rsc_index)) { - return 0; - } - } - return 1; -} static unsigned ucp_ep_num_reused_lanes(ucp_ep_h ep, const ucp_lane_index_t *reuse_lane_map) @@ -1394,20 +1376,22 @@ ucp_wireup_check_is_reconfigurable(ucp_ep_h ep, { ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES]; const ucp_ep_config_key_t *old_key; + ucp_lane_index_t lane; - if (ucp_ep_has_cm_lane(ep)) { + if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) || + ucp_ep_has_cm_lane(ep)) { return 1; } - old_key = &ucp_ep_config(ep)->key; - - /* Verify both old/new configurations have only p2p lanes */ - if (!ucp_wireup_are_all_lanes_p2p(ep, old_key) || - !ucp_wireup_are_all_lanes_p2p(ep, new_key) || - (old_key->num_lanes != new_key->num_lanes)) { - return 0; + /* TODO: Support reconfiguration when lanes are created without a wireup_ep + * wrapper */ + for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { + if (!ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) { + return 0; + } } + old_key = &ucp_ep_config(ep)->key; ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address, addr_indices, reuse_lane_map); @@ -1452,7 +1436,6 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, /* Use existing EP from CM lane */ new_wireup_ep = ucp_ep_get_cm_wireup_ep(ep); ucs_assert(new_wireup_ep != NULL); - aux_rsc_index = ucp_ep_get_rsc_index(ep, old_lane); } else { /* Create new EP for non-CM flow */ status = ucp_wireup_ep_create(ep, &uct_ep); @@ -1461,10 +1444,14 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, } new_wireup_ep = ucp_wireup_ep(uct_ep); - aux_rsc_index = ucp_wireup_ep_get_aux_rsc_index( - &old_wireup_ep->super.super); } + /* Get correct aux_rsc_index either from next_ep or aux_ep */ + aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ? + ucp_ep_get_rsc_index(ep, old_lane) : + ucp_wireup_ep_get_aux_rsc_index( + &old_wireup_ep->super.super); + ucs_assert(aux_rsc_index != UCP_NULL_RESOURCE); is_p2p = ucp_ep_config_connect_p2p(ep->worker, &ucp_ep_config(ep)->key, aux_rsc_index); @@ -1500,7 +1487,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key, uct_ep_h new_uct_eps[UCP_MAX_LANES] = {NULL}; ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES] = {UCP_NULL_LANE}; ucp_ep_config_key_t *old_key; - ucp_lane_index_t lane, reuse_lane; + ucp_lane_index_t lane, reuse_lane, wireup_lane; uct_ep_h uct_ep; ucs_status_t status; @@ -1532,11 +1519,13 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key, "new_key->wireup_msg_lane=%u", new_key->wireup_msg_lane); } + wireup_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST); + /* wireup lane has to be selected for the old configuration */ - ucs_assert(old_key->wireup_msg_lane != UCP_NULL_LANE); + ucs_assert(wireup_lane != UCP_NULL_LANE); /* set the correct WIREUP MSG lane */ - reuse_lane = reuse_lane_map[old_key->wireup_msg_lane]; + reuse_lane = reuse_lane_map[wireup_lane]; if (reuse_lane != UCP_NULL_LANE) { /* previous wireup lane is part of the new configuration, so reuse it */ new_key->wireup_msg_lane = reuse_lane; @@ -1622,6 +1611,23 @@ ucp_wireup_try_select_lanes(ucp_ep_h ep, unsigned ep_init_flags, return UCS_OK; } +static void +ucp_wireup_gather_pending_reqs(ucp_ep_h ep, + ucs_queue_head_t *replay_pending_queue) +{ + ucp_request_t *req; + + ucp_wireup_eps_pending_extract(ep, replay_pending_queue); + + ucs_queue_for_each(req, &ep->worker->rkey_ptr_reqs, + send.rndv.rkey_ptr.queue_elem) { + if (req->send.ep == ep) { + ucs_queue_push(replay_pending_queue, + (ucs_queue_elem_t*)&req->send.uct.priv); + } + } +} + ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, const ucp_tl_bitmap_t *local_tl_bitmap, const ucp_unpacked_address_t *remote_address, @@ -1639,6 +1645,7 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, char str[32]; ucs_queue_head_t replay_pending_queue; ucp_rsc_index_t dst_mds_mem[UCP_MAX_MDS]; + int is_reconfigurable; tl_bitmap = UCS_STATIC_BITMAP_AND(*local_tl_bitmap, worker->context->tl_bitmap); @@ -1646,7 +1653,14 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, ucs_trace("ep %p: initialize lanes", ep); ucs_log_indent(1); - ucp_wireup_eps_pending_extract(ep, &replay_pending_queue); + + if (!ucp_ep_has_cm_lane(ep)) { + ucp_ep_update_flags(ep, 0, + UCP_EP_FLAG_LOCAL_CONNECTED | + UCP_EP_FLAG_REMOTE_CONNECTED); + } + + ucp_wireup_gather_pending_reqs(ep, &replay_pending_queue); status = ucp_wireup_try_select_lanes(ep, ep_init_flags, &tl_bitmap, remote_address, addr_indices, &key, @@ -1655,10 +1669,14 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, goto out; } - if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) && - !ucp_ep_config_is_equal(&ucp_ep_config(ep)->key, &key) && - !ucp_wireup_check_is_reconfigurable(ep, &key, remote_address, - addr_indices)) { + /* This function must be used before uct_eps are discarded. Store return + * value for later use. */ + is_reconfigurable = ucp_wireup_check_is_reconfigurable(ep, &key, + remote_address, + addr_indices); + + if (!is_reconfigurable && + !ucp_ep_config_is_equal(&ucp_ep_config(ep)->key, &key)) { /* Allow to choose only the lanes that were already chosen for case * without CM to prevent reconfiguration error. */ @@ -1710,9 +1728,7 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, cm_idx = ep->ext->cm_idx; - if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) && - !ucp_wireup_check_is_reconfigurable(ep, &key, remote_address, - addr_indices)) { + if (!is_reconfigurable) { /* * TODO handle a case where we have to change lanes and reconfigure the ep: * diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 2f1b9101bd6..fab2bca23ce 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -52,10 +52,10 @@ static ssize_t ucp_wireup_ep_bcopy_send_func(uct_ep_h uct_ep) return UCS_ERR_NO_RESOURCE; } -static int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep) +int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep) { - return (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) || - (wireup_ep->aux_ep == NULL); + ucs_assert(wireup_ep->super.uct_ep != NULL); + return wireup_ep->aux_ep == NULL; } uct_ep_h ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep_t *wireup_ep) diff --git a/src/ucp/wireup/wireup_ep.h b/src/ucp/wireup/wireup_ep.h index 7d9a5e0ec4e..46df730c201 100644 --- a/src/ucp/wireup/wireup_ep.h +++ b/src/ucp/wireup/wireup_ep.h @@ -113,6 +113,8 @@ uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep); uct_ep_h ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep_t *wireup_ep); +int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep); + void ucp_wireup_ep_destroy_next_ep(ucp_wireup_ep_t *wireup_ep); int ucp_wireup_ep_test(uct_ep_h uct_ep); diff --git a/test/gtest/ucp/test_ucp_ep_reconfig.cc b/test/gtest/ucp/test_ucp_ep_reconfig.cc index 8d9795f4ece..5bedb9126d2 100644 --- a/test/gtest/ucp/test_ucp_ep_reconfig.cc +++ b/test/gtest/ucp/test_ucp_ep_reconfig.cc @@ -52,6 +52,17 @@ class test_ucp_ep_reconfig : public ucp_test { bool m_exclude_ifaces; }; + bool is_single_transport() + { + return GetParam().transports.size() == 1; + } + + bool has_p2p_transport() + { + return has_resource(sender(), "rc_verbs") || + has_resource(sender(), "rc_mlx5"); + } + void create_entity(bool push_front, bool exclude_ifaces) { auto e = new entity(GetParam(), m_ucp_config, get_worker_params(), this, @@ -64,40 +75,85 @@ class test_ucp_ep_reconfig : public ucp_test { } } - void create_entities_and_connect(bool exclude_ifaces) + virtual void create_entities_and_connect() { - create_entity(true, exclude_ifaces); - create_entity(false, exclude_ifaces); + create_entity(true, true); + create_entity(false, true); sender().connect(&receiver(), get_ep_params()); receiver().connect(&sender(), get_ep_params()); } public: + void init() + { + ucp_test::init(); + + /* num_tls = single device + UD */ + if (sender().ucph()->num_tls <= 2) { + UCS_TEST_SKIP_R("test requires at least 2 ifaces to work"); + } + } + static void get_test_variants(std::vector &variants) { add_variant_with_value(variants, UCP_FEATURE_TAG, 0, ""); } - void send_recv() + void run(bool bidirectional = false); + void skip_non_p2p(); + + void send_message(const ucp_test_base::entity &e1, + const ucp_test_base::entity &e2, const mem_buffer &sbuf, + const mem_buffer &rbuf, size_t msg_size, + std::vector &reqs) { - static const unsigned num_iterations = 100; - static const size_t msg_sizes[] = {8, 1024, 16384, UCS_MBYTE}; - const ucp_request_param_t param = { + const ucp_request_param_t param = { .op_attr_mask = UCP_OP_ATTR_FLAG_NO_IMM_CMPL }; + void *sreq = ucp_tag_send_nbx(e1.ep(), sbuf.ptr(), msg_size, 0, ¶m); + void *sreq_sync = ucp_tag_send_sync_nbx(e1.ep(), sbuf.ptr(), msg_size, + 0, ¶m); + reqs.insert(reqs.end(), {sreq, sreq_sync}); + + for (unsigned iter = 0; iter < 2; iter++) { + void *rreq = ucp_tag_recv_nbx(e2.worker(), rbuf.ptr(), msg_size, 0, + 0, ¶m); + reqs.push_back(rreq); + } + } + + void send_recv(bool bidirectional) + { + static const unsigned num_iterations = 1000; +/* TODO: remove this when 100MB asan bug is solved */ +#ifdef __SANITIZE_ADDRESS__ + static const size_t msg_sizes[] = {8, 1024, 16384, 65536}; +#else + static const size_t msg_sizes[] = {8, 1024, 16384, UCS_MBYTE}; +#endif + for (auto msg_size : msg_sizes) { for (unsigned i = 0; i < num_iterations; ++i) { mem_buffer sbuf(msg_size, UCS_MEMORY_TYPE_HOST, i); mem_buffer rbuf(msg_size, UCS_MEMORY_TYPE_HOST, ucs::rand()); + mem_buffer o_sbuf(msg_size, UCS_MEMORY_TYPE_HOST, i); + mem_buffer o_rbuf(msg_size, UCS_MEMORY_TYPE_HOST, ucs::rand()); + + std::vector reqs; + send_message(sender(), receiver(), sbuf, rbuf, msg_size, reqs); - void *sreq = ucp_tag_send_nbx(sender().ep(), sbuf.ptr(), - msg_size, 0, ¶m); - void *rreq = ucp_tag_recv_nbx(receiver().worker(), rbuf.ptr(), - msg_size, 0, 0, ¶m); - request_wait(rreq); - request_wait(sreq); + if (bidirectional) { + send_message(receiver(), sender(), o_sbuf, o_rbuf, msg_size, + reqs); + } + + requests_wait(reqs); rbuf.pattern_check(i); + + if (bidirectional) { + o_rbuf.pattern_check(i); + } } } } @@ -161,7 +217,9 @@ void test_ucp_ep_reconfig::entity::connect(const ucp_test_base::entity *other, ucs::handle(ucp_ep, ucp_ep_destroy)); - ASSERT_UCS_OK(ucp_wireup_send_request(ucp_ep)); + if (!(ucp_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) { + ASSERT_UCS_OK(ucp_wireup_send_request(ucp_ep)); + } store_config(); UCS_ASYNC_UNBLOCK(&worker()->async); @@ -230,10 +288,10 @@ test_ucp_ep_reconfig::entity::get_address(const ucp_tl_bitmap_t &tl_bitmap) cons return address; } -UCS_TEST_P(test_ucp_ep_reconfig, basic) +void test_ucp_ep_reconfig::run(bool bidirectional) { - create_entities_and_connect(true); - send_recv(); + create_entities_and_connect(); + send_recv(bidirectional); auto r_sender = static_cast(&sender()); auto r_receiver = static_cast(&receiver()); @@ -243,4 +301,79 @@ UCS_TEST_P(test_ucp_ep_reconfig, basic) r_receiver->verify_configuration(*r_sender); } -UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_ep_reconfig, rc, "rc"); +void test_ucp_ep_reconfig::skip_non_p2p() +{ + if (!has_p2p_transport()) { + UCS_TEST_SKIP_R("No p2p TLs available, config will be non-wireup"); + } +} + +/* TODO: Remove skip condition after next PRs are merged. */ +UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, basic, + !has_transport("rc_x") || !has_transport("rc_v")) +{ + run(); +} + +UCS_TEST_P(test_ucp_ep_reconfig, request_reset, "PROTO_REQUEST_RESET=y") +{ + if (is_single_transport()) { + /* One side will consume all ifaces and the other side will have no ifaces left to use */ + UCS_TEST_SKIP_R("exclude_iface requires at least 2 transports to work " + "(for example DC + SHM)"); + } + + skip_non_p2p(); + run(); +} + +/* SHM causes lane reuse, disable for now. */ +UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, resolve_remote_id, + has_transport("shm") || is_self(), "MAX_RNDV_LANES=0") +{ + if (has_transport("tcp")) { + UCS_TEST_SKIP_R("lanes are reused (not supported yet)"); + } + + run(true); +} + +UCP_INSTANTIATE_TEST_CASE(test_ucp_ep_reconfig); +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_ep_reconfig, rc_x_v, "rc"); + +class test_reconfig_asymmetric : public test_ucp_ep_reconfig { +protected: + void create_entities_and_connect() override + { + create_entity(true, false); + + modify_config("NUM_EPS", "200"); + create_entity(false, false); + + sender().connect(&receiver(), get_ep_params()); + receiver().connect(&sender(), get_ep_params()); + } +}; + +/* Will be relevant when reuse + non-wireup is supported */ +UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, basic, has_transport("shm")) +{ + run(); +} + +/* Will be relevant when reuse + non-wireup is supported */ +UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, request_reset, + has_transport("shm"), "PROTO_REQUEST_RESET=y") +{ + skip_non_p2p(); + run(); +} + +/* SHM causes lane reuse, disable for now. */ +UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, resolve_remote_id, + has_transport("shm") || is_self(), "MAX_RNDV_LANES=0") +{ + run(true); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_reconfig_asymmetric, shm_ib, "shm,ib"); diff --git a/test/gtest/ucp/test_ucp_request.cc b/test/gtest/ucp/test_ucp_request.cc index 54e243db585..9e12b480349 100644 --- a/test/gtest/ucp/test_ucp_request.cc +++ b/test/gtest/ucp/test_ucp_request.cc @@ -374,10 +374,9 @@ class test_proto_reset : public ucp_test { } void reset_protocol(operation_t op, bool sync = false, - unsigned reqs_count = 1000) + unsigned reqs_count = 1000, + size_t msg_size = UCS_KBYTE * 70) { - static const size_t msg_size = UCS_KBYTE * 70; - for (int i = 0; i < reqs_count; ++i) { mapped_buffer *rbuf = new mapped_buffer(msg_size, receiver()); rbuf->memset(0); @@ -468,6 +467,8 @@ class test_proto_reset : public ucp_test { NULL; } + const ucp_request_t *get_rndv_request(const void *ureq); + std::vector> m_sbufs; std::vector> m_rbufs; std::vector> m_rkeys; @@ -578,22 +579,23 @@ UCP_INSTANTIATE_TEST_CASE(test_proto_reset) /* The following tests require ENABLE_DEBUG_DATA flag in order to access * req->recv.proto_rndv_request, which is only present with this flag. */ #if ENABLE_DEBUG_DATA +const ucp_request_t *test_proto_reset::get_rndv_request(const void *ureq) +{ + auto req = get_request(ureq, false); + if ((req == NULL) || (req->recv.proto_rndv_request == NULL)) { + return NULL; + } + + return req->recv.proto_rndv_request; +} + class test_proto_reset_rndv_get : public test_proto_reset { protected: void wait_and_restart(const std::vector &reqs) override { wait_any(reqs, [this](const void *ureq) { - const ucp_request_t *req = get_request(ureq, false); - if (req == NULL) { - return false; - } - - const ucp_request_t *rndv_req = req->recv.proto_rndv_request; - if (rndv_req == NULL) { - return false; - } - - return is_request_in_the_middle(rndv_req); + auto rndv_req = get_rndv_request(ureq); + return (rndv_req != NULL) && is_request_in_the_middle(rndv_req); }); restart(receiver().ep()); @@ -613,6 +615,40 @@ UCS_TEST_P(test_proto_reset_rndv_get, rndv_get, "RNDV_THRESH=0", UCP_INSTANTIATE_TEST_CASE(test_proto_reset_rndv_get) +class test_proto_reset_rkey_ptr : public test_proto_reset { +protected: + void wait_and_restart(const std::vector &reqs) override + { + wait_any(reqs, [this](const void *ureq) { + auto rndv_req = get_rndv_request(ureq); + return (rndv_req != NULL) && + (rndv_req->send.state.completed_size > 0); + }); + + ucs_queue_iter_t iter; + ucp_request_t *req; + + ucs_queue_for_each_safe(req, iter, &receiver().worker()->rkey_ptr_reqs, + send.rndv.rkey_ptr.queue_elem) { + m_pending.push_back(req); + } + + restart(receiver().ep()); + } +}; + +UCS_TEST_P(test_proto_reset_rkey_ptr, rkey_ptr, "RNDV_THRESH=0", + "RKEY_PTR_SEG_SIZE=1024") +{ + if (!has_resource(sender(), "xpmem")) { + UCS_TEST_SKIP_R("xpmem must be present for rkey_ptr protocol"); + } + + reset_protocol(TAG, false, 1000, UCS_KBYTE * 20); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_proto_reset_rkey_ptr, shm_ib, "shm,ib") + /* This test is intended to check resetting request during ATP phase for * RNDV_PUT protocol. * We use uct hooks on some UCT level functions in order to simulate the