From 0d24ff506c57e1b1c344adfc07a602136a9cfdb3 Mon Sep 17 00:00:00 2001 From: xb Date: Thu, 21 Sep 2023 18:17:35 +0800 Subject: [PATCH] next_gen --- category/bulk | 5 +- libfuse | 1 - one_stor_next_gen.md | 72 -------------------- readme_next_gen | 147 ----------------------------------------- readme_next_gen_fake | 87 ------------------------ readme_next_gen_fake_h | 0 6 files changed, 3 insertions(+), 309 deletions(-) delete mode 160000 libfuse delete mode 100644 one_stor_next_gen.md delete mode 100644 readme_next_gen delete mode 100644 readme_next_gen_fake delete mode 100644 readme_next_gen_fake_h diff --git a/category/bulk b/category/bulk index 19d10b57ca2..64694f18b77 100644 --- a/category/bulk +++ b/category/bulk @@ -52,6 +52,7 @@ dc_pool_query(tse_task_t *task) sgl.sg_nr_out = 0; sgl.sg_iovs = &iov; rc = crt_bulk_create(ctx, &sgl, CRT_BULK_RW, bulk); + return daos_rpc_send(rpc, task) 发送RPC 客户端 --------------------------- 服务端 @@ -90,11 +91,11 @@ ds_pool_query_handler(crt_rpc_t *rpc, int version) segments[i].len, flags, mem_type, device, &na_mem_handles[i], &na_mem_serialize_sizes[i]) ... - map_desc.bd_bulk_op = CRT_BULK_PUT -> 设置为服务端通过RDMA写内存的方式, 将map写给客户端 + map_desc.bd_bulk_op = CRT_BULK_PUT -> 设置为服务端通过RDMA写操作(wr.opcode = IBV_WR_RDMA_WRITE), 将池map, DMA给客户端 map_desc.bd_remote_hdl = remote_bulk map_desc.bd_local_hdl = bulk crt_bulk_transfer(&map_desc, bulk_cb, &eventual, &map_opid) -> 传输BULK, 在: rc = bulk_cbinfo->bci_cb(&crt_bulk_cbinfo) 中执行回调 - + diff --git a/libfuse b/libfuse deleted file mode 160000 index 5fd503935c0..00000000000 --- a/libfuse +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 5fd503935c0fc268bc59c271aa361a1552476a4c diff --git a/one_stor_next_gen.md b/one_stor_next_gen.md deleted file mode 100644 index fdff00b21ca..00000000000 --- a/one_stor_next_gen.md +++ /dev/null @@ -1,72 +0,0 @@ -'参考: https://plantuml.com/zh/sequence-diagram' - -@startuml - -client -> librados: rados_connect -librados -> librados: create_cct -librados -> librados: crt_context_create(&daos_eq_ctx) -librados -> librados: tse_sched_init(&daos_sched_g, NULL, daos_eq_ctx) 初始化全局调度器 -librados -> librados: daos_eq_create(daos_handle_t *eqh, nolock, poll) -librados -> librados: monclient.init() -librados -> librados: monclient.sub_want("mgrmap", 0, 0) -librados -> librados: monclient.renew_subs() - - -client -> librados: OP(如:rados_aio_write) -librados -> librados: 准备OP -librados -> librados: tse_sched_init(&sched, NULL, 0) -librados -> librados: daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL) -librados -> librados: 拷贝数据: copy_user_buf_to_sgl(user_buf, sgl.iov_buf) -librados -> librados: dc_task_create(op_submit, NULL, ev, &task)\n tse_task_create(func, sched, NULL, &task) 通过ev上的调度器将ev和task关联起来\n tse_task_register_comp_cb(task, task_comp_event, NULL, 0) 注册任务完成回调(在其中完成EV) - -librados -> librados: args = dc_task_get_args(task) \n\ -args = args->iod = iod \n\ -args->sgl = sgl \n\ -args->op = op -note right: 将OP封装到task参数中 - - - - -librados -> objecter: object_write(*task) - -objecter -> objecter: _calc_target(&op->target, nullptr) \n \ -enginemap->get_bucket_pool(t->base_oloc.pool) \n \ -enginemap->object_locator_to_bucket(t->base_oid, t->base_oloc, bucketid) 计算对象的bucket \n \ -crush_buffer_map_dse.find(bucketid.m_pool) 先查引擎缓存 \n \ -bucket_to_raw_engines(cct, bucketid, &engineid) 未命中,计算引擎ID - - - -objecter -> objecter: !engineid.valid() 引擎ID无效,加入homeless - -alt 大IO -objecter -> objecter: big_io 大IO分片(>1MB) -objecter -> objecter: tse_task_register_deps(task, n, &io_split_task) -else 小IO -end - -objecter -> rpc: hrpc_create_req -objecter -> objecter: tse_task_schedule(task, instant) 立即调度任务 \n\t\ -op_submit(*task) 提交OP - -objecter -> rpc: hrpc_send/bulk_transfer hrpc_cb 发送RPC/BULK -rpc -> objecter: hrpc_reply_send 接收端底层处理完成后, 发送回复给发送端(cr_output) - - -client -> librados: daos_eq_poll(eq, 1, wait_time, num, ev_arr) -librados -> librados: check_homess_head_timeout - -alt 轮训 -librados -> librados: num = hrpc_trigger(ctx, timeout, eq_progress_cb, &ev_args) \n\t\ -eq_progress_cb(void *arg) \n\t\ -tse_sched_progress(&epa->eqx->eqx_sched) - - -else 中断 -end - - - - -@enduml diff --git a/readme_next_gen b/readme_next_gen deleted file mode 100644 index 32ba05a661d..00000000000 --- a/readme_next_gen +++ /dev/null @@ -1,147 +0,0 @@ - -FR004支持OP请求发送:SR002, FR001, FR003 - ... 前置准备(sclient创建) - 客户端创建集群(rados_create2), 入参:集群地址, 集群名, 用户名, 标记 - 实例化日志 - 注册日志观察者 - 注册MGR观察者 - 实例化化性能统计(计数器) - 实例化 asok, 实例化套接字钩子, 注册命令行命令 - 实例化内存池 - 读配置文件(可选)(rados_conf_read_file) - 解析命令行参数(可选)(rados_conf_parse_argv) - 连接集群(rados_connect) - 启动日志线程 - 启动monitor客户端(MonC) - 构造OP连接Monitor(待细化) - 启动定时任务线程 - 启动性能统计线程 - 启动MAP订阅线程 - 启动asok线程(并定端口, 监听和处理命令行) - 初始化HomeLess队列(IO无家可归(osd异常)时入该队列, flag: CEPH_OSDMAP_PAUSERD, homeless_session) - 对象初始化 - 序列化的初始化(如性能统计) - 更新本地crush - 添加观察者(map订阅) - 订阅mgrmap及执行 - 创建池级别IO上下文(rados_ioctx_create), 入参(集群,池名,IO上下文地址) - 根据池名在map中查询池ID - 实例化IO上下文(绑定对象, 池ID) - ----------------------------------------------------- - 1. 客户端写数据(同步写)(rados_write) - 2. 调用池IO上下文的write方法 - 3. 构造对象OP - 4. 调用对象操作的write方法, 封装为对象操作 - 添加数据: (1) 封装OP, 添加操作码, 比如: CEPH_OSD_OP_WRITE (2) 拷贝客户端IO - 添加OP参数: 如: 截断参数 - 5. 执行对象操作 - 准备可变OP(将对象操作封装为更灵活的可变OP) - 实例化可变OP, 设置回调 - 在OP上初始化调试跟踪(trace) - -------------------------------- FR从这里开始 - 提交操作(发送OP) - 计算目标OSD(调用crush算法) - 检查OSD状态,是否将IO放入homeless -> op->target.paused -> _maybe_request_map -> epoch过期 -> 订阅和更新map - 拆分EC/副本请求 - OP操作序列化为RPC/BULK, 生成副本任务(依赖任务) - 发送RPC/BULK(hrpc_req_send) - 6. 业务调用trigger: 轮训/中断(等响应事件)(hrpc_trigger/epoll_wait) - 执行发送完成回调函数(如有) - - -FR005支持OP回复处理, 轮训/中断, 模式切换 -业务调用trigger: 轮训/中断(等响应事件)(hrpc_trigger/epoll_wait) -轮训模式: - hrpc_trrigger调用RPC公用回调, 如: hrpc_cb(const struct rpc_cb_info *cb_info), 参考: daos_rpc_cb - tse_task_t *task = cb_info->cci_arg -> 从参数中拿到task指针 - tse_task_complete(task, rc) -> 执行task完成 .. - 最终执行业务回调 - - -中断模式: -单独线程 -监听eq相关的fd -epoll_wait -网络回包触发fd可读 -该线程从eq中拿出事件ev -解析出task -> 执行回调(参考轮训模式) - - - - - -FR007engine故障场景OP发送控制, 引擎故障, backoff, map刷新, 超时检测和返回(网络层), 1, 2 homeless队列如何保证性能 -参考: osd_state: up/down/in/out - -AR001(原FR006支持map订阅和更新) - MonC实例化和初始化 - 构造OP连接Monitor, 完成认证 - 向Monitor注册订阅(monc->sub_want, renew_subs) // monitor通知monc - ... - 处理Monitor发来的Map更新(MonClient::ms_dispatch), osdmap和enginemap - case 消息类型 -> 执行各自的map更新处理逻辑, 如: case CEPH_MSG_OSD_MAP -> return handle_osd_map - ... - handle_engine_map - ... - -OSD异常 -... -计算目标节点 -测试目标节点连通性 - 如果OSD无法访问, 判断是否将IO放入 homeless_session -... - - - -FR009性能统计(嵌入发送流程), dump指标: ceph daemon osd.0 perf dump, 增加计数: logger->inc, 指标demo: l_osdc_linger_send | l_osdc_op_w -定义枚举(指标变量): enum -> 如: l_osdc_linger_send -对象初始化中(注册指标): void Objecter::init() - pcb.add_u64_counter(l_osdc_linger_send, "linger_send", -> 增加计数器指标 | pcb.add_u64_avg -> 或增加均值指标 - ... -void Objecter::_send_linger - logger->inc(l_osdc_linger_send) -> 发送心跳业务中增加该计数器 -> 或: _send_op_account 登记OP中增加计数器(l_osdc_op_w) -使用指标: ceph daemon osd.0 perf dump -重置指标 - - - - - - -FR007 asock功能支持(嵌入发送流程) - 实例化asok, 实例化套接字钩子, 注册命令行命令 - 启动asok线程(并定端口, 监听和处理命令行) - - - -F8: 池实例创建销毁和池级别io操作 -创建集群, 解析参数(可选), 连接集群 -创建池(rados_pool_create(rados, pool_name)) -> rados_pool_create(*cluster, pool_name.c_str()) -更新OSD_MAP -设置创池操作(POOL_OP_CREATE) -通过MonC给Monitor发送创池消息 - - -F9: fio压测对象语义 -实现fio引擎语义(ioengine_ops ioengine) -rados初始化 -rados IO入队(aio和回调) -获取IO完成事件(判断当前是否还有没有处理完的io events) -统计IO完成事件 -清理 -打开文件(不实现) -IO初始化(io单元) -IO释放 - - -F12: 支持op调度(tse,预计1k) -tse_task_create 创建任务 -tse_task_register_comp_cb 注册回调 -tse_task_schedule 调度任务 -daos_event_priv_wait 等待任务完成 - hrpc_trigger tse_sched_progress - tse_sched_run 执行调度 - tse_sched_process_complete 处理完成 -业务trigger -> tse_task_complete - - diff --git a/readme_next_gen_fake b/readme_next_gen_fake deleted file mode 100644 index 2dc9f51eb54..00000000000 --- a/readme_next_gen_fake +++ /dev/null @@ -1,87 +0,0 @@ ----------------------------- api ---------------------------------- -int daos_eq_create(daos_handle_t *eqh); -创建事件队列。 事件队列用于保存和池化多个事件。 创建的每个事件队列都将创建一个与事件队列关联的网络上下文 - -int set_eq_mode(bool block, bool polling); - - - - - - --------------------------- app ------------------------------------ -static const daos_handle_t eqh; - -daos_eq_create(daos_handle_t *eqh, nolock, noblock); -rados_connect(cluster); -rados_aio_write(test_data.m_ioctx /* rados_ioctx_t */, "foo" /* oid */, my_completion, buf, UINT_MAX /* len */, 0 /* off */)); -daos_eq_poll(eqh); - ... -> my_completion - -------------------------------------------------------------------- -int daos_eq_create(daos_handle_t *eqh, bool lock, bool block) - struct daos_eq_private *eqx - struct daos_eq *eq - eq = daos_eq_alloc() - D_INIT_LIST_HEAD(&eq->eq_running) - D_INIT_LIST_HEAD(&eq->eq_comp) - daos_hhash_hlink_init(&eqx->eqx_hlink, &eq_h_ops) - eq.lock=lock; - eq.block=block; - rc = crt_context_create(&eqx->eqx_ctx); // block ? - daos_eq_insert(eqx); - daos_eq_handle(eqx, eqh); - daos_hhash_link_key(&eqx->eqx_hlink, &h->cookie) -> 关联key - rc = tse_sched_init(&eqx->eqx_sched, NULL, eqx->eqx_ctx); - -------------------------------------------------------------------- -rados_connect() -> extern "C" int _rados_connect(rados_t cluster) - create_cct - monclient.init() - monclient.sub_want("mgrmap", 0, 0) - monclient.renew_subs() - daos_eq_create | argobots_schedule - - -------------------------------------------------------------------- -rados_aio_write 异步写 -CEPH_RADOS_API int rados_aio_write(rados_ioctx_t io, const char *oid, rados_completion_t completion, const char *buf, size_t len, uint64_t off) - ev - task - daos_event_init(&ev, eqh, NULL) - evx->evx_sched = &eqx->eqx_sched - ev->de_iov.iov_buf - ev.cb = NULL; - prepare_write_op -> CEPH_OSD_OP_WRITE - tse_task_create(tse_task_func_t task_func, tse_sched_t *sched, void *priv tse_task_t **taskp) -> aio_write - object_write(*task) ------------------------------ objecter ---------------------------- - check_for_latest_map = _calc_target(&op->target, nullptr) - add to homeless? - if(len > 1M) - split_ec | split_replicate - tse_task_register_deps - hrpc_create_req - tse_task_schedule(*task) -> run task_func - objecter->op_submit(o, &c->tid) - hrpc_send - ------------------------------- app poll ---------------------------- -while(1) - if(!eq.block) - rc = daos_eq_poll(eq, 1, DAOS_EQ_WAIT, 128, &dev[0]) - check_homeless_head_timeout - hrpc_trigger(ctx, hrpc_cb) -> cb - *task = cb_info - tse_task_complete - ev.cb -> completion - return 0 - nready = epoll_wait(ctx->event_fd, events, *num_events, timeout_ms) - rc = daos_eq_poll(eq, 1, DAOS_EQ_WAIT, num_events, &dev[0]) - check_homeless_head_timeout - hrpc_trigger(ctx, hrpc_cb) -> cb - *task = cb_info - tse_task_complete - ev.cb -> completion -------------------------------------------------------------------- - diff --git a/readme_next_gen_fake_h b/readme_next_gen_fake_h deleted file mode 100644 index e69de29bb2d..00000000000