Skip to content

Commit

Permalink
Merge pull request #84 from kradalby/integration-tests-ci
Browse files Browse the repository at this point in the history
Improve logic to keep nodes up to date with the network state
  • Loading branch information
juanfont authored Aug 23, 2021
2 parents 98e63d5 + 059f13f commit 74d2fe1
Show file tree
Hide file tree
Showing 9 changed files with 648 additions and 259 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/test-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: CI

on: [pull_request]

jobs:
# The "build" workflow
integration-test:
# The type of runner that the job will run on
runs-on: ubuntu-latest

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2

# Setup Go
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: "1.16.3"

- name: Run Integration tests
run: go test -tags integration -timeout 30m
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ config.json
*.key
/db.sqlite
*.sqlite3

test_output/
203 changes: 7 additions & 196 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/gin-gonic/gin"
"github.com/klauspost/compress/zstd"
"gorm.io/datatypes"
"gorm.io/gorm"
"inet.af/netaddr"
"tailscale.com/tailcfg"
Expand Down Expand Up @@ -82,14 +81,16 @@ func (h *Headscale) RegistrationHandler(c *gin.Context) {
return
}

now := time.Now().UTC()
var m Machine
if result := h.db.Preload("Namespace").First(&m, "machine_key = ?", mKey.HexString()); errors.Is(result.Error, gorm.ErrRecordNotFound) {
log.Info().Str("machine", req.Hostinfo.Hostname).Msg("New machine")
m = Machine{
Expiry: &req.Expiry,
MachineKey: mKey.HexString(),
Name: req.Hostinfo.Hostname,
NodeKey: wgkey.Key(req.NodeKey).HexString(),
Expiry: &req.Expiry,
MachineKey: mKey.HexString(),
Name: req.Hostinfo.Hostname,
NodeKey: wgkey.Key(req.NodeKey).HexString(),
LastSuccessfulUpdate: &now,
}
if err := h.db.Create(&m).Error; err != nil {
log.Error().
Expand Down Expand Up @@ -215,196 +216,6 @@ func (h *Headscale) RegistrationHandler(c *gin.Context) {
c.Data(200, "application/json; charset=utf-8", respBody)
}

// PollNetMapHandler takes care of /machine/:id/map
//
// This is the busiest endpoint, as it keeps the HTTP long poll that updates
// the clients when something in the network changes.
//
// The clients POST stuff like HostInfo and their Endpoints here, but
// only after their first request (marked with the ReadOnly field).
//
// At this moment the updates are sent in a quite horrendous way, but they kinda work.
func (h *Headscale) PollNetMapHandler(c *gin.Context) {
log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Msg("PollNetMapHandler called")
body, _ := io.ReadAll(c.Request.Body)
mKeyStr := c.Param("id")
mKey, err := wgkey.ParseHex(mKeyStr)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Err(err).
Msg("Cannot parse client key")
c.String(http.StatusBadRequest, "")
return
}
req := tailcfg.MapRequest{}
err = decode(body, &req, &mKey, h.privateKey)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Err(err).
Msg("Cannot decode message")
c.String(http.StatusBadRequest, "")
return
}

var m Machine
if result := h.db.Preload("Namespace").First(&m, "machine_key = ?", mKey.HexString()); errors.Is(result.Error, gorm.ErrRecordNotFound) {
log.Warn().
Str("handler", "PollNetMap").
Msgf("Ignoring request, cannot find machine with key %s", mKey.HexString())
c.String(http.StatusUnauthorized, "")
return
}
log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Msg("Found machine in database")

hostinfo, _ := json.Marshal(req.Hostinfo)
m.Name = req.Hostinfo.Hostname
m.HostInfo = datatypes.JSON(hostinfo)
m.DiscoKey = wgkey.Key(req.DiscoKey).HexString()
now := time.Now().UTC()

// From Tailscale client:
//
// ReadOnly is whether the client just wants to fetch the MapResponse,
// without updating their Endpoints. The Endpoints field will be ignored and
// LastSeen will not be updated and peers will not be notified of changes.
//
// The intended use is for clients to discover the DERP map at start-up
// before their first real endpoint update.
if !req.ReadOnly {
endpoints, _ := json.Marshal(req.Endpoints)
m.Endpoints = datatypes.JSON(endpoints)
m.LastSeen = &now
}
h.db.Save(&m)

data, err := h.getMapResponse(mKey, req, m)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Err(err).
Msg("Failed to get Map response")
c.String(http.StatusInternalServerError, ":(")
return
}

// We update our peers if the client is not sending ReadOnly in the MapRequest
// so we don't distribute its initial request (it comes with
// empty endpoints to peers)

// Details on the protocol can be found in https://github.com/tailscale/tailscale/blob/main/tailcfg/tailcfg.go#L696
log.Debug().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Bool("readOnly", req.ReadOnly).
Bool("omitPeers", req.OmitPeers).
Bool("stream", req.Stream).
Msg("Client map request processed")

if req.ReadOnly {
log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Client is starting up. Asking for DERP map")
c.Data(200, "application/json; charset=utf-8", *data)
return
}
if req.OmitPeers && !req.Stream {
log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Client sent endpoint update and is ok with a response without peer list")
c.Data(200, "application/json; charset=utf-8", *data)
return
} else if req.OmitPeers && req.Stream {
log.Warn().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Ignoring request, don't know how to handle it")
c.String(http.StatusBadRequest, "")
return
}

// Only create update channel if it has not been created
var update chan []byte
log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Msg("Creating or loading update channel")
if result, ok := h.clientsPolling.LoadOrStore(m.ID, make(chan []byte, 1)); ok {
update = result.(chan []byte)
}

pollData := make(chan []byte, 1)
defer close(pollData)

cancelKeepAlive := make(chan []byte, 1)
defer close(cancelKeepAlive)

log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Client is ready to access the tailnet")
log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Sending initial map")
pollData <- *data

log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Notifying peers")
// TODO: Why does this block?
go h.notifyChangesToPeers(&m)

h.PollNetMapStream(c, m, req, mKey, pollData, update, cancelKeepAlive)
log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Msg("Finished stream, closing PollNetMap session")
}

func (h *Headscale) keepAlive(cancel chan []byte, pollData chan []byte, mKey wgkey.Key, req tailcfg.MapRequest, m Machine) {
for {
select {
case <-cancel:
return

default:
data, err := h.getMapKeepAliveResponse(mKey, req, m)
if err != nil {
log.Error().
Str("func", "keepAlive").
Err(err).
Msg("Error generating the keep alive msg")
return
}

log.Debug().
Str("func", "keepAlive").
Str("machine", m.Name).
Msg("Sending keepalive")
pollData <- *data

time.Sleep(60 * time.Second)
}
}
}

func (h *Headscale) getMapResponse(mKey wgkey.Key, req tailcfg.MapRequest, m Machine) (*[]byte, error) {
log.Trace().
Str("func", "getMapResponse").
Expand Down Expand Up @@ -542,7 +353,7 @@ func (h *Headscale) handleAuthKey(c *gin.Context, db *gorm.DB, idKey wgkey.Key,
Str("func", "handleAuthKey").
Str("machine", m.Name).
Str("ip", ip.String()).
Msgf("Assining %s to %s", ip, m.Name)
Msgf("Assigning %s to %s", ip, m.Name)

m.AuthKeyID = uint(pak.ID)
m.IPAddress = ip.String()
Expand Down
43 changes: 37 additions & 6 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ type Headscale struct {
aclPolicy *ACLPolicy
aclRules *[]tailcfg.FilterRule

clientsPolling sync.Map
clientsUpdateChannels sync.Map
clientsUpdateChannelMutex sync.Mutex

lastStateChange sync.Map
}

// NewHeadscale returns the Headscale app
Expand Down Expand Up @@ -165,9 +168,18 @@ func (h *Headscale) Serve() error {
r.POST("/machine/:id", h.RegistrationHandler)
var err error

timeout := 30 * time.Second

go h.watchForKVUpdates(5000)
go h.expireEphemeralNodes(5000)

s := &http.Server{
Addr: h.cfg.Addr,
Handler: r,
ReadTimeout: timeout,
WriteTimeout: timeout,
}

if h.cfg.TLSLetsEncryptHostname != "" {
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
Expand All @@ -179,9 +191,11 @@ func (h *Headscale) Serve() error {
Cache: autocert.DirCache(h.cfg.TLSLetsEncryptCacheDir),
}
s := &http.Server{
Addr: h.cfg.Addr,
TLSConfig: m.TLSConfig(),
Handler: r,
Addr: h.cfg.Addr,
TLSConfig: m.TLSConfig(),
Handler: r,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
if h.cfg.TLSLetsEncryptChallengeType == "TLS-ALPN-01" {
// Configuration via autocert with TLS-ALPN-01 (https://tools.ietf.org/html/rfc8737)
Expand All @@ -206,12 +220,29 @@ func (h *Headscale) Serve() error {
if !strings.HasPrefix(h.cfg.ServerURL, "http://") {
log.Warn().Msg("Listening without TLS but ServerURL does not start with http://")
}
err = r.Run(h.cfg.Addr)
err = s.ListenAndServe()
} else {
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
}
err = r.RunTLS(h.cfg.Addr, h.cfg.TLSCertPath, h.cfg.TLSKeyPath)
err = s.ListenAndServeTLS(h.cfg.TLSCertPath, h.cfg.TLSKeyPath)
}
return err
}

func (h *Headscale) setLastStateChangeToNow(namespace string) {
now := time.Now().UTC()
h.lastStateChange.Store(namespace, now)
}

func (h *Headscale) getLastStateChange(namespace string) time.Time {
if wrapped, ok := h.lastStateChange.Load(namespace); ok {
lastChange, _ := wrapped.(time.Time)
return lastChange

}

now := time.Now().UTC()
h.lastStateChange.Store(namespace, now)
return now
}
2 changes: 1 addition & 1 deletion cmd/headscale/cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func LoadConfig(path string) error {

viper.SetDefault("ip_prefix", "100.64.0.0/10")

viper.SetDefault("log_level", "debug")
viper.SetDefault("log_level", "info")

err := viper.ReadInConfig()
if err != nil {
Expand Down
Loading

0 comments on commit 74d2fe1

Please sign in to comment.