Skip to content

Commit

Permalink
UCP/WIREUP: Support EP reconfiguration for non-reused wired-up scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
shasson5 committed Oct 1, 2024
1 parent 667d7b5 commit e0494e8
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 115 deletions.
4 changes: 2 additions & 2 deletions src/ucp/proto/proto_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params,
uint32_t weight_sum;
ucs_status_t status;

ucs_assert(params->max_lanes >= 1);
ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES);

if ((ucp_proto_select_op_flags(params->super.super.select_param) &
Expand All @@ -50,7 +49,8 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params,
return UCS_ERR_UNSUPPORTED;
}

if (!ucp_proto_common_init_check_err_handling(&params->super)) {
if (!ucp_proto_common_init_check_err_handling(&params->super) ||
(params->max_lanes == 0)) {
return UCS_ERR_UNSUPPORTED;
}

Expand Down
4 changes: 4 additions & 0 deletions src/ucp/rndv/proto_rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ size_t ucp_proto_rndv_common_pack_ack(void *dest, void *arg)
ucs_status_t ucp_proto_rndv_ats_complete(ucp_request_t *req)
{
ucp_datatype_iter_cleanup(&req->send.state.dt_iter, 1, UCP_DT_MASK_ALL);
if (req->send.rndv.rkey != NULL) {
ucp_proto_rndv_rkey_destroy(req);
}

return ucp_proto_rndv_recv_complete(req);
}

Expand Down
1 change: 0 additions & 1 deletion src/ucp/rndv/proto_rndv.inl
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ ucp_proto_rndv_rkey_destroy(ucp_request_t *req)
static UCS_F_ALWAYS_INLINE void
ucp_proto_rndv_recv_complete_with_ats(ucp_request_t *req, uint8_t ats_stage)
{
ucp_proto_rndv_rkey_destroy(req);
ucp_proto_request_set_stage(req, ats_stage);
ucp_request_send(req);
}
Expand Down
22 changes: 20 additions & 2 deletions src/ucp/rndv/rndv_rkey_ptr.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ ucp_proto_rndv_rkey_ptr_probe(const ucp_proto_init_params_t *init_params)
ucs_status_t status;

if (!ucp_proto_rndv_op_check(init_params, UCP_OP_ID_RNDV_RECV, 0) ||
!ucp_proto_common_init_check_err_handling(&params.super)) {
!ucp_proto_common_init_check_err_handling(&params.super) ||
(ucp_proto_select_op_flags(params.super.super.select_param) &
UCP_PROTO_SELECT_OP_FLAG_RESUME)) {
return;
}

Expand Down Expand Up @@ -214,6 +216,22 @@ ucp_proto_rndv_rkey_ptr_fetch_progress(uct_pending_req_t *uct_req)
return UCS_OK;
}

static ucs_status_t ucp_proto_rndv_rkey_ptr_reset(ucp_request_t *request)
{
switch (request->send.proto_stage) {
case UCP_PROTO_RNDV_RKEY_PTR_STAGE_COPY:
ucs_queue_remove(&request->send.ep->worker->rkey_ptr_reqs,
&request->send.rndv.rkey_ptr.queue_elem);
break;
case UCP_PROTO_RNDV_RKEY_PTR_STAGE_ACK:
break;
default:
ucp_proto_fatal_invalid_stage(request, "reset");
}

return UCS_OK;
}

ucp_proto_t ucp_rndv_rkey_ptr_proto = {
.name = "rndv/rkey_ptr",
.desc = "copy from mapped remote memory",
Expand All @@ -225,7 +243,7 @@ ucp_proto_t ucp_rndv_rkey_ptr_proto = {
[UCP_PROTO_RNDV_RKEY_PTR_STAGE_ACK] = ucp_proto_rndv_ats_progress
},
.abort = ucp_proto_abort_fatal_not_implemented,
.reset = (ucp_request_reset_func_t)ucp_proto_reset_fatal_not_implemented
.reset = ucp_proto_rndv_rkey_ptr_reset
};

static void
Expand Down
38 changes: 4 additions & 34 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -1558,29 +1558,6 @@ ucp_wireup_add_fast_lanes(ucp_worker_h worker,
return num_lanes;
}

static unsigned ucp_wireup_get_current_num_lanes(
const ucp_wireup_select_params_t *select_params, ucp_lane_type_t type)
{
ucp_ep_h ep = select_params->ep;
unsigned current_num_lanes = 0;
ucp_lane_index_t lane;

/* First initialization (current lanes weren't chosen yet) or
* CM is enabled (so we can reconfigure the endpoint).
*/
if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
ucp_ep_has_cm_lane(ep)) {
return ucp_wireup_bw_max_lanes(select_params);
}

for (lane = 0; lane < ucp_ep_config(ep)->key.num_lanes; ++lane) {
if (ucp_ep_config(ep)->key.lanes[lane].lane_types & UCS_BIT(type)) {
++current_num_lanes;
}
}
return current_num_lanes;
}

static unsigned
ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
ucp_wireup_select_bw_info_t *bw_info,
Expand All @@ -1602,25 +1579,17 @@ ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
ucp_rsc_index_t rsc_index;
unsigned addr_index;
ucp_wireup_select_info_t *sinfo;
unsigned max_lanes, num_lanes;
unsigned num_lanes;
unsigned local_num_paths, remote_num_paths;

local_dev_bitmap = bw_info->local_dev_bitmap;
remote_dev_bitmap = bw_info->remote_dev_bitmap;
bw_info->criteria.arg = &dev_count;

/* Restrict choosing more lanes that were already chosen when CM is disabled
* to prevent EP reconfiguration in case of dropping lanes due to low BW.
* TODO: Remove when endpoint reconfiguration is supported.
*/
max_lanes = ucp_wireup_get_current_num_lanes(select_params,
bw_info->criteria.lane_type);
max_lanes = ucs_min(max_lanes, bw_info->max_lanes);

/* lookup for requested number of lanes or limit of MD map
* (we have to limit MD's number to avoid malloc in
* memory registration) */
while (ucs_array_length(&sinfo_array) < max_lanes) {
while (ucs_array_length(&sinfo_array) < bw_info->max_lanes) {
if (excl_lane == UCP_NULL_LANE) {
sinfo = ucs_array_append(&sinfo_array, break);
status = ucp_wireup_select_transport(select_ctx, select_params,
Expand Down Expand Up @@ -1848,7 +1817,8 @@ ucp_wireup_add_rma_bw_lanes(const ucp_wireup_select_params_t *select_params,
ucp_wireup_init_select_flags(&iface_rma_flags, 0, 0);
ucp_wireup_init_select_flags(&peer_rma_flags, 0, 0);

if (ep_init_flags & UCP_EP_INIT_CREATE_AM_LANE_ONLY) {
if ((ep_init_flags & UCP_EP_INIT_CREATE_AM_LANE_ONLY) ||
(context->config.ext.max_rndv_lanes == 0)) {
return UCS_OK;
}

Expand Down
98 changes: 57 additions & 41 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1085,11 +1085,9 @@ ucp_wireup_connect_lane_to_iface(ucp_ep_h ep, ucp_lane_index_t lane,
}
} else {
/* If EP already exists, it's a wireup proxy, and we need to update
* its next_ep instead of replacing it. The wireup EP was created
* during CM pack_cb() on a client side */
* its next_ep instead of replacing it. */
ucs_assert(ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane)));
ucs_assert(ucp_proxy_ep_extract(ucp_ep_get_lane(ep, lane)) == NULL);
ucs_assert(ucp_ep_has_cm_lane(ep));
ucp_wireup_ep_lane_set_next_ep(ep, lane, uct_ep);
}

Expand Down Expand Up @@ -1355,23 +1353,7 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps,
}
}

static int
ucp_wireup_are_all_lanes_p2p(ucp_ep_h ep, const ucp_ep_config_key_t *key)
{
ucp_lane_index_t lane;
ucp_rsc_index_t rsc_index;

for (lane = 0; lane < key->num_lanes; ++lane) {
rsc_index = ucp_ep_get_rsc_index(ep, lane);
ucs_assert(rsc_index != UCP_NULL_RESOURCE);

if (ucp_worker_is_tl_2iface(ep->worker, rsc_index)) {
return 0;
}
}

return 1;
}

static unsigned
ucp_ep_num_reused_lanes(ucp_ep_h ep, const ucp_lane_index_t *reuse_lane_map)
Expand All @@ -1394,20 +1376,22 @@ ucp_wireup_check_is_reconfigurable(ucp_ep_h ep,
{
ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES];
const ucp_ep_config_key_t *old_key;
ucp_lane_index_t lane;

if (ucp_ep_has_cm_lane(ep)) {
if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
ucp_ep_has_cm_lane(ep)) {
return 1;
}

old_key = &ucp_ep_config(ep)->key;

/* Verify both old/new configurations have only p2p lanes */
if (!ucp_wireup_are_all_lanes_p2p(ep, old_key) ||
!ucp_wireup_are_all_lanes_p2p(ep, new_key) ||
(old_key->num_lanes != new_key->num_lanes)) {
return 0;
/* TODO: Support reconfiguration when lanes are created without a wireup_ep
* wrapper */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (!ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) {
return 0;
}
}

old_key = &ucp_ep_config(ep)->key;
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);

Expand Down Expand Up @@ -1452,7 +1436,6 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
/* Use existing EP from CM lane */
new_wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
ucs_assert(new_wireup_ep != NULL);
aux_rsc_index = ucp_ep_get_rsc_index(ep, old_lane);
} else {
/* Create new EP for non-CM flow */
status = ucp_wireup_ep_create(ep, &uct_ep);
Expand All @@ -1461,10 +1444,14 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
}

new_wireup_ep = ucp_wireup_ep(uct_ep);
aux_rsc_index = ucp_wireup_ep_get_aux_rsc_index(
&old_wireup_ep->super.super);
}

/* Get correct aux_rsc_index either from next_ep or aux_ep */
aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ?
ucp_ep_get_rsc_index(ep, old_lane) :
ucp_wireup_ep_get_aux_rsc_index(
&old_wireup_ep->super.super);

ucs_assert(aux_rsc_index != UCP_NULL_RESOURCE);
is_p2p = ucp_ep_config_connect_p2p(ep->worker, &ucp_ep_config(ep)->key,
aux_rsc_index);
Expand Down Expand Up @@ -1500,7 +1487,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
uct_ep_h new_uct_eps[UCP_MAX_LANES] = {NULL};
ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES] = {UCP_NULL_LANE};
ucp_ep_config_key_t *old_key;
ucp_lane_index_t lane, reuse_lane;
ucp_lane_index_t lane, reuse_lane, wireup_lane;
uct_ep_h uct_ep;
ucs_status_t status;

Expand Down Expand Up @@ -1532,11 +1519,13 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
"new_key->wireup_msg_lane=%u", new_key->wireup_msg_lane);
}

wireup_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);

/* wireup lane has to be selected for the old configuration */
ucs_assert(old_key->wireup_msg_lane != UCP_NULL_LANE);
ucs_assert(wireup_lane != UCP_NULL_LANE);

/* set the correct WIREUP MSG lane */
reuse_lane = reuse_lane_map[old_key->wireup_msg_lane];
reuse_lane = reuse_lane_map[wireup_lane];
if (reuse_lane != UCP_NULL_LANE) {
/* previous wireup lane is part of the new configuration, so reuse it */
new_key->wireup_msg_lane = reuse_lane;
Expand Down Expand Up @@ -1622,6 +1611,23 @@ ucp_wireup_try_select_lanes(ucp_ep_h ep, unsigned ep_init_flags,
return UCS_OK;
}

static void
ucp_wireup_gather_pending_reqs(ucp_ep_h ep,
ucs_queue_head_t *replay_pending_queue)
{
ucp_request_t *req;

ucp_wireup_eps_pending_extract(ep, replay_pending_queue);

ucs_queue_for_each(req, &ep->worker->rkey_ptr_reqs,
send.rndv.rkey_ptr.queue_elem) {
if (req->send.ep == ep) {
ucs_queue_push(replay_pending_queue,
(ucs_queue_elem_t*)&req->send.uct.priv);
}
}
}

ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
const ucp_tl_bitmap_t *local_tl_bitmap,
const ucp_unpacked_address_t *remote_address,
Expand All @@ -1639,14 +1645,22 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
char str[32];
ucs_queue_head_t replay_pending_queue;
ucp_rsc_index_t dst_mds_mem[UCP_MAX_MDS];
int is_reconfigurable;

tl_bitmap = UCS_STATIC_BITMAP_AND(*local_tl_bitmap,
worker->context->tl_bitmap);
ucs_assert(!UCS_STATIC_BITMAP_IS_ZERO(tl_bitmap));

ucs_trace("ep %p: initialize lanes", ep);
ucs_log_indent(1);
ucp_wireup_eps_pending_extract(ep, &replay_pending_queue);

if (!ucp_ep_has_cm_lane(ep)) {
ucp_ep_update_flags(ep, 0,
UCP_EP_FLAG_LOCAL_CONNECTED |
UCP_EP_FLAG_REMOTE_CONNECTED);
}

ucp_wireup_gather_pending_reqs(ep, &replay_pending_queue);

status = ucp_wireup_try_select_lanes(ep, ep_init_flags, &tl_bitmap,
remote_address, addr_indices, &key,
Expand All @@ -1655,10 +1669,14 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
goto out;
}

if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
!ucp_ep_config_is_equal(&ucp_ep_config(ep)->key, &key) &&
!ucp_wireup_check_is_reconfigurable(ep, &key, remote_address,
addr_indices)) {
/* This function must be used before uct_eps are discarded. Store return
* value for later use. */
is_reconfigurable = ucp_wireup_check_is_reconfigurable(ep, &key,
remote_address,
addr_indices);

if (!is_reconfigurable &&
!ucp_ep_config_is_equal(&ucp_ep_config(ep)->key, &key)) {
/* Allow to choose only the lanes that were already chosen for case
* without CM to prevent reconfiguration error.
*/
Expand Down Expand Up @@ -1710,9 +1728,7 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,

cm_idx = ep->ext->cm_idx;

if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
!ucp_wireup_check_is_reconfigurable(ep, &key, remote_address,
addr_indices)) {
if (!is_reconfigurable) {
/*
* TODO handle a case where we have to change lanes and reconfigure the ep:
*
Expand Down
6 changes: 3 additions & 3 deletions src/ucp/wireup/wireup_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ static ssize_t ucp_wireup_ep_bcopy_send_func(uct_ep_h uct_ep)
return UCS_ERR_NO_RESOURCE;
}

static int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep)
int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep)
{
return (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) ||
(wireup_ep->aux_ep == NULL);
ucs_assert(wireup_ep->super.uct_ep != NULL);
return wireup_ep->aux_ep == NULL;
}

uct_ep_h ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep_t *wireup_ep)
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/wireup/wireup_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep);

uct_ep_h ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep_t *wireup_ep);

int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep);

void ucp_wireup_ep_destroy_next_ep(ucp_wireup_ep_t *wireup_ep);

int ucp_wireup_ep_test(uct_ep_h uct_ep);
Expand Down
Loading

0 comments on commit e0494e8

Please sign in to comment.