Skip to content

Commit

Permalink
Merge pull request #188 from PFZheng/master
Browse files Browse the repository at this point in the history
Pick patchs from the internal stable branch of braft in Baidu.
  • Loading branch information
Edward-xk authored Feb 19, 2020
2 parents 591f0cf + d552db7 commit 0a9ec3f
Show file tree
Hide file tree
Showing 35 changed files with 3,934 additions and 807 deletions.
5 changes: 2 additions & 3 deletions src/braft/builtin_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ void RaftStatImpl::default_method(::google::protobuf::RpcController* controller,
brpc::Controller* cntl = (brpc::Controller*)controller;
std::string group_id = cntl->http_request().unresolved_path();
std::vector<scoped_refptr<NodeImpl> > nodes;
NodeManager* nm = NodeManager::GetInstance();
if (group_id.empty()) {
nm->get_all_nodes(&nodes);
global_node_manager->get_all_nodes(&nodes);
} else {
nm->get_nodes_by_group_id(group_id, &nodes);
global_node_manager->get_nodes_by_group_id(group_id, &nodes);
}
const bool html = brpc::UseHTML(cntl->http_request());
if (html) {
Expand Down
11 changes: 5 additions & 6 deletions src/braft/cli_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,20 @@ void CliServiceImpl::get_leader(::google::protobuf::RpcController* controller,
brpc::Controller* cntl = (brpc::Controller*)controller;
brpc::ClosureGuard done_guard(done);
std::vector<scoped_refptr<NodeImpl> > nodes;
NodeManager* const nm = NodeManager::GetInstance();
if (request->has_peer_id()) {
PeerId peer;
if (peer.parse(request->peer_id()) != 0) {
cntl->SetFailed(EINVAL, "Fail to parse %s",
request->peer_id().c_str());
return;
}
scoped_refptr<NodeImpl> node = nm->get(request->group_id(), peer);
scoped_refptr<NodeImpl> node =
global_node_manager->get(request->group_id(), peer);
if (node) {
nodes.push_back(node);
}
} else {
nm->get_nodes_by_group_id(request->group_id(), &nodes);
global_node_manager->get_nodes_by_group_id(request->group_id(), &nodes);
}
if (nodes.empty()) {
cntl->SetFailed(ENOENT, "No nodes in group %s",
Expand All @@ -232,17 +232,16 @@ void CliServiceImpl::get_leader(::google::protobuf::RpcController* controller,
butil::Status CliServiceImpl::get_node(scoped_refptr<NodeImpl>* node,
const GroupId& group_id,
const std::string& peer_id) {
NodeManager* const nm = NodeManager::GetInstance();
if (!peer_id.empty()) {
*node = nm->get(group_id, peer_id);
*node = global_node_manager->get(group_id, peer_id);
if (!(*node)) {
return butil::Status(ENOENT, "Fail to find node %s in group %s",
peer_id.c_str(),
group_id.c_str());
}
} else {
std::vector<scoped_refptr<NodeImpl> > nodes;
nm->get_nodes_by_group_id(group_id, &nodes);
global_node_manager->get_nodes_by_group_id(group_id, &nodes);
if (nodes.empty()) {
return butil::Status(ENOENT, "Fail to find node in group %s",
group_id.c_str());
Expand Down
2 changes: 2 additions & 0 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
namespace braft {

typedef std::string GroupId;
// GroupId with version, format: {group_id}_{index}
typedef std::string VersionedGroupId;

// Represent a participant in a replicating group.
struct PeerId {
Expand Down
20 changes: 14 additions & 6 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
delete iter->status;
break;
case LEADER_START:
caller->do_leader_start(iter->term);
caller->do_leader_start(*(iter->leader_start_context));
delete iter->leader_start_context;
break;
case START_FOLLOWING:
caller->_cur_task = START_FOLLOWING;
Expand Down Expand Up @@ -430,19 +431,26 @@ int FSMCaller::on_leader_stop(const butil::Status& status) {
return 0;
}

int FSMCaller::on_leader_start(int64_t term) {
int FSMCaller::on_leader_start(int64_t term, int64_t lease_epoch) {
ApplyTask task;
task.type = LEADER_START;
task.term = term;
return bthread::execution_queue_execute(_queue_id, task);
LeaderStartContext* on_leader_start_context =
new LeaderStartContext(term, lease_epoch);
task.leader_start_context = on_leader_start_context;
if (bthread::execution_queue_execute(_queue_id, task) != 0) {
delete on_leader_start_context;
return -1;
}
return 0;
}

void FSMCaller::do_leader_stop(const butil::Status& status) {
_fsm->on_leader_stop(status);
}

void FSMCaller::do_leader_start(int64_t term) {
_fsm->on_leader_start(term);
void FSMCaller::do_leader_start(const LeaderStartContext& leader_start_context) {
_node->leader_lease_start(leader_start_context.lease_epoch);
_fsm->on_leader_start(leader_start_context.term);
}

int FSMCaller::on_start_following(const LeaderChangeContext& start_following_context) {
Expand Down
16 changes: 13 additions & 3 deletions src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "braft/closure_queue.h"
#include "braft/macros.h"
#include "braft/log_entry.h"
#include "braft/lease.h"

namespace braft {

Expand Down Expand Up @@ -112,7 +113,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
BRAFT_MOCK int on_snapshot_load(LoadSnapshotClosure* done);
BRAFT_MOCK int on_snapshot_save(SaveSnapshotClosure* done);
int on_leader_stop(const butil::Status& status);
int on_leader_start(int64_t term);
int on_leader_start(int64_t term, int64_t lease_epoch);
int on_start_following(const LeaderChangeContext& start_following_context);
int on_stop_following(const LeaderChangeContext& stop_following_context);
BRAFT_MOCK int on_error(const Error& e);
Expand All @@ -138,14 +139,23 @@ friend class IteratorImpl;
ERROR,
};

struct LeaderStartContext {
LeaderStartContext(int64_t term_, int64_t lease_epoch_)
: term(term_), lease_epoch(lease_epoch_)
{}

int64_t term;
int64_t lease_epoch;
};

struct ApplyTask {
TaskType type;
union {
// For applying log entry (including configuration change)
int64_t committed_index;

// For on_leader_start
int64_t term;
LeaderStartContext* leader_start_context;

// For on_leader_stop
butil::Status* status;
Expand All @@ -167,7 +177,7 @@ friend class IteratorImpl;
void do_snapshot_load(LoadSnapshotClosure* done);
void do_on_error(OnErrorClousre* done);
void do_leader_stop(const butil::Status& status);
void do_leader_start(int64_t term);
void do_leader_start(const LeaderStartContext& leader_start_context);
void do_start_following(const LeaderChangeContext& start_following_context);
void do_stop_following(const LeaderChangeContext& stop_following_context);
void set_error(const Error& e);
Expand Down
141 changes: 141 additions & 0 deletions src/braft/lease.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) 2019 Baidu.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Authors: Pengfei Zheng ([email protected])

#include <gflags/gflags.h>
#include <brpc/reloadable_flags.h>
#include "braft/lease.h"

namespace braft {

DEFINE_bool(raft_enable_leader_lease, false,
"Enable or disable leader lease. only when all peers in a raft group "
"set this configuration to true, leader lease check and vote are safe.");
BRPC_VALIDATE_GFLAG(raft_enable_leader_lease, ::brpc::PassValidate);

void LeaderLease::init(int64_t election_timeout_ms) {
_election_timeout_ms = election_timeout_ms;
}

void LeaderLease::on_leader_start(int64_t term) {
BAIDU_SCOPED_LOCK(_mutex);
++_lease_epoch;
_term = term;
_last_active_timestamp = 0;
}

void LeaderLease::on_leader_stop() {
BAIDU_SCOPED_LOCK(_mutex);
_last_active_timestamp = 0;
_term = 0;
}

void LeaderLease::on_lease_start(int64_t expect_lease_epoch, int64_t last_active_timestamp) {
BAIDU_SCOPED_LOCK(_mutex);
if (_term == 0 || expect_lease_epoch != _lease_epoch) {
return;
}
_last_active_timestamp = last_active_timestamp;
}

void LeaderLease::renew(int64_t last_active_timestamp) {
BAIDU_SCOPED_LOCK(_mutex);
_last_active_timestamp = last_active_timestamp;
}

void LeaderLease::get_lease_info(LeaseInfo* lease_info) {
lease_info->term = 0;
lease_info->lease_epoch = 0;
if (!FLAGS_raft_enable_leader_lease) {
lease_info->state = LeaderLease::DISABLED;
return;
}

BAIDU_SCOPED_LOCK(_mutex);
if (_term == 0) {
lease_info->state = LeaderLease::EXPIRED;
return;
}
if (_last_active_timestamp == 0) {
lease_info->state = LeaderLease::NOT_READY;
return;
}
if (butil::monotonic_time_ms() < _last_active_timestamp + _election_timeout_ms) {
lease_info->term = _term;
lease_info->lease_epoch = _lease_epoch;
lease_info->state = LeaderLease::VALID;
} else {
lease_info->state = LeaderLease::SUSPECT;
}
}

int64_t LeaderLease::lease_epoch() {
BAIDU_SCOPED_LOCK(_mutex);
return _lease_epoch;
}

void LeaderLease::reset_election_timeout_ms(int64_t election_timeout_ms) {
BAIDU_SCOPED_LOCK(_mutex);
_election_timeout_ms = election_timeout_ms;
}

void FollowerLease::init(int64_t election_timeout_ms, int64_t max_clock_drift_ms) {
_election_timeout_ms = election_timeout_ms;
_max_clock_drift_ms = max_clock_drift_ms;
// When the node restart, we are not sure when the lease will be expired actually,
// so just be conservative.
_last_leader_timestamp = butil::monotonic_time_ms();
}

void FollowerLease::renew(const PeerId& leader_id) {
_last_leader = leader_id;
_last_leader_timestamp = butil::monotonic_time_ms();
}

int64_t FollowerLease::votable_time_from_now() {
if (!FLAGS_raft_enable_leader_lease) {
return 0;
}

int64_t now = butil::monotonic_time_ms();
int64_t votable_timestamp = _last_leader_timestamp + _election_timeout_ms +
_max_clock_drift_ms;
if (now >= votable_timestamp) {
return 0;
}
return votable_timestamp - now;
}

const PeerId& FollowerLease::last_leader() {
return _last_leader;
}

bool FollowerLease::expired() {
return butil::monotonic_time_ms() - _last_leader_timestamp
>= _election_timeout_ms + _max_clock_drift_ms;
}

void FollowerLease::reset() {
_last_leader = PeerId();
_last_leader_timestamp = 0;
}

void FollowerLease::reset_election_timeout_ms(int64_t election_timeout_ms,
int64_t max_clock_drift_ms) {
_election_timeout_ms = election_timeout_ms;
_max_clock_drift_ms = max_clock_drift_ms;
}

} // namespace braft
88 changes: 88 additions & 0 deletions src/braft/lease.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2019 Baidu.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Authors: Pengfei Zheng ([email protected])

#ifndef PUBLIC_RAFT_LEASE_H
#define PUBLIC_RAFT_LEASE_H

#include "braft/util.h"

namespace braft {

class LeaderLease {
public:
enum InternalState {
DISABLED,
EXPIRED,
NOT_READY,
VALID,
SUSPECT,
};

struct LeaseInfo {
InternalState state;
int64_t term;
int64_t lease_epoch;
};

LeaderLease()
: _election_timeout_ms(0)
, _last_active_timestamp(0)
, _term(0)
, _lease_epoch(0)
{}

void init(int64_t election_timeout_ms);
void on_leader_start(int64_t term);
void on_leader_stop();
void on_lease_start(int64_t expect_lease_epoch, int64_t last_active_timestamp);
void get_lease_info(LeaseInfo* lease_info);
void renew(int64_t last_active_timestamp);
int64_t lease_epoch();
void reset_election_timeout_ms(int64_t election_timeout_ms);

private:
raft_mutex_t _mutex;
int64_t _election_timeout_ms;
int64_t _last_active_timestamp;
int64_t _term;
int64_t _lease_epoch;
};

class FollowerLease {
public:
FollowerLease()
: _election_timeout_ms(0), _max_clock_drift_ms(0)
, _last_leader_timestamp(0)
{}

void init(int64_t election_timeout_ms, int64_t max_clock_drift_ms);
void renew(const PeerId& leader_id);
int64_t votable_time_from_now();
const PeerId& last_leader();
bool expired();
void reset();
void reset_election_timeout_ms(int64_t election_timeout_ms, int64_t max_clock_drift_ms);

private:
int64_t _election_timeout_ms;
int64_t _max_clock_drift_ms;
PeerId _last_leader;
int64_t _last_leader_timestamp;
};

} // namespace braft

#endif // PUBLIC_RAFT_LEADER_LEASE_H
Loading

0 comments on commit 0a9ec3f

Please sign in to comment.