Skip to content

Commit

Permalink
replace ephemeral deletion logic (#2008)
Browse files Browse the repository at this point in the history
* replace ephemeral deletion logic

this commit replaces the way we remove ephemeral nodes,
currently they are deleted in a loop and we look at last seen
time. This time is now only set when a node disconnects and
there was a bug (#2006) where nodes that had never disconnected
was deleted since they did not have a last seen.

The new logic will start an expiry timer when the node disconnects
and delete the node from the database when the timer is up.

If the node reconnects within the expiry, the timer is cancelled.

Fixes #2006

Signed-off-by: Kristoffer Dalby <[email protected]>

* use uint64 as authekyid and ptr helper in tests

Signed-off-by: Kristoffer Dalby <[email protected]>

* add test db helper

Signed-off-by: Kristoffer Dalby <[email protected]>

* add list ephemeral node func

Signed-off-by: Kristoffer Dalby <[email protected]>

* schedule ephemeral nodes for removal on startup

Signed-off-by: Kristoffer Dalby <[email protected]>

* fix gorm query for postgres

Signed-off-by: Kristoffer Dalby <[email protected]>

* add godoc

Signed-off-by: Kristoffer Dalby <[email protected]>

---------

Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby authored Jul 18, 2024
1 parent 58bd38a commit 7e62031
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 206 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
- TestPingAllByIPPublicDERP
- TestAuthKeyLogoutAndRelogin
- TestEphemeral
- TestEphemeral2006DeletedTooQuickly
- TestPingAllByHostname
- TestTaildrop
- TestResolveMagicDNS
Expand Down
65 changes: 20 additions & 45 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Headscale struct {
db *db.HSDatabase
ipAlloc *db.IPAllocator
noisePrivateKey *key.MachinePrivate
ephemeralGC *db.EphemeralGarbageCollector

DERPMap *tailcfg.DERPMap
DERPServer *derpServer.DERPServer
Expand Down Expand Up @@ -153,6 +154,12 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
return nil, err
}

app.ephemeralGC = db.NewEphemeralGarbageCollector(func(ni types.NodeID) {
if err := app.db.DeleteEphemeralNode(ni); err != nil {
log.Err(err).Uint64("node.id", ni.Uint64()).Msgf("failed to delete ephemeral node")
}
})

if cfg.OIDC.Issuer != "" {
err = app.initOIDC()
if err != nil {
Expand Down Expand Up @@ -217,47 +224,6 @@ func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, target, http.StatusFound)
}

// deleteExpireEphemeralNodes deletes ephemeral node records that have not been
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
func (h *Headscale) deleteExpireEphemeralNodes(ctx context.Context, every time.Duration) {
ticker := time.NewTicker(every)

for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
var removed []types.NodeID
var changed []types.NodeID
if err := h.db.Write(func(tx *gorm.DB) error {
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)

return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
continue
}

if removed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: removed,
})
}

if changed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: changed,
})
}
}
}
}

// expireExpiredNodes expires nodes that have an explicit expiry set
// after that expiry time has passed.
func (h *Headscale) expireExpiredNodes(ctx context.Context, every time.Duration) {
Expand Down Expand Up @@ -557,9 +523,18 @@ func (h *Headscale) Serve() error {
return errEmptyInitialDERPMap
}

expireEphemeralCtx, expireEphemeralCancel := context.WithCancel(context.Background())
defer expireEphemeralCancel()
go h.deleteExpireEphemeralNodes(expireEphemeralCtx, updateInterval)
// Start ephemeral node garbage collector and schedule all nodes
// that are already in the database and ephemeral. If they are still
// around between restarts, they will reconnect and the GC will
// be cancelled.
go h.ephemeralGC.Start()
ephmNodes, err := h.db.ListEphemeralNodes()
if err != nil {
return fmt.Errorf("failed to list ephemeral nodes: %w", err)
}
for _, node := range ephmNodes {
h.ephemeralGC.Schedule(node.ID, h.cfg.EphemeralNodeInactivityTimeout)
}

expireNodeCtx, expireNodeCancel := context.WithCancel(context.Background())
defer expireNodeCancel()
Expand Down Expand Up @@ -809,7 +784,7 @@ func (h *Headscale) Serve() error {
Msg("Received signal to stop, shutting down gracefully")

expireNodeCancel()
expireEphemeralCancel()
h.ephemeralGC.Close()

trace("waiting for netmap stream to close")
h.pollNetMapStreamWG.Wait()
Expand Down
8 changes: 4 additions & 4 deletions hscontrol/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gorm.io/gorm"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/ptr"
)

func logAuthFunc(
Expand Down Expand Up @@ -314,9 +315,8 @@ func (h *Headscale) handleAuthKey(
Msg("node was already registered before, refreshing with new auth key")

node.NodeKey = nodeKey
pakID := uint(pak.ID)
if pakID != 0 {
node.AuthKeyID = &pakID
if pak.ID != 0 {
node.AuthKeyID = ptr.To(pak.ID)
}

node.Expiry = &registerRequest.Expiry
Expand Down Expand Up @@ -394,7 +394,7 @@ func (h *Headscale) handleAuthKey(

pakID := uint(pak.ID)
if pakID != 0 {
nodeToRegister.AuthKeyID = &pakID
nodeToRegister.AuthKeyID = ptr.To(pak.ID)
}
node, err = h.db.RegisterNode(
nodeToRegister,
Expand Down
146 changes: 101 additions & 45 deletions hscontrol/db/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/patrickmn/go-cache"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog/log"
"github.com/sasha-s/go-deadlock"
"gorm.io/gorm"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
Expand Down Expand Up @@ -78,6 +79,17 @@ func ListNodes(tx *gorm.DB) (types.Nodes, error) {
return nodes, nil
}

func (hsdb *HSDatabase) ListEphemeralNodes() (types.Nodes, error) {
return Read(hsdb.DB, func(rx *gorm.DB) (types.Nodes, error) {
nodes := types.Nodes{}
if err := rx.Joins("AuthKey").Where(`"AuthKey"."ephemeral" = true`).Find(&nodes).Error; err != nil {
return nil, err
}

return nodes, nil
})
}

func listNodesByGivenName(tx *gorm.DB, givenName string) (types.Nodes, error) {
nodes := types.Nodes{}
if err := tx.
Expand Down Expand Up @@ -286,6 +298,20 @@ func DeleteNode(tx *gorm.DB,
return changed, nil
}

// DeleteEphemeralNode deletes a Node from the database, note that this method
// will remove it straight, and not notify any changes or consider any routes.
// It is intended for Ephemeral nodes.
func (hsdb *HSDatabase) DeleteEphemeralNode(
nodeID types.NodeID,
) error {
return hsdb.Write(func(tx *gorm.DB) error {
if err := tx.Unscoped().Delete(&types.Node{}, nodeID).Error; err != nil {
return err
}
return nil
})
}

// SetLastSeen sets a node's last seen field indicating that we
// have recently communicating with this node.
func SetLastSeen(tx *gorm.DB, nodeID types.NodeID, lastSeen time.Time) error {
Expand Down Expand Up @@ -660,51 +686,6 @@ func GenerateGivenName(
return givenName, nil
}

func DeleteExpiredEphemeralNodes(tx *gorm.DB,
inactivityThreshold time.Duration,
) ([]types.NodeID, []types.NodeID) {
users, err := ListUsers(tx)
if err != nil {
return nil, nil
}

var expired []types.NodeID
var changedNodes []types.NodeID
for _, user := range users {
nodes, err := ListNodesByUser(tx, user.Name)
if err != nil {
return nil, nil
}

for idx, node := range nodes {
if node.IsEphemeral() && node.LastSeen != nil &&
time.Now().
After(node.LastSeen.Add(inactivityThreshold)) {
expired = append(expired, node.ID)

log.Info().
Str("node", node.Hostname).
Msg("Ephemeral client removed from database")

// empty isConnected map as ephemeral nodes are not routes
changed, err := DeleteNode(tx, nodes[idx], nil)
if err != nil {
log.Error().
Err(err).
Str("node", node.Hostname).
Msg("🤮 Cannot delete ephemeral node from the database")
}

changedNodes = append(changedNodes, changed...)
}
}

// TODO(kradalby): needs to be moved out of transaction
}

return expired, changedNodes
}

func ExpireExpiredNodes(tx *gorm.DB,
lastCheck time.Time,
) (time.Time, types.StateUpdate, bool) {
Expand Down Expand Up @@ -737,3 +718,78 @@ func ExpireExpiredNodes(tx *gorm.DB,

return started, types.StateUpdate{}, false
}

// EphemeralGarbageCollector is a garbage collector that will delete nodes after
// a certain amount of time.
// It is used to delete ephemeral nodes that have disconnected and should be
// cleaned up.
type EphemeralGarbageCollector struct {
mu deadlock.Mutex

deleteFunc func(types.NodeID)
toBeDeleted map[types.NodeID]*time.Timer

deleteCh chan types.NodeID
cancelCh chan struct{}
}

// NewEphemeralGarbageCollector creates a new EphemeralGarbageCollector, it takes
// a deleteFunc that will be called when a node is scheduled for deletion.
func NewEphemeralGarbageCollector(deleteFunc func(types.NodeID)) *EphemeralGarbageCollector {
return &EphemeralGarbageCollector{
toBeDeleted: make(map[types.NodeID]*time.Timer),
deleteCh: make(chan types.NodeID, 10),
cancelCh: make(chan struct{}),
deleteFunc: deleteFunc,
}
}

// Close stops the garbage collector.
func (e *EphemeralGarbageCollector) Close() {
e.cancelCh <- struct{}{}
}

// Schedule schedules a node for deletion after the expiry duration.
func (e *EphemeralGarbageCollector) Schedule(nodeID types.NodeID, expiry time.Duration) {
e.mu.Lock()
defer e.mu.Unlock()

timer := time.NewTimer(expiry)
e.toBeDeleted[nodeID] = timer

go func() {
select {
case _, ok := <-timer.C:
if ok {
e.deleteCh <- nodeID
}
}
}()
}

// Cancel cancels the deletion of a node.
func (e *EphemeralGarbageCollector) Cancel(nodeID types.NodeID) {
e.mu.Lock()
defer e.mu.Unlock()

if timer, ok := e.toBeDeleted[nodeID]; ok {
timer.Stop()
delete(e.toBeDeleted, nodeID)
}
}

// Start starts the garbage collector.
func (e *EphemeralGarbageCollector) Start() {
for {
select {
case <-e.cancelCh:
return
case nodeID := <-e.deleteCh:
e.mu.Lock()
delete(e.toBeDeleted, nodeID)
e.mu.Unlock()

go e.deleteFunc(nodeID)
}
}
}
Loading

0 comments on commit 7e62031

Please sign in to comment.