diff --git a/flow/plant_uml.md b/flow/plant_uml.md new file mode 100644 index 00000000000..eedf5d34f2a --- /dev/null +++ b/flow/plant_uml.md @@ -0,0 +1,15 @@ +@startuml + +Client -> HttpServer: request + +HttpServer -> WebApp: application_callable() +note right: application_callable(environ, start_response) + +WebApp -> HttpServer: start_response() +note right: start_response(status, headers, exc_info) + +WebApp -> HttpServer: return iterator + +HttpServer -> Client: dddds + +@enduml diff --git a/readme b/readme index 9e38efeed83..0ca604558c5 100644 --- a/readme +++ b/readme @@ -761,6 +761,8 @@ run_all_tests(int keys) -> 所有的测试 dts_create_config(cfg_desc_io, "keys=%d", keys) run_ts_tests -> lru ... + run_wal_tests(cfg_desc_io) + ... 启动容器: @@ -943,10 +945,14 @@ eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增, eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab) ev->de_complete_cb = dfuse_cb_write_complete -dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev) +oh->doh_ie->ie_stat.st_size = len + position -> 比如写10字节 +此时SGL上已经有用户数据 +dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev) -> 初始化IO描述(iod, daos_array_iod_t, IO范围), 将EV继续透传 daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev) - dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和task - sched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器 + dc_task_create(dc_array_write, NULL, ev, &task) -> 初始化task, 关联EV和task, 此时调度器为空, 但是EV不为空 + sched = daos_ev2sched(ev) -> 从EV上拿到调度器 + tse_task_create(func, sched, NULL, &task) -> 创建任务 + args = dc_task_get_args(task) -> 从task参数联合体上得到参数, 并将SGL和IO描述设置到参数上 return dc_task_schedule(task, true) sem_post(&eqt->de_sem) -> 唤醒EQ d_slab_restock(eqt->de_write_slab) -> 重用slab @@ -1100,6 +1106,7 @@ write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3 fuse_buf_size(bufv) ibuf = FUSE_BUFVEC_INIT(len) 分配本地缓冲区 DFUSE_TRA_DEBUG 调试 + dfuse_mcache_evict -> 清除此处的元数据缓存,以便查找不会返回过时的大小/时间信息 fuse_buf_copy(&ibuf, bufv, 0) dfuse_cache_evict d_slab_acquire 以高效的方式分配数据 @@ -1132,8 +1139,10 @@ dc_array_write array_hdl2ptr io_extent_same D_INIT_LIST_HEAD(&io_task_list) - daos_task_create(DAOS_OPC_ARRAY_GET_SIZE 短读任务 DAOS_OPC_ARRAY_READ + daos_task_create(DAOS_OPC_ARRAY_GET_SIZE 短读任务 DAOS_OPC_ARRAY_READ, 写不走此逻辑 while (u < rg_iod->arr_nr) 遍历每个范围,但同时组合属于同一 dkey 的连续范围。 如果用户给出的范围不增加偏移量,则它们可能不会合并,除非分隔范围也属于同一个 dkey + tse_task_t *io_task = NULL -> io任务 + struct io_params *params -> IO参数 compute_dkey 计算分布式key 在给定此范围的数组索引的情况下计算 dkey。 还计算从我们开始的索引开始,dkey 可以保存的记录数写作。 相对于 dkey 的记录索引 struct io_params *prev, *current 如果有多个dkey io, 则通过链表连接起来 num_ios++ @@ -1142,15 +1151,16 @@ dc_array_write compute_dkey 再次计算dkey create_sgl 创建分散聚集列表 daos_task_create(DAOS_OPC_OBJ_FETCH 读: DAOS_OPC_ARRAY_READ 按索引号 -> dc_obj_fetch_task - daos_task_create(DAOS_OPC_OBJ_UPDATE 写 或 DAOS_OPC_ARRAY_PUNCH truncate dc_funcs[opc].task_func 客户端方法数组 + daos_task_create(DAOS_OPC_OBJ_UPDATE 写 或 DAOS_OPC_ARRAY_PUNCH truncate dc_funcs[opc].task_func 客户端方法数组 -> dc_obj_update_task(tse_task_t *task) daos_task_get_args tse_task_register_deps 注册在计划任务之前需要完成的依赖任务。 依赖任务无法进行, 如果一个任务依赖于其他任务,只有依赖的任务完成了,才可以将任务添加到调度器列表中 - tse_task_list_add(io_task, &io_task_list) d_list_add_tail(&dtp->dtp_task_list, head); 添加任务到链表 - tse_task_register_comp_cb(task, free_io_params_cb, &head, sizeof(head)) 为任务注册完成回调 + tse_task_list_add(io_task, &io_task_list) -> d_list_add_tail(&dtp->dtp_task_list, head); 添加IO任务到链表 + tse_task_register_comp_cb(task, free_io_params_cb, &head, sizeof(head)) -> 为任务注册完成回调, 用于释放IO参数 if (op_type == DAOS_OPC_ARRAY_READ && array->byte_array) 短读 tse_task_register_deps(task, 1, &stask) 注册依赖任务 tse_task_list_add(stask, &io_task_list) 加到io任务列表 - tse_task_list_sched(&io_task_list, false); 调度执行 + tse_task_list_sched(&io_task_list, true) -> 遍历临时任务列表, 立即调度执行 + tse_task_schedule(task, instant) array_decref(array) tse_task_register_cbs(stask, check_short_read_cb 读回调 tse_sched_progress(tse_task2sched(task)) 推进/处理, 先处理依赖任务 @@ -1220,11 +1230,6 @@ dfuse_start dfuse_progress_thread -重要结构: - -tse_task_t *io_task = NULL; io任务 - - pool_map dfuse_cb_write position=0 @@ -1335,7 +1340,7 @@ tse: 调度, 1300+400, 涉及文件: event.h, daos_event.h, tse.h, tse.c, tse_in dc_task_create sched = daos_ev2sched(ev) -> EV事件转调度器 tse_task_create(func, sched, NULL, &task) -> 初始化 tse_task。 该任务(task)会被添加到调度器(sched)任务列表中,稍后被调度,如果提供了依赖任务,则该任务将被添加到依赖任务的dep列表中,一旦依赖任务完成,则添加该任务到调度程序列表(先完成依赖任务, 然后添加主任务) - task_ptr2args 指针转参数 + task_ptr2args -> task指针转参数args, D_INIT_LIST_HEAD(&dtp->dtp_list) -> 初始化任务链表, 调度时插入到调度器的初始化队列(dsp_init_list) -> d_list_add_tail(&dtp->dtp_list, &dsp->dsp_init_list) D_INIT_LIST_HEAD(&dtp->dtp_task_list) D_INIT_LIST_HEAD(&dtp->dtp_dep_list) @@ -1344,13 +1349,16 @@ dc_task_create ... dtp->dtp_func = task_func ... - tse_task_buf_embedded 获取任务的嵌入式缓冲区,用户可以使用它来携带功能参数。 任务的嵌入式缓冲区有大小限制,如果 buf_size 大于限制,此函数将返回 NULL。 用户应通过 tse_task_set_priv() 使用私有数据来传递大参数。 MSC - 我将其更改为只是一个缓冲区,而不是像以前那样, 不断给一个额外的指针指向大的预涂层缓冲区。 以前的方式不适用于公共用途。我们现在应该使它更简单,更通用,如下面的评论 + tse_task_buf_embedded 获取任务的嵌入式缓冲区,用户可以使用它来携带功能参数。 任务的嵌入式缓冲区有大小限制,如果 buf_size 大于限制,此函数将返回 NULL。 用户应通过 tse_task_set_priv() 使用私有数据来传递大参数。 MSC - 我将其更改为只是一个缓冲区,而不是像以前那样, 不断给一个额外的指针指向大的预涂层缓冲区。 以前的方式不适用于公共用途。我们现在应该使它更简单,更通用,如下面的评论, 比如size=152B tse_task_buf_size return (size + 7) & ~0x7 + avail_size = sizeof(dtp->dtp_buf) - dtp->dtp_stack_top -> 可用大小880B + return (void *)dtp->dtp_buf -> 任务增加栈空间并将任务内存返回 args->ta_magic = DAOS_TASK_MAGIC tse_task_register_comp_cb(task, task_comp_event, NULL, 0) -> dtc->dtc_cb = cb task_comp_event 注册完成回调 register_cb(task, true, comp_cb, arg, arg_size) d_list_add(&dtc->dtc_list, &dtp->dtp_comp_cb_list) 插入到列表开始处 + args->ta_ev = ev -> 将EV设置到task参数上 failed -> tse_task_decref(task) -> 如果失败则判断引用计数 @@ -1629,7 +1637,7 @@ dfuse_do_work if (type->st_free_count == 0) int count = restock(type, 1) -> 剩余个数为0时需要补货 ptr = (void *)entry - type->st_reg.sr_offset -> 从空闲表中拿一个 - ibuf.buf[0].mem = ev->de_iov.iov_buf -> 将ev与ibuf通过iov_buf关联 + ibuf.buf[0].mem = ev->de_iov.iov_buf -> 将ev与ibuf通过iov_buf关联, 用户数据与SGL关联 fuse_buf_copy(&ibuf, bufv, 0) ev->de_complete_cb = dfuse_cb_write_complete -> 为ev设置回调 dfs_write @@ -4321,7 +4329,7 @@ struct fuse_file_info libfuse 客户端写数据:write.c -write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3 +write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3, 将用户数据拷贝到用户态EV的向量IO(iov_buf)上, 在用户态EV上注册完成回调, 调用DAOS文件系统写接口, 传入EV上的DAOS_EV, fuse_req_userdata fuse_req_ctx fuse_buf_size(bufv) @@ -4363,7 +4371,7 @@ dc_array_write D_INIT_LIST_HEAD(&io_task_list) daos_task_create(DAOS_OPC_ARRAY_GET_SIZE 短读任务 DAOS_OPC_ARRAY_READ while (u < rg_iod->arr_nr) 遍历每个范围,但同时组合属于同一 dkey 的连续范围。 如果用户给出的范围不增加偏移量,则它们可能不会合并,除非分隔范围也属于同一个 dkey - compute_dkey 计算分布式key 在给定此范围的数组索引的情况下计算 dkey。 还计算从我们开始的索引开始,dkey 可以保存的记录数写作。 相对于 dkey 的记录索引 + compute_dkey 计算分布式key 在给定此范围的数组索引的情况下计算 dkey。 还计算从我们开始的索引开始,dkey 可以保存的记录数写作。 相对于 dkey 的记录索引, 比如10B, dkey_val=1 struct io_params *prev, *current 如果有多个dkey io, 则通过链表连接起来 num_ios++ d_iov_set(dkey, ¶ms->dkey_val, sizeof(uint64_t)); @@ -4396,13 +4404,23 @@ dc_funcs[opc].task_func 客户端方法数组 DAOS_OPC_OBJ_UPDATE 写 dc_obj_update_task DAOS_OBJ_RPC_UPDATE obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE - obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) - tse_task_stack_pop + obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) -> 将任务压栈 + pushed_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top + ... + dc_io_epoch_set(epoch, opc) + tse_task_stack_pop -> 将任务从栈上弹出来 + poped_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx - dc_obj_update 否则 - obj_task_init_common(task, DAOS_OBJ_RPC_UPDATE + dc_obj_update(task, &epoch, map_ver, args, obj) -> 提交对象更新 + obj_task_init(task, DAOS_OBJ_RPC_UPDATE, map_ver, args->th, &obj_auxi, obj) + obj_task_init_common(task, opc, map_ver, th, auxi, obj) tse_task_stack_push shard_task_list_init(obj_auxi) + obj_auxi->is_ec_obj = obj_is_ec(obj) -> 设置EC对象标志 + tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) -> 为任务注册对象完成回调, 弹出任务, 重试, 错误处理等 + ---------------------- + obj_update_sgls_dup(obj_auxi, args) + --------------------------- old obj_rw_req_reassemb 重新组装 dkey_hash = obj_dkey2hash obj_req_get_tgts 获取对象对应的目标 @@ -4424,8 +4442,12 @@ DAOS_OPC_OBJ_UPDATE 写 array_bin_search 二分查找 daos_obj_classes tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) obj_csum_update(obj, args, obj_auxi) - obj_rw_bulk_prep - obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) 扇出 shard_io_cb = io_cb = dc_obj_shard_rw + obj_rw_bulk_prep(obj, args->iods, args->sgls, args->nr, true, obj_auxi->req_tgts.ort_srv_disp, task, obj_auxi) -> 准备读写大块数据 + daos_sgls_packed_size -> 内联提取需要将 sqls 缓冲区打包到 RPC 中,因此使用它来检查是否需要批量传输 + obj_bulk_prep + crt_bulk_create + crt_bulk_bind + obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) -> 扇出 shard_io_cb = io_cb = dc_obj_shard_rw ds_obj_rw_handler 接收端的回调 @@ -4451,11 +4473,6 @@ dfuse_start 重要结构: -tse_task_t *io_task = NULL; io任务 - - - - ubix: @@ -4468,7 +4485,7 @@ dfuse_cb_write position=0 daos_task_create DAOS_OPC_OBJ_UPDATE dc_obj_update_task dc_obj_update - obj_update_shards_get + obj_update_shards_get -> 走非EC的处理逻辑 obj_rw_bulk_prep obj_req_fanout shard_rw_prep dc_obj_shard_rw = shard_io_cb 请求扇出 io_prep_cb shard_rw_prep 发送io前执行的回调 @@ -4858,10 +4875,6 @@ ds_obj_query_key_handler_1 全局rpc操作码: cg_opc_map -open shard -obj_shard_open 18 - - shard_query_key_task daos_task_create(DAOS_OPC_OBJ_QUERY_KEY @@ -5382,8 +5395,10 @@ daos_eq_query -> 查询EQ中有多少个未完成的事件,如果events不为N daos_event_parent_barrier -> 将父事件标记为已启动的屏障,这意味着在所有其他子事件完成并且父事件从 EQ 中轮询或测试完成(如果它不在 EQ 中)之前,无法添加更多子事件。 在所有子级完成之前,不会从 EQ 中轮询父级,也不会使用 daos_event_test 返回完成。 请注意,如果父事件作为另一个 daos 操作的一部分启动,则不应再调用此函数,并且启动事件的函数将成为屏障操作。 在这种情况下,操作本身可以在子级完成之前完成,但在所有子级完成之前该事件不会被标记为就绪, 将此事件标记为障碍事件,由最后一个完成的子事件后触发完成 +daos_event_test -> 测试事件的完成情况。 如果ev是一个孩子,操作将会失败。 如果事件在事件队列中初始化,并且测试完成该事件,则该事件将从事件队列中拉出 + -daos_event_abort -> 尝试中止与此事件相关的操作。 在此调用之后,用户仍然需要等待或轮询该事件。 目前,这不会中止任何内部 DAOS 操作,并且实际上是无操作, commit: daos_event_abort() 将事件标记为已中止,但 DAOS 内部任务并未中止,目前无法中止该操作。 因此,稍后如果用户继续(通过测试或轮询)中止事件,事件 API 会释放该事件并将其标记为准备初始化,因为它之前已中止,但 DAOS 内部任务仍未完成。 当 DAOS 任务最终进行时,可能会导致损坏,因为用户可能重新使用了该事件,甚至重新使用了传递给该事件的缓冲区。 由于我们不支持取消内部 DAOS 任务和 RPC,因此暂时将中止实现更改为无操作,这将导致对事件的测试或轮询等待内部任务完成,以避免此类损坏问题 +daos_event_abort -> 尝试终止与此事件相关的操作。 在此调用之后,用户仍然需要等待或轮询该事件。 目前,这不会中止任何内部 DAOS 操作,并且实际上是无操作, commit: daos_event_abort() 将事件标记为已中止,但 DAOS 内部任务并未中止,目前无法中止该操作。 因此,稍后如果用户继续(通过测试或轮询)中止事件,事件 API 会释放该事件并将其标记为准备初始化,因为它之前已中止,但 DAOS 内部任务仍未完成。 当 DAOS 任务最终进行时,可能会导致损坏,因为用户可能重新使用了该事件,甚至重新使用了传递给该事件的缓冲区。 由于我们不支持取消内部 DAOS 任务和 RPC,因此暂时将中止实现更改为无操作,这将导致对事件的测试或轮询等待内部任务完成,以避免此类损坏问题 ---------------------------------------- DL ---------------------------------------- @@ -5694,7 +5709,7 @@ int daos_eq_poll(daos_handle_t eqh, int wait_running, int64_t timeout, unsigned static int eq_progress_cb(void *arg) -> 网络层触发事件回调(网络首先调用一次事件回调, 避免错过恰好满足条件的场景), 从参数中拿到EQ tse_sched_progress(&epa->eqx->eqx_sched) -> 根据EQ上的调度器, 对调度器加锁和引用计数, 执行一轮任务调度, EQ单元测试可能是空转 - 遍历EQ完成队列中的事件, 如果该EV还有子EV在运行, 则跳过次轮处理, 否则EQ完成计数-1, 从完成队列中删除该事件, 该事件只能是已完成或被终止, 重新设置EV状态为已就绪, 将该事件加入到返回的事件列表中, 如果事件列表个数达到指定的轮询个数,则终止遍历EQ的完成队列, 最后如果事件列表中有事件则对EQ解锁,并返回真值1 + 遍历EQ完成队列中的事件, 如果该EV还有子EV在运行, 则跳过本轮处理, 否则EQ完成计数-1, 从完成队列中删除该事件, 该事件只能是已完成或被终止, 重新设置EV状态为已就绪, 将该事件加入到返回的事件列表中, 如果事件列表个数达到指定的轮询个数,则终止遍历EQ的完成队列, 最后如果事件列表中有事件则对EQ解锁,并返回真值1 int daos_event_fini(struct daos_event *ev) -> 终止事件, 从各种列表、parent_list、子列表和事件队列哈希列表中取消事件链接,并销毁所有子事件, EV不能是运行中, 销毁EV锁, 处理子EV, 处理父EV, 删除EV的链表, EV上的网络上下文置空, EQ引用-1 diff --git a/readme_client b/readme_client new file mode 100644 index 00000000000..8e3ef690020 --- /dev/null +++ b/readme_client @@ -0,0 +1,15 @@ +DAOS客户端写流程 +启动DFUSE -> dfuse_do_work +dfuse_cb_write + ibuf.buf[0].mem = ev->de_iov.iov_buf + rc = fuse_buf_copy(&ibuf, bufv, 0) + ev->de_iov.iov_len = len -> 用户数据 -> SGL + --------- + dfs_write + rg.rg_len = buf_size + iod.arr_rgs = &rg -> 通过IO描述, 将SGL -> DAOS连续记录的范围 + ---------------- + daos_array_write + + +---------------------------------------- DL ---------------------------------------- \ No newline at end of file diff --git a/src/vos/storage_estimator/README.md b/src/vos/storage_estimator/README.md index 69509020d28..cfd65ad2638 100644 --- a/src/vos/storage_estimator/README.md +++ b/src/vos/storage_estimator/README.md @@ -1,5 +1,5 @@ -# Storage Estimation Tool +# Storage Estimation Tool 存储估算工具(vos元数据等) The daos_storage_estimator.py tool estimates the utilization of the Storage Class Memory (SCM) required for DAOS deployments. DAOS uses the Versioning Object Store (VOS) to keep track of the DAOS objects metadata. There are three options to feed the tool with the description of the items that will be stored in the DAOS File system.