diff --git a/dkron/agent.go b/dkron/agent.go index a261f397c..4a05b2aea 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -113,9 +113,10 @@ type Agent struct { // peers is used to track the known Dkron servers. This is // used for region forwarding and clustering. - peers map[string][]*ServerParts - localPeers map[raft.ServerAddress]*ServerParts - peerLock sync.RWMutex + peers map[string][]*ServerParts + localPeers map[raft.ServerAddress]*ServerParts + peerLock sync.RWMutex + serverLookup *ServerLookup activeExecutions sync.Map @@ -316,7 +317,19 @@ func (a *Agent) setupRaft() error { logger = a.logger.Logger.Writer() } - transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger) + var serverAddressProvider raft.ServerAddressProvider = a.serverLookup + + transConfig := &raft.NetworkTransportConfig{ + Stream: a.raftLayer, + MaxPool: 3, + Timeout: 10 * time.Second, + ServerAddressProvider: serverAddressProvider, + } + transport := raft.NewNetworkTransportWithConfig(transConfig) + rpcIP := net.ParseIP(a.config.Tags["rpc_addr"]) + port, err := strconv.Atoi(a.config.Tags["port"]) + rpcAddr := &net.TCPAddr{IP: rpcIP, Port: port} + a.serverLookup.AddServer(&ServerParts{ID: a.config.NodeName, RPCAddr: rpcAddr}) a.raftTransport = transport config := raft.DefaultConfig() @@ -540,6 +553,7 @@ func (a *Agent) SetConfig(c *Config) { // StartServer launch a new dkron server process func (a *Agent) StartServer() { + a.serverLookup = NewServerLookup() if a.Store == nil { s, err := NewStore(a.logger) if err != nil { @@ -710,7 +724,9 @@ func (a *Agent) eventLoop() { a.localMemberEvent(me) case serf.EventMemberReap: a.localMemberEvent(me) - case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore + case serf.EventMemberUpdate: + a.logger.WithField("event", e.String()).Info("agent: event member update") + case serf.EventUser, serf.EventQuery: // Ignore default: a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event") } diff --git a/dkron/serf.go b/dkron/serf.go index 6311aee02..2e18f174c 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -22,7 +22,7 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) { continue } a.logger.WithField("server", parts.Name).Info("adding server") - + a.serverLookup.AddServer(parts) // Check if this server is known found := false a.peerLock.Lock() @@ -174,6 +174,7 @@ func (a *Agent) nodeFailed(me serf.MemberEvent) { delete(a.localPeers, raft.ServerAddress(parts.Addr.String())) } a.peerLock.Unlock() + a.serverLookup.RemoveServer(parts) } } diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go new file mode 100644 index 000000000..1ac7c31e5 --- /dev/null +++ b/dkron/server_lookup.go @@ -0,0 +1,75 @@ +package dkron + +import ( + "fmt" + "sync" + + "github.com/hashicorp/raft" +) + +// ServerLookup encapsulates looking up servers by id and address +type ServerLookup struct { + lock sync.RWMutex + addressToServer map[raft.ServerAddress]*ServerParts + idToServer map[raft.ServerID]*ServerParts +} + +func NewServerLookup() *ServerLookup { + return &ServerLookup{ + addressToServer: make(map[raft.ServerAddress]*ServerParts), + idToServer: make(map[raft.ServerID]*ServerParts), + } +} + +func (sl *ServerLookup) AddServer(server *ServerParts) { + sl.lock.Lock() + defer sl.lock.Unlock() + sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server + sl.idToServer[raft.ServerID(server.ID)] = server +} + +func (sl *ServerLookup) RemoveServer(server *ServerParts) { + sl.lock.Lock() + defer sl.lock.Unlock() + delete(sl.addressToServer, raft.ServerAddress(server.RPCAddr.String())) + delete(sl.idToServer, raft.ServerID(server.ID)) +} + +// Implements the ServerAddressProvider interface +func (sl *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { + sl.lock.RLock() + defer sl.lock.RUnlock() + svr, ok := sl.idToServer[id] + if !ok { + return "", fmt.Errorf("Could not find address for server id %v", id) + } + return raft.ServerAddress(svr.RPCAddr.String()), nil +} + +// Server looks up the server by address, returns a boolean if not found +func (sl *ServerLookup) Server(addr raft.ServerAddress) *ServerParts { + sl.lock.RLock() + defer sl.lock.RUnlock() + return sl.addressToServer[addr] +} + +func (sl *ServerLookup) Servers() []*ServerParts { + sl.lock.RLock() + defer sl.lock.RUnlock() + var ret []*ServerParts + for _, svr := range sl.addressToServer { + ret = append(ret, svr) + } + return ret +} + +func (sl *ServerLookup) CheckServers(fn func(srv *ServerParts) bool) { + sl.lock.RLock() + defer sl.lock.RUnlock() + + for _, srv := range sl.addressToServer { + if !fn(srv) { + return + } + } +}