diff --git a/.gitignore b/.gitignore index 21604e100df..193169189ef 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,5 @@ cache/ xb/write xb/write_file + +*next_gen.md \ No newline at end of file diff --git a/daos_flow.md b/daos_flow.md new file mode 100644 index 00000000000..8a5ec6cf53f --- /dev/null +++ b/daos_flow.md @@ -0,0 +1,99 @@ +'参考: https://plantuml.com/zh/sequence-diagram' + +@startuml + +title DAOS用户态文件系统写流程 + +note right + +dmg pool create sxb -z 4g; dmg pool list --verbose +daos container create sxb --type POSIX sxb; daos container query sxb sxb --verbose; daos cont get-prop sxb sxb +mkdir -p /tmp/sxb; dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb; df -h +cd /tmp/sxb +for i in {0..5};do + echo "$i, `date`" + dd if=/dev/zero of=$i bs=1M count=100 oflag=direct + sleep 3 +done + +end note + + +APP -> FS: write(写10字节数据到/tmp/sxb/file) + +FS -> libfuse3: write_buf + +dfuse -> dfuse: dfuse_do_work 循环处理 \n fuse_session_process_buf_int \n se->op.write_buf \n dfuse_cb_write \n \ +fuse_buf_copy(&ibuf, bufv, 0) 拷贝用户数据 + + +dfuse -> dfs: dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) \n\ +daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) \n\ +dc_task_create(dc_array_write, NULL, ev, &task) 创建任务 \n\ +dc_task_schedule(task, true) 调度客户端写数组任务 + +dfs -> array: dc_array_write 写数组(IO) +array -> array: dc_array_io 写数组(IO) +array -> array: compute_dkey 计算分布式key\n\ +create_sgl 创建分散聚集列表 \n\ +daos_task_create DAOS_OPC_OBJ_UPDATE 创建对象更新任务 \n\ +tse_task_register_deps(task, 1, &io_task) 注册依赖任务 \n\ +tse_task_list_sched(&io_task_list, false) 批量任务调度执行 +note left: 在给定此范围的数组索引的情况下计算 dkey + +array -> obj: dc_obj_update_task 执行对象更新任务 + +obj -> obj: obj_req_valid 校验请求\n\ +tse_task_stack_push 任务压栈 \n\ +dc_io_epoch_set 设置epoch \n\ +tse_task_stack_pop 任务出栈 \n\ + +obj -> obj: dc_obj_update 提交对象更新 \n\ +obj_task_init(task, DAOS_OBJ_RPC_UPDATE 初始化对象更新任务 \n\ +obj_shards_2_fwtgts 根据分片查找转发的目标 \n\ +obj_rw_bulk_prep 准备读写用的大块内存 \n\ +obj_req_fanout 扇出对象写请求(准备读写和执行对象分片读写) + + +obj -> obj: dc_obj_shard_rw 对象分片读写\n操作码: DAOS_OBJ_RPC_UPDATE +note right: 设置目标(组,TAG,RANK) + +obj -> daos_engine: crt_req_send(rpc, daos_rpc_cb, task)\n发送对象分片读写RPC给engine + +daos_engine -> srv_obj: ds_obj_rw_handler 服务端对象读写处理器 +srv_obj -> srv_obj: obj_ioc_begin 访问VOS前的检查 \n\ +process_epoch 处理纪元\n\ +dtx_leader_begin \n\ +dtx_leader_exec_ops obj_tgt_update \n\ +在所有目标上执行对象更新操作 \n\ +obj_local_rw 执行一次本地对象读写 \n\ +vos_update_begin 准备更新VOS \n\ +bio_iod_prep 准备块IO(IO描述) \n\ +bio_iod_post 提交块IO(IO描述) \n\ +dma_rw 内存直接访问 \n\ +nvme_rw 执行nvme读写 + +srv_obj -> spdk: spdk_blob_io_write 通过SPDK接口写blob + +spdk -> nvme_disk: spdk_bdev_write_blocks SPDK写NVME(落盘) +nvme_disk -> spdk: callback +spdk -> srv_obj: rw_completion 读写回调 \n\ +iod_dma_completion -> 完成DMA \n\ +biod->bd_completion -> wal_completion | data_completion + + +srv_obj -> srv_obj: obj_rw_complete 完成对象读写请求 \n\ +对象读写完成, 更新延迟计数器, 发送回复, 释放资源等 + +srv_obj -> obj: obj_rw_reply 发送回复 + +obj -> obj: dc_rw_cb 读写回调,释放资源, 逐层往上回调 + +obj -> dfuse: 回复 +dfuse -> app: 回复 + + + + + +@enduml diff --git a/readme b/readme index 0ca604558c5..81ce9775d6a 100644 --- a/readme +++ b/readme @@ -976,11 +976,8 @@ vos_blob_format_cb spdk_bdev_write_blocks -rw_completion - spdk_thread_send_msg - -客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 默认后台启动 +客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 强制前台, 默认后台启动(dfuse_bg) dfuse -m /mnt/sxb --pool sxb --cont sxb | dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb dfuse_main.c -> main daos_debug_init(DAOS_LOG_DEFAULT) @@ -1172,54 +1169,6 @@ check_short_read_cb 通过以下对象连接 .cpf_name = daos_opc_t -dc_funcs[opc].task_func 客户端方法数组 - -DAOS_OPC_OBJ_UPDATE 写 - dc_obj_update_task DAOS_OBJ_RPC_UPDATE - obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE - obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) - tse_task_stack_pop - dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx - dc_obj_update 否则 - obj_task_init_common(task, DAOS_OBJ_RPC_UPDATE - tse_task_stack_push - shard_task_list_init(obj_auxi) - obj_rw_req_reassemb 重新组装 - dkey_hash = obj_dkey2hash - obj_req_get_tgts 获取对象对应的目标 - obj_dkey2grpmemb - obj_dkey2grpidx - pool_map_ver = pool_map_get_version(pool->dp_map) - grp_size = obj_get_grp_size(obj) - grp_idx = d_hash_jump(hash, obj->cob_shards_nr / grp_size) how hash generate? obj with pool - obj_shards_2_fwtgts - obj_shard_tgts_query 分片目标查询 - obj_shard_open - dc_obj_shard_open - pool_map_find_target 二分查找 - comp_sorter_find_target(sorter, id) - daos_array_find - array_bin_search - obj_grp_leader_get - pl_select_leader obj_get_shard - array_bin_search 二分查找 daos_obj_classes - tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) - obj_csum_update(obj, args, obj_auxi) - obj_rw_bulk_prep - obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) 扇出 shard_io_cb = io_cb = dc_obj_shard_rw - - -ds_obj_rw_handler 接收端的回调 - obj_ioc_begin 访问VOS前的各种检查 - obj_rpc_is_fetch - process_epoch - obj_rpc_is_fetch - rc = dtx_begin 返回超时? - dtx_handle_init - dtx_shares_init(dth) 初始化以下链表, 提交,中断,活动,检查 - dtx_epoch_bound - vos_dtx_rsrvd_init(dth) - obj_local_rw 本地读写 创建任务, daos_task_create @@ -1231,35 +1180,6 @@ dfuse_start -pool_map -dfuse_cb_write position=0 - dfuse_cb_write_complete 回调 - dfs_write - dc_array_write - daos_task_create DAOS_OPC_OBJ_UPDATE - dc_obj_update_task - dc_obj_update - obj_update_shards_get - obj_rw_bulk_prep - obj_req_fanout shard_rw_prep dc_obj_shard_rw = shard_io_cb 请求扇出 - io_prep_cb shard_rw_prep 发送io前执行的回调 - shard_io 分片IO - obj_shard_open - map_ver - dc_obj_shard_open 打开分片 - dc_cont_tgt_idx2ptr 根据容器handler和taget索引,获取池的目标pool_target - pool - dc_hdl2pool - daos_hhash_link_lookup(poh.cookie) - pool_map_find_target - dc_pool_put - daos_hhash_link_putref - dc_cont_put - shard_auxi->shard_io_cb(obj_shard,...) -> dc_obj_shard_rw 对象分片读写回调 - obj_shard_close - - - import struct pool_target @@ -1277,20 +1197,6 @@ dfuse_start dfuse_cb_write step? - - -dc_obj_shard_rw 客户端对象分片读写(读写对象分片) - obj_shard_ptr2pool(shard) 根据分片获取池 - obj_req_create opc = DAOS_OBJ_RPC_UPDATE -> ds_obj_rw_handler - uuid_copy - daos_dti_copy 拷贝dtx_id - 跳过ec逻辑 - tse_task_register_comp_cb - daos_rpc_send - crt_req_send daos_rpc_cb -> tse_task_complete 发送完成回调流程:hg -> crt_hg_req_send_cb -> crp_complete_cb -> -> daos_rpc_cb -> dc_rw_cb - -超时检测和业务回调是在不同的线程中并发执行 - ds_obj_rw_handler obj_rw_reply obj_reply_set_status 与 obj_reply_get_status 成对使用 @@ -1601,7 +1507,6 @@ dc_obj_open_task_create -obj_rw_req_reassemb 重新组装对象读写请求 编译镜像: docker build . -f utils/docker/Dockerfile.el.8 -t daos @@ -1910,27 +1815,63 @@ crt_rpc_handler_common 更新目标, 落盘 ds_obj_tgt_update_handler obj_ioc_begin 访问vos前的各种检查 - obj_ioc_begin_lite 设置 lite IO 上下文,目前仅适用于复合 RPC - obj_capa_check + obj_ioc_begin_lite -> 设置lite IO上下文,到目前为止仅适用于复合RPC,1.还没有关联对象,2.权限检查(不确定它是读/写) + obj_ioc_init -> 查找并返回容器句柄,如果是重建句柄,永远不会关联特定容器,则容器结构将返回给ioc::ioc_coc + ds_cont_csummer_init(coc) -> 如果尚未加载,则按需加载 csummer 进行重建 + dss_rpc_cntr_enter(DSS_RC_OBJ) -> 增加 RPC 类型的活动计数器和总计数器 + tls = obj_tls_get() + ioc->ioc_start_time = daos_get_ntime() -> IO开始时间 + obj_inflight_io_check + 如果传入 I/O 处于集成期间(integration),则需要等待 vos 丢弃完成,否则可能会丢弃这些新的正在进行的 I/O 更新 + 重建过程中的所有I/O,都需要等待重建栅栏生成(参见rebuild_prepare_one()),这将为重建创建一个边界,因此不应重建boundary(epoch)之后的数据,否则可能会被写入 重复,这可能会导致 VOS 失败 + obj_capa_check -> DAOS-7235 obj:当 cont_rf 损坏时关闭读/写权限,(#5822) 对于活动打开的容器句柄,DAOS 将在内部 - 1. 当 cont_rf 损坏时关闭读/写权限 2. 当 cont_rf 损坏时打开读/写权限 通过以下方式恢复 - 重新集成故障设备,或通过清除 UNCLEAN 容器状态 - “daos cont set-prop --properties=status:healthy ...” 将测试用例添加到 co_rf_simple()。 现在DAOS_PROP_CO_STATUS仅用于存储cont_create的pm_ver,以及当用户清除UNCLEAN状态时。 并且不要将 UNCLEAN 状态设置为 RDB 以避免在检查活动容器句柄时出现歧义 obj_ioc_init_oca - dtx_handle_resend - vos_dtx_commit - dtx_begin - obj_local_rw - obj_local_rw_internal - bio_iod_prep - obj_bulk_transfer - bio_iod_post - dma_rw - nvme_rw - spdk_blob_io_write 完成回调 rw_completion - blob_request_submit_op - blob_request_submit_op_single - bs_batch_write_dev - blob_bdev->bs_dev.write - bdev_blob_write - spdk_bdev_write_blocks - spdk_blob_io_read + process_epoch -> 处理传入操作的纪元状态。 一旦该函数返回,纪元状态将包含选定的纪元。 另外,如果返回值为PE_OK_LOCAL,则该纪元可以毫无不确定性地用于本地RDG操作 + dtx_leader_begin + dtx_handle_init + vos_dtx_attach + dtx_leader_exec_ops(dlh, obj_tgt_update, NULL, 0, &exec_arg) -> 在所有目标上执行对象更新操作 + dtx_leader_end(dlh, ioc.ioc_coh, rc) + local_rc = func(dlh, func_arg, -1, NULL) -> obj_tgt_update -> 本地仅执行一次对象读写 -> obj_local_rw(exec_arg->rpc, exec_arg->ioc, &dlh->dlh_handle) +-------------------------- +XXX:对于非单独DTX,领导者和非领导者将并行地进行各自的局部修改。 如果非领导者的速度太快,以至于非领导者可能已经开始处理下一个 RPC,但领导者还没有真正开始当前的修改,例如在批量数据传输阶段被阻止。 在这种情况下,有可能当non-leader处理下一个请求时,它命中了本地刚刚准备好的DTX,那么non-leader就会向leader检查这样的DTX状态。 但此时,Leader 上的 DTX 条目不存在,这会误导非 Leader 错过中止此类 DTX。 为了避免这种糟糕的情况,领导者需要在将当前请求分派给非领导者之前在 DRAM 中构建其 DTX 条目。 另一方面,即使对于单独的DTX,由于服务器端负载过重,PRC也可能被延迟处理。 如果是大数据传输的更新RPC,那么客户端有可能认为更新RPC超时,在原来的RPC批量数据传输期间重新发送RPC,这会导致CPU消耗,然后服务器上的重发逻辑将找不到相关的 DTX 条目,因为原始 RPC 的 DTX 尚未“准备好”。 在这种情况下,更新请求将在服务器上双重执行。 应该避免这种情况。 因此,在批量数据传输之前预分配 DTX 条目是必要的 +obj_local_rw + obj_get_iods_offs + obj_local_rw_internal + csum_verify_keys + obj_singv_ec_rw_filter + vos_update_begin + vos_check_akeys + vos_ioc_create + vos_space_hold + dkey_update_begin + bio_iod_prep CRT_BULK_RW + iod_map_iovs(biod, arg) + bio_iod_copy(biod, orw->orw_sgls.ca_arrays, iods_nr) + vos_dedup_verify + obj_verify_bio_csum + bio_iod_post_async(biod, rc) + obj_bulk_transfer + bio_iod_post + 写入时将数据从缓冲区转移到介质 + dma_rw + nvme_rw + xs_ctxt = biod->bd_ctxt->bic_xs_ctxt + ... + drain_inflight_ios(xs_ctxt, bxb) + spdk_thread_poll(ctxt->bxc_thread, 0, 0) + if (biod->bd_type == BIO_IOD_TYPE_UPDATE) + spdk_blob_io_write(blob, channel, payload, + page2io_unit(biod->bd_ctxt, pg_idx, BIO_DMA_PAGE_SZ), + page2io_unit(biod->bd_ctxt, rw_cnt, BIO_DMA_PAGE_SZ), + rw_completion, biod); -> 完成回调 rw_completion + blob_request_submit_op + blob_request_submit_op_single + bs_batch_write_dev + blob_bdev->bs_dev.write + bdev_blob_write + spdk_bdev_write_blocks + spdk_blob_io_read @@ -2964,10 +2905,6 @@ create_map_refresh_rpc ds_pool_tgt_query_map_handler - -obj_ioc_init - - umount -t fuse.daos /home/cont_ssd dfuse_launch_fuse 阻塞,直到完成 fuse_session_unmount @@ -4256,8 +4193,12 @@ vos_blob_format_cb spdk_bdev_write_blocks -rw_completion - spdk_thread_send_msg +rw_completion(void *cb_arg, int err) + biod = cb_arg -> 拿到BIO描述 + spdk_thread_send_msg bio_media_error -> 如果有错误,则通知错误处理线程 + iod_dma_completion -> 完成DMA + biod->bd_completion(biod->bd_comp_arg, err) -> wal_completion | data_completion + ... 客户端mount: @@ -4401,17 +4342,16 @@ check_short_read_cb daos_opc_t dc_funcs[opc].task_func 客户端方法数组 -DAOS_OPC_OBJ_UPDATE 写 - dc_obj_update_task DAOS_OBJ_RPC_UPDATE - obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE - obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) -> 将任务压栈 - pushed_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top - ... - dc_io_epoch_set(epoch, opc) - tse_task_stack_pop -> 将任务从栈上弹出来 - poped_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top - dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx - dc_obj_update(task, &epoch, map_ver, args, obj) -> 提交对象更新 +dc_obj_update_task(tse_task_t *task) DAOS_OPC_OBJ_UPDATE 写 + obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE + obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) -> 将任务压栈 + pushed_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top + ... + dc_io_epoch_set(epoch, opc) + tse_task_stack_pop -> 将任务从栈上弹出来 + poped_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top + dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx + return dc_obj_update(task, &epoch, map_ver, args, obj) -> 提交对象更新 obj_task_init(task, DAOS_OBJ_RPC_UPDATE, map_ver, args->th, &obj_auxi, obj) obj_task_init_common(task, opc, map_ver, th, auxi, obj) tse_task_stack_push @@ -4419,38 +4359,48 @@ DAOS_OPC_OBJ_UPDATE 写 obj_auxi->is_ec_obj = obj_is_ec(obj) -> 设置EC对象标志 tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) -> 为任务注册对象完成回调, 弹出任务, 重试, 错误处理等 ---------------------- - obj_update_sgls_dup(obj_auxi, args) - --------------------------- old - obj_rw_req_reassemb 重新组装 - dkey_hash = obj_dkey2hash - obj_req_get_tgts 获取对象对应的目标 - obj_dkey2grpmemb - obj_dkey2grpidx - pool_map_ver = pool_map_get_version(pool->dp_map) - grp_size = obj_get_grp_size(obj) - grp_idx = d_hash_jump(hash, obj->cob_shards_nr / grp_size) how hash generate? obj with pool - obj_shards_2_fwtgts - obj_shard_tgts_query 分片目标查询 - obj_shard_open - dc_obj_shard_open - pool_map_find_target 二分查找 - comp_sorter_find_target(sorter, id) - daos_array_find - array_bin_search - obj_grp_leader_get - pl_select_leader obj_get_shard - array_bin_search 二分查找 daos_obj_classes - tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) - obj_csum_update(obj, args, obj_auxi) - obj_rw_bulk_prep(obj, args->iods, args->sgls, args->nr, true, obj_auxi->req_tgts.ort_srv_disp, task, obj_auxi) -> 准备读写大块数据 - daos_sgls_packed_size -> 内联提取需要将 sqls 缓冲区打包到 RPC 中,因此使用它来检查是否需要批量传输 - obj_bulk_prep - crt_bulk_create - crt_bulk_bind - obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) -> 扇出 shard_io_cb = io_cb = dc_obj_shard_rw - - -ds_obj_rw_handler 接收端的回调 + obj_update_sgls_dup(obj_auxi, args) -> 用户可能提供 iov_len < iov_buf_len 的 sql,这可能会给内部处理带来一些麻烦,例如 crt_bulk_create/daos_iov_left() 总是使用 iov_buf_len。 对于这种情况,我们复制 sql 并使其 iov_buf_len = iov_len + obj_auxi->dkey_hash = obj_dkey2hash(obj->cob_md.omd_id, args->dkey) -> 比如为1 + if (obj_is_ec(obj)) + obj_rw_req_reassemb(obj, args, NULL, obj_auxi) -> EC对象需要, 重新组装对象读写请求 + obj_update_shards_get + obj_shards_2_fwtgts -> 根据分片查找转发的目标 + req_tgts->ort_shard_tgts = req_tgts->ort_tgts_inline -> 分片目标数组,包含 (ort_grp_nr * ort_grp_size) 个目标。 如果#targets <= OBJ_TGT_INLINE_NR 那么它指向ort_tgts_inline。 在数组中,[0, ort_grp_size - 1] 表示第一组,[ort_grp_size, ort_grp_size * 2 - 1] 表示第二组,依此类推。 如果 (ort_srv_disp == 1),则在每个组中,第一个目标是领导分片,后面的 (ort_grp_size - 1) 目标是前向非领导分片。 现在只有一种情况 (ort_grp_nr > 1) 用于对象打孔,所有其他情况均为 (ort_grp_nr == 1) + obj_shard_tgts_query -> 分片目标查询 + obj_csum_update + ------------------- + obj_req_get_tgts 获取对象对应的目标 + obj_dkey2grpmemb + obj_dkey2grpidx + pool_map_ver = pool_map_get_version(pool->dp_map) + grp_size = obj_get_grp_size(obj) + grp_idx = d_hash_jump(hash, obj->cob_shards_nr / grp_size) how hash generate? obj with pool + obj_shards_2_fwtgts + obj_shard_tgts_query 分片目标查询 + obj_shard_open + dc_obj_shard_open + pool_map_find_target 二分查找 + comp_sorter_find_target(sorter, id) + daos_array_find + array_bin_search + obj_shard2tgtid + *tgt_id = obj->cob_shards->do_shards[shard].do_target_id -> dc_obj_layout 客户端对象布局 + obj_shard_close(obj_shard) + obj_auxi->flags |= ORF_CONTAIN_LEADER -> 要求转发给容器leader + obj_grp_leader_get + pl_select_leader obj_get_shard + array_bin_search 二分查找 daos_obj_classes + tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) + obj_csum_update(obj, args, obj_auxi) + obj_rw_bulk_prep(obj, args->iods, args->sgls, args->nr, true, obj_auxi->req_tgts.ort_srv_disp, task, obj_auxi) -> 准备读写大块数据 + daos_sgls_packed_size -> 内联提取需要将 sqls 缓冲区打包到 RPC 中,因此使用它来检查是否需要批量传输 + obj_bulk_prep + crt_bulk_create + crt_bulk_bind + obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) -> 扇出 shard_io_cb = io_cb = dc_obj_shard_rw + + +ds_obj_rw_handler(crt_rpc_t *rpc) -> 服务端对象读写处理器(DAOS_OBJ_RPC_UPDATE) obj_ioc_begin 访问VOS前的各种检查 obj_rpc_is_fetch process_epoch @@ -4526,18 +4476,24 @@ dfuse_start dc_obj_shard_rw 客户端对象分片读写(读写对象分片) + dc_cont2uuid(shard->do_co, &cont_hdl_uuid, &cont_uuid) -> 设置容器uuid obj_shard_ptr2pool(shard) 根据分片获取池 + tgt_ep.ep_grp = pool->dp_sys->sy_group + tgt_ep.ep_tag = shard->do_target_idx + tgt_ep.ep_rank = shard->do_target_rank -> 设置cart端点(目标组,tag,rank) obj_req_create opc = DAOS_OBJ_RPC_UPDATE -> ds_obj_rw_handler + crt_req_create(crt_ctx, tgt_ep, opcode, req) uuid_copy daos_dti_copy 拷贝dtx_id + orw... > 填充容器读写结构体 跳过ec逻辑 - tse_task_register_comp_cb + tse_task_register_comp_cb(task, dc_rw_cb -> 注册容器读写任务回调 daos_rpc_send - crt_req_send daos_rpc_cb -> tse_task_complete 发送完成回调流程:hg -> crt_hg_req_send_cb -> crp_complete_cb -> -> daos_rpc_cb -> dc_rw_cb + crt_req_send(rpc, daos_rpc_cb, task) -> engine收到后处理 -> ds_obj_rw_handler -> tse_task_complete 发送完成回调流程:hg -> crt_hg_req_send_cb -> crp_complete_cb -> -> daos_rpc_cb -> dc_rw_cb 超时检测和业务回调是在不同的线程中并发执行 -ds_obj_rw_handler +ds_obj_rw_handler(crt_rpc_t *rpc) obj_rw_reply obj_reply_set_status 与 obj_reply_get_status 成对使用 ((struct obj_rw_out *)reply)->orw_ret = status 设置回复状态 @@ -4885,7 +4841,6 @@ dc_obj_open_task_create -obj_rw_req_reassemb 重新组装对象读写请求 终结/销毁流程 @@ -5719,3 +5674,13 @@ int daos_event_fini(struct daos_event *ev) -> 终止事件, 从各种列表、pa ---------------------------------------- DL ---------------------------------------- +obj_rw_complete(crt_rpc_t *rpc, struct obj_io_context *ioc, + daos_handle_t ioh, int status, struct dtx_handle *dth) -> 对象读写完成, 更新延迟计数器, 发送回复, 释放资源等 +obj_update_latency +obj_rw_reply(rpc, rc, epoch.oe_value, &ioc) + crt_reply_send(rpc) +obj_ioc_end(&ioc, rc) + dss_rpc_cntr_exit + obj_update_sensors +obj_ioc_fini +