Skip to content

Commit

Permalink
tse
Browse files Browse the repository at this point in the history
  • Loading branch information
xb committed Sep 2, 2023
1 parent c895791 commit 5727285
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 37 deletions.
15 changes: 15 additions & 0 deletions flow/plant_uml.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
@startuml

Client -> HttpServer: request

HttpServer -> WebApp: application_callable()
note right: application_callable(environ, start_response)

WebApp -> HttpServer: start_response()
note right: start_response(status, headers, exc_info)

WebApp -> HttpServer: return iterator

HttpServer -> Client: dddds

@enduml
87 changes: 51 additions & 36 deletions readme
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ run_all_tests(int keys) -> 所有的测试
dts_create_config(cfg_desc_io, "keys=%d", keys)
run_ts_tests -> lru
...
run_wal_tests(cfg_desc_io)
...


启动容器:
Expand Down Expand Up @@ -943,10 +945,14 @@ eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增,
eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq
ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab)
ev->de_complete_cb = dfuse_cb_write_complete
dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev)
oh->doh_ie->ie_stat.st_size = len + position -> 比如写10字节
此时SGL上已经有用户数据
dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev) -> 初始化IO描述(iod, daos_array_iod_t, IO范围), 将EV继续透传
daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev)
dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和task
sched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器
dc_task_create(dc_array_write, NULL, ev, &task) -> 初始化task, 关联EV和task, 此时调度器为空, 但是EV不为空
sched = daos_ev2sched(ev) -> 从EV上拿到调度器
tse_task_create(func, sched, NULL, &task) -> 创建任务
args = dc_task_get_args(task) -> 从task参数联合体上得到参数, 并将SGL和IO描述设置到参数上
return dc_task_schedule(task, true)
sem_post(&eqt->de_sem) -> 唤醒EQ
d_slab_restock(eqt->de_write_slab) -> 重用slab
Expand Down Expand Up @@ -1100,6 +1106,7 @@ write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
fuse_buf_size(bufv)
ibuf = FUSE_BUFVEC_INIT(len) 分配本地缓冲区
DFUSE_TRA_DEBUG 调试
dfuse_mcache_evict -> 清除此处的元数据缓存,以便查找不会返回过时的大小/时间信息
fuse_buf_copy(&ibuf, bufv, 0)
dfuse_cache_evict
d_slab_acquire 以高效的方式分配数据
Expand Down Expand Up @@ -1132,8 +1139,10 @@ dc_array_write
array_hdl2ptr
io_extent_same
D_INIT_LIST_HEAD(&io_task_list)
daos_task_create(DAOS_OPC_ARRAY_GET_SIZE 短读任务 DAOS_OPC_ARRAY_READ
daos_task_create(DAOS_OPC_ARRAY_GET_SIZE 短读任务 DAOS_OPC_ARRAY_READ, 写不走此逻辑
while (u < rg_iod->arr_nr) 遍历每个范围,但同时组合属于同一 dkey 的连续范围。 如果用户给出的范围不增加偏移量,则它们可能不会合并,除非分隔范围也属于同一个 dkey
tse_task_t *io_task = NULL -> io任务
struct io_params *params -> IO参数
compute_dkey 计算分布式key 在给定此范围的数组索引的情况下计算 dkey。 还计算从我们开始的索引开始,dkey 可以保存的记录数写作。 相对于 dkey 的记录索引
struct io_params *prev, *current 如果有多个dkey io, 则通过链表连接起来
num_ios++
Expand All @@ -1142,15 +1151,16 @@ dc_array_write
compute_dkey 再次计算dkey
create_sgl 创建分散聚集列表
daos_task_create(DAOS_OPC_OBJ_FETCH 读: DAOS_OPC_ARRAY_READ 按索引号 -> dc_obj_fetch_task
daos_task_create(DAOS_OPC_OBJ_UPDATE 写 或 DAOS_OPC_ARRAY_PUNCH truncate dc_funcs[opc].task_func 客户端方法数组
daos_task_create(DAOS_OPC_OBJ_UPDATE 写 或 DAOS_OPC_ARRAY_PUNCH truncate dc_funcs[opc].task_func 客户端方法数组 -> dc_obj_update_task(tse_task_t *task)
daos_task_get_args
tse_task_register_deps 注册在计划任务之前需要完成的依赖任务。 依赖任务无法进行, 如果一个任务依赖于其他任务,只有依赖的任务完成了,才可以将任务添加到调度器列表中
tse_task_list_add(io_task, &io_task_list) d_list_add_tail(&dtp->dtp_task_list, head); 添加任务到链表
tse_task_register_comp_cb(task, free_io_params_cb, &head, sizeof(head)) 为任务注册完成回调
tse_task_list_add(io_task, &io_task_list) -> d_list_add_tail(&dtp->dtp_task_list, head); 添加IO任务到链表
tse_task_register_comp_cb(task, free_io_params_cb, &head, sizeof(head)) -> 为任务注册完成回调, 用于释放IO参数
if (op_type == DAOS_OPC_ARRAY_READ && array->byte_array) 短读
tse_task_register_deps(task, 1, &stask) 注册依赖任务
tse_task_list_add(stask, &io_task_list) 加到io任务列表
tse_task_list_sched(&io_task_list, false); 调度执行
tse_task_list_sched(&io_task_list, true) -> 遍历临时任务列表, 立即调度执行
tse_task_schedule(task, instant)
array_decref(array)
tse_task_register_cbs(stask, check_short_read_cb 读回调
tse_sched_progress(tse_task2sched(task)) 推进/处理, 先处理依赖任务
Expand Down Expand Up @@ -1220,11 +1230,6 @@ dfuse_start
dfuse_progress_thread


重要结构:

tse_task_t *io_task = NULL; io任务



pool_map
dfuse_cb_write position=0
Expand Down Expand Up @@ -1335,7 +1340,7 @@ tse: 调度, 1300+400, 涉及文件: event.h, daos_event.h, tse.h, tse.c, tse_in
dc_task_create
sched = daos_ev2sched(ev) -> EV事件转调度器
tse_task_create(func, sched, NULL, &task) -> 初始化 tse_task。 该任务(task)会被添加到调度器(sched)任务列表中,稍后被调度,如果提供了依赖任务,则该任务将被添加到依赖任务的dep列表中,一旦依赖任务完成,则添加该任务到调度程序列表(先完成依赖任务, 然后添加主任务)
task_ptr2args 指针转参数
task_ptr2args -> task指针转参数args,
D_INIT_LIST_HEAD(&dtp->dtp_list) -> 初始化任务链表, 调度时插入到调度器的初始化队列(dsp_init_list) -> d_list_add_tail(&dtp->dtp_list, &dsp->dsp_init_list)
D_INIT_LIST_HEAD(&dtp->dtp_task_list)
D_INIT_LIST_HEAD(&dtp->dtp_dep_list)
Expand All @@ -1344,13 +1349,16 @@ dc_task_create
...
dtp->dtp_func = task_func
...
tse_task_buf_embedded 获取任务的嵌入式缓冲区,用户可以使用它来携带功能参数。 任务的嵌入式缓冲区有大小限制,如果 buf_size 大于限制,此函数将返回 NULL。 用户应通过 tse_task_set_priv() 使用私有数据来传递大参数。 MSC - 我将其更改为只是一个缓冲区,而不是像以前那样, 不断给一个额外的指针指向大的预涂层缓冲区。 以前的方式不适用于公共用途。我们现在应该使它更简单,更通用,如下面的评论
tse_task_buf_embedded 获取任务的嵌入式缓冲区,用户可以使用它来携带功能参数。 任务的嵌入式缓冲区有大小限制,如果 buf_size 大于限制,此函数将返回 NULL。 用户应通过 tse_task_set_priv() 使用私有数据来传递大参数。 MSC - 我将其更改为只是一个缓冲区,而不是像以前那样, 不断给一个额外的指针指向大的预涂层缓冲区。 以前的方式不适用于公共用途。我们现在应该使它更简单,更通用,如下面的评论, 比如size=152B
tse_task_buf_size
return (size + 7) & ~0x7
avail_size = sizeof(dtp->dtp_buf) - dtp->dtp_stack_top -> 可用大小880B
return (void *)dtp->dtp_buf -> 任务增加栈空间并将任务内存返回
args->ta_magic = DAOS_TASK_MAGIC
tse_task_register_comp_cb(task, task_comp_event, NULL, 0) -> dtc->dtc_cb = cb task_comp_event 注册完成回调
register_cb(task, true, comp_cb, arg, arg_size)
d_list_add(&dtc->dtc_list, &dtp->dtp_comp_cb_list) 插入到列表开始处
args->ta_ev = ev -> 将EV设置到task参数上
failed -> tse_task_decref(task) -> 如果失败则判断引用计数


Expand Down Expand Up @@ -1629,7 +1637,7 @@ dfuse_do_work
if (type->st_free_count == 0)
int count = restock(type, 1) -> 剩余个数为0时需要补货
ptr = (void *)entry - type->st_reg.sr_offset -> 从空闲表中拿一个
ibuf.buf[0].mem = ev->de_iov.iov_buf -> 将ev与ibuf通过iov_buf关联
ibuf.buf[0].mem = ev->de_iov.iov_buf -> 将ev与ibuf通过iov_buf关联, 用户数据与SGL关联
fuse_buf_copy(&ibuf, bufv, 0)
ev->de_complete_cb = dfuse_cb_write_complete -> 为ev设置回调
dfs_write
Expand Down Expand Up @@ -4321,7 +4329,7 @@ struct fuse_file_info libfuse


客户端写数据:write.c
write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3, 将用户数据拷贝到用户态EV的向量IO(iov_buf)上, 在用户态EV上注册完成回调, 调用DAOS文件系统写接口, 传入EV上的DAOS_EV,
fuse_req_userdata
fuse_req_ctx
fuse_buf_size(bufv)
Expand Down Expand Up @@ -4363,7 +4371,7 @@ dc_array_write
D_INIT_LIST_HEAD(&io_task_list)
daos_task_create(DAOS_OPC_ARRAY_GET_SIZE 短读任务 DAOS_OPC_ARRAY_READ
while (u < rg_iod->arr_nr) 遍历每个范围,但同时组合属于同一 dkey 的连续范围。 如果用户给出的范围不增加偏移量,则它们可能不会合并,除非分隔范围也属于同一个 dkey
compute_dkey 计算分布式key 在给定此范围的数组索引的情况下计算 dkey。 还计算从我们开始的索引开始,dkey 可以保存的记录数写作。 相对于 dkey 的记录索引
compute_dkey 计算分布式key 在给定此范围的数组索引的情况下计算 dkey。 还计算从我们开始的索引开始,dkey 可以保存的记录数写作。 相对于 dkey 的记录索引, 比如10B, dkey_val=1
struct io_params *prev, *current 如果有多个dkey io, 则通过链表连接起来
num_ios++
d_iov_set(dkey, &params->dkey_val, sizeof(uint64_t));
Expand Down Expand Up @@ -4396,13 +4404,23 @@ dc_funcs[opc].task_func 客户端方法数组
DAOS_OPC_OBJ_UPDATE 写
dc_obj_update_task DAOS_OBJ_RPC_UPDATE
obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE
obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi))
tse_task_stack_pop
obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) -> 将任务压栈
pushed_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top
...
dc_io_epoch_set(epoch, opc)
tse_task_stack_pop -> 将任务从栈上弹出来
poped_ptr = dtp->dtp_buf + sizeof(dtp->dtp_buf) - dtp->dtp_stack_top
dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx
dc_obj_update 否则
obj_task_init_common(task, DAOS_OBJ_RPC_UPDATE
dc_obj_update(task, &epoch, map_ver, args, obj) -> 提交对象更新
obj_task_init(task, DAOS_OBJ_RPC_UPDATE, map_ver, args->th, &obj_auxi, obj)
obj_task_init_common(task, opc, map_ver, th, auxi, obj)
tse_task_stack_push
shard_task_list_init(obj_auxi)
obj_auxi->is_ec_obj = obj_is_ec(obj) -> 设置EC对象标志
tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) -> 为任务注册对象完成回调, 弹出任务, 重试, 错误处理等
----------------------
obj_update_sgls_dup(obj_auxi, args)
--------------------------- old
obj_rw_req_reassemb 重新组装
dkey_hash = obj_dkey2hash
obj_req_get_tgts 获取对象对应的目标
Expand All @@ -4424,8 +4442,12 @@ DAOS_OPC_OBJ_UPDATE 写
array_bin_search 二分查找 daos_obj_classes
tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0)
obj_csum_update(obj, args, obj_auxi)
obj_rw_bulk_prep
obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) 扇出 shard_io_cb = io_cb = dc_obj_shard_rw
obj_rw_bulk_prep(obj, args->iods, args->sgls, args->nr, true, obj_auxi->req_tgts.ort_srv_disp, task, obj_auxi) -> 准备读写大块数据
daos_sgls_packed_size -> 内联提取需要将 sqls 缓冲区打包到 RPC 中,因此使用它来检查是否需要批量传输
obj_bulk_prep
crt_bulk_create
crt_bulk_bind
obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) -> 扇出 shard_io_cb = io_cb = dc_obj_shard_rw


ds_obj_rw_handler 接收端的回调
Expand All @@ -4451,11 +4473,6 @@ dfuse_start

重要结构:

tse_task_t *io_task = NULL; io任务






ubix:
Expand All @@ -4468,7 +4485,7 @@ dfuse_cb_write position=0
daos_task_create DAOS_OPC_OBJ_UPDATE
dc_obj_update_task
dc_obj_update
obj_update_shards_get
obj_update_shards_get -> 走非EC的处理逻辑
obj_rw_bulk_prep
obj_req_fanout shard_rw_prep dc_obj_shard_rw = shard_io_cb 请求扇出
io_prep_cb shard_rw_prep 发送io前执行的回调
Expand Down Expand Up @@ -4858,10 +4875,6 @@ ds_obj_query_key_handler_1
全局rpc操作码: cg_opc_map


open shard
obj_shard_open 18


shard_query_key_task

daos_task_create(DAOS_OPC_OBJ_QUERY_KEY
Expand Down Expand Up @@ -5382,8 +5395,10 @@ daos_eq_query -> 查询EQ中有多少个未完成的事件,如果events不为N

daos_event_parent_barrier -> 将父事件标记为已启动的屏障,这意味着在所有其他子事件完成并且父事件从 EQ 中轮询或测试完成(如果它不在 EQ 中)之前,无法添加更多子事件。 在所有子级完成之前,不会从 EQ 中轮询父级,也不会使用 daos_event_test 返回完成。 请注意,如果父事件作为另一个 daos 操作的一部分启动,则不应再调用此函数,并且启动事件的函数将成为屏障操作。 在这种情况下,操作本身可以在子级完成之前完成,但在所有子级完成之前该事件不会被标记为就绪, 将此事件标记为障碍事件,由最后一个完成的子事件后触发完成

daos_event_test -> 测试事件的完成情况。 如果ev是一个孩子,操作将会失败。 如果事件在事件队列中初始化,并且测试完成该事件,则该事件将从事件队列中拉出


daos_event_abort -> 尝试中止与此事件相关的操作。 在此调用之后,用户仍然需要等待或轮询该事件。 目前,这不会中止任何内部 DAOS 操作,并且实际上是无操作, commit: daos_event_abort() 将事件标记为已中止,但 DAOS 内部任务并未中止,目前无法中止该操作。 因此,稍后如果用户继续(通过测试或轮询)中止事件,事件 API 会释放该事件并将其标记为准备初始化,因为它之前已中止,但 DAOS 内部任务仍未完成。 当 DAOS 任务最终进行时,可能会导致损坏,因为用户可能重新使用了该事件,甚至重新使用了传递给该事件的缓冲区。 由于我们不支持取消内部 DAOS 任务和 RPC,因此暂时将中止实现更改为无操作,这将导致对事件的测试或轮询等待内部任务完成,以避免此类损坏问题
daos_event_abort -> 尝试终止与此事件相关的操作。 在此调用之后,用户仍然需要等待或轮询该事件。 目前,这不会中止任何内部 DAOS 操作,并且实际上是无操作, commit: daos_event_abort() 将事件标记为已中止,但 DAOS 内部任务并未中止,目前无法中止该操作。 因此,稍后如果用户继续(通过测试或轮询)中止事件,事件 API 会释放该事件并将其标记为准备初始化,因为它之前已中止,但 DAOS 内部任务仍未完成。 当 DAOS 任务最终进行时,可能会导致损坏,因为用户可能重新使用了该事件,甚至重新使用了传递给该事件的缓冲区。 由于我们不支持取消内部 DAOS 任务和 RPC,因此暂时将中止实现更改为无操作,这将导致对事件的测试或轮询等待内部任务完成,以避免此类损坏问题


---------------------------------------- DL ----------------------------------------
Expand Down Expand Up @@ -5694,7 +5709,7 @@ int daos_eq_poll(daos_handle_t eqh, int wait_running, int64_t timeout, unsigned

static int eq_progress_cb(void *arg) -> 网络层触发事件回调(网络首先调用一次事件回调, 避免错过恰好满足条件的场景), 从参数中拿到EQ
tse_sched_progress(&epa->eqx->eqx_sched) -> 根据EQ上的调度器, 对调度器加锁和引用计数, 执行一轮任务调度, EQ单元测试可能是空转
遍历EQ完成队列中的事件, 如果该EV还有子EV在运行, 则跳过次轮处理, 否则EQ完成计数-1, 从完成队列中删除该事件, 该事件只能是已完成或被终止, 重新设置EV状态为已就绪, 将该事件加入到返回的事件列表中, 如果事件列表个数达到指定的轮询个数,则终止遍历EQ的完成队列, 最后如果事件列表中有事件则对EQ解锁,并返回真值1
遍历EQ完成队列中的事件, 如果该EV还有子EV在运行, 则跳过本轮处理, 否则EQ完成计数-1, 从完成队列中删除该事件, 该事件只能是已完成或被终止, 重新设置EV状态为已就绪, 将该事件加入到返回的事件列表中, 如果事件列表个数达到指定的轮询个数,则终止遍历EQ的完成队列, 最后如果事件列表中有事件则对EQ解锁,并返回真值1

int daos_event_fini(struct daos_event *ev) -> 终止事件, 从各种列表、parent_list、子列表和事件队列哈希列表中取消事件链接,并销毁所有子事件, EV不能是运行中, 销毁EV锁, 处理子EV, 处理父EV, 删除EV的链表, EV上的网络上下文置空, EQ引用-1

Expand Down
15 changes: 15 additions & 0 deletions readme_client
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DAOS客户端写流程
启动DFUSE -> dfuse_do_work
dfuse_cb_write
ibuf.buf[0].mem = ev->de_iov.iov_buf
rc = fuse_buf_copy(&ibuf, bufv, 0)
ev->de_iov.iov_len = len -> 用户数据 -> SGL
---------
dfs_write
rg.rg_len = buf_size
iod.arr_rgs = &rg -> 通过IO描述, 将SGL -> DAOS连续记录的范围
----------------
daos_array_write


---------------------------------------- DL ----------------------------------------
2 changes: 1 addition & 1 deletion src/vos/storage_estimator/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

# Storage Estimation Tool
# Storage Estimation Tool 存储估算工具(vos元数据等)

The daos_storage_estimator.py tool estimates the utilization of the Storage Class Memory (SCM) required for DAOS deployments. DAOS uses the <a href="https://github.com/daos-stack/daos/blob/master/src/vos/README.md">Versioning Object Store (VOS)</a> to keep track of the DAOS objects metadata.
There are three options to feed the tool with the description of the items that will be stored in the <a href="https://github.com/daos-stack/daos/blob/master/src/client/dfs/README.md">DAOS File system</a>.
Expand Down

0 comments on commit 5727285

Please sign in to comment.