Skip to content

Commit

Permalink
(WIP - consul like approach) fix cluster bootstrapping & fix cluster …
Browse files Browse the repository at this point in the history
…leaving & fix serf events handling
  • Loading branch information
ivan-kripakov-m10 committed Dec 6, 2023
1 parent ae67d49 commit db69358
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build-and-deploy-to-minikube.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# shellcheck disable=SC2046
eval $(minikube -p minikube docker-env)
docker build -t m10-payments-dkron .
docker build -t m10-payments-dkron-2 .
helm upgrade --install dkron --namespace dkron --create-namespace ./helm/dkron
112 changes: 97 additions & 15 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
minRaftProtocol = 3

// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
)

var (
Expand Down Expand Up @@ -263,25 +267,101 @@ func (a *Agent) JoinLAN(addrs []string) (int, error) {
func (a *Agent) Stop() error {
a.logger.Info("agent: Called member stop, now stopping")

if a.config.Server {
if a.sched.Started() {
<-a.sched.Stop().Done()
// Check the number of known peers
configFuture := a.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
return err
}
cfg := configFuture.Configuration()

var numPeers int
for _, server := range cfg.Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}

// TODO: Check why Shutdown().Error() is not working
_ = a.raft.Shutdown()
addr := a.raftTransport.LocalAddr()

if err := a.Store.Shutdown(); err != nil {
return err
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size.
// If we are not the leader, then we should issue our leave intention and wait to be
// removed for some reasonable period of time.
isLeader := a.IsLeader()
if isLeader && numPeers > 1 {
if err := a.leadershipTransfer(); err == nil {
isLeader = false
if a.sched.Started() {
<-a.sched.Stop().Done()
}
} else {
future := a.raft.RemoveServer(raft.ServerID(a.config.NodeName), 0, 0)
if err := future.Error(); err != nil {
a.logger.Error("failed to remove ourself as raft peer", "error", err)
}
}
}

if err := a.serf.Leave(); err != nil {
return err
// Leave the LAN pool
if a.serf != nil {
if err := a.serf.Leave(); err != nil {
a.logger.Error("failed to leave LAN Serf cluster", "error", err)
}
}

if err := a.serf.Shutdown(); err != nil {
return err
// Start refusing RPCs now that we've left the LAN pool. It's important
// to do this *after* we've left the LAN pool so that clients will know
// to shift onto another server if they perform a retry. We also wake up
// all queries in the RPC retry state.
a.logger.Info("Waiting to drain RPC traffic", "drain_time", time.Second*10)
close(a.shutdownCh)
time.Sleep(time.Second * 10) // todo cfg on start

// If we were not leader, wait to be safely removed from the cluster. We
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.
if !isLeader {
left := false
limit := time.Now().Add(raftRemoveGracePeriod)
for !left && time.Now().Before(limit) {
// Sleep a while before we check.
time.Sleep(50 * time.Millisecond)

// Get the latest configuration.
future := a.raft.GetConfiguration()
if err := future.Error(); err != nil {
a.logger.Error("failed to get raft configuration", "error", err)
break
}

// See if we are no longer included.
left = true
for _, server := range future.Configuration().Servers {
if server.Address == addr {
left = false
break
}
}
}

// TODO (slackpad) With the old Raft library we used to force the
// peers set to empty when a graceful leave occurred. This would
// keep voting spam down if the server was restarted, but it was
// dangerous because the peers was inconsistent with the logs and
// snapshots, so it wasn't really safe in all cases for the server
// to become leader. This is now safe, but the log spam is noisy.
// The next new version of the library will have a "you are not a
// peer stop it" behavior that should address this. We will have
// to evaluate during the RC period if this interim situation is
// not too confusing for operators.

// TODO (slackpad) When we take a later new version of the Raft
// library it won't try to complete replication, so this peer
// may not realize that it has been removed. Need to revisit this
// and the warning here.
if !left {
a.logger.Warn("failed to leave raft configuration gracefully, timeout")
}
}

return nil
Expand Down Expand Up @@ -719,14 +799,16 @@ func (a *Agent) eventLoop() {
case serf.EventMemberJoin:
a.nodeJoin(me)
a.localMemberEvent(me)
case serf.EventMemberLeave, serf.EventMemberFailed:
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
a.nodeFailed(me)
a.localMemberEvent(me)
case serf.EventMemberReap:
a.localMemberEvent(me)
case serf.EventMemberUpdate:
a.logger.WithField("event", e.String()).Info("agent: event member update")
case serf.EventUser, serf.EventQuery: // Ignore
a.nodeJoin(me)
a.localMemberEvent(me)
case serf.EventUser:
a.localMemberEvent(me)
case serf.EventQuery: // Ignore
default:
a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event")
}
Expand Down
51 changes: 48 additions & 3 deletions dkron/serf.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package dkron

import (
"strings"

"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"strings"
"time"
)

const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)

// maxPeerRetries limits how many invalidate attempts are made
maxPeerRetries = 6
)

// nodeJoin is used to handle join events on the serf cluster
Expand Down Expand Up @@ -111,16 +114,58 @@ func (a *Agent) maybeBootstrap() {
}

// TODO: Query each of the servers and make sure they report no Raft peers.
// Query each of the servers and make sure they report no Raft peers.
for _, server := range servers {
var peers []string

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String())
if err != nil {
nextRetry := (1 << attempt) * time.Second
a.logger.Error("Failed to confirm peer status for server (will retry).",
"server", server.Name,
"retry_interval", nextRetry.String(),
"error", err,
)
time.Sleep(nextRetry)
} else {
for _, peer := range configuration.Servers {
peers = append(peers, peer.Id)
}
break
}
}

// Found a node with some Raft peers, stop bootstrap since there's
// evidence of an existing cluster. We should get folded in by the
// existing servers if that's the case, so it's cleaner to sit as a
// candidate with no peers so we don't cause spurious elections.
// It's OK this is racy, because even with an initial bootstrap
// as long as one peer runs bootstrap things will work, and if we
// have multiple peers bootstrap in the same way, that's OK. We
// just don't want a server added much later to do a live bootstrap
// and interfere with the cluster. This isn't required for Raft's
// correctness because no server in the existing cluster will vote
// for this server, but it makes things much more stable.
if len(peers) > 0 {
a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name)
a.config.BootstrapExpect = 0
return
}
}

// Update the peer set
// Attempt a live bootstrap!
var configuration raft.Configuration
var addrs []string
var ids []string

for _, server := range servers {
addr := server.Addr.String()
addrs = append(addrs, addr)
id := raft.ServerID(server.ID)
ids = append(ids, server.ID)
suffrage := raft.Voter
peer := raft.Server{
ID: id,
Expand All @@ -130,7 +175,7 @@ func (a *Agent) maybeBootstrap() {
configuration.Servers = append(configuration.Servers, peer)
}
a.logger.Info("agent: found expected number of peers, attempting to bootstrap cluster...",
"peers", strings.Join(addrs, ","))
"peersAddrs", strings.Join(addrs, ","), "peersIDs", ";", strings.Join(ids, ","))
future := a.raft.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
a.logger.WithError(err).Error("agent: failed to bootstrap cluster")
Expand Down
2 changes: 1 addition & 1 deletion helm/dkron/templates/server-statefulSet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ spec:
lifecycle:
preStop:
exec:
command: [ /usr/local/bin/dkron, leave ]
command: [ "dkron", "leave" ]
resources:
{{- toYaml .Values.server.resources | nindent 12 }}
envFrom:
Expand Down
4 changes: 2 additions & 2 deletions helm/dkron/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Declare variables to be passed into your templates.

image:
repository: m10-payments-dkron
repository: m10-payments-dkron-2
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
Expand Down Expand Up @@ -79,7 +79,7 @@ server:
size: 1Gi

agent:
replicaCount: 1
replicaCount: 3

log:
level: "info"
Expand Down

0 comments on commit db69358

Please sign in to comment.