From 22e8736b9b8ca028e74414c57feb99421c2c647b Mon Sep 17 00:00:00 2001 From: xb Date: Fri, 20 Oct 2023 23:08:50 +0800 Subject: [PATCH] ult_cart --- category/ult_cart_progress_recv_call_back | 109 ++++++++++++++++++++++ readme | 32 ++++++- 2 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 category/ult_cart_progress_recv_call_back diff --git a/category/ult_cart_progress_recv_call_back b/category/ult_cart_progress_recv_call_back new file mode 100644 index 00000000000..b3153d5b279 --- /dev/null +++ b/category/ult_cart_progress_recv_call_back @@ -0,0 +1,109 @@ +引擎启动: +server_init -> daos_debug_init -> dss_engine_metrics_init -> drpc_init -> register_dbtree_classes -> dss_topo_init -> abt_init -> dss_module_init interface初始化 -> crt_init_opt 网络初始化-> dss_module_init_all -> vos,rdb,rsvc,security,mgmt,dtx,pool,cont,obj,rebuild 模块初始化 -> dss_srv_init 服务初始化 + +... +dss_module_init_all(&dss_mod_facs) -> 初始化所有模块, vos(vos_mod_init), ... +dss_srv_init() 初始化服务(服务初始化) + dss_xstreams_init() -> xs初始化(协程池) + dss_start_xs_id(tags, xs_id) -> 按tag和xs_id启动xs, 启动系统服务, 主IO服务, 负载等多个XS + dss_start_one_xstream(obj->cpuset, tag, xs_id) -> 服务端启动单个XS + dss_sched_init(dx); -> 创建XS调度器 + daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_NET_POLL], dss_srv_handler, dx, attr, &dx->dx_progress) -> 启动轮询(驱动ULT)线程 + ... + drpc_listener_init 启动drpc监听 + ... + + +... +dss_srv_handler + if (dx->dx_comm) + rc = crt_context_create(&dmi->dmi_ctx) -> 创建私有的传输上下文 + rc = crt_context_register_rpc_task(dmi->dmi_ctx, dss_rpc_hdlr, dss_iv_resp_hdlr, dx); -> 为RPC任务注册两个公共回调(rpc控制器和iv回复) + cc_rpc_cb = dss_rpc_hdlr -> 公共RPC回调(被 progress 后由 crt_rpc_handler_common 触发执行) + cc_iv_resp_cb = dss_iv_resp_hdlr -> iv RPC回调 + crt_context_idx(dmi->dmi_ctx, &dmi->dmi_ctx_id) -> 获取cart上下文索引 + tse_sched_init(&dx->dx_sched_dsc, NULL, dmi->dmi_ctx) -> 为支持服务端调用客户端的API, 初始化调度器 + for (;;) -> 死循环 + if (dx->dx_comm) + crt_progress(dmi->dmi_ctx, dx->dx_timeout) -> 轮询/驱动网络回调 + ABT_thread_yield() -> 每次循环让出cpu一次 + +... +rpc公共回调, 调用栈1 +dss_rpc_hdlr + sched_req_enqueue 调度请求入队 + should_enqueue_req?(dx->dx_main_xs) 主线程才入队(如VOS) + req_get(对端 req_put) -> req_enqueue 请求入队 -> d_list_add_tail(&req->sr_link, &sri->sri_req_list) + ... + +调度器初始化后就开始处理队列(process_all) +dss_sched_init + sched_run + sched_start_cycle(data, pools) + process_all + policy_ops[sched_policy].process_io(dx) 处理io + policy_fifo_process 先进先出 + process_req_list + req_kickoff + req_kickoff_internal(dx, &req->sr_attr, req->sr_func,req->sr_arg) sr_func -> crt_handle_rpc 不入队列,直接处理回调 + sched_create_thread(dx, func func -> crt_handle_rpc + ABT_thread_create(abt_pool, func, arg, t_attr, thread) + crt_handle_rpc + +process_all -> policy_ops[sched_policy].process_io(dx) -> .process_io = policy_fifo_process -> + process_req_list + d_list_for_each_entry_safe(req, tmp, list, sr_link) -> 遍历队列另一端, 并处理请求 + process_req -> req_kickoff(dx, req) 开始处理请求 -> crt_handle_rpc + + +调用栈2 +dss_rpc_hdlr(crt_context_t *ctx, void *hdlr_arg, void (*real_rpc_hdlr)(void *), void *arg) + opc_get_mod_id(rpc->cr_opc) 获取模块id 偏移+掩码 daos_modeul_id + SCHED_REQ_ANONYM 匿名 sra=调度请求属性 + struct dss_module *module = dss_module_get(mod_id); + module->sm_mod_ops->dms_get_req_attr 获取属性 + sched_req_enqueue 入队 real_rpc_hdlr = crt_handle_rpc req->sr_func + should_enqueue_req SCHED_REQ_ANONYM 匿名不入队列 + req_enqueue + d_list_add_tail(&req->sr_link, &sri->sri_req_list) + + + +crt_rpc_handler_common + HG_Get_info + HG_Context_get_data + crt_hg_unpack_header + crt_opc_lookup + crt_hg_header_copy + crt_rpc_priv_init + crp_completed = 0 + crt_rpc_common_hdlr 不是集合rpc + crt_grp_priv_get_primary_rank + crt_rpc_cb_customized 自定义回调, 并且非心跳rpc, crt_opc_is_swim, 那么就执行自定义回调 + crt_ctx->cc_rpc_cb(..., crt_handle_rpc) cart的rpc控制器 + cc_rpc_cb = dss_rpc_hdlr -> sched_create_thread(dx, func -> ABT_thread_create -> crt_handle_rpc + ... 入队,出队 + crt_handle_rpc + rpc_priv->crp_opc_info->coi_rpc_cb(rpc_pub) 执行回调,如 obj_req_create DAOS_OBJ_RPC_TGT_UPDATE 对应的 ds_obj_tgt_update_handler + + + +crt_context_create -> 创建上下文的时候已经注册好了rpc公共回调(crt_rpc_handler_common), 等待被 progress 触发执行 + crt_context_init + crt_hg_ctx_init + crt_hg_class_init + HG_Init_opt + HG_Init_opt + NA_Initialize_opt 网络抽象类初始化 class: ofi Protocal: verbs;ofi_rxm Hostname: mlx5_bond_1/ip:50177 + na_private_class->na_class.ops = na_class_table[plugin_index] 抽象网络表 na_ofi_class_ops_g + na_class.ops->initialize 初始化 + na_class_ops NA_PLUGIN_OPS(ofi) 插件实现 + fi_getinfo + ofi_check + crt_hg_get_addr + crt_hg_reg_rpcid + crt_hg_reg CRT_HG_RPCID | CRT_HG_ONEWAY_RPCID 单程 -> 注册公共回调 + crt_proc_in_common + crt_proc_out_common + rpc_priv = container_of(data, struct crt_rpc_priv, crp_pub.cr_output) -> reply 回复数据 + crt_rpc_handler_common <- hg_proc_info->rpc_cb <- hg_core_rpc_cb <- hg_core_rpc_info->rpc_cb <- hg_core_process diff --git a/readme b/readme index 1b74b3010ff..afc07c87b0f 100644 --- a/readme +++ b/readme @@ -6333,7 +6333,7 @@ ec_setup -> ec_setup(void **state) arg->setup_state = SETUP_POOL_CREATE return test_setup_pool_create(state, pool -ec_tests +ec_tests 通过一些标记位才控制ec特性, 比如: DAOS_OBJ_SKIP_PARITY, 并利用cart的故障注入功能 test_teardown ec_full_stripe_snapshot @@ -6342,6 +6342,10 @@ ec_full_stripe_snapshot ec_verify_parity_data +trigger_and_wait_ec_aggreation -> 触发并等待ec聚合 + +full_update_size -> 满条带 + ec_dkey_list_punch if (!test_runable(arg, 6)) @@ -6411,5 +6415,29 @@ enum daos_obj_redun { OR_RP_8, - + + +加入集群失败, +case *mgmtpb.JoinResp: + fmt.Fprintf(&bld, "%T rank:%d (state:%s, local:%t) map:%d", m, m.Rank, m.State, m.LocalJoin, m.MapVersion) +据观察,MS领导者可能会将版本2的系统映射{rank 1,rank 2}传递给本地rank 0,他可能会错误地认为自己已被排除在系统之外(实际上它甚至还没有第一次加入系统) 类似地,如果某个等级加入了系统地图版本V,并且以某种方式收到了不包括其自身的系统地图版本V-1,则它也可能会错误地认为自己已被排除在系统之外。 为了解决这两种情况,我们通过JoinResp将最小组版本(Rank加入系统的系统映射版本)从MS(主服务)传递到该Rank上的crt PG(主要组),以便Rank将拒绝任何传入低于最低组版本的 PG 版本。 这也实现了 crt_swim_cli_cb 中的待办事项 + + +聚合(聚合是非满条带聚合满条带,下一代全部都是满条带了), 客户端设置给所有服务端的参数, DAOS_FORCE_EC_AGG +enum { + DMG_KEY_FAIL_LOC = 0, + DMG_KEY_FAIL_VALUE, + DMG_KEY_FAIL_NUM, + DMG_KEY_NUM, +}; +DAOS-3749 聚合:仅在没有 I/O 时聚合 (#1488) +- 仅在默认情况下没有对象 RPC 时才运行聚合,这是池的默认属性,但可以更改它,以便将来在聚合性能调整后聚合可以正常优先级运行 +- 每 4 秒检查一次聚合属性更改,而不是 90 秒 +- 删除DMG禁用聚合命令 +- 代码清理 +添加 obj_io_context 来存储 RPC 所需的数据结构 将相关函数一起移动 将 DMG 控制操作码的前缀从 DSS 重命名为 DMG + + + +agg_process_stripe -> 处理先前的条带。 当迭代器移动到后续的第一个范围时调用