diff --git a/hscontrol/db/node.go b/hscontrol/db/node.go index 913d7bf95b..61c952a027 100644 --- a/hscontrol/db/node.go +++ b/hscontrol/db/node.go @@ -289,9 +289,9 @@ func DeleteNode(tx *gorm.DB, return changed, nil } -// UpdateLastSeen sets a node's last seen field indicating that we +// SetLastSeen sets a node's last seen field indicating that we // have recently communicating with this node. -func UpdateLastSeen(tx *gorm.DB, nodeID types.NodeID, lastSeen time.Time) error { +func SetLastSeen(tx *gorm.DB, nodeID types.NodeID, lastSeen time.Time) error { return tx.Model(&types.Node{}).Where("id = ?", nodeID).Update("last_seen", lastSeen).Error } diff --git a/hscontrol/mapper/mapper.go b/hscontrol/mapper/mapper.go index 395eb3ccbf..3a92cae65f 100644 --- a/hscontrol/mapper/mapper.go +++ b/hscontrol/mapper/mapper.go @@ -218,6 +218,7 @@ func (m *Mapper) FullMapResponse( mapRequest tailcfg.MapRequest, node *types.Node, pol *policy.ACLPolicy, + messages ...string, ) ([]byte, error) { peers, err := m.ListPeers(node.ID) if err != nil { @@ -229,7 +230,7 @@ func (m *Mapper) FullMapResponse( return nil, err } - return m.marshalMapResponse(mapRequest, resp, node, mapRequest.Compress) + return m.marshalMapResponse(mapRequest, resp, node, mapRequest.Compress, messages...) } // ReadOnlyResponse returns a MapResponse for the given node. diff --git a/hscontrol/noise.go b/hscontrol/noise.go index 2f4b62c9eb..3debd3785f 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -228,6 +228,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( if errors.Is(err, gorm.ErrRecordNotFound) { log.Warn(). Str("handler", "NoisePollNetMap"). + Uint64("node.id", node.ID.Uint64()). Msgf("Ignoring request, cannot find node with key %s", mapRequest.NodeKey.String()) http.Error(writer, "Internal error", http.StatusNotFound) @@ -235,6 +236,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( } log.Error(). Str("handler", "NoisePollNetMap"). + Uint64("node.id", node.ID.Uint64()). Msgf("Failed to fetch node from the database with node key: %s", mapRequest.NodeKey.String()) http.Error(writer, "Internal error", http.StatusInternalServerError) @@ -244,6 +246,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( Str("handler", "NoisePollNetMap"). Str("node", node.Hostname). Int("cap_ver", int(mapRequest.Version)). + Uint64("node.id", node.ID.Uint64()). Msg("A node sending a MapRequest with Noise protocol") session := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node) @@ -251,10 +254,6 @@ func (ns *noiseServer) NoisePollNetMapHandler( // If a streaming mapSession exists for this node, close it // and start a new one. if session.isStreaming() { - defer func() { - delete(ns.headscale.mapSessions, node.ID) - }() - log.Debug(). Caller(). Uint64("node.id", node.ID.Uint64()). @@ -264,7 +263,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok { log.Info(). Caller(). - Int("node.id", int(node.ID)). + Uint64("node.id", node.ID.Uint64()). Msg("Node has an open streaming session, replacing") oldSession.close() } @@ -279,4 +278,22 @@ func (ns *noiseServer) NoisePollNetMapHandler( } session.serve() + + if session.isStreaming() { + log.Debug(). + Caller(). + Uint64("node.id", node.ID.Uint64()). + Int("cap_ver", int(mapRequest.Version)). + Msg("Aquiring lock to remove stream") + ns.headscale.mapSessionMu.Lock() + + delete(ns.headscale.mapSessions, node.ID) + + ns.headscale.mapSessionMu.Unlock() + log.Debug(). + Caller(). + Uint64("node.id", node.ID.Uint64()). + Int("cap_ver", int(mapRequest.Version)). + Msg("Releasing lock to remove stream") + } } diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index d572d5c904..61f3594e86 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -113,17 +113,17 @@ func (n *Notifier) NotifyWithIgnore( case <-ctx.Done(): log.Error(). Err(ctx.Err()). - Uint64("node.ID", nodeID.Uint64()). + Uint64("node.id", nodeID.Uint64()). Any("origin", ctx.Value("origin")). - Any("hostname", ctx.Value("hostname")). + Any("origin-hostname", ctx.Value("hostname")). Msgf("update not sent, context cancelled") return case c <- update: log.Trace(). - Uint64("node.ID", nodeID.Uint64()). + Uint64("node.id", nodeID.Uint64()). Any("origin", ctx.Value("origin")). - Any("hostname", ctx.Value("hostname")). + Any("origin-hostname", ctx.Value("hostname")). Msgf("update successfully sent on chan") } } @@ -148,17 +148,17 @@ func (n *Notifier) NotifyByMachineKey( case <-ctx.Done(): log.Error(). Err(ctx.Err()). - Uint64("node.ID", nodeID.Uint64()). + Uint64("node.id", nodeID.Uint64()). Any("origin", ctx.Value("origin")). - Any("hostname", ctx.Value("hostname")). + Any("origin-hostname", ctx.Value("hostname")). Msgf("update not sent, context cancelled") return case c <- update: log.Trace(). - Uint64("node.ID", nodeID.Uint64()). + Uint64("node.id", nodeID.Uint64()). Any("origin", ctx.Value("origin")). - Any("hostname", ctx.Value("hostname")). + Any("origin-hostname", ctx.Value("hostname")). Msgf("update successfully sent on chan") } } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index d771c1eb8a..b14c54fd54 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -76,7 +76,7 @@ func (h *Headscale) newMapSession( mapper: h.mapper, ch: updateChan, - cancelCh: make(chan struct{}), + cancelCh: make(chan struct{}, 1), // Loggers warnf: warnf, @@ -278,7 +278,7 @@ func (m *mapSession) serve() { // Finally, if a DERP map is the only request, send that alone. if full { m.tracef("Sending Full MapResponse") - data, err = m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy) + data, err = m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy, fmt.Sprintf("from mapSession: %p, stream: %t", m, m.isStreaming())) } else if changed != nil { m.tracef(fmt.Sprintf("Sending Changed MapResponse: %v", lastMessage)) data, err = m.mapper.PeerChangedResponse(m.req, m.node, changed, patches, m.h.ACLPolicy, lastMessage) @@ -336,8 +336,10 @@ func (m *mapSession) serve() { select { case <-m.cancelCh: m.tracef("ROUTE DEBUG: CLOSE RECEIVED RETURNING") + m.tracef("poll cancelled received") return case <-ctx.Done(): + m.tracef("poll context done") return // Avoid infinite block that would potentially leave @@ -427,30 +429,36 @@ func (m *mapSession) pollFailoverRoutes(where string, node *types.Node) { // about change in their online/offline status. // It takes a StateUpdateType of either StatePeerOnlineChanged or StatePeerOfflineChanged. func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) { - now := time.Now() - node.LastSeen = &now + change := &tailcfg.PeerChange{ + NodeID: tailcfg.NodeID(node.ID), + Online: &online, + } + + if !online { + now := time.Now() + + // lastSeen is only relevant if the node is disconnected. + node.LastSeen = &now + change.LastSeen = &now + + err := h.db.DB.Transaction(func(tx *gorm.DB) error { + return db.SetLastSeen(tx, node.ID, *node.LastSeen) + }) + if err != nil { + log.Error().Err(err).Msg("Cannot update node LastSeen") + + return + } + } ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-onlinestatus", node.Hostname) h.nodeNotifier.NotifyWithIgnore(ctx, types.StateUpdate{ Type: types.StatePeerChangedPatch, ChangePatches: []*tailcfg.PeerChange{ - { - NodeID: tailcfg.NodeID(node.ID), - Online: &online, - LastSeen: &now, - }, + change, }, }, node.ID) - - err := h.db.DB.Transaction(func(tx *gorm.DB) error { - return db.UpdateLastSeen(tx, node.ID, *node.LastSeen) - }) - if err != nil { - log.Error().Err(err).Msg("Cannot update node LastSeen") - - return - } } func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) { @@ -634,7 +642,7 @@ func (m *mapSession) handleReadOnlyRequest() { } func logTracePeerChange(hostname string, hostinfoChange bool, change *tailcfg.PeerChange) { - trace := log.Trace().Str("node_id", change.NodeID.String()).Str("hostname", hostname) + trace := log.Trace().Uint64("node.id", uint64(change.NodeID)).Str("hostname", hostname) if change.Key != nil { trace = trace.Str("node_key", change.Key.ShortString()) diff --git a/integration/general_test.go b/integration/general_test.go index d42c780d22..975b4c21fb 100644 --- a/integration/general_test.go +++ b/integration/general_test.go @@ -882,17 +882,23 @@ func TestPingAllByIPManyUpDown(t *testing.T) { success := pingAllHelper(t, allClients, allAddrs) t.Logf("%d successful pings out of %d", success, len(allClients)*len(allIps)) - for range 3 { + for run := range 3 { + t.Logf("Starting DownUpPing run %d", run+1) + for _, client := range allClients { + t.Logf("taking down %q", client.Hostname()) client.Down() } time.Sleep(5 * time.Second) for _, client := range allClients { + t.Logf("bringing up %q", client.Hostname()) client.Up() } + time.Sleep(5 * time.Second) + err = scenario.WaitForTailscaleSync() assertNoErrSync(t, err)