Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Nov 23, 2023
1 parent f1afce7 commit 3230c73
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 141 deletions.
40 changes: 40 additions & 0 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,31 @@ const (
registerCacheCleanup = time.Minute * 20
)

type NodeMap struct {
m map[string]*types.Node
}

func (nk *NodeMap) Get(nKey string) (*types.Node, bool) {
if node, ok := nk.m[nKey]; ok {
return node, true
}

return nil, false
}

func (nk *NodeMap) Peers(nKey string) []*types.Node {
var ret []*types.Node
for key, node := range nk.m {
if key == nKey {
continue
}

ret = append(ret, node)
}

return ret
}

// Headscale represents the base app of the service.
type Headscale struct {
cfg *types.Config
Expand All @@ -85,6 +110,9 @@ type Headscale struct {

ACLPolicy *policy.ACLPolicy

nodes NodeMap
nodesMu sync.Mutex

nodeNotifier *notifier.Notifier

oidcProvider *oidc.Provider
Expand Down Expand Up @@ -211,6 +239,18 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
app.DERPServer = embeddedDERPServer
}

nodes, err := app.db.ListNodes()
if err != nil {
log.Fatal().Msgf("cannot read nodes from database: %s", err)
}

app.nodes = NodeMap{
m: map[string]*types.Node{},
}
for idx, node := range nodes {
app.nodes.m[node.NodeKey] = &nodes[idx]
}

return &app, nil
}

Expand Down
186 changes: 75 additions & 111 deletions hscontrol/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
"github.com/rs/zerolog/log"
xslices "golang.org/x/exp/slices"
"tailscale.com/tailcfg"
)

Expand Down Expand Up @@ -101,47 +100,47 @@ func (h *Headscale) handlePoll(
// If the hostinfo has changed, but not the routes, just update
// hostinfo and let the function continue.
if hi := node.GetHostInfo(); !hi.Equal(mapRequest.Hostinfo) {
oldRoutes := hi.RoutableIPs
newRoutes := mapRequest.Hostinfo.RoutableIPs
// oldRoutes := hi.RoutableIPs
// newRoutes := mapRequest.Hostinfo.RoutableIPs

node.HostInfo = types.HostInfo(*mapRequest.Hostinfo)

if !xslices.Equal(oldRoutes, newRoutes) {
err := h.db.SaveNodeRoutes(node)
if err != nil {
logErr(err, "Error processing node routes")
http.Error(writer, "", http.StatusInternalServerError)

return
}

if err := h.db.NodeSave(node); err != nil {
logErr(err, "Failed to persist/update node in the database")
http.Error(writer, "", http.StatusInternalServerError)

return
}

stateUpdate := types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: types.Nodes{node},
}
if stateUpdate.Valid() {
h.nodeNotifier.NotifyWithIgnore(
stateUpdate,
node.MachineKey)
}

return
}
// if !xslices.Equal(oldRoutes, newRoutes) {
// err := h.db.SaveNodeRoutes(node)
// if err != nil {
// logErr(err, "Error processing node routes")
// http.Error(writer, "", http.StatusInternalServerError)

// return
// }

// if err := h.db.NodeSave(node); err != nil {
// logErr(err, "Failed to persist/update node in the database")
// http.Error(writer, "", http.StatusInternalServerError)

// return
// }

// stateUpdate := types.StateUpdate{
// Type: types.StatePeerChanged,
// ChangeNodes: types.Nodes{node},
// }
// if stateUpdate.Valid() {
// h.nodeNotifier.NotifyWithIgnore(
// stateUpdate,
// node.MachineKey)
// }

// return
// }
}

if err := h.db.NodeSave(node); err != nil {
logErr(err, "Failed to persist/update node in the database")
http.Error(writer, "", http.StatusInternalServerError)
// if err := h.db.NodeSave(node); err != nil {
// logErr(err, "Failed to persist/update node in the database")
// http.Error(writer, "", http.StatusInternalServerError)

return
}
// return
// }

stateUpdate := types.StateUpdate{
Type: types.StatePeerChangedPatch,
Expand Down Expand Up @@ -193,40 +192,42 @@ func (h *Headscale) handlePoll(
// Only save HostInfo if changed, update routes if changed
// TODO(kradalby): Remove when capver is over 68
if hi := node.GetHostInfo(); !hi.Equal(mapRequest.Hostinfo) {
oldRoutes := hi.RoutableIPs
newRoutes := mapRequest.Hostinfo.RoutableIPs
// oldRoutes := hi.RoutableIPs
// newRoutes := mapRequest.Hostinfo.RoutableIPs

node.HostInfo = types.HostInfo(*mapRequest.Hostinfo)

if !xslices.Equal(oldRoutes, newRoutes) {
err := h.db.SaveNodeRoutes(node)
if err != nil {
logErr(err, "Error processing node routes")
http.Error(writer, "", http.StatusInternalServerError)
// if !xslices.Equal(oldRoutes, newRoutes) {
// err := h.db.SaveNodeRoutes(node)
// if err != nil {
// logErr(err, "Error processing node routes")
// http.Error(writer, "", http.StatusInternalServerError)

return
}
}
// return
// }
// }
}

if err := h.db.NodeSave(node); err != nil {
logErr(err, "Failed to persist/update node in the database")
http.Error(writer, "", http.StatusInternalServerError)
// if err := h.db.NodeSave(node); err != nil {
// logErr(err, "Failed to persist/update node in the database")
// http.Error(writer, "", http.StatusInternalServerError)

return
}
// return
// }

// When a node connects to control, list the peers it has at
// that given point, further updates are kept in memory in
// the Mapper, which lives for the duration of the polling
// session.
peers, err := h.db.ListPeers(node)
if err != nil {
logErr(err, "Failed to list peers when opening poller")
http.Error(writer, "", http.StatusInternalServerError)
// peers, err := h.db.ListPeers(node)
// if err != nil {
// logErr(err, "Failed to list peers when opening poller")
// http.Error(writer, "", http.StatusInternalServerError)

return
}
// return
// }

peers := h.nodes.Peers(node.NodeKey)

mapp := mapper.NewMapper(
node,
Expand All @@ -241,26 +242,23 @@ func (h *Headscale) handlePoll(
)

// update ACLRules with peer informations (to update server tags if necessary)
if h.ACLPolicy != nil {
// update routes with peer information
err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
if err != nil {
logErr(err, "Error running auto approved routes")
}
}
// if h.ACLPolicy != nil {
// // update routes with peer information
// err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
// if err != nil {
// logErr(err, "Error running auto approved routes")
// }
// }

logInfo("Sending initial map")

log.Trace().Msgf("NODE FULLMAP BEGIN %s", node.Hostname)
h.populateIsOnline(peers)
mapResp, err := mapp.FullMapResponse(mapRequest, node, peers, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)

return
}
log.Trace().Msgf("NODE FULLMAP END %s", node.Hostname)

// Send the client an update to make sure we send an initial mapresponse
_, err = writer.Write(mapResp)
Expand Down Expand Up @@ -290,7 +288,7 @@ func (h *Headscale) handlePoll(
h.pollNetMapStreamWG.Add(1)
defer h.pollNetMapStreamWG.Done()

updateChan := make(chan types.StateUpdate)
updateChan := make(chan types.StateUpdate, 8)
defer closeChanWithLog(updateChan, node.Hostname, "updateChan")

// Register the node's update channel
Expand Down Expand Up @@ -356,43 +354,21 @@ func (h *Headscale) handlePoll(
case types.StateFullUpdate:
logInfo("Sending Full MapResponse")

peers, err = h.db.ListPeers(node)
if err != nil {
logErr(err, "Failed to list peers when opening poller")
http.Error(writer, "", http.StatusInternalServerError)

return
}
peers = h.nodes.Peers(node.NodeKey)

log.Trace().Msgf("NODE FULLMAP BEGIN %s", node.Hostname)
h.populateIsOnline(peers)

data, err = mapp.FullMapResponse(mapRequest, node, peers, h.ACLPolicy)
log.Trace().Msgf("NODE FULLMAP END %s", node.Hostname)
case types.StatePeerChanged:
logInfo("Sending Changed MapResponse")

peers, err = h.db.ListPeers(node)
if err != nil {
logErr(err, "Failed to list peers when opening poller")
http.Error(writer, "", http.StatusInternalServerError)

return
}

log.Trace().Msgf("NODE CHANGEDMAP BEGIN %s", node.Hostname)
h.populateIsOnline(update.ChangeNodes)
peers = h.nodes.Peers(node.NodeKey)

data, err = mapp.PeerChangedResponse(mapRequest, node, peers, update.ChangeNodes, h.ACLPolicy)
log.Trace().Msgf("NODE CHANGEDMAP END %s", node.Hostname)
case types.StatePeerChangedNoPolicy:
logInfo("Sending PeerChangedWithoutACL MapResponse")

log.Trace().Msgf("NODE PATCHMAP BEGIN %s", node.Hostname)
h.populateIsOnline(update.ChangeNodes)

data, err = mapp.PeerChangedWithoutACLResponse(mapRequest, node, update.ChangeNodes, h.ACLPolicy)
log.Trace().Msgf("NODE PATCHMAP BEGIN %s", node.Hostname)
case types.StatePeerChangedPatch:
logInfo("Sending PeerChangedPatch MapResponse")
data, err = mapp.PeerChangedPatchResponse(mapRequest, node, update.ChangePatches, h.ACLPolicy)
Expand Down Expand Up @@ -447,7 +423,7 @@ func (h *Headscale) handlePoll(
go h.updateNodeOnlineStatus(false, node)

// Failover the node's routes if any.
go h.db.FailoverNodeRoutesWithNotify(node)
// go h.db.FailoverNodeRoutesWithNotify(node)

// The connection has been closed, so we can stop polling.
return
Expand All @@ -467,6 +443,7 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
now := time.Now()

node.LastSeen = &now
node.IsOnline = &online

statusUpdate := types.StateUpdate{
Type: types.StatePeerChangedPatch,
Expand All @@ -482,25 +459,12 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
h.nodeNotifier.NotifyWithIgnore(statusUpdate, node.MachineKey)
}

err := h.db.UpdateLastSeen(node)
if err != nil {
log.Error().Err(err).Msg("Cannot update node LastSeen")

return
}
}

func (h *Headscale) populateIsOnline(nodes types.Nodes) {
// Fetch a list over online nodes, dont ask individually to avoid
// locking the notifier.
onlineMap := h.nodeNotifier.IsConnectedMap()
// err := h.db.UpdateLastSeen(node)
// if err != nil {
// log.Error().Err(err).Msg("Cannot update node LastSeen")

// Populate the machines online status.
for idx := range nodes {
if online, ok := onlineMap[nodes[idx].MachineKey]; ok {
nodes[idx].IsOnline = &online
}
}
// return
// }
}

func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) {
Expand Down
Loading

0 comments on commit 3230c73

Please sign in to comment.