Skip to content

Commit

Permalink
enhance: retry build processing on failed server comms (#594)
Browse files Browse the repository at this point in the history
* enhance: retry build processing on failed server comms

* only pop from queue once
  • Loading branch information
ecrupper authored Aug 26, 2024
1 parent dd70412 commit 66403e5
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 66 deletions.
146 changes: 100 additions & 46 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (

"github.com/sirupsen/logrus"

"github.com/go-vela/sdk-go/vela"
api "github.com/go-vela/server/api/types"
"github.com/go-vela/server/queue/models"
"github.com/go-vela/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
Expand All @@ -31,64 +33,116 @@ func (w *Worker) exec(index int, config *api.Worker) error {
// setup the version
v := version.New()

// get worker from database
worker, _, err := w.VelaClient.Worker.Get(w.Config.API.Address.Hostname())
if err != nil {
logrus.Errorf("unable to retrieve worker from server: %s", err)
var (
execBuildClient *vela.Client
execBuildExecutable *library.BuildExecutable
p *pipeline.Build
item *models.Item
retries = 3
)

for i := 0; i < retries; i++ {
// check if we're on the first iteration of the loop
if i > 0 {
// incrementally sleep in between retries
time.Sleep(time.Duration(i*10) * time.Second)
}

return err
}
logrus.Debugf("queue item prep - attempt %d", i+1)

// capture an item from the queue
item, err := w.Queue.Pop(context.Background(), worker.GetRoutes())
if err != nil {
logrus.Errorf("queue pop failed: %v", err)
// get worker from database
worker, _, err := w.VelaClient.Worker.Get(w.Config.API.Address.Hostname())
if err != nil {
logrus.Errorf("unable to retrieve worker from server: %s", err)

// returning immediately on queue pop fail will attempt
// to pop in quick succession, so we honor the configured timeout
time.Sleep(w.Config.Queue.Timeout)
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// returning nil to avoid unregistering the worker on pop failure;
// sometimes queue could be unavailable due to blip or maintenance
return nil
}
// continue to the next iteration of the loop
continue
}

if item == nil {
return nil
}
return err
}

// retrieve a build token from the server to setup the execBuildClient
bt, resp, err := w.VelaClient.Build.GetBuildToken(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
logrus.Errorf("unable to retrieve build token: %s", err)
// capture an item from the queue only on first loop iteration (failures here return nil)
if i == 0 {
item, err = w.Queue.Pop(context.Background(), worker.GetRoutes())
if err != nil {
logrus.Errorf("queue pop failed: %v", err)

// build is not in pending state — user canceled build while it was in queue. Pop, discard, move on.
if resp != nil && resp.StatusCode == http.StatusConflict {
return nil
// returning immediately on queue pop fail will attempt
// to pop in quick succession, so we honor the configured timeout
time.Sleep(w.Config.Queue.Timeout)

// returning nil to avoid unregistering the worker on pop failure;
// sometimes queue could be unavailable due to blip or maintenance
return nil
}

if item == nil {
return nil
}
}

// something else is amiss (auth, server down, etc.) — shut down worker, will have to re-register if registration enabled.
return err
}
// retrieve a build token from the server to setup the execBuildClient
bt, resp, err := w.VelaClient.Build.GetBuildToken(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
logrus.Errorf("unable to retrieve build token: %s", err)

// set up build client with build token as auth
execBuildClient, err := setupClient(w.Config.Server, bt.GetToken())
if err != nil {
return err
}
// build is not in pending state — user canceled build while it was in queue. Pop, discard, move on.
if resp != nil && resp.StatusCode == http.StatusConflict {
return nil
}

// request build executable containing pipeline.Build data using exec client
execBuildExecutable, _, err := execBuildClient.Build.GetBuildExecutable(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
return err
}
// check if the retry limit has been exceeded
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// get the build pipeline from the build executable
pipeline := new(pipeline.Build)
// continue to the next iteration of the loop
continue
}

err = json.Unmarshal(execBuildExecutable.GetData(), pipeline)
if err != nil {
return err
return err
}

// set up build client with build token as auth
execBuildClient, err = setupClient(w.Config.Server, bt.GetToken())
if err != nil {
// check if the retry limit has been exceeded
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// continue to the next iteration of the loop
continue
}

return err
}

// request build executable containing pipeline.Build data using exec client
execBuildExecutable, _, err = execBuildClient.Build.GetBuildExecutable(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber())
if err != nil {
// check if the retry limit has been exceeded
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// continue to the next iteration of the loop
continue
}

return err
}

// get the build pipeline from the build executable
p = new(pipeline.Build)

err = json.Unmarshal(execBuildExecutable.GetData(), p)
if err != nil {
return err
}

break
}

// create logger with extra metadata
Expand Down Expand Up @@ -180,7 +234,7 @@ func (w *Worker) exec(index int, config *api.Worker) error {
Hostname: w.Config.API.Address.Hostname(),
Runtime: w.Runtime,
Build: item.Build,
Pipeline: pipeline.Sanitize(w.Config.Runtime.Driver),
Pipeline: p.Sanitize(w.Config.Runtime.Driver),
Version: v.Semantic(),
})

Expand Down
74 changes: 54 additions & 20 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,78 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/sirupsen/logrus"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
)

// checkIn is a helper function to phone home to the server.
func (w *Worker) checkIn(config *api.Worker) (bool, string, error) {
// check to see if the worker already exists in the database
logrus.Infof("retrieving worker %s from the server", config.GetHostname())

_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// if server is down, the worker status will not be updated
if resp == nil {
return false, "", respErr
var (
tkn *library.Token
retries = 3
)

for i := 0; i < retries; i++ {
logrus.Debugf("check in loop - attempt %d", i+1)
// check if we're on the first iteration of the loop
if i > 0 {
// incrementally sleep in between retries
time.Sleep(time.Duration(i*10) * time.Second)
}
// if we receive a 404 the worker needs to be registered
if resp.StatusCode == http.StatusNotFound {
return w.register(config)

_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// if server is down, the worker status will not be updated
if resp == nil || (resp.StatusCode != http.StatusNotFound) {
return false, "", respErr
}
// if we receive a 404 the worker needs to be registered
if resp.StatusCode == http.StatusNotFound {
registered, strToken, regErr := w.register(config)
if regErr != nil {
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

// continue to the next iteration of the loop
continue
}
}

return registered, strToken, regErr
}
}

return false, "", respErr
}
// if we were able to GET the worker, update it
logrus.Infof("checking worker %s into the server", config.GetHostname())

// if we were able to GET the worker, update it
logrus.Infof("checking worker %s into the server", config.GetHostname())
tkn, _, err = w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
if i < retries-1 {
logrus.WithError(err).Warningf("retrying #%d", i+1)

tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
// set to error when check in fails
w.updateWorkerStatus(config, constants.WorkerStatusError)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
// continue to the next iteration of the loop
continue
}

// set to error when check in fails
w.updateWorkerStatus(config, constants.WorkerStatusError)

return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

break
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

return true, tkn.GetToken(), nil
}
Expand Down

0 comments on commit 66403e5

Please sign in to comment.