Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Mar 15, 2024
1 parent f4a53c4 commit 13fb074
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 36 deletions.
4 changes: 2 additions & 2 deletions hscontrol/db/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ func DeleteNode(tx *gorm.DB,
return changed, nil
}

// UpdateLastSeen sets a node's last seen field indicating that we
// SetLastSeen sets a node's last seen field indicating that we
// have recently communicating with this node.
func UpdateLastSeen(tx *gorm.DB, nodeID types.NodeID, lastSeen time.Time) error {
func SetLastSeen(tx *gorm.DB, nodeID types.NodeID, lastSeen time.Time) error {
return tx.Model(&types.Node{}).Where("id = ?", nodeID).Update("last_seen", lastSeen).Error
}

Expand Down
3 changes: 2 additions & 1 deletion hscontrol/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (m *Mapper) FullMapResponse(
mapRequest tailcfg.MapRequest,
node *types.Node,
pol *policy.ACLPolicy,
messages ...string,
) ([]byte, error) {
peers, err := m.ListPeers(node.ID)
if err != nil {
Expand All @@ -229,7 +230,7 @@ func (m *Mapper) FullMapResponse(
return nil, err
}

return m.marshalMapResponse(mapRequest, resp, node, mapRequest.Compress)
return m.marshalMapResponse(mapRequest, resp, node, mapRequest.Compress, messages...)
}

// ReadOnlyResponse returns a MapResponse for the given node.
Expand Down
27 changes: 22 additions & 5 deletions hscontrol/noise.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,15 @@ func (ns *noiseServer) NoisePollNetMapHandler(
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Warn().
Str("handler", "NoisePollNetMap").
Uint64("node.id", node.ID.Uint64()).
Msgf("Ignoring request, cannot find node with key %s", mapRequest.NodeKey.String())
http.Error(writer, "Internal error", http.StatusNotFound)

return
}
log.Error().
Str("handler", "NoisePollNetMap").
Uint64("node.id", node.ID.Uint64()).
Msgf("Failed to fetch node from the database with node key: %s", mapRequest.NodeKey.String())
http.Error(writer, "Internal error", http.StatusInternalServerError)

Expand All @@ -244,17 +246,14 @@ func (ns *noiseServer) NoisePollNetMapHandler(
Str("handler", "NoisePollNetMap").
Str("node", node.Hostname).
Int("cap_ver", int(mapRequest.Version)).
Uint64("node.id", node.ID.Uint64()).
Msg("A node sending a MapRequest with Noise protocol")

session := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node)

// If a streaming mapSession exists for this node, close it
// and start a new one.
if session.isStreaming() {
defer func() {
delete(ns.headscale.mapSessions, node.ID)
}()

log.Debug().
Caller().
Uint64("node.id", node.ID.Uint64()).
Expand All @@ -264,7 +263,7 @@ func (ns *noiseServer) NoisePollNetMapHandler(
if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok {
log.Info().
Caller().
Int("node.id", int(node.ID)).
Uint64("node.id", node.ID.Uint64()).
Msg("Node has an open streaming session, replacing")
oldSession.close()
}
Expand All @@ -279,4 +278,22 @@ func (ns *noiseServer) NoisePollNetMapHandler(
}

session.serve()

if session.isStreaming() {
log.Debug().
Caller().
Uint64("node.id", node.ID.Uint64()).
Int("cap_ver", int(mapRequest.Version)).
Msg("Aquiring lock to remove stream")
ns.headscale.mapSessionMu.Lock()

delete(ns.headscale.mapSessions, node.ID)

ns.headscale.mapSessionMu.Unlock()
log.Debug().
Caller().
Uint64("node.id", node.ID.Uint64()).
Int("cap_ver", int(mapRequest.Version)).
Msg("Releasing lock to remove stream")
}
}
16 changes: 8 additions & 8 deletions hscontrol/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ func (n *Notifier) NotifyWithIgnore(
case <-ctx.Done():
log.Error().
Err(ctx.Err()).
Uint64("node.ID", nodeID.Uint64()).
Uint64("node.id", nodeID.Uint64()).
Any("origin", ctx.Value("origin")).
Any("hostname", ctx.Value("hostname")).
Any("origin-hostname", ctx.Value("hostname")).
Msgf("update not sent, context cancelled")

return
case c <- update:
log.Trace().
Uint64("node.ID", nodeID.Uint64()).
Uint64("node.id", nodeID.Uint64()).
Any("origin", ctx.Value("origin")).
Any("hostname", ctx.Value("hostname")).
Any("origin-hostname", ctx.Value("hostname")).
Msgf("update successfully sent on chan")
}
}
Expand All @@ -148,17 +148,17 @@ func (n *Notifier) NotifyByMachineKey(
case <-ctx.Done():
log.Error().
Err(ctx.Err()).
Uint64("node.ID", nodeID.Uint64()).
Uint64("node.id", nodeID.Uint64()).
Any("origin", ctx.Value("origin")).
Any("hostname", ctx.Value("hostname")).
Any("origin-hostname", ctx.Value("hostname")).
Msgf("update not sent, context cancelled")

return
case c <- update:
log.Trace().
Uint64("node.ID", nodeID.Uint64()).
Uint64("node.id", nodeID.Uint64()).
Any("origin", ctx.Value("origin")).
Any("hostname", ctx.Value("hostname")).
Any("origin-hostname", ctx.Value("hostname")).
Msgf("update successfully sent on chan")
}
}
Expand Down
46 changes: 27 additions & 19 deletions hscontrol/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *Headscale) newMapSession(
mapper: h.mapper,

ch: updateChan,
cancelCh: make(chan struct{}),
cancelCh: make(chan struct{}, 1),

// Loggers
warnf: warnf,
Expand Down Expand Up @@ -278,7 +278,7 @@ func (m *mapSession) serve() {
// Finally, if a DERP map is the only request, send that alone.
if full {
m.tracef("Sending Full MapResponse")
data, err = m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy)
data, err = m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy, fmt.Sprintf("from mapSession: %p, stream: %t", m, m.isStreaming()))
} else if changed != nil {
m.tracef(fmt.Sprintf("Sending Changed MapResponse: %v", lastMessage))
data, err = m.mapper.PeerChangedResponse(m.req, m.node, changed, patches, m.h.ACLPolicy, lastMessage)
Expand Down Expand Up @@ -336,8 +336,10 @@ func (m *mapSession) serve() {
select {
case <-m.cancelCh:
m.tracef("ROUTE DEBUG: CLOSE RECEIVED RETURNING")
m.tracef("poll cancelled received")
return
case <-ctx.Done():
m.tracef("poll context done")
return

// Avoid infinite block that would potentially leave
Expand Down Expand Up @@ -427,30 +429,36 @@ func (m *mapSession) pollFailoverRoutes(where string, node *types.Node) {
// about change in their online/offline status.
// It takes a StateUpdateType of either StatePeerOnlineChanged or StatePeerOfflineChanged.
func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
now := time.Now()

node.LastSeen = &now
change := &tailcfg.PeerChange{
NodeID: tailcfg.NodeID(node.ID),
Online: &online,
}

if !online {
now := time.Now()

// lastSeen is only relevant if the node is disconnected.
node.LastSeen = &now
change.LastSeen = &now

err := h.db.DB.Transaction(func(tx *gorm.DB) error {
return db.SetLastSeen(tx, node.ID, *node.LastSeen)
})
if err != nil {
log.Error().Err(err).Msg("Cannot update node LastSeen")

return
}
}

ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-onlinestatus", node.Hostname)
h.nodeNotifier.NotifyWithIgnore(ctx, types.StateUpdate{
Type: types.StatePeerChangedPatch,
ChangePatches: []*tailcfg.PeerChange{
{
NodeID: tailcfg.NodeID(node.ID),
Online: &online,
LastSeen: &now,
},
change,
},
}, node.ID)

err := h.db.DB.Transaction(func(tx *gorm.DB) error {
return db.UpdateLastSeen(tx, node.ID, *node.LastSeen)
})
if err != nil {
log.Error().Err(err).Msg("Cannot update node LastSeen")

return
}
}

func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) {
Expand Down Expand Up @@ -634,7 +642,7 @@ func (m *mapSession) handleReadOnlyRequest() {
}

func logTracePeerChange(hostname string, hostinfoChange bool, change *tailcfg.PeerChange) {
trace := log.Trace().Str("node_id", change.NodeID.String()).Str("hostname", hostname)
trace := log.Trace().Uint64("node.id", uint64(change.NodeID)).Str("hostname", hostname)

if change.Key != nil {
trace = trace.Str("node_key", change.Key.ShortString())
Expand Down
8 changes: 7 additions & 1 deletion integration/general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,17 +882,23 @@ func TestPingAllByIPManyUpDown(t *testing.T) {
success := pingAllHelper(t, allClients, allAddrs)
t.Logf("%d successful pings out of %d", success, len(allClients)*len(allIps))

for range 3 {
for run := range 3 {
t.Logf("Starting DownUpPing run %d", run+1)

for _, client := range allClients {
t.Logf("taking down %q", client.Hostname())
client.Down()
}

time.Sleep(5 * time.Second)

for _, client := range allClients {
t.Logf("bringing up %q", client.Hostname())
client.Up()
}

time.Sleep(5 * time.Second)

err = scenario.WaitForTailscaleSync()
assertNoErrSync(t, err)

Expand Down

0 comments on commit 13fb074

Please sign in to comment.