From 3230c736a7d10be42dcc11cfbfb8d660b9349d2e Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 15 Nov 2023 13:25:33 +0100 Subject: [PATCH] stuff Signed-off-by: Kristoffer Dalby --- hscontrol/app.go | 40 +++++++++ hscontrol/poll.go | 186 ++++++++++++++++------------------------ hscontrol/poll_noise.go | 48 ++++------- 3 files changed, 133 insertions(+), 141 deletions(-) diff --git a/hscontrol/app.go b/hscontrol/app.go index 012ea4122d..12db0c924c 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -70,6 +70,31 @@ const ( registerCacheCleanup = time.Minute * 20 ) +type NodeMap struct { + m map[string]*types.Node +} + +func (nk *NodeMap) Get(nKey string) (*types.Node, bool) { + if node, ok := nk.m[nKey]; ok { + return node, true + } + + return nil, false +} + +func (nk *NodeMap) Peers(nKey string) []*types.Node { + var ret []*types.Node + for key, node := range nk.m { + if key == nKey { + continue + } + + ret = append(ret, node) + } + + return ret +} + // Headscale represents the base app of the service. type Headscale struct { cfg *types.Config @@ -85,6 +110,9 @@ type Headscale struct { ACLPolicy *policy.ACLPolicy + nodes NodeMap + nodesMu sync.Mutex + nodeNotifier *notifier.Notifier oidcProvider *oidc.Provider @@ -211,6 +239,18 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) { app.DERPServer = embeddedDERPServer } + nodes, err := app.db.ListNodes() + if err != nil { + log.Fatal().Msgf("cannot read nodes from database: %s", err) + } + + app.nodes = NodeMap{ + m: map[string]*types.Node{}, + } + for idx, node := range nodes { + app.nodes.m[node.NodeKey] = &nodes[idx] + } + return &app, nil } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index e8249ad0eb..a51979e28a 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -10,7 +10,6 @@ import ( "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/util" "github.com/rs/zerolog/log" - xslices "golang.org/x/exp/slices" "tailscale.com/tailcfg" ) @@ -101,47 +100,47 @@ func (h *Headscale) handlePoll( // If the hostinfo has changed, but not the routes, just update // hostinfo and let the function continue. if hi := node.GetHostInfo(); !hi.Equal(mapRequest.Hostinfo) { - oldRoutes := hi.RoutableIPs - newRoutes := mapRequest.Hostinfo.RoutableIPs + // oldRoutes := hi.RoutableIPs + // newRoutes := mapRequest.Hostinfo.RoutableIPs node.HostInfo = types.HostInfo(*mapRequest.Hostinfo) - if !xslices.Equal(oldRoutes, newRoutes) { - err := h.db.SaveNodeRoutes(node) - if err != nil { - logErr(err, "Error processing node routes") - http.Error(writer, "", http.StatusInternalServerError) - - return - } - - if err := h.db.NodeSave(node); err != nil { - logErr(err, "Failed to persist/update node in the database") - http.Error(writer, "", http.StatusInternalServerError) - - return - } - - stateUpdate := types.StateUpdate{ - Type: types.StatePeerChanged, - ChangeNodes: types.Nodes{node}, - } - if stateUpdate.Valid() { - h.nodeNotifier.NotifyWithIgnore( - stateUpdate, - node.MachineKey) - } - - return - } + // if !xslices.Equal(oldRoutes, newRoutes) { + // err := h.db.SaveNodeRoutes(node) + // if err != nil { + // logErr(err, "Error processing node routes") + // http.Error(writer, "", http.StatusInternalServerError) + + // return + // } + + // if err := h.db.NodeSave(node); err != nil { + // logErr(err, "Failed to persist/update node in the database") + // http.Error(writer, "", http.StatusInternalServerError) + + // return + // } + + // stateUpdate := types.StateUpdate{ + // Type: types.StatePeerChanged, + // ChangeNodes: types.Nodes{node}, + // } + // if stateUpdate.Valid() { + // h.nodeNotifier.NotifyWithIgnore( + // stateUpdate, + // node.MachineKey) + // } + + // return + // } } - if err := h.db.NodeSave(node); err != nil { - logErr(err, "Failed to persist/update node in the database") - http.Error(writer, "", http.StatusInternalServerError) + // if err := h.db.NodeSave(node); err != nil { + // logErr(err, "Failed to persist/update node in the database") + // http.Error(writer, "", http.StatusInternalServerError) - return - } + // return + // } stateUpdate := types.StateUpdate{ Type: types.StatePeerChangedPatch, @@ -193,40 +192,42 @@ func (h *Headscale) handlePoll( // Only save HostInfo if changed, update routes if changed // TODO(kradalby): Remove when capver is over 68 if hi := node.GetHostInfo(); !hi.Equal(mapRequest.Hostinfo) { - oldRoutes := hi.RoutableIPs - newRoutes := mapRequest.Hostinfo.RoutableIPs + // oldRoutes := hi.RoutableIPs + // newRoutes := mapRequest.Hostinfo.RoutableIPs node.HostInfo = types.HostInfo(*mapRequest.Hostinfo) - if !xslices.Equal(oldRoutes, newRoutes) { - err := h.db.SaveNodeRoutes(node) - if err != nil { - logErr(err, "Error processing node routes") - http.Error(writer, "", http.StatusInternalServerError) + // if !xslices.Equal(oldRoutes, newRoutes) { + // err := h.db.SaveNodeRoutes(node) + // if err != nil { + // logErr(err, "Error processing node routes") + // http.Error(writer, "", http.StatusInternalServerError) - return - } - } + // return + // } + // } } - if err := h.db.NodeSave(node); err != nil { - logErr(err, "Failed to persist/update node in the database") - http.Error(writer, "", http.StatusInternalServerError) + // if err := h.db.NodeSave(node); err != nil { + // logErr(err, "Failed to persist/update node in the database") + // http.Error(writer, "", http.StatusInternalServerError) - return - } + // return + // } // When a node connects to control, list the peers it has at // that given point, further updates are kept in memory in // the Mapper, which lives for the duration of the polling // session. - peers, err := h.db.ListPeers(node) - if err != nil { - logErr(err, "Failed to list peers when opening poller") - http.Error(writer, "", http.StatusInternalServerError) + // peers, err := h.db.ListPeers(node) + // if err != nil { + // logErr(err, "Failed to list peers when opening poller") + // http.Error(writer, "", http.StatusInternalServerError) - return - } + // return + // } + + peers := h.nodes.Peers(node.NodeKey) mapp := mapper.NewMapper( node, @@ -241,18 +242,16 @@ func (h *Headscale) handlePoll( ) // update ACLRules with peer informations (to update server tags if necessary) - if h.ACLPolicy != nil { - // update routes with peer information - err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node) - if err != nil { - logErr(err, "Error running auto approved routes") - } - } + // if h.ACLPolicy != nil { + // // update routes with peer information + // err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node) + // if err != nil { + // logErr(err, "Error running auto approved routes") + // } + // } logInfo("Sending initial map") - log.Trace().Msgf("NODE FULLMAP BEGIN %s", node.Hostname) - h.populateIsOnline(peers) mapResp, err := mapp.FullMapResponse(mapRequest, node, peers, h.ACLPolicy) if err != nil { logErr(err, "Failed to create MapResponse") @@ -260,7 +259,6 @@ func (h *Headscale) handlePoll( return } - log.Trace().Msgf("NODE FULLMAP END %s", node.Hostname) // Send the client an update to make sure we send an initial mapresponse _, err = writer.Write(mapResp) @@ -290,7 +288,7 @@ func (h *Headscale) handlePoll( h.pollNetMapStreamWG.Add(1) defer h.pollNetMapStreamWG.Done() - updateChan := make(chan types.StateUpdate) + updateChan := make(chan types.StateUpdate, 8) defer closeChanWithLog(updateChan, node.Hostname, "updateChan") // Register the node's update channel @@ -356,43 +354,21 @@ func (h *Headscale) handlePoll( case types.StateFullUpdate: logInfo("Sending Full MapResponse") - peers, err = h.db.ListPeers(node) - if err != nil { - logErr(err, "Failed to list peers when opening poller") - http.Error(writer, "", http.StatusInternalServerError) - - return - } + peers = h.nodes.Peers(node.NodeKey) log.Trace().Msgf("NODE FULLMAP BEGIN %s", node.Hostname) - h.populateIsOnline(peers) data, err = mapp.FullMapResponse(mapRequest, node, peers, h.ACLPolicy) log.Trace().Msgf("NODE FULLMAP END %s", node.Hostname) case types.StatePeerChanged: logInfo("Sending Changed MapResponse") - peers, err = h.db.ListPeers(node) - if err != nil { - logErr(err, "Failed to list peers when opening poller") - http.Error(writer, "", http.StatusInternalServerError) - - return - } - - log.Trace().Msgf("NODE CHANGEDMAP BEGIN %s", node.Hostname) - h.populateIsOnline(update.ChangeNodes) + peers = h.nodes.Peers(node.NodeKey) data, err = mapp.PeerChangedResponse(mapRequest, node, peers, update.ChangeNodes, h.ACLPolicy) - log.Trace().Msgf("NODE CHANGEDMAP END %s", node.Hostname) case types.StatePeerChangedNoPolicy: logInfo("Sending PeerChangedWithoutACL MapResponse") - - log.Trace().Msgf("NODE PATCHMAP BEGIN %s", node.Hostname) - h.populateIsOnline(update.ChangeNodes) - data, err = mapp.PeerChangedWithoutACLResponse(mapRequest, node, update.ChangeNodes, h.ACLPolicy) - log.Trace().Msgf("NODE PATCHMAP BEGIN %s", node.Hostname) case types.StatePeerChangedPatch: logInfo("Sending PeerChangedPatch MapResponse") data, err = mapp.PeerChangedPatchResponse(mapRequest, node, update.ChangePatches, h.ACLPolicy) @@ -447,7 +423,7 @@ func (h *Headscale) handlePoll( go h.updateNodeOnlineStatus(false, node) // Failover the node's routes if any. - go h.db.FailoverNodeRoutesWithNotify(node) + // go h.db.FailoverNodeRoutesWithNotify(node) // The connection has been closed, so we can stop polling. return @@ -467,6 +443,7 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) { now := time.Now() node.LastSeen = &now + node.IsOnline = &online statusUpdate := types.StateUpdate{ Type: types.StatePeerChangedPatch, @@ -482,25 +459,12 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) { h.nodeNotifier.NotifyWithIgnore(statusUpdate, node.MachineKey) } - err := h.db.UpdateLastSeen(node) - if err != nil { - log.Error().Err(err).Msg("Cannot update node LastSeen") - - return - } -} - -func (h *Headscale) populateIsOnline(nodes types.Nodes) { - // Fetch a list over online nodes, dont ask individually to avoid - // locking the notifier. - onlineMap := h.nodeNotifier.IsConnectedMap() + // err := h.db.UpdateLastSeen(node) + // if err != nil { + // log.Error().Err(err).Msg("Cannot update node LastSeen") - // Populate the machines online status. - for idx := range nodes { - if online, ok := onlineMap[nodes[idx].MachineKey]; ok { - nodes[idx].IsOnline = &online - } - } + // return + // } } func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) { diff --git a/hscontrol/poll_noise.go b/hscontrol/poll_noise.go index 933350c6cb..a794d7ec96 100644 --- a/hscontrol/poll_noise.go +++ b/hscontrol/poll_noise.go @@ -7,9 +7,7 @@ import ( "net/http" "github.com/rs/zerolog/log" - "gorm.io/gorm" "tailscale.com/tailcfg" - "tailscale.com/types/key" ) // NoisePollNetMapHandler takes care of /machine/:id/map using the Noise protocol @@ -49,13 +47,8 @@ func (ns *noiseServer) NoisePollNetMapHandler( ns.nodeKey = mapRequest.NodeKey - node, err := ns.headscale.db.GetNodeByAnyKey( - ns.conn.Peer(), - mapRequest.NodeKey, - key.NodePublic{}, - ) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { + if node, ok := ns.headscale.nodes.Get(mapRequest.NodeKey.String()); !ok { + if node == nil { log.Warn(). Str("handler", "NoisePollNetMap"). Msgf("Ignoring request, cannot find node with key %s", mapRequest.NodeKey.String()) @@ -63,30 +56,25 @@ func (ns *noiseServer) NoisePollNetMapHandler( return } - log.Error(). + } else { + log.Debug(). Str("handler", "NoisePollNetMap"). - Msgf("Failed to fetch node from the database with node key: %s", mapRequest.NodeKey.String()) - http.Error(writer, "Internal error", http.StatusInternalServerError) + Str("node", node.Hostname). + Msg("A node sending a MapRequest with Noise protocol") - return - } - log.Debug(). - Str("handler", "NoisePollNetMap"). - Str("node", node.Hostname). - Msg("A node sending a MapRequest with Noise protocol") + capVer, err := parseCabailityVersion(req) + if err != nil && !errors.Is(err, ErrNoCapabilityVersion) { + log.Error(). + Caller(). + Err(err). + Msg("failed to parse capVer") + http.Error(writer, "Internal error", http.StatusInternalServerError) - capVer, err := parseCabailityVersion(req) - if err != nil && !errors.Is(err, ErrNoCapabilityVersion) { - log.Error(). - Caller(). - Err(err). - Msg("failed to parse capVer") - http.Error(writer, "Internal error", http.StatusInternalServerError) + return + } - return + // TODO(kradalby): since we are now passing capVer, we could arguably stop passing + // isNoise, and rather have a isNoise function that takes capVer + ns.headscale.handlePoll(writer, req.Context(), node, mapRequest, true, capVer) } - - // TODO(kradalby): since we are now passing capVer, we could arguably stop passing - // isNoise, and rather have a isNoise function that takes capVer - ns.headscale.handlePoll(writer, req.Context(), node, mapRequest, true, capVer) }