diff --git a/build-and-deploy-to-minikube.sh b/build-and-deploy-to-minikube.sh index 912c2ba67..ea6634307 100644 --- a/build-and-deploy-to-minikube.sh +++ b/build-and-deploy-to-minikube.sh @@ -1,4 +1,4 @@ # shellcheck disable=SC2046 eval $(minikube -p minikube docker-env) -docker build -t m10-payments-dkron . +docker build -t m10-payments-dkron-2 . helm upgrade --install dkron --namespace dkron --create-namespace ./helm/dkron \ No newline at end of file diff --git a/dkron/agent.go b/dkron/agent.go index 26687ed27..d4dbbd93e 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -36,6 +36,10 @@ const ( // This is used to reduce disk I/O for the recently committed entries. raftLogCacheSize = 512 minRaftProtocol = 3 + + // raftRemoveGracePeriod is how long we wait to allow a RemovePeer + // to replicate to gracefully leave the cluster. + raftRemoveGracePeriod = 5 * time.Second ) var ( @@ -263,25 +267,101 @@ func (a *Agent) JoinLAN(addrs []string) (int, error) { func (a *Agent) Stop() error { a.logger.Info("agent: Called member stop, now stopping") - if a.config.Server { - if a.sched.Started() { - <-a.sched.Stop().Done() + // Check the number of known peers + configFuture := a.raft.GetConfiguration() + if err := configFuture.Error(); err != nil { + return err + } + cfg := configFuture.Configuration() + + var numPeers int + for _, server := range cfg.Servers { + if server.Suffrage == raft.Voter { + numPeers++ } + } - // TODO: Check why Shutdown().Error() is not working - _ = a.raft.Shutdown() + addr := a.raftTransport.LocalAddr() - if err := a.Store.Shutdown(); err != nil { - return err + // If we are the current leader, and we have any other peers (cluster has multiple + // servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size. + // If we are not the leader, then we should issue our leave intention and wait to be + // removed for some reasonable period of time. + isLeader := a.IsLeader() + if isLeader && numPeers > 1 { + if err := a.leadershipTransfer(); err == nil { + isLeader = false + if a.sched.Started() { + <-a.sched.Stop().Done() + } + } else { + future := a.raft.RemoveServer(raft.ServerID(a.config.NodeName), 0, 0) + if err := future.Error(); err != nil { + a.logger.Error("failed to remove ourself as raft peer", "error", err) + } } } - if err := a.serf.Leave(); err != nil { - return err + // Leave the LAN pool + if a.serf != nil { + if err := a.serf.Leave(); err != nil { + a.logger.Error("failed to leave LAN Serf cluster", "error", err) + } } - if err := a.serf.Shutdown(); err != nil { - return err + // Start refusing RPCs now that we've left the LAN pool. It's important + // to do this *after* we've left the LAN pool so that clients will know + // to shift onto another server if they perform a retry. We also wake up + // all queries in the RPC retry state. + a.logger.Info("Waiting to drain RPC traffic", "drain_time", time.Second*10) + close(a.shutdownCh) + time.Sleep(time.Second * 10) // todo cfg on start + + // If we were not leader, wait to be safely removed from the cluster. We + // must wait to allow the raft replication to take place, otherwise an + // immediate shutdown could cause a loss of quorum. + if !isLeader { + left := false + limit := time.Now().Add(raftRemoveGracePeriod) + for !left && time.Now().Before(limit) { + // Sleep a while before we check. + time.Sleep(50 * time.Millisecond) + + // Get the latest configuration. + future := a.raft.GetConfiguration() + if err := future.Error(); err != nil { + a.logger.Error("failed to get raft configuration", "error", err) + break + } + + // See if we are no longer included. + left = true + for _, server := range future.Configuration().Servers { + if server.Address == addr { + left = false + break + } + } + } + + // TODO (slackpad) With the old Raft library we used to force the + // peers set to empty when a graceful leave occurred. This would + // keep voting spam down if the server was restarted, but it was + // dangerous because the peers was inconsistent with the logs and + // snapshots, so it wasn't really safe in all cases for the server + // to become leader. This is now safe, but the log spam is noisy. + // The next new version of the library will have a "you are not a + // peer stop it" behavior that should address this. We will have + // to evaluate during the RC period if this interim situation is + // not too confusing for operators. + + // TODO (slackpad) When we take a later new version of the Raft + // library it won't try to complete replication, so this peer + // may not realize that it has been removed. Need to revisit this + // and the warning here. + if !left { + a.logger.Warn("failed to leave raft configuration gracefully, timeout") + } } return nil @@ -719,14 +799,16 @@ func (a *Agent) eventLoop() { case serf.EventMemberJoin: a.nodeJoin(me) a.localMemberEvent(me) - case serf.EventMemberLeave, serf.EventMemberFailed: + case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap: a.nodeFailed(me) a.localMemberEvent(me) - case serf.EventMemberReap: - a.localMemberEvent(me) case serf.EventMemberUpdate: a.logger.WithField("event", e.String()).Info("agent: event member update") - case serf.EventUser, serf.EventQuery: // Ignore + a.nodeJoin(me) + a.localMemberEvent(me) + case serf.EventUser: + a.localMemberEvent(me) + case serf.EventQuery: // Ignore default: a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event") } diff --git a/dkron/serf.go b/dkron/serf.go index 2e18f174c..349ac7991 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -1,16 +1,19 @@ package dkron import ( - "strings" - "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "strings" + "time" ) const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) + + // maxPeerRetries limits how many invalidate attempts are made + maxPeerRetries = 6 ) // nodeJoin is used to handle join events on the serf cluster @@ -111,16 +114,58 @@ func (a *Agent) maybeBootstrap() { } // TODO: Query each of the servers and make sure they report no Raft peers. + // Query each of the servers and make sure they report no Raft peers. + for _, server := range servers { + var peers []string + + // Retry with exponential backoff to get peer status from this server + for attempt := uint(0); attempt < maxPeerRetries; attempt++ { + configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String()) + if err != nil { + nextRetry := (1 << attempt) * time.Second + a.logger.Error("Failed to confirm peer status for server (will retry).", + "server", server.Name, + "retry_interval", nextRetry.String(), + "error", err, + ) + time.Sleep(nextRetry) + } else { + for _, peer := range configuration.Servers { + peers = append(peers, peer.Id) + } + break + } + } + + // Found a node with some Raft peers, stop bootstrap since there's + // evidence of an existing cluster. We should get folded in by the + // existing servers if that's the case, so it's cleaner to sit as a + // candidate with no peers so we don't cause spurious elections. + // It's OK this is racy, because even with an initial bootstrap + // as long as one peer runs bootstrap things will work, and if we + // have multiple peers bootstrap in the same way, that's OK. We + // just don't want a server added much later to do a live bootstrap + // and interfere with the cluster. This isn't required for Raft's + // correctness because no server in the existing cluster will vote + // for this server, but it makes things much more stable. + if len(peers) > 0 { + a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name) + a.config.BootstrapExpect = 0 + return + } + } // Update the peer set // Attempt a live bootstrap! var configuration raft.Configuration var addrs []string + var ids []string for _, server := range servers { addr := server.Addr.String() addrs = append(addrs, addr) id := raft.ServerID(server.ID) + ids = append(ids, server.ID) suffrage := raft.Voter peer := raft.Server{ ID: id, @@ -130,7 +175,7 @@ func (a *Agent) maybeBootstrap() { configuration.Servers = append(configuration.Servers, peer) } a.logger.Info("agent: found expected number of peers, attempting to bootstrap cluster...", - "peers", strings.Join(addrs, ",")) + "peersAddrs", strings.Join(addrs, ","), "peersIDs", ";", strings.Join(ids, ",")) future := a.raft.BootstrapCluster(configuration) if err := future.Error(); err != nil { a.logger.WithError(err).Error("agent: failed to bootstrap cluster") diff --git a/helm/dkron/templates/server-statefulSet.yaml b/helm/dkron/templates/server-statefulSet.yaml index 115cccbd6..9581d7974 100644 --- a/helm/dkron/templates/server-statefulSet.yaml +++ b/helm/dkron/templates/server-statefulSet.yaml @@ -64,7 +64,7 @@ spec: lifecycle: preStop: exec: - command: [ /usr/local/bin/dkron, leave ] + command: [ "dkron", "leave" ] resources: {{- toYaml .Values.server.resources | nindent 12 }} envFrom: diff --git a/helm/dkron/values.yaml b/helm/dkron/values.yaml index 0e08604c9..f7fe3b02a 100644 --- a/helm/dkron/values.yaml +++ b/helm/dkron/values.yaml @@ -3,7 +3,7 @@ # Declare variables to be passed into your templates. image: - repository: m10-payments-dkron + repository: m10-payments-dkron-2 pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. tag: "" @@ -79,7 +79,7 @@ server: size: 1Gi agent: - replicaCount: 1 + replicaCount: 3 log: level: "info"