Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: retry build processing on failed server comms #594

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 100 additions & 46 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@

"github.com/sirupsen/logrus"

"github.com/go-vela/sdk-go/vela"
api "github.com/go-vela/server/api/types"
"github.com/go-vela/server/queue/models"
"github.com/go-vela/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
Expand All @@ -24,71 +26,123 @@
// exec is a helper function to poll the queue
// and execute Vela pipelines for the Worker.
//
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker

Check failure on line 29 in cmd/vela-worker/exec.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] cmd/vela-worker/exec.go#L29

directive `//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker` is unused for linter "nilerr" (nolintlint)
Raw output
cmd/vela-worker/exec.go:29:1: directive `//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker` is unused for linter "nilerr" (nolintlint)
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker
^
func (w *Worker) exec(index int, config *api.Worker) error {

Check failure on line 30 in cmd/vela-worker/exec.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] cmd/vela-worker/exec.go#L30

cyclomatic complexity 31 of func `(*Worker).exec` is high (> 30) (gocyclo)
Raw output
cmd/vela-worker/exec.go:30:1: cyclomatic complexity 31 of func `(*Worker).exec` is high (> 30) (gocyclo)
func (w *Worker) exec(index int, config *api.Worker) error {
^
var err error

// setup the version
v := version.New()

// get worker from database
worker, _, err := w.VelaClient.Worker.Get(w.Config.API.Address.Hostname())
if err != nil {
logrus.Errorf("unable to retrieve worker from server: %s", err)
var (
execBuildClient *vela.Client
execBuildExecutable *library.BuildExecutable
p *pipeline.Build
item *models.Item
retries = 3
)

for i := 0; i < retries; i++ {
// check if we're on the first iteration of the loop
if i > 0 {
// incrementally sleep in between retries
time.Sleep(time.Duration(i*10) * time.Second)
}

return err
}
logrus.Debugf("queue item prep - attempt %d", i+1)

// capture an item from the queue
item, err := w.Queue.Pop(context.Background(), worker.GetRoutes())
if err != nil {
logrus.Errorf("queue pop failed: %v", err)
// get worker from database
worker, _, err := w.VelaClient.Worker.Get(w.Config.API.Address.Hostname())
if err != nil {
logrus.Errorf("unable to retrieve worker from server: %s", err)

// returning immediately on queue pop fail will attempt
// to pop in quick succession, so we honor the configured timeout
time.Sleep(w.Config.Queue.Timeout)
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// returning nil to avoid unregistering the worker on pop failure;
// sometimes queue could be unavailable due to blip or maintenance
return nil
}
// continue to the next iteration of the loop
continue
}

if item == nil {
return nil
}
return err
}

// retrieve a build token from the server to setup the execBuildClient
bt, resp, err := w.VelaClient.Build.GetBuildToken(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
logrus.Errorf("unable to retrieve build token: %s", err)
// capture an item from the queue only on first loop iteration (failures here return nil)
if i == 0 {
item, err = w.Queue.Pop(context.Background(), worker.GetRoutes())
if err != nil {
logrus.Errorf("queue pop failed: %v", err)

// build is not in pending state — user canceled build while it was in queue. Pop, discard, move on.
if resp != nil && resp.StatusCode == http.StatusConflict {
return nil
// returning immediately on queue pop fail will attempt
// to pop in quick succession, so we honor the configured timeout
time.Sleep(w.Config.Queue.Timeout)

// returning nil to avoid unregistering the worker on pop failure;
// sometimes queue could be unavailable due to blip or maintenance
return nil
}

if item == nil {
return nil
}
}

// something else is amiss (auth, server down, etc.) — shut down worker, will have to re-register if registration enabled.
return err
}
// retrieve a build token from the server to setup the execBuildClient
bt, resp, err := w.VelaClient.Build.GetBuildToken(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
logrus.Errorf("unable to retrieve build token: %s", err)

// set up build client with build token as auth
execBuildClient, err := setupClient(w.Config.Server, bt.GetToken())
if err != nil {
return err
}
// build is not in pending state — user canceled build while it was in queue. Pop, discard, move on.
if resp != nil && resp.StatusCode == http.StatusConflict {
return nil
}

// request build executable containing pipeline.Build data using exec client
execBuildExecutable, _, err := execBuildClient.Build.GetBuildExecutable(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
return err
}
// check if the retry limit has been exceeded
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// get the build pipeline from the build executable
pipeline := new(pipeline.Build)
// continue to the next iteration of the loop
continue
}

err = json.Unmarshal(execBuildExecutable.GetData(), pipeline)
if err != nil {
return err
return err
}

// set up build client with build token as auth
execBuildClient, err = setupClient(w.Config.Server, bt.GetToken())
if err != nil {
// check if the retry limit has been exceeded
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// continue to the next iteration of the loop
continue
}

return err
}

// request build executable containing pipeline.Build data using exec client
execBuildExecutable, _, err = execBuildClient.Build.GetBuildExecutable(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
// check if the retry limit has been exceeded
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// continue to the next iteration of the loop
continue
}

return err
}

// get the build pipeline from the build executable
p = new(pipeline.Build)

err = json.Unmarshal(execBuildExecutable.GetData(), p)
if err != nil {
return err
}

break
}

// create logger with extra metadata
Expand Down Expand Up @@ -180,7 +234,7 @@
Hostname: w.Config.API.Address.Hostname(),
Runtime: w.Runtime,
Build: item.Build,
Pipeline: pipeline.Sanitize(w.Config.Runtime.Driver),
Pipeline: p.Sanitize(w.Config.Runtime.Driver),
Version: v.Semantic(),
})

Expand Down
74 changes: 54 additions & 20 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,78 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/sirupsen/logrus"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
)

// checkIn is a helper function to phone home to the server.
func (w *Worker) checkIn(config *api.Worker) (bool, string, error) {
// check to see if the worker already exists in the database
logrus.Infof("retrieving worker %s from the server", config.GetHostname())

_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// if server is down, the worker status will not be updated
if resp == nil {
return false, "", respErr
var (
tkn *library.Token
retries = 3
)

for i := 0; i < retries; i++ {
logrus.Debugf("check in loop - attempt %d", i+1)
// check if we're on the first iteration of the loop
if i > 0 {
// incrementally sleep in between retries
time.Sleep(time.Duration(i*10) * time.Second)
}
// if we receive a 404 the worker needs to be registered
if resp.StatusCode == http.StatusNotFound {
return w.register(config)

_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// if server is down, the worker status will not be updated
if resp == nil || (resp.StatusCode != http.StatusNotFound) {
return false, "", respErr
}
// if we receive a 404 the worker needs to be registered
if resp.StatusCode == http.StatusNotFound {
registered, strToken, regErr := w.register(config)
if regErr != nil {
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// continue to the next iteration of the loop
continue
}
}

return registered, strToken, regErr
}
}

return false, "", respErr
}
// if we were able to GET the worker, update it
logrus.Infof("checking worker %s into the server", config.GetHostname())

// if we were able to GET the worker, update it
logrus.Infof("checking worker %s into the server", config.GetHostname())
tkn, _, err = w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
// set to error when check in fails
w.updateWorkerStatus(config, constants.WorkerStatusError)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
// continue to the next iteration of the loop
continue
}

// set to error when check in fails
w.updateWorkerStatus(config, constants.WorkerStatusError)

return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

break
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

return true, tkn.GetToken(), nil
}
Expand Down
Loading