Skip to content

Commit

Permalink
replace ephemeral deletion logic
Browse files Browse the repository at this point in the history
this commit replaces the way we remove ephemeral nodes,
currently they are deleted in a loop and we look at last seen
time. This time is now only set when a node disconnects and
there was a bug (juanfont#2006) where nodes that had never disconnected
was deleted since they did not have a last seen.

The new logic will start an expiry timer when the node disconnects
and delete the node from the database when the timer is up.

If the node reconnects within the expiry, the timer is cancelled.

Fixes juanfont#2006

Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Jul 12, 2024
1 parent eb1591d commit 157930e
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 162 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
- TestPingAllByIPPublicDERP
- TestAuthKeyLogoutAndRelogin
- TestEphemeral
- TestEphemeral2006DeletedTooQuickly
- TestPingAllByHostname
- TestTaildrop
- TestResolveMagicDNS
Expand Down
54 changes: 9 additions & 45 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Headscale struct {
db *db.HSDatabase
ipAlloc *db.IPAllocator
noisePrivateKey *key.MachinePrivate
ephemeralGC *db.EphemeralGarbageCollector

DERPMap *tailcfg.DERPMap
DERPServer *derpServer.DERPServer
Expand Down Expand Up @@ -152,6 +153,12 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
return nil, err
}

app.ephemeralGC = db.NewEphemeralGarbageCollector(func(ni types.NodeID) {
if err := app.db.DeleteEphemeralNode(ni); err != nil {
log.Err(err).Uint64("node.id", ni.Uint64()).Msgf("failed to delete ephemeral node")
}
})

if cfg.OIDC.Issuer != "" {
err = app.initOIDC()
if err != nil {
Expand Down Expand Up @@ -216,47 +223,6 @@ func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, target, http.StatusFound)
}

// deleteExpireEphemeralNodes deletes ephemeral node records that have not been
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
func (h *Headscale) deleteExpireEphemeralNodes(ctx context.Context, every time.Duration) {
ticker := time.NewTicker(every)

for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
var removed []types.NodeID
var changed []types.NodeID
if err := h.db.Write(func(tx *gorm.DB) error {
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)

return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
continue
}

if removed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: removed,
})
}

if changed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: changed,
})
}
}
}
}

// expireExpiredNodes expires nodes that have an explicit expiry set
// after that expiry time has passed.
func (h *Headscale) expireExpiredNodes(ctx context.Context, every time.Duration) {
Expand Down Expand Up @@ -552,9 +518,7 @@ func (h *Headscale) Serve() error {
return errEmptyInitialDERPMap
}

expireEphemeralCtx, expireEphemeralCancel := context.WithCancel(context.Background())
defer expireEphemeralCancel()
go h.deleteExpireEphemeralNodes(expireEphemeralCtx, updateInterval)
go h.ephemeralGC.Start()

expireNodeCtx, expireNodeCancel := context.WithCancel(context.Background())
defer expireNodeCancel()
Expand Down Expand Up @@ -810,7 +774,7 @@ func (h *Headscale) Serve() error {
Msg("Received signal to stop, shutting down gracefully")

expireNodeCancel()
expireEphemeralCancel()
h.ephemeralGC.Close()

trace("waiting for netmap stream to close")
h.pollNetMapStreamWG.Wait()
Expand Down
125 changes: 80 additions & 45 deletions hscontrol/db/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/netip"
"sort"
"sync"
"time"

"github.com/juanfont/headscale/hscontrol/types"
Expand Down Expand Up @@ -286,6 +287,20 @@ func DeleteNode(tx *gorm.DB,
return changed, nil
}

// DeleteEphemeralNode deletes a Node from the database, note that this method
// will remove it straight, and not notify any changes or consider any routes.
// It is intended for Ephemeral nodes.
func (hsdb *HSDatabase) DeleteEphemeralNode(
nodeID types.NodeID,
) error {
return hsdb.Write(func(tx *gorm.DB) error {
if err := tx.Unscoped().Delete(&types.Node{}, nodeID).Error; err != nil {
return err
}
return nil
})
}

// SetLastSeen sets a node's last seen field indicating that we
// have recently communicating with this node.
func SetLastSeen(tx *gorm.DB, nodeID types.NodeID, lastSeen time.Time) error {
Expand Down Expand Up @@ -660,51 +675,6 @@ func GenerateGivenName(
return givenName, nil
}

func DeleteExpiredEphemeralNodes(tx *gorm.DB,
inactivityThreshold time.Duration,
) ([]types.NodeID, []types.NodeID) {
users, err := ListUsers(tx)
if err != nil {
return nil, nil
}

var expired []types.NodeID
var changedNodes []types.NodeID
for _, user := range users {
nodes, err := ListNodesByUser(tx, user.Name)
if err != nil {
return nil, nil
}

for idx, node := range nodes {
if node.IsEphemeral() && node.LastSeen != nil &&
time.Now().
After(node.LastSeen.Add(inactivityThreshold)) {
expired = append(expired, node.ID)

log.Info().
Str("node", node.Hostname).
Msg("Ephemeral client removed from database")

// empty isConnected map as ephemeral nodes are not routes
changed, err := DeleteNode(tx, nodes[idx], nil)
if err != nil {
log.Error().
Err(err).
Str("node", node.Hostname).
Msg("🤮 Cannot delete ephemeral node from the database")
}

changedNodes = append(changedNodes, changed...)
}
}

// TODO(kradalby): needs to be moved out of transaction
}

return expired, changedNodes
}

func ExpireExpiredNodes(tx *gorm.DB,
lastCheck time.Time,
) (time.Time, types.StateUpdate, bool) {
Expand Down Expand Up @@ -737,3 +707,68 @@ func ExpireExpiredNodes(tx *gorm.DB,

return started, types.StateUpdate{}, false
}

type EphemeralGarbageCollector struct {
mu sync.Mutex

deleteFunc func(types.NodeID)
toBeDeleted map[types.NodeID]*time.Timer

deleteCh chan types.NodeID
cancelCh chan struct{}
}

func NewEphemeralGarbageCollector(deleteFunc func(types.NodeID)) *EphemeralGarbageCollector {
return &EphemeralGarbageCollector{
toBeDeleted: make(map[types.NodeID]*time.Timer),
deleteCh: make(chan types.NodeID, 10),
deleteFunc: deleteFunc,
}
}

func (e *EphemeralGarbageCollector) Close() {
e.cancelCh <- struct{}{}
}

func (e *EphemeralGarbageCollector) Schedule(nodeID types.NodeID, expiry time.Duration) {
e.mu.Lock()
defer e.mu.Unlock()

timer := time.NewTimer(expiry)
e.toBeDeleted[nodeID] = timer

go func() {
select {
case _, ok := <-timer.C:
if ok {
e.deleteCh <- nodeID
}
}
}()
}

func (e *EphemeralGarbageCollector) Cancel(nodeID types.NodeID) {
e.mu.Lock()
defer e.mu.Unlock()

if timer, ok := e.toBeDeleted[nodeID]; ok {
timer.Stop()
delete(e.toBeDeleted, nodeID)
}
}

func (e *EphemeralGarbageCollector) Start() {
for {
select {
case <-e.cancelCh:
return
case nodeID := <-e.deleteCh:
// TODO(kradalby): deadlock here?
e.mu.Lock()
delete(e.toBeDeleted, nodeID)
e.mu.Unlock()

go e.deleteFunc(nodeID)
}
}
}
24 changes: 24 additions & 0 deletions hscontrol/db/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/juanfont/headscale/hscontrol/policy"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
Expand Down Expand Up @@ -599,3 +600,26 @@ func (s *Suite) TestAutoApproveRoutes(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(enabledRoutes, check.HasLen, 4)
}

func TestEphemeralGarbageCollector(t *testing.T) {
want := []types.NodeID{1, 3}
got := []types.NodeID{}

e := NewEphemeralGarbageCollector(func(ni types.NodeID) {
got = append(got, ni)
})
go e.Start()

e.Schedule(1, 1*time.Second)
e.Schedule(2, 2*time.Second)
e.Schedule(3, 3*time.Second)
e.Schedule(4, 4*time.Second)
e.Cancel(2)
e.Cancel(4)

time.Sleep(10 * time.Second)

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("wrong nodes deleted, unexpected result (-want +got):\n%s", diff)
}
}
72 changes: 0 additions & 72 deletions hscontrol/db/preauth_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
"gopkg.in/check.v1"
"gorm.io/gorm"
)

func (*Suite) TestCreatePreAuthKey(c *check.C) {
Expand Down Expand Up @@ -127,77 +126,6 @@ func (*Suite) TestNotReusableNotBeingUsedKey(c *check.C) {
c.Assert(key.ID, check.Equals, pak.ID)
}

func (*Suite) TestEphemeralKeyReusable(c *check.C) {
user, err := db.CreateUser("test7")
c.Assert(err, check.IsNil)

pak, err := db.CreatePreAuthKey(user.Name, true, true, nil, nil)
c.Assert(err, check.IsNil)

now := time.Now().Add(-time.Second * 30)
pakID := uint(pak.ID)
node := types.Node{
ID: 0,
Hostname: "testest",
UserID: user.ID,
RegisterMethod: util.RegisterMethodAuthKey,
LastSeen: &now,
AuthKeyID: &pakID,
}
trx := db.DB.Save(&node)
c.Assert(trx.Error, check.IsNil)

_, err = db.ValidatePreAuthKey(pak.Key)
c.Assert(err, check.IsNil)

_, err = db.getNode("test7", "testest")
c.Assert(err, check.IsNil)

db.Write(func(tx *gorm.DB) error {
DeleteExpiredEphemeralNodes(tx, time.Second*20)
return nil
})

// The machine record should have been deleted
_, err = db.getNode("test7", "testest")
c.Assert(err, check.NotNil)
}

func (*Suite) TestEphemeralKeyNotReusable(c *check.C) {
user, err := db.CreateUser("test7")
c.Assert(err, check.IsNil)

pak, err := db.CreatePreAuthKey(user.Name, false, true, nil, nil)
c.Assert(err, check.IsNil)

now := time.Now().Add(-time.Second * 30)
pakId := uint(pak.ID)
node := types.Node{
ID: 0,
Hostname: "testest",
UserID: user.ID,
RegisterMethod: util.RegisterMethodAuthKey,
LastSeen: &now,
AuthKeyID: &pakId,
}
db.DB.Save(&node)

_, err = db.ValidatePreAuthKey(pak.Key)
c.Assert(err, check.NotNil)

_, err = db.getNode("test7", "testest")
c.Assert(err, check.IsNil)

db.Write(func(tx *gorm.DB) error {
DeleteExpiredEphemeralNodes(tx, time.Second*20)
return nil
})

// The machine record should have been deleted
_, err = db.getNode("test7", "testest")
c.Assert(err, check.NotNil)
}

func (*Suite) TestExpirePreauthKey(c *check.C) {
user, err := db.CreateUser("test3")
c.Assert(err, check.IsNil)
Expand Down
Loading

0 comments on commit 157930e

Please sign in to comment.