Skip to content

Commit

Permalink
tse
Browse files Browse the repository at this point in the history
  • Loading branch information
ssbandjl committed Aug 30, 2023
1 parent 1227745 commit b4b57ed
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 23 deletions.
117 changes: 95 additions & 22 deletions readme
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,23 @@ vos_self_init(vos_path, false, BIO_STANDALONE_TGT_ID)
dbtree_class_register(DBTREE_CLASS_IV, BTR_FEAT_UINT_KEY | BTR_FEAT_DIRECT_KEY, &dbtree_iv_ops)
dbtree_class_register(DBTREE_CLASS_IFV, BTR_FEAT_UINT_KEY | BTR_FEAT_DIRECT_KEY,&dbtree_ifv_ops) -> AOS-11757 common:为固定大小值添加 DBTREE_CLASS_IFV (#10457),为了避免当值是固定大小时在 btree 记录中进行额外的、不必要的分配,请内联用户定义的记录。 我们可以将其应用于内存树和持久格式树,因为 vea_format 将在池创建的上下文中运行,并且新创建的池不需要保持向后兼容性。 vea 没有从旧池升级的路径,但这是我们可以忍受的限制。 稍微修改兼容性代码以简化兼容性标志的设置。从最近意外添加的 run_test.sh 中删除 -x
fd = open(nvme_conf, O_RDONLY, 0600)
rc = bio_nvme_init(nvme_conf, VOS_NVME_NUMA_NODE, VOS_NVME_MEM_SIZE, VOS_NVME_HUGEPAGE_SIZE, VOS_NVME_NR_TARGET, true) -> bio_nvme_init (nvme_conf=nvme_conf@entry=0x55555687e0f0 "/mnt/daos/daos_nvme.conf", numa_node=numa_node@entry=-1, mem_size=mem_size@entry=1024, hugepage_size=hugepage_size@entry=2, tgt_nr=tgt_nr@entry=1,bypass_health_collect=bypass_health_collect@entry=true)

rc = bio_nvme_init(nvme_conf, VOS_NVME_NUMA_NODE, VOS_NVME_MEM_SIZE, VOS_NVME_HUGEPAGE_SIZE, VOS_NVME_NR_TARGET, true) -> bio_nvme_init (nvme_conf=nvme_conf@entry=0x55555687e0f0 "/mnt/daos/daos_nvme.conf", numa_node=numa_node@entry=-1, mem_size=mem_size@entry=1024, hugepage_size=hugepage_size@entry=2, tgt_nr=tgt_nr@entry=1,bypass_health_collect=bypass_health_collect@entry=true) -> 全局nvme初始化, struct bio_nvme_data nvme_glb -> Rename EIO (Extent I/O) to BIO (Blob I/O)
D_INIT_LIST_HEAD(&nvme_glb.bd_bdevs)
ABT_mutex_create(&nvme_glb.bd_mutex)
ABT_cond_create(&nvme_glb.bd_barrier)
DAOS_DMA_CHUNK_MB 8 /* 8MB DMA chunks */ -> 每个 chunk 8MB
bio_chk_sz = ((uint64_t)size_mb << 20) >> BIO_DMA_PAGE_SHIFT -> 8<<20>>12=2048
d_getenv_int("DAOS_SPDK_MAX_UNMAP_CNT", &bio_spdk_max_unmap_cnt) -> 32
d_getenv_int("DAOS_MAX_ASYNC_SZ", &bio_max_async_sz) -> 1MB
spdk_bs_opts_init(&nvme_glb.bd_bs_opts, sizeof(nvme_glb.bd_bs_opts))
nvme_glb.bd_bs_opts.cluster_sz = DAOS_BS_CLUSTER_SZ -> 32MB
nvme_glb.bd_bs_opts.max_channel_ops = BIO_BS_MAX_CHANNEL_OPS -> 通道数=4096
bio_spdk_env_init()
spdk_log_set_print_level(SPDK_LOG_ERROR)
spdk_env_opts_init(&opts)
bio_nvme_configured -> 检查是否配置了指定类型的NVMe设备,当指定SMD_DEV_TYPE_MAX时,如果配置了任意类型的设备则返回true, TODO:将 opts.mem_size 设置为 nvme_glb.bd_mem_size 目前我们无法保证干净关闭(没有泄漏大页)。 设置mem_size可能会导致EAL:没有足够的内存可用错误,并且DPDK将无法初始化
set_faulty_criteria
...
vos_mod_init()
vos_db_init_ex(db_path, "self_db", true, true)
db = vos_db_get()
Expand Down Expand Up @@ -1131,7 +1146,7 @@ dc_array_write
tse_task_list_sched(&io_task_list, false); 调度执行
array_decref(array)
tse_task_register_cbs(stask, check_short_read_cb 读回调
tse_sched_progress(tse_task2sched(task)) 推进/处理
tse_sched_progress(tse_task2sched(task)) 推进/处理, 先处理依赖任务
tse_sched_run


Expand Down Expand Up @@ -1204,11 +1219,6 @@ tse_task_t *io_task = NULL; io任务






ubix:

pool_map
dfuse_cb_write position=0
dfuse_cb_write_complete 回调
Expand Down Expand Up @@ -1280,7 +1290,40 @@ ds_obj_rw_handler
crt_swim_cli_cb
ca_arrays

tse测试, test: src/common/tests/sched.c
tse测试, test: src/common/tests/sched.c -> gdb build/debug/gcc/src/common/tests/sched -> cd build/debug/gcc/src/common/tests/; gdb sched -> int main(int argc, char **argv)
sched_ut_setup -> daos_debug_init(DAOS_LOG_DEFAULT)
sched_uts
sched_test_1 -> Scheduler create/complete/cancel
sched_test_3 -> Task Reinitialization in Completion CB -> 反复执行,直到满足重入次数(REINITS)
sched_test_6 -> Task Dependencies 依赖任务
tse_sched_init(&sched, NULL, 0)
Test N -> 1 dependencies
tse_task_create(check_func_n, &sched, counter, &task) -> 主任务
tse_task_create(inc_func, &sched, counter, &tasks[i]) -> 创建依赖任务
tse_task_schedule(tasks[i], false) -> 调度依赖任务
tse_task_register_deps(task, NUM_DEPS, tasks) -> 注册依赖任务
tse_task_schedule(task, false) -> 调度主任务
tse_sched_progress(&sched) -> 执行调度器
tse_task_complete(tasks[i], 0) -> 完成依赖
tse_sched_progress(&sched) -> 再次执行调度器
tse_task_complete(task, 0) -> 完成主任务
tse_sched_check_complete(&sched)
-------------------------------------------
Test 1 -> N dependencies
tse_task_create(inc_func, &sched, counter, &task)
tse_task_create(check_func_1, &sched, counter, &tasks[i])
tse_task_register_deps(tasks[i], 1, &task)
tse_task_schedule(tasks[i], false)
tse_task_schedule(task, false)
tse_sched_progress(&sched);
tse_task_complete(task, 0);
tse_sched_progress(&sched);
tse_task_complete(tasks[i], 0)
tse_sched_check_complete(&sched)
...
sched_ut_teardown -> daos_debug_fini()


tse: 调度, 1300+400, 涉及文件: event.h, daos_event.h, tse.h, tse.c, tse_internal.h
dc_task_create
sched = daos_ev2sched(ev) -> EV事件转调度器
Expand Down Expand Up @@ -1338,7 +1381,7 @@ crt_rpc_completed call 2? 重复完成,duplicated completions
daos_rpc_send -> crt_req_send(rpc, daos_rpc_cb, task)
daos_rpc_cb
tse_task_complete
tse_sched_process_complete(dsp)
tse_sched_process_complete(dsp) -> 更新调度器中的任务列表(状态)
post_procee
tse_task_complete_callback
dtc_cb 执行注册时的回调 register_cb -> dc_rw_cb
Expand Down Expand Up @@ -2379,7 +2422,7 @@ func (cmd *startCmd) Execute(args []string)
GetDeviceClassFn func GetDeviceClass cat /sys/class/net/enp2s0/type
updateFabricEnvars
initStorage 初始化存储
prepBdevStorage iommuDetected iommu内存管理
prepBdevStorage iommuDetected iommu内存管理 -> 仅当使用非模拟 NVMe 并且用户无特权时才执行这些检查, 比如非root用户
NvmePrepare
PrepareBdevs
func (p *Provider) Prepare
Expand Down Expand Up @@ -2744,7 +2787,7 @@ dfuse_do_work
dfs_read
dfs_read_int dfs_read_int
dc_task_create(dc_array_read, NULL, ev, &task)
tse_task_register_cbs(task, NULL, NULL, 0, read_cb, NULL, 0)
tse_task_register_cbs(task, NULL, NULL, 0, read_cb, NULL, 0) -> 支持前置cb和后置cb
dc_task_schedule(task, true)
tse_task_schedule
tse_task_schedule_with_delay
Expand All @@ -2755,16 +2798,23 @@ dfuse_do_work

...
tse_sched_progress task2
tse_sched_run
processed += tse_sched_process_init(dsp) -> 处理调度程序的初始化列表和休眠列表。 这首先将所有现在应该醒来的任务从睡眠列表移动到初始化列表的尾部,然后执行调度程序初始化列表中没有依赖项的所有任务的所有主体函数, 调度器主函数, 在队列间移动任务
tse_sched_run -> while (1) -> 循环处理
processed += tse_sched_process_init(dsp) -> 处理调度程序的初始化列表和休眠列表。 这首先将所有现在应该醒来的任务从睡眠列表移动到初始化列表的尾部,然后执行调度程序初始化列表中没有依赖项的所有任务的所有主体函数, 调度器主函数, 在队列间移动任务, 从头部开始处理任务
processed += tse_sched_process_complete(dsp)
d_list_splice_init(&dsp->dsp_complete_list, &comp_list) -> 连接两个列表并重新初始化空列表。 列表的内容添加到头部的开头。 返回时列表为初始化为空(可理解为从一个队列移动到新的队列)
d_list_for_each_entry_safe(dtp, tmp, &comp_list, dtp_list) -> 遍历新的完成队列
d_list_del_init(&dtp->dtp_list)
tse_task_post_process(task) -> 检查完成列表中的任务、依赖任务状态检查、调度状态更新等。此后任务将移至完成列表
while (!d_list_empty(&dtp->dtp_dep_list)) -> 任务依赖队列不为空(还有依赖任务)
...
bumped = true ->
dsp->dsp_inflight-- 调度器中的飞行计数器减1
tse_sched_priv_decref(dsp) -> 调度器引用-1
tse_task_decref(task) -> 任务引用-1 -> 或释放task -> zombie -> D_FREE(task)
completed = tse_sched_check_complete(sched) -> 检查调度器是否是完成状态, 即: 初始队列和睡眠队列都为空, 且无飞行的任务
completed = (d_list_empty(&dsp->dsp_init_list) &&
d_list_empty(&dsp->dsp_sleeping_list) &&
dsp->dsp_inflight == 0); -> 完成条件: 调度器初始队列和睡眠队列为空,且调度器上的飞行任务个数为0
tse_task_prep_callback
check_short_read_cb
tse_task_complete
Expand Down Expand Up @@ -3092,12 +3142,23 @@ server_init(int argc, char *argv[])
smd_init(db) 服务元数据管理(每个服务) Per-Server Metadata Management (SMD),smd使用sysdb作为kv存储元数据
spdk_bs_opts_init 将 spdk_bs_opts 结构初始化为默认的 blobstore 选项值。
bio_spdk_env_init 环境初始化
spdk_env_opts_init(&opts) 初始化选项 lib/env_dpdk/init.c
spdk_env_opts_init(&opts) 初始化选项 lib/env_dpdk/init.c -> spdk默认初始化
opts->name = SPDK_ENV_DPDK_DEFAULT_NAME;
opts->core_mask = SPDK_ENV_DPDK_DEFAULT_CORE_MASK;
opts->shm_id = SPDK_ENV_DPDK_DEFAULT_SHM_ID;
opts->mem_size = SPDK_ENV_DPDK_DEFAULT_MEM_SIZE;
opts->main_core = SPDK_ENV_DPDK_DEFAULT_MAIN_CORE;
opts->mem_channel = SPDK_ENV_DPDK_DEFAULT_MEM_CHANNEL;
opts->base_virtaddr = SPDK_ENV_DPDK_DEFAULT_BASE_VIRTADDR;
bio_add_allowed_alloc
read_config
spdk_json_parse
spdk_json_find_array
check_vmd_status
spdk_json_find_array(ctx->values, "subsystems", NULL, &ctx->subsystems)
ctx->subsystems_it = spdk_json_array_first(ctx->subsystems)
spdk_json_decode_object
spdk_json_strequal(ctx->subsystem_name, "bdev")
spdk_json_strequal(ctx->subsystem_name, BIO_DEV_TYPE_VMD) -> vmd
check_vmd_status(ctx, vmd_ss, &vmd_enabled)
add_bdevs_to_opts
spdk_env_init 初始化或重新初始化环境库。 对于初始化,必须在使用此库中的任何其他函数之前调用它。 对于重新初始化,参数 `opts` 必须设置为 NULL,并且必须在同一进程中通过 spdk_env_fini() 完成环境库后调用
pci_env_reinit
Expand Down Expand Up @@ -4487,7 +4548,7 @@ daos_rpc_complete | daos_rpc_cb
tse_task_complete
tse_sched_process_complete(dsp)
post_procee
tse_task_complete_callback
tse_task_complete_callback -> 返回true/false
dtc_cb 执行注册时的回调 register_cb -> dc_rw_cb
task_comp_event
event_complete
Expand Down Expand Up @@ -4735,11 +4796,11 @@ dfs_lookupx 查询条目,获取属性, 生成?
daos_task_create(DAOS_OPC_OBJ_OPEN 创建对象打开任务, 创建一个异步任务并将其与 daos 客户端操作相关联。 对于同步操作,请为该操作使用特定的 API。 通常,此 API 用于需要将一系列 daos 操作排队到 DAOS 异步引擎中的用例,这些任务之间的执行顺序具有特定的依赖性。 例如,用户可以创建任务来打开一个对象,然后使用插入到打开任务更新中的依赖项来更新该对象。 对于更简单的工作流程,用户可以使用基于事件的 API 而不是任务
tse_task_register_deps 注册依赖任务
for num_deps 遍历依赖任务数量
tse_task_add_dependent(task, dep_tasks[i])
tse_task_add_dependent(task, dep_tasks[i]) -> 主任务和依赖任务可以是不同的调度器, 跨调度器时需要锁依赖任务调度器上的锁
Add dependent 依赖 -> 主任务
d_list_add_tail(&tlink->tl_link, &dep_dtp->dtp_dep_list) 添加到依赖链表
tse_task_register_comp_cb(task, open_handle_cb 注册打开完成回调
tse_task_schedule(open_task, false) 调度任务
tse_task_schedule(open_task, false) 调度任务, 仅添加到初始队列
tse_sched_progress(tse_task2sched(task)) 推进任务
daos_task_create(DAOS_OPC_OBJ_FETCH 查看元数据
atomic_store_relaxed(&ie->ie_ref, 1); 初始化引用计数 原子操作
Expand Down Expand Up @@ -4802,10 +4863,11 @@ obj_rw_req_reassemb 重新组装对象读写请求

终结/销毁流程
daos_eq_lib_fini / daos_eq_destroy /
tse_sched_complete -> 等待调度程序中的所有任务完成并终结。 如果另一个线程正在完成调度程序,则立即返回
tse_sched_complete(&sched, 0, true) -> 销毁调度器, 等待调度程序中的所有任务完成并终结。 如果另一个线程正在完成调度程序,则立即返回, 终止或销毁中需要取消调度器中所有的任务
if (dsp->dsp_cancelling)
tse_sched_complete_inflight
d_list_for_each_entry_safe(dtp, tmp, &dsp->dsp_running_list -> 将任务从运行队列移除
tse_sched_complete_cb(sched) -> MSC - 我们可能只需要 1 个完成 cb 而不是列表



Expand Down Expand Up @@ -5405,7 +5467,9 @@ daos_cont_create2 -> daos_cont_create
cont_req_create(daos_task2ctx(task), &ep, CONT_CREATE, &rpc) -> 创建容器操作 -> ds_cont_op_handler
tse_task_register_comp_cb(task, cont_create_complete 完成回调
cont_rsvc_client_complete_rpc
tse_task_reinit(task) 重新初始化任务并将其移至调度程序的初始化列表中。 该任务必须具有要重新插入调度程序的主体函数。 如果任务在其完成 CB 之一中重新初始化,则该回调和已执行的回调将从 cb 列表中删除,并且需要在重新插入后由用户重新注册
tse_task_reinit(task) 重新初始化任务并将其移至调度程序的初始化列表中。 该任务必须具有要重新插入调度程序的主体函数。 如果任务在其完成 CB 之一中重新初始化,则该回调和已执行的回调将从 cb 列表中删除,并且需要在重新插入后由用户重新注册 -> tse_task_reinit_with_delay(task, 0 /* delay */)
dtp_generation_inc(dtp) -> 任务的生成,每次任务重新初始化或添加依赖任务时+1
d_list_move_tail(&dtp->dtp_list, &dsp->dsp_init_list) -> 重新加入初始队列



Expand Down Expand Up @@ -5484,3 +5548,12 @@ vea_uts

vea_tx_publish -> 使预订持久化。 它应该是调用者操纵的事务的一部分


daos_task_reset -> 使用另一个操作码重置 DAOS 任务。 该任务必须已经完成或尚未处于运行状态,并且尚未被释放(使用时必须对该任务进行引用计数,以防止在 DAOS 操作完成后它被释放)


参考配置: utils/config/examples/daos_server_verbs.yml


指标: engine_io_latency_tgt_update_{min|mean|max} and engine_io_latency_update_{min|mean|max}
更新副本对象。 客户端需要先将更新RPC发送给一个引擎(称为领导者),然后该引擎将更新RPC(tgt_update)转发给其他引擎,一旦所有引擎完成更新,领导者就会回复客户端。 所以更新 RPC 是从客户端到领导者的 RPC。 (engine_io_latency_update用于描述整个更新的延迟)。 tgt_update RPC是从leader到其他引擎的RPC,(engine_io_latency_tgt_update用于描述一个引擎的更新延迟,即其他引擎处理来自leader的tgt_update RPC
3 changes: 2 additions & 1 deletion src/common/tests/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ sched_test_5(void **state)
TSE_TEST_EXIT(rc);
}

#define NUM_DEPS 128
// #define NUM_DEPS 128
#define NUM_DEPS 2

int
inc_func(tse_task_t *task)
Expand Down

0 comments on commit b4b57ed

Please sign in to comment.