From c3f49909ec07ea2ecdaa1f7c082b13607ed5fe0f Mon Sep 17 00:00:00 2001 From: ecrupper Date: Wed, 14 Aug 2024 11:24:05 -0500 Subject: [PATCH 1/2] enhance: retry build processing on failed server comms --- cmd/vela-worker/exec.go | 142 ++++++++++++++++++++++++------------ cmd/vela-worker/register.go | 74 ++++++++++++++----- 2 files changed, 151 insertions(+), 65 deletions(-) diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index a2b234e5..3ffb48fc 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -11,10 +11,12 @@ import ( "github.com/sirupsen/logrus" + "github.com/go-vela/sdk-go/vela" api "github.com/go-vela/server/api/types" "github.com/go-vela/server/queue/models" "github.com/go-vela/types" "github.com/go-vela/types/constants" + "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" "github.com/go-vela/worker/executor" "github.com/go-vela/worker/runtime" @@ -31,64 +33,114 @@ func (w *Worker) exec(index int, config *api.Worker) error { // setup the version v := version.New() - // get worker from database - worker, _, err := w.VelaClient.Worker.Get(w.Config.API.Address.Hostname()) - if err != nil { - logrus.Errorf("unable to retrieve worker from server: %s", err) + var ( + execBuildClient *vela.Client + execBuildExecutable *library.BuildExecutable + p *pipeline.Build + item *models.Item + retries = 3 + ) + + for i := 0; i < retries; i++ { + // check if we're on the first iteration of the loop + if i > 0 { + // incrementally sleep in between retries + time.Sleep(time.Duration(i*10) * time.Second) + } - return err - } + logrus.Debugf("queue item prep - attempt %d", i+1) - // capture an item from the queue - item, err := w.Queue.Pop(context.Background(), worker.GetRoutes()) - if err != nil { - logrus.Errorf("queue pop failed: %v", err) + // get worker from database + worker, _, err := w.VelaClient.Worker.Get(w.Config.API.Address.Hostname()) + if err != nil { + logrus.Errorf("unable to retrieve worker from server: %s", err) - // returning immediately on queue pop fail will attempt - // to pop in quick succession, so we honor the configured timeout - time.Sleep(w.Config.Queue.Timeout) + if i < retries-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) - // returning nil to avoid unregistering the worker on pop failure; - // sometimes queue could be unavailable due to blip or maintenance - return nil - } + // continue to the next iteration of the loop + continue + } - if item == nil { - return nil - } + return err + } - // retrieve a build token from the server to setup the execBuildClient - bt, resp, err := w.VelaClient.Build.GetBuildToken(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber()) - if err != nil { - logrus.Errorf("unable to retrieve build token: %s", err) + // capture an item from the queue + item, err = w.Queue.Pop(context.Background(), worker.GetRoutes()) + if err != nil { + logrus.Errorf("queue pop failed: %v", err) + + // returning immediately on queue pop fail will attempt + // to pop in quick succession, so we honor the configured timeout + time.Sleep(w.Config.Queue.Timeout) - // build is not in pending state — user canceled build while it was in queue. Pop, discard, move on. - if resp != nil && resp.StatusCode == http.StatusConflict { + // returning nil to avoid unregistering the worker on pop failure; + // sometimes queue could be unavailable due to blip or maintenance return nil } - // something else is amiss (auth, server down, etc.) — shut down worker, will have to re-register if registration enabled. - return err - } + if item == nil { + return nil + } - // set up build client with build token as auth - execBuildClient, err := setupClient(w.Config.Server, bt.GetToken()) - if err != nil { - return err - } + // retrieve a build token from the server to setup the execBuildClient + bt, resp, err := w.VelaClient.Build.GetBuildToken(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber()) + if err != nil { + logrus.Errorf("unable to retrieve build token: %s", err) - // request build executable containing pipeline.Build data using exec client - execBuildExecutable, _, err := execBuildClient.Build.GetBuildExecutable(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber()) - if err != nil { - return err - } + // build is not in pending state — user canceled build while it was in queue. Pop, discard, move on. + if resp != nil && resp.StatusCode == http.StatusConflict { + return nil + } - // get the build pipeline from the build executable - pipeline := new(pipeline.Build) + // check if the retry limit has been exceeded + if i < retries-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) - err = json.Unmarshal(execBuildExecutable.GetData(), pipeline) - if err != nil { - return err + // continue to the next iteration of the loop + continue + } + + return err + } + + // set up build client with build token as auth + execBuildClient, err = setupClient(w.Config.Server, bt.GetToken()) + if err != nil { + // check if the retry limit has been exceeded + if i < retries-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) + + // continue to the next iteration of the loop + continue + } + + return err + } + + // request build executable containing pipeline.Build data using exec client + execBuildExecutable, _, err = execBuildClient.Build.GetBuildExecutable(item.Build.GetRepo().GetOrg(), item.Build.GetRepo().GetName(), item.Build.GetNumber()) + if err != nil { + // check if the retry limit has been exceeded + if i < retries-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) + + // continue to the next iteration of the loop + continue + } + + return err + } + + // get the build pipeline from the build executable + p = new(pipeline.Build) + + err = json.Unmarshal(execBuildExecutable.GetData(), p) + if err != nil { + return err + } + + break } // create logger with extra metadata @@ -180,7 +232,7 @@ func (w *Worker) exec(index int, config *api.Worker) error { Hostname: w.Config.API.Address.Hostname(), Runtime: w.Runtime, Build: item.Build, - Pipeline: pipeline.Sanitize(w.Config.Runtime.Driver), + Pipeline: p.Sanitize(w.Config.Runtime.Driver), Version: v.Semantic(), }) diff --git a/cmd/vela-worker/register.go b/cmd/vela-worker/register.go index a713613a..68967f49 100644 --- a/cmd/vela-worker/register.go +++ b/cmd/vela-worker/register.go @@ -6,11 +6,13 @@ import ( "context" "fmt" "net/http" + "time" "github.com/sirupsen/logrus" api "github.com/go-vela/server/api/types" "github.com/go-vela/types/constants" + "github.com/go-vela/types/library" ) // checkIn is a helper function to phone home to the server. @@ -18,32 +20,64 @@ func (w *Worker) checkIn(config *api.Worker) (bool, string, error) { // check to see if the worker already exists in the database logrus.Infof("retrieving worker %s from the server", config.GetHostname()) - _, resp, err := w.VelaClient.Worker.Get(config.GetHostname()) - if err != nil { - respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err) - // if server is down, the worker status will not be updated - if resp == nil { - return false, "", respErr + var ( + tkn *library.Token + retries = 3 + ) + + for i := 0; i < retries; i++ { + logrus.Debugf("check in loop - attempt %d", i+1) + // check if we're on the first iteration of the loop + if i > 0 { + // incrementally sleep in between retries + time.Sleep(time.Duration(i*10) * time.Second) } - // if we receive a 404 the worker needs to be registered - if resp.StatusCode == http.StatusNotFound { - return w.register(config) + + _, resp, err := w.VelaClient.Worker.Get(config.GetHostname()) + if err != nil { + respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err) + // if server is down, the worker status will not be updated + if resp == nil || (resp.StatusCode != http.StatusNotFound) { + return false, "", respErr + } + // if we receive a 404 the worker needs to be registered + if resp.StatusCode == http.StatusNotFound { + registered, strToken, regErr := w.register(config) + if regErr != nil { + if i < retries-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) + + // continue to the next iteration of the loop + continue + } + } + + return registered, strToken, regErr + } } - return false, "", respErr - } + // if we were able to GET the worker, update it + logrus.Infof("checking worker %s into the server", config.GetHostname()) - // if we were able to GET the worker, update it - logrus.Infof("checking worker %s into the server", config.GetHostname()) + tkn, _, err = w.VelaClient.Worker.RefreshAuth(config.GetHostname()) + if err != nil { + if i < retries-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) - tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname()) - if err != nil { - // set to error when check in fails - w.updateWorkerStatus(config, constants.WorkerStatusError) - return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err) + // continue to the next iteration of the loop + continue + } + + // set to error when check in fails + w.updateWorkerStatus(config, constants.WorkerStatusError) + + return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err) + } + // update worker status to Idle when checkIn is successful. + w.updateWorkerStatus(config, constants.WorkerStatusIdle) + + break } - // update worker status to Idle when checkIn is successful. - w.updateWorkerStatus(config, constants.WorkerStatusIdle) return true, tkn.GetToken(), nil } From 0d846bf644e5a45899215bef08de44418965ca52 Mon Sep 17 00:00:00 2001 From: ecrupper Date: Thu, 15 Aug 2024 09:28:01 -0500 Subject: [PATCH 2/2] only pop from queue once --- cmd/vela-worker/exec.go | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index 3ffb48fc..a1568d77 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -65,22 +65,24 @@ func (w *Worker) exec(index int, config *api.Worker) error { return err } - // capture an item from the queue - item, err = w.Queue.Pop(context.Background(), worker.GetRoutes()) - if err != nil { - logrus.Errorf("queue pop failed: %v", err) - - // returning immediately on queue pop fail will attempt - // to pop in quick succession, so we honor the configured timeout - time.Sleep(w.Config.Queue.Timeout) - - // returning nil to avoid unregistering the worker on pop failure; - // sometimes queue could be unavailable due to blip or maintenance - return nil - } + // capture an item from the queue only on first loop iteration (failures here return nil) + if i == 0 { + item, err = w.Queue.Pop(context.Background(), worker.GetRoutes()) + if err != nil { + logrus.Errorf("queue pop failed: %v", err) + + // returning immediately on queue pop fail will attempt + // to pop in quick succession, so we honor the configured timeout + time.Sleep(w.Config.Queue.Timeout) + + // returning nil to avoid unregistering the worker on pop failure; + // sometimes queue could be unavailable due to blip or maintenance + return nil + } - if item == nil { - return nil + if item == nil { + return nil + } } // retrieve a build token from the server to setup the execBuildClient