diff --git a/dkron/agent.go b/dkron/agent.go index d14de14fc..e51787643 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -327,10 +327,6 @@ func (a *Agent) setupRaft() error { ServerAddressProvider: serverAddressProvider, } transport := raft.NewNetworkTransportWithConfig(transConfig) - rpcIP := net.ParseIP(a.config.Tags["rpc_addr"]) - port, err := strconv.Atoi(a.config.Tags["port"]) - rpcAddr := &net.TCPAddr{IP: rpcIP, Port: port} - a.serverLookup.AddServer(&ServerParts{ID: a.config.NodeName, RPCAddr: rpcAddr}) a.raftTransport = transport config := raft.DefaultConfig() diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 37ddc4ba1..a9a33b0d0 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -16,7 +16,7 @@ import ( ) var ( - logLevel = "error" + logLevel = "info" ) func TestAgentCommand_runForElection(t *testing.T) { @@ -636,3 +636,95 @@ func Test_selectNodes(t *testing.T) { }) } } + +func Test_clusterWillRecoverAfterIpChange(t *testing.T) { + a1, rfn1 := buildAndRunAgent("test8", []string{}, 3) + defer rfn1() + a2, rfn2 := buildAndRunAgent("test9", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946"}, 3) + defer rfn2() + a3, rfn3 := buildAndRunAgent("test10", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946", a2.bindRPCAddr()[:len(a2.bindRPCAddr())-4] + "8946"}, 3) + defer rfn3() + time.Sleep(2 * time.Second) + assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader()) + servers := a1.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a2.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a3.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + + _ = a1.Stop() + + time.Sleep(30 * time.Second) + + assert.True(t, !a1.IsLeader() && (a2.IsLeader() || a3.IsLeader())) + + //servers = a2.raft.GetConfiguration().Configuration().Servers + //assert.Equal(t, 2, len(servers)) + //servers = a3.raft.GetConfiguration().Configuration().Servers + //assert.Equal(t, 2, len(servers)) + + _ = a2.Stop() + + time.Sleep(20 * time.Second) + + assert.True(t, !a1.IsLeader() && !a2.IsLeader() && !a3.IsLeader()) + + //servers = a3.raft.GetConfiguration().Configuration().Servers + //assert.Equal(t, 1, len(servers)) + + a1, rfn1 = buildAndRunAgent("test8", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3) + defer rfn1() + a2, rfn2 = buildAndRunAgent("test9", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3) + defer rfn2() + + time.Sleep(10 * time.Second) + + assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader()) + servers = a1.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a2.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a3.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) +} + +func buildAndRunAgent( + nodeName string, + startJoin []string, + bootstrapExpect int, +) (*Agent, func()) { + + dir, err := os.MkdirTemp("", fmt.Sprintf("test-%s", nodeName)) + if err != nil { + panic(err.Error()) + } + defer os.RemoveAll(dir) + ip, returnFn := testutil.TakeIP() + defer returnFn() + addr := ip.String() + + // Start another agent + c := DefaultConfig() + c.BindAddr = addr + c.StartJoin = startJoin + c.NodeName = nodeName + c.Server = true + c.LogLevel = logLevel + c.BootstrapExpect = bootstrapExpect + c.DevMode = true + c.DataDir = dir + c.RaftMultiplier = 1 + + a2 := NewAgent(c) + err = a2.Start() + if err != nil { + panic(err.Error()) + } + + return a2, func() { + _ = a2.Stop() + returnFn() + _ = os.RemoveAll(dir) + } +} diff --git a/dkron/serf.go b/dkron/serf.go index 980139dc8..249f2651d 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -2,6 +2,7 @@ package dkron import ( "strings" + "time" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -11,6 +12,9 @@ const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) + + // maxPeerRetries limits how many invalidate attempts are made + maxPeerRetries = 6 ) // nodeJoin is used to handle join events on the serf cluster @@ -110,7 +114,46 @@ func (a *Agent) maybeBootstrap() { return } - // TODO: Query each of the servers and make sure they report no Raft peers. + // Query each of the servers and make sure they report no Raft peers. + for _, server := range servers { + var peers []string + + // Retry with exponential backoff to get peer status from this server + for attempt := uint(0); attempt < maxPeerRetries; attempt++ { + configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String()) + if err != nil { + nextRetry := (1 << attempt) * time.Second + a.logger.Error("Failed to confirm peer status for server (will retry).", + "server", server.Name, + "retry_interval", nextRetry.String(), + "error", err, + ) + time.Sleep(nextRetry) + } else { + for _, peer := range configuration.Servers { + peers = append(peers, peer.Id) + } + break + } + } + + // Found a node with some Raft peers, stop bootstrap since there's + // evidence of an existing cluster. We should get folded in by the + // existing servers if that's the case, so it's cleaner to sit as a + // candidate with no peers so we don't cause spurious elections. + // It's OK this is racy, because even with an initial bootstrap + // as long as one peer runs bootstrap things will work, and if we + // have multiple peers bootstrap in the same way, that's OK. We + // just don't want a server added much later to do a live bootstrap + // and interfere with the cluster. This isn't required for Raft's + // correctness because no server in the existing cluster will vote + // for this server, but it makes things much more stable. + if len(peers) > 0 { + a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name) + a.config.BootstrapExpect = 0 + return + } + } // Update the peer set // Attempt a live bootstrap! diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go index 2343cc433..eac4038c9 100644 --- a/dkron/server_lookup.go +++ b/dkron/server_lookup.go @@ -26,7 +26,7 @@ func (sl *ServerLookup) AddServer(server *ServerParts) { sl.lock.Lock() defer sl.lock.Unlock() sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server - sl.idToServer[raft.ServerID(server.ID)] = server + sl.idToServer[raft.ServerID(server.Name)] = server } func (sl *ServerLookup) RemoveServer(server *ServerParts) {