本大作业的目标是实现一个多副本分布式键值数据库原型系统,其主要分为三个部分:事务模块、KV存储模块和Raft共识模块,学生需要根据此文档的相关介绍,补全项目代码,并通过功能测试,同时进行性能测试。
上述系统架构展示了支持SQL的多副本分布式数据库原型系统,而目前对于本大作业,舍弃了SQL层,事务层为最高层级。因此只支持读写序列的输入,且未实现面向用户的可交互的界面。
-
元数据服务器 /meta_server 元数据服务器需要单独编译并执行 目前元数据服务器的功能是向事务层提供集中的时间戳授时服务,此部分不作为大作业内容,直接编译运行即可
-
存储层代码 /src/storage 头文件存放在include/storage中,此部分实现的是lsm-tree架构的KV存储功能,KVStore_beta类向外部提供了put,get和del三个接口,同时在include/storage/KVStore_new.h中KVStore继承了KVStore_beta,并接入了事务参数.
// put(key, value)
void put(const std::string & key, const std::string &value);
// value = get(key)
std::pair<bool, std::string> get(const std::string & key);
// del(key)
bool del(const std::string & key);
void put(const std::string & key, const std::string &value, Transaction *txn, bool add_writeset);
bool del(const std::string & key, Transaction *txn, bool add_writeset);
-
事务层代码 /src/transaction 头文件存放在include/transaction中,此部分实现的是事务模块,具体包含了锁管理器部分和事务管理器部分,其中锁管理器用于管理表、分片、行上的锁,用于实现SS2PL算法,其中使用了No-Wait的死锁避免策略。这部分由框架给出。而事务管理器主要用于实现事务的开始、回滚和提交。对于分布式事务,应使用两阶段提交协议(2PC)协议完成。具体需要完成的工作将在后续详细介绍。
-
日志层代码 /src/recovery 此部分实现的是数据库中的Undo/Redo日志部分,日志管理器向外提供了AppendLogRecord()等接口, 其设置有两个缓冲区,log_buffer和flush_buffer. flush_buffer不断向磁盘中刷入日志
-
执行层 /src/execution 目前这一部分不需要考虑,这一部分最初的设想是用于发送分布式执行计划到远端节点,其只作为事务测试样例的所需文件,无需对此更改,对于整体的性能测试,执行的上锁,读写数据的函数调用部分由性能测试负载给出。可见/test/benchmark/benchmark_rpc.h
-
protobuf 文件存放在 /include/proto目录下, 有关protobuf文件的编写可以查看项目地址,也可以搜索相关教程
本实验需要完成基础的两阶段提交协议的实现。两阶段提交协议是用来保证分布式事务的原子提交而广泛使用的协议。协议的具体过程简单来说即:在分布式事务中,由协调者和参与者两个角色,2PC协议将事务的执行过程分成了三个阶段:执行阶段、准备阶段和提交阶段。在执行阶段,执行层通过2PL协议对数据项进行加锁;当用户发起commit请求时,事务进入准备阶段。此时,协调者需要询问所有的参与者是否可以提交,如果所有参与者都可以提交,则事务可以提交;如果有任何一个参与者不允许提交,则事务将回滚。
以上是2PC协议的简单介绍,可以上网自行查阅更多的相关的资料。
回到本项目,具体来说,任务一的核心就是要补充事务模块中事务管理器的相关代码,实现2PC协议,其中,多个节点之间的数据通讯采用BRPC框架,它是百度开发并在百度内部广泛使用的远程过程调用框架,BRPC的具体使用可以参照BRPC的Github文档, 同时在src/brpc_test目录下给出了一个简单的BRPC样例。
首先,在src/transaction/transaction_manager.h中,需要实现以下几个方法,对于事务开始的方法,本项目采用从meta_server中获取时间戳,事务开始的代码已经给出。而对于事务准备、回滚、提交的方法如下,在本任务中,不考虑多副本之间使用Raft进行日志复制。
// 事务回滚程序入口
bool Abort(Transaction * txn);
// 回滚本地事务
bool AbortSingle(Transaction * txn);
// 事务提交入口
bool Commit(Transaction * txn);
// 提交本地事务
bool CommitSingle(Transaction * txn);
// 准备提交
bool PrepareCommit(Transaction * txn);
以上是直接对事务状态更改、写日志等操作执行的函数,对于AbortSingle(),CommitSingle(),PrepareCommit()函数, 其可能需要被远端的协调者所调用, 因此需要使用BRPC的服务端框架对这三个方法进行封装, 其应该在include/transaction_manager_rpc.h中完成, 例如:
// TODO 接收远端协调者发送的事务回滚的brpc服务端方法
virtual void AbortTransaction(::google::protobuf::RpcController* controller,
const ::transaction_manager::AbortRequest* request,
::transaction_manager::AbortResponse* response,
::google::protobuf::Closure* done){
brpc::ClosureGuard done_guard(done);
return;
}
完成以上方法的实现之后可以进行编译,并通过生成的测试可执行文件 build/test/transaction_test和build/test/transaction_test2 进行一定的测试,测试时,这两个可执行文件相当于两台数据库服务器节点,可以通过设置不同的监听端口进行伪分布式环境下的测试,运行命令如下:(执行顺序为meta_server,transaction_test2,transaction_test)
# 先启动 meta_server 节点
./meta_server/build/meta_server
# 启动一个新的终端
cd build/test/transaction/
./transaction_test2 -SERVER_NAME server2 -SERVER_LISTEN_PORT 8003 -log_path "这里改成你的第二个节点的Undo/Redo日志地址"
# 启动一个新的终端
./transaction_test -SERVER_NAME server1 -SERVER_LISTEN_PORT 8002 -log_path "这里改成你的第一个节点的Undo/Redo日志地址, 两个地址不要一样"
通过测试即可作为任务一完成
2.3.3 讨论:两阶段提交协议中的容错问题
在分布式环境下,可能出现各种各样的故障,例如某个节点系统故障、网络分区等问题,本小节在系统中最多只存在一个故障的假设下对2PC协议中的故障问题进行讨论。本小节假定数据库系统使用Undo/Redo日志,并实现了WAL(Write-Ahead Logging)预写日志技术[16]。假定分片没有多副本通过Raft共识协议容错。
2PC协议中有两个角色:协调者和参与者。首先考虑参与者故障,如果参与者在准备阶段发生故障,那么它不会返回给协调者事务准备结果,那么协调者由于RPC连接超时将判断事务回滚,当故障的参与者重启之后,读取Undo/Redo日志,由于日志中该事务没有写Prepare结果日志,因此会在Undo过程中会撤销事务。如果参与者在提交阶段发生故障,即参与者收到协调者的事务结果但未写入日志,或参与者还未收到结果就故障。由于事务结果在准备阶段由协调者已经确定,在提交阶段,所有参与者必须按照这个结果执行,因此在参与者重启时,需要向协调者请求事务结果并执行。
由于2PC协议中协调者是唯一的,因此协调者故障恢复问题也更加复杂。协调者可能在准备阶段出现故障,此时可能只有部分参与者收到了准备请求,也可能所有参与者都收到了准备请求但协调者还没有做出决策。协调者在发送准备请求的同时,将所有参与者的IP地址和端口号进行发送,当参与者收到准备请求后长时间没有收到协调者的决策结果时,参与者将轮询所有其他的参与者,如果所有参与者都在线并且事务状态都未进入提交阶段,则说明协调者出现故障并且事务仍处于准备阶段。这时该参与者将使用一种确定性算法计算出新的协调者,确定性算法保证了多个参与者所决策的新协调者是一致的,之后参与者将事务id以及所有参与者的IP地址和端口转发给新协调者并重新进入准备阶段。而参与者如果发现其他的参与者均无法连接,那么则是这个参与者网络出现了问题,参与者一旦写入了Prepare日志,那么事务最终提交还是回滚便不能由它自己决定,因此它会不断向协调者发送事务协调结果请求来获取事务结果并执行。
而如果协调者在提交阶段发生故障,即协调者已经写入了事务结果日志,但还未来得及将结果发送到参与者或只发送到了部分参与者。那么可以等待协调者重新上线并读它的协调者备份日志,或者可以轮询参与者查看它们的事务状态,如果有一个是提交状态,则它也可以提交,如果有一个是回滚状态,则它也可以回滚。
注意:以上给出的讨论并未涵盖所有的故障问题,事实上,两阶段提交协议的故障问题是一个较为复杂的问题,故障既包括了机器故障也包含了网络故障,很多网络故障是未在讨论中列出的,本任务之后所给出的伪代码也仅作参考,同学们可以加入自己的故障问题的思考并自行发挥。
对于参与者的故障问题, 可以通过/src/recovery/recovery_manager.cpp中的Redo 和Undo过程进行实现。可以参考下面的伪代码:
算法4.1 参与者故障容错算法
输入:参与者的所有Undo/Redo日志
输出:恢复参与者的正确状态
1. function Recovery() :
2. Redo();
3. Undo();
4. end function
5. function Redo() :
6. for log_record in log_set:
7. active_txn[log_record.TxnID()] = log_record.GetLsn(); //记录事务最大Lsn
8. switch (log_record.Type()):
9. case Type::Put, Type::Del: RedoOperator(); //重做Put和Delete操作
10. case Type::Prepared: prepared_set.add(log_record. TxnID ()); //加入集合
11. case Type::Abort, Type::Commit:
12. active_txn.remove(log_record. TxnID ()); //从活跃事务表移除
13. prepared_set. remove (log_record. TxnID ());//从准备事务集合移除
14. end function
15. function Undo() :
16. for txn in prepared_set:
17. if (AskCoordinatorTxnRes(txn) == Commit): //向协调者询问事务结果
18. Commit(txn); //如果事务结果为提交,则提交此事务
19. else:
20. Abort(txn); //如果事务结果为回滚,则回滚此事务
21. active_txn.remove(txn); //从活跃事务表中移除
22. for txn in active_txn: //回滚剩余的活跃事务表中所有事务
23. Abort(txn);
24. end function
对于协调者故障问题也更加复杂,首先协调者在发送准备请求时应该附带所有参与者的IP地址+监听端口,这需要修改 include/proto/transaction_manager.proto 中PrepareRequest的消息, 同时也需要修改 src/transaction/transaction_manager.cpp中的PrepareCommit()函数。还需要完善Commit()函数中两阶段提交过程中协调者写协调结果备份的过程。在include/fault_tolerance和src/fault_tolerance中,给出了一些方法 (不保证正确),可以在其之上进行添加,也可以重写。
给出协调者故障的伪代码,仅供参考:
算法4.2 协调者故障容错算法
输入:已准备事务按照事务ID排序的优先队列
输出:恢复协调者所协调事务的正确状态
1. function FaultToleranceForCoordinator(): //协调者容错方法
2. every interval: //定时检测
3. for each txn in PreparedTxnQueue: //遍历准备事务队列中的事务
4. if txn timeout: //如果事务超时,则需要对协调者状态进行判断
5. if !CoordinatorHeartbeat() && AllParticipantsHeartbeat ():
6. //其他参与者在线,协调者不在线,可判定协调者故障
7. if txn.OneOfStatus() == Commit:
8. Commit(txn); //如果任何一个参与者提交,则提交此事务
9. continue;
10. if txn.OneOfStatus() == Abort:
11. Abort(txn); //如果任何一个参与者回滚,则回滚此事务
12. continue;
13. node = DeterministicNewCoordinator(); //确定性选出新协调者
14. node.ReCommit(txn); //新协调者重新发起两阶段提交请求
15. end function
在本任务中,需要实现Raft协议在多副本分布式数据库中的日志复制并重放日志部分,其应用在两阶段提交协议过程中。 可参考论文中的第二章。
对于此任务,核心需要修改的地方有两处。
第一是在参与者Prepare的过程中,参与者中的 Raft Leader需要将此事务的写集和事务准备完成的操作转化为日志记录复制到该分片的其他Follower节点上; 所有Follower向Leader都返回结果之后, 参与者才可以向协调者返回准备完成的结果。
第二是在参与者Commit的过程中,Raft Leader需要将事务提交或回滚的日志记录复制到其他的Raft Follower中,Follower应在后台重放这些日志记录到LSM-Tree中。
本框架提供了一个性能测试工具,可以自定义开启多个线程生成事务,事务的执行过程将随机生成读写操作和参与者节点,测试文件在/build/test/benchmark中。测试过程如下:
首先要修改 benchmark_config.cpp 文件
std::vector<IP_Port> NodeSet = {{"172.22.168.27",8011},{"172.24.162.221",8012},{"172.27.163.66",8013}}; //真分布式环境
或者
std::vector<IP_Port> NodeSet = {{"127.0.0.1",8011},{"127.0.0.1",8012},{"127.0.0.1",8013}}; //伪分布式环境
之后重新编译,然后可以执行下面的命令
# transaction benchmark test
# 生成负载
./benchmark_workload -DIR ./data1
./benchmark_workload -DIR ./data2
./benchmark_workload -DIR ./data3
# 数据检验
./data_test -DIR ./data1
# 负载测试 -META_SERVER_ADDR 是meta_server运行的ip和端口,-stack_size_normal=10000000 -tc_stack_normal=1是BRPC的配置参数 -THREAD_NUM 是事务生成器的线程数 -READ_RATIO 是读写操作比例参数 -OP_MAX_NUM 是事务的最大操作数目
./benchmark_test -DIR ./data1 -SERVER_NAME server1 -SERVER_LISTEN_PORT 8011 -log_path /root/RucDDBS/data1/ -META_SERVER_ADDR 172.22.168.27:8001 -THREAD_NUM 16 -stack_size_normal=10000000 -tc_stack_normal=1 -READ_RATIO=1 -OP_MAX_NUM=30
./benchmark_test -DIR ./data2 -SERVER_NAME server2 -SERVER_LISTEN_PORT 8012 -log_path /root/RucDDBS/data2/ -META_SERVER_ADDR 172.22.168.27:8001 -THREAD_NUM 16 -stack_size_normal=10000000 -tc_stack_normal=1 -READ_RATIO=1 -OP_MAX_NUM=30
./benchmark_test -DIR ./data3 -SERVER_NAME server3 -SERVER_LISTEN_PORT 8013 -log_path /root/RucDDBS/data3/ -META_SERVER_ADDR 172.22.168.27:8001 -THREAD_NUM 16 -stack_size_normal=10000000 -tc_stack_normal=1 -READ_RATIO=1 -OP_MAX_NUM=30
测试负载支持以上几个参数,也可以自行对测试负载的代码进行修改以支持更多的参数