在 braft 中,Snapshot 被定义为在特定持久化存储中的文件集合,用户将状态机序列化到一个或者多个文件中,并且任何节点都能从这些文件中恢复状态机到当时的状态。
Snapshot 有两个作用:
- 启动加速:启动阶段变为加载 Snapshot 和追加之后日志两个阶段,而不需要重新执行历史上所有的操作。
- Log Compaction:在完成 Snapshot 完成之后,这个时间之前的日志都可以被删除了,这样可以减少日志占用的资源。
在 braft 的中,可以通过 SnapshotReader
和 SnapshotWriter
来控制访问相应的 Snapshot。
不同业务的 Snapshot 千差万别,因为 SnapshotStorage 并没有抽象具体读写 Snapshot 的接口,而是抽象出 SnapshotReader 和 SnapshotWriter,交由用户扩展具体的 Snapshot 创建和加载逻辑。
Snapshot 创建流程:
SnapshotStorage::create
创建一个临时的 Snapshot,并返回一个SnapshotWriter
SnapshotWriter
将状态数据写入到临时 Snapshot 中SnapshotStorage::close
来将这个 Snapshot 转为合法的 Snapshot
Snapshot 读取流程:
SnapshotStorage::open
打开最近的一个 Snapshot,并返回一个SnapshotReader
SnapshotReader
将状态数据从 Snapshot 中恢复出来SnapshotStorage::close
清理资源
有两个地方会打快照,一个是 SnapshotTimer 定时器触发的,还有一种是调用 cli 或 API 的 snapshot 接口。两者的流程类似,都会去调用NodeImpl::do_snapshot()
。
打快照操作是由 SnapshotExecutor::do_snapshot()
来完成的,主要流程如下:
-
如果正在 downloading snapshot 和 saving snapshot,跳过本次 snapshot
-
如果当前 apply 的日志 index (从 FsmCaller 中拿到)距离上次 snapshot 的日志 index(写入量小),没超过设定值,跳过 snapshot
int64_t saved_fsm_applied_index = _fsm_caller->last_applied_index(); if (saved_fsm_applied_index - _last_snapshot_index < FLAGS_raft_do_snapshot_min_index_gap) { // There might be false positive as the last_applied_index() is being // updated. But it's fine since we will do next snapshot saving in a // predictable time. lck.unlock(); _log_manager->clear_bufferred_logs(); if (done) { run_closure_in_bthread(done, _usercode_in_pthread); } return;
-
创建一个 SnapshotWriter
SnapshotWriter* writer = _snapshot_storage->create(); if (!writer) { lck.unlock(); if (done) { done->status().set_error(EIO, "Fail to create writer"); run_closure_in_bthread(done, _usercode_in_pthread); } report_error(EIO, "Fail to create SnapshotWriter"); return; }
-
_snapshot_storage
负责快照文件的读取,默认是 LocalSnapshotStorage,从本地文件读取,创建的 SnapshotWriter 是 LocalSnapshotWriter。LocalSnapshotStorage::create()
会创建一个 LocalSnapshotWriter,并把 writer 的_path
设置为 temp:SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) { LocalSnapshotWriter* writer = NULL; do { std::string snapshot_path(_path); snapshot_path.append("/"); snapshot_path.append(_s_temp_path); // delete temp // TODO: Notify watcher before deleting if (_fs->path_exists(snapshot_path) && from_empty) { if (destroy_snapshot(snapshot_path) != 0) { break; } } writer = new LocalSnapshotWriter(snapshot_path, _fs.get()); if (writer->init() != 0) { LOG(ERROR) << "Fail to init writer in path " << snapshot_path << ", " << *writer; delete writer; writer = NULL; break; } BRAFT_VLOG << "Create writer success, path: " << snapshot_path; } while (0); return writer; }
-
创建完 writer 之后,创建一个 SaveSnapshotDone,并调用
FSMCaller::on_snapshot_save()
,该函数会把它放到执行队列里面,然后执行do_snapshot_save()
_saving_snapshot = true; SaveSnapshotDone* snapshot_save_done = new SaveSnapshotDone(this, writer, done); if (_fsm_caller->on_snapshot_save(snapshot_save_done) != 0) { lck.unlock(); if (done) { snapshot_save_done->status().set_error(EHOSTDOWN, "The raft node is down"); run_closure_in_bthread(snapshot_save_done, _usercode_in_pthread); } return; } _running_jobs.add_count(1);
它会设置好 snapshot 的 meta,把它存到 done 里面,并返回 writer,最后使用 writer 和 done 去调用用户定义的
on_snapshot_save()
。用户在on_snapshot_save()
里面自己利用 writer 去写 snapshot 文件。void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) { CHECK(done); int64_t last_applied_index = _last_applied_index.load(butil::memory_order_relaxed); SnapshotMeta meta; meta.set_last_included_index(last_applied_index); meta.set_last_included_term(_last_applied_term); ConfigurationEntry conf_entry; _log_manager->get_configuration(last_applied_index, &conf_entry); for (Configuration::const_iterator iter = conf_entry.conf.begin(); iter != conf_entry.conf.end(); ++iter) { *meta.add_peers() = iter->to_string(); } for (Configuration::const_iterator iter = conf_entry.old_conf.begin(); iter != conf_entry.old_conf.end(); ++iter) { *meta.add_old_peers() = iter->to_string(); } SnapshotWriter* writer = done->start(meta); if (!writer) { done->status().set_error(EINVAL, "snapshot_storage create SnapshotWriter failed"); done->Run(); return; } _fsm->on_snapshot_save(writer, done); return; }
-
用户的函数成功写入之后,会调用
SaveSnapshotDone::Run()
,该函数会开启新 bthread 调用SaveSnapshotDone::continue_run()
。SaveSnapshotDone::continue_run()
调用SnapshotExecutor::on_snapshot_save_done()
。 该函数部分代码如下:int SnapshotExecutor::on_snapshot_save_done( const butil::Status& st, const SnapshotMeta& meta, SnapshotWriter* writer) { std::unique_lock<raft_mutex_t> lck(_mutex); int ret = st.error_code(); // ... ... if (ret == 0) { if (writer->save_meta(meta)) { LOG(WARNING) << "node " << _node->node_id() << " fail to save snapshot"; ret = EIO; } } else { if (writer->ok()) { writer->set_error(ret, "Fail to do snapshot"); } } if (_snapshot_storage->close(writer) != 0) { ret = EIO; LOG(WARNING) << "node " << _node->node_id() << " fail to close writer"; } std::stringstream ss; if (_node) { ss << "node " << _node->node_id() << ' '; } lck.lock(); if (ret == 0) { _last_snapshot_index = meta.last_included_index(); _last_snapshot_term = meta.last_included_term(); lck.unlock(); ss << "snapshot_save_done, last_included_index=" << meta.last_included_index() << " last_included_term=" << meta.last_included_term(); LOG(INFO) << ss.str(); _log_manager->set_snapshot(&meta); lck.lock(); } if (ret == EIO) { report_error(EIO, "Fail to save snapshot"); } _saving_snapshot = false; lck.unlock(); _running_jobs.signal(); return ret; }
- 首先会去调用
LocalSnapshotStorage::close()
,该函数会把 temp 文件夹重命名成 snapshot+last_included_index 这种格式的新目录,这样快照文件就成功保存下来了。 - 然后保存 _last_snapshot_index 和 _last_snapshot_term 信息,用于下一次 snapshot 最开始判断 gap。
- 最后调用
LogManager::set_snapshot()
通知 LogManager snapshot 的信息,用于删除 Log
- 首先会去调用
SnapshotMeta 定义:
message SnapshotMeta {
required int64 last_included_index = 1;
required int64 last_included_term = 2;
repeated string peers = 3;
repeated string old_peers = 4;
}
==需要保存 last_included_index 和 last_included_term 信息的原因是做完 Snapshot 之后,last_include_index 及其之前的 Log 都会被删除,这样再次重启需要恢复 term、index 和 cofiguration 等信息,考虑 Snapshot 之后没有写入并重启的情况==。
写完快照文件后,调用 LogManager::set_snapshot()
,在该函数里首先调用 ConfigManager::set_snapshot()
更新快照元数据,然后得到快照最后包含的index 对应的 term:
ConfigurationEntry entry;
entry.id = LogId(meta->last_included_index(), meta->last_included_term());
entry.conf = conf;
entry.old_conf = old_conf;
_config_manager->set_snapshot(entry);
int64_t term = unsafe_get_term(meta->last_included_index());
然后更新 _last_snapshot_id
用于 Log Truncate:
const LogId last_but_one_snapshot_id = _last_snapshot_id;
_last_snapshot_id.index = meta->last_included_index();
_last_snapshot_id.term = meta->last_included_term();
if (_last_snapshot_id > _applied_id) {
_applied_id = _last_snapshot_id;
}
然后根据 term 的值,进行不同的处理:
- 等于 0:说明快照的 index(last_included_index)大于当前最新的日志 index(last_index),发生在 Follower 收到 Leader 的 InstallSnapshot 后,则把缓存和文件里面的 log entry 从前面截断到
last_included_index
。 - 等于
meta->last_included_term()
:说明 log entry 里面还存在着这条记录,先不着急截断,把它截断到上一个快照处(如果有的话)。 - 其他情况对应 index 上的 term 不等于
meta->last_included_term()
,则可能是 Follower 处正在安装快照,这种情况,直接 reset,让_first_log_index
指向last_included_index
,_last_log_index
指向last_included_index-1
,把 entries 清空。
if (term == 0) {
// last_included_index is larger than last_index
// FIXME: what if last_included_index is less than first_index?
_virtual_first_log_id = _last_snapshot_id;
truncate_prefix(meta->last_included_index() + 1, lck);
return;
} else if (term == meta->last_included_term()) {
// Truncating log to the index of the last snapshot.
// We don't truncate log before the latest snapshot immediately since
// some log around last_snapshot_index is probably needed by some
// followers
if (last_but_one_snapshot_id.index > 0) {
// We have last snapshot index
_virtual_first_log_id = last_but_one_snapshot_id;
truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
}
return;
} else {
// TODO: check the result of reset.
_virtual_first_log_id = _last_snapshot_id;
reset(meta->last_included_index() + 1, lck);
return;
}
Leader 可以通过向 Follower 发起 InstallSnapshot RPC 让它安装快照。发起 InstallSnapshot RPC 的前提是:send entry 的时候发现要发的 entry 不存在。
Replicator::_send_entries()
的时候如果 _fill_common_fields()
返回错误,则说明没有找到对应 next_index 的 entry,调用 Replicator::_install_snapshot()
发起 InstallSnapshot RPC。
-
各种 check
- 是否有另一个快照正在安装(_reader != nullptr)
- 检查 install snapshot task 数量是否超过设置的限流值(对 Leader 没限制)
-
更改状态为
INSTALLING_SNAPSHOT
-
调用
SnapshotStorage::open()
得到一个 reader(默认是 LocalSnapshotStorage 和 LocalSnapshotReader),它指向最新的快照文件,并且会把 snapshot meta 从 pb 文件加载到_meta_table
里面_reader = _options.snapshot_storage->open(); if (!_reader) { if (_options.snapshot_throttle) { _options.snapshot_throttle->finish_one_task(true); } // ... ... return; } int LocalSnapshotReader::init() { if (!_fs->directory_exists(_path)) { set_error(ENOENT, "Not such _path : %s", _path.c_str()); return ENOENT; } std::string meta_path = _path + "/" BRAFT_SNAPSHOT_META_FILE; if (_meta_table.load_from_file(_fs, meta_path) != 0) { set_error(EIO, "Fail to load meta"); return EIO; } return 0; }
-
生成 file service uri,用于 FLollower 下载 Snapshot 文件。调用
LocalSnapshotReader::generate_uri_for_copy()
,该函数会通过调用file_service_add()
把 reader 添加到 file_service 里面,然后返回一个 uri,uri 的格式为:"remote://" + _addr + "/" + _reader_id
-
从 reader 中取出 meta,然后填充 request
SnapshotMeta meta; // report error on failure if (_reader->load_meta(&meta) != 0) { // ... ... return; } request->set_term(_options.term); request->set_group_id(_options.group_id); request->set_server_id(_options.server_id.to_string()); request->set_peer_id(_options.peer_id.to_string()); request->mutable_meta()->CopyFrom(meta); request->set_uri(uri);
-
更新
_install_snapshot_in_fly
、_install_snapshot_counter
等计数_install_snapshot_in_fly = cntl->call_id(); _install_snapshot_counter++; _st.last_log_included = meta.last_included_index(); _st.last_term_included = meta.last_included_term();
-
发起 InstallSnapshot RPC,其中回调函数为
Replicator::_on_install_snapshot_returned()
google::protobuf::Closure* done = brpc::NewCallback< ReplicatorId, brpc::Controller*, InstallSnapshotRequest*, InstallSnapshotResponse*>( _on_install_snapshot_returned, _id.value, cntl, request, response); RaftService_Stub stub(&_sending_channel); stub.install_snapshot(cntl, request, response, done);
InstallSnapshotRequest 定义:
message SnapshotMeta {
required int64 last_included_index = 1;
required int64 last_included_term = 2;
repeated string peers = 3;
repeated string old_peers = 4;
}
message InstallSnapshotRequest {
required string group_id = 1;
required string server_id = 2;
required string peer_id = 3;
required int64 term = 4;
required SnapshotMeta meta = 5;
required string uri = 6;
};
Follower 收到 InstallSnapshot RPC 后,会调用 NodeImpl::handle_install_snapshot_request()
,它再去调用 SnapshotExecutor::install_snapshot()
。该函数部分代码如下:
void SnapshotExecutor::install_snapshot(brpc::Controller* cntl,
const InstallSnapshotRequest* request,
InstallSnapshotResponse* response,
google::protobuf::Closure* done) {
int ret = 0;
brpc::ClosureGuard done_guard(done);
SnapshotMeta meta = request->meta();
// check if install_snapshot tasks num exceeds threshold
if (_snapshot_throttle && !_snapshot_throttle->add_one_more_task(false)) {
LOG(WARNING) << "Fail to install snapshot";
cntl->SetFailed(EBUSY, "Fail to add install_snapshot tasks now");
return;
}
std::unique_ptr<DownloadingSnapshot> ds(new DownloadingSnapshot);
ds->cntl = cntl;
ds->done = done;
ds->response = response;
ds->request = request;
ret = register_downloading_snapshot(ds.get());
// ^^^ DON'T access request, response, done and cntl after this point
// as the retry snapshot will replace this one.
if (ret != 0) {
// ... ...
return;
}
// Release done first as this RPC might be replaced by the retry one
done_guard.release();
CHECK(_cur_copier);
_cur_copier->join();
// when copying finished or canceled, more install_snapshot tasks are allowed
if (_snapshot_throttle) {
_snapshot_throttle->finish_one_task(false);
}
return load_downloading_snapshot(ds.release(), meta);
}
SnapshotExecutor::install_snapshot()
中首先会构造一个 DownloadingSnapshot,然后调用 SnapshotExecutor::register_downloading_snapshot()
,该函大致代码如下:
int SnapshotExecutor::register_downloading_snapshot(DownloadingSnapshot* ds) {
std::unique_lock<raft_mutex_t> lck(_mutex);
// ... ...
DownloadingSnapshot* m = _downloading_snapshot.load(butil::memory_order_relaxed);
if (!m) {
_downloading_snapshot.store(ds, butil::memory_order_relaxed);
// Now this session has the right to download the snapshot.
CHECK(!_cur_copier);
_cur_copier = _snapshot_storage->start_to_copy_from(ds->request->uri());
if (_cur_copier == NULL) {
_downloading_snapshot.store(NULL, butil::memory_order_relaxed);
lck.unlock();
LOG(WARNING) << "Register failed: fail to copy file.";
ds->cntl->SetFailed(EINVAL, "Fail to copy from , %s",
ds->request->uri().c_str());
return -1;
}
_running_jobs.add_count(1);
return 0;
}
// 说明当前有另一个快照在下载, 不同情况对应的处理方式返回不同
// ... ...
}
如果 SnapshotExecutor::_downloading_snapshot
为空的话说明没有正在下载的快照,直接调用 SnapshotStorage::start_to_copy_from()
(默认为 LocalSnapshotStorage)。这个函数会构造一个 LocalSnapshotCopier,然后 init 并 start。
LocalSnapshotCopier::init()
会去调用 RemoteFileCopier::init()
,这个函数会从 uri 解析出 ip 和端口,用它们初始化 channel,并把 reader_id 提取出来。
LocalSnapshotCopier::start()
会在后台开启 bthread 执行 LocalSnapshotCopier::copy()
:
void LocalSnapshotCopier::copy() {
do {
load_meta_table();
if (!ok()) {
break;
}
filter();
if (!ok()) {
break;
}
std::vector<std::string> files;
_remote_snapshot.list_files(&files);
for (size_t i = 0; i < files.size() && ok(); ++i) {
copy_file(files[i]);
}
} while (0);
if (!ok() && _writer && _writer->ok()) {
LOG(WARNING) << "Fail to copy, error_code " << error_code()
<< " error_msg " << error_cstr()
<< " writer path " << _writer->get_path();
_writer->set_error(error_code(), error_cstr());
}
if (_writer) {
// set_error for copier only when failed to close writer and copier was
// ok before this moment
if (_storage->close(_writer, _filter_before_copy_remote) != 0 && ok()) {
set_error(EIO, "Fail to close writer");
}
_writer = NULL;
}
if (ok()) {
_reader = _storage->open();
}
}
- 首先调用
LocalSnapshotCopier::load_meta_table()
加载快照的 metaFile:
void LocalSnapshotCopier::load_meta_table() {
butil::IOBuf meta_buf;
std::unique_lock<raft_mutex_t> lck(_mutex);
if (_cancelled) {
set_error(ECANCELED, "%s", berror(ECANCELED));
return;
}
scoped_refptr<RemoteFileCopier::Session> session
= _copier.start_to_copy_to_iobuf(BRAFT_SNAPSHOT_META_FILE, &meta_buf, NULL);
_cur_session = session.get();
lck.unlock();
session->join();
lck.lock();
_cur_session = NULL;
lck.unlock();
if (!session->status().ok()) {
LOG(WARNING) << "Fail to copy meta file : " << session->status();
set_error(session->status().error_code(), session->status().error_cstr());
return;
}
if (_remote_snapshot._meta_table.load_from_iobuf_as_remote(meta_buf) != 0) {
LOG(WARNING) << "Bad meta_table format";
set_error(-1, "Bad meta_table format");
return;
}
CHECK(_remote_snapshot._meta_table.has_meta());
}
它会调用 RemoteFileCopier::start_to_copy_to_iobuf()
,然后用 RemoteFileCopier::Session::send_next_rpc()
向 Leader 的 FileService 发送 get_file 请求。Leader 的 FileService 收到 get_file 请求后用对应的 reader 去读文件,把它的 meta_table 变成 iobuf 放到 controller 的 response_attachment 里面。rpc 返回后会调用回调函数 RemoteFileCopier::Session::on_rpc_returned()
,从 response_attachment 里面取出文件放到 _file
(也就是meta_buf)里面,然后继续发送下一个 rpc。 使用 session.join() 等待 rpc 完成,然后把 meta_buf 存到 remote_snapshot 的 meta_table 里面。
- 然后调用 filter 进行一些清理操作
- 对于远端的所有文件执行 copy_file。这一步和从远端复制 meta 文件类似,调用
RemoteFileCopier::start_to_copy_to_file()
,然后向远端发 rpc 请求,Leader 收到请求之后用 request 里面的 reader 把文件读出来放到 response_attachment 里面,rpc 返回后 Follower 将得到的数据放到本地文件里面。
把快照下载到本地之后,就可以调用 load_downloading_snapshot 加载了。大致代码如下:
void SnapshotExecutor::load_downloading_snapshot(DownloadingSnapshot* ds,
const SnapshotMeta& meta) {
std::unique_ptr<DownloadingSnapshot> ds_guard(ds);
std::unique_lock<raft_mutex_t> lck(_mutex);
CHECK_EQ(ds, _downloading_snapshot.load(butil::memory_order_relaxed));
brpc::ClosureGuard done_guard(ds->done);
CHECK(_cur_copier);
SnapshotReader* reader = _cur_copier->get_reader();
if (!_cur_copier->ok()) {
// ... ...
return;
}
_snapshot_storage->close(_cur_copier);
_cur_copier = NULL;
if (reader == NULL || !reader->ok()) {
if (reader) {
_snapshot_storage->close(reader);
}
// ... ...
return;
}
// The owner of ds is on_snapshot_load_done
ds_guard.release();
done_guard.release();
_loading_snapshot = true;
// ^ After this point, this installing cannot be interrupted
_loading_snapshot_meta = meta;
lck.unlock();
InstallSnapshotDone* install_snapshot_done = new InstallSnapshotDone(this, reader);
int ret = _fsm_caller->on_snapshot_load(install_snapshot_done);
if (ret != 0) {
LOG(WARNING) << "node " << _node->node_id() << " fail to call on_snapshot_load";
install_snapshot_done->status().set_error(EHOSTDOWN, "This raft node is down");
return install_snapshot_done->Run();
}
}
它主要是调用 FSMCaller::on_snapshot_load()
,这个函数会把它放到队列里面,执行 FSMCaller::do_snapshot_load()
,它会调用用户定义的状态机的StateMachine::on_snapshot_load()
,使用快照文件把系统恢复成快照对应的状态,然后更新 _last_applied_index
和 _last_applied_term
。
最后调用回调,把节点的状态包括 log entries、conf 和 Leader 安装快照一样。除了落后的 Follower 需要安装快照外,所有节点在重启的时候也会首先去安装快照,也就是快照的启动加速功能。具体的代码在 SnapshotExecutor::init()
里。
主要是做一些清理工作,注意最后调用了 Replicator::_send_entries()
继续发送日志。