Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add context to Workers #940

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/build/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func CancelBuild(c *gin.Context) {
switch b.GetStatus() {
case constants.StatusRunning:
// retrieve the worker info
w, err := database.FromContext(c).GetWorkerForHostname(b.GetHost())
w, err := database.FromContext(c).GetWorkerForHostname(ctx, b.GetHost())
if err != nil {
retErr := fmt.Errorf("unable to get worker for build %s: %w", entry, err)
util.HandleError(c, http.StatusNotFound, retErr)
Expand Down
2 changes: 1 addition & 1 deletion api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func recordGauges(c *gin.Context) {
// worker_build_limit, active_worker_count, inactive_worker_count, idle_worker_count, available_worker_count, busy_worker_count, error_worker_count
if q.WorkerBuildLimit || q.ActiveWorkerCount || q.InactiveWorkerCount || q.IdleWorkerCount || q.AvailableWorkerCount || q.BusyWorkerCount || q.ErrorWorkerCount {
// send API call to capture the workers
workers, err := database.FromContext(c).ListWorkers()
workers, err := database.FromContext(c).ListWorkers(ctx)
if err != nil {
logrus.Errorf("unable to get workers: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion api/worker/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func CreateWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
cl := claims.Retrieve(c)
ctx := c.Request.Context()

// capture body from API request
input := new(library.Worker)
Expand Down Expand Up @@ -89,7 +90,7 @@ func CreateWorker(c *gin.Context) {
"worker": input.GetHostname(),
}).Infof("creating new worker %s", input.GetHostname())

_, err = database.FromContext(c).CreateWorker(input)
_, err = database.FromContext(c).CreateWorker(ctx, input)
if err != nil {
retErr := fmt.Errorf("unable to create worker: %w", err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func DeleteWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
w := worker.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -57,7 +58,7 @@ func DeleteWorker(c *gin.Context) {
}).Infof("deleting worker %s", w.GetHostname())

// send API call to remove the step
err := database.FromContext(c).DeleteWorker(w)
err := database.FromContext(c).DeleteWorker(ctx, w)
if err != nil {
retErr := fmt.Errorf("unable to delete worker %s: %w", w.GetHostname(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func GetWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
w := worker.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -56,7 +57,7 @@ func GetWorker(c *gin.Context) {
"worker": w.GetHostname(),
}).Infof("reading worker %s", w.GetHostname())

w, err := database.FromContext(c).GetWorkerForHostname(w.GetHostname())
w, err := database.FromContext(c).GetWorkerForHostname(ctx, w.GetHostname())
if err != nil {
retErr := fmt.Errorf("unable to get workers: %w", err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
func ListWorkers(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -49,7 +50,7 @@ func ListWorkers(c *gin.Context) {
"user": u.GetName(),
}).Info("reading workers")

w, err := database.FromContext(c).ListWorkers()
w, err := database.FromContext(c).ListWorkers(ctx)
if err != nil {
retErr := fmt.Errorf("unable to get workers: %w", err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func Refresh(c *gin.Context) {
// capture middleware values
w := worker.Retrieve(c)
cl := claims.Retrieve(c)
ctx := c.Request.Context()

// if we are not using a symmetric token, and the subject does not match the input, request should be denied
if !strings.EqualFold(cl.TokenType, constants.ServerWorkerTokenType) && !strings.EqualFold(cl.Subject, w.GetHostname()) {
Expand All @@ -79,7 +80,7 @@ func Refresh(c *gin.Context) {
w.SetLastCheckedIn(time.Now().Unix())

// send API call to update the worker
_, err := database.FromContext(c).UpdateWorker(w)
_, err := database.FromContext(c).UpdateWorker(ctx, w)
if err != nil {
retErr := fmt.Errorf("unable to update worker %s: %w", w.GetHostname(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func UpdateWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
w := worker.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand Down Expand Up @@ -124,7 +125,7 @@ func UpdateWorker(c *gin.Context) {
}

// send API call to update the worker
w, err = database.FromContext(c).UpdateWorker(w)
w, err = database.FromContext(c).UpdateWorker(ctx, w)
if err != nil {
retErr := fmt.Errorf("unable to update worker %s: %w", w.GetHostname(), err)

Expand Down
12 changes: 6 additions & 6 deletions database/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,15 +1760,15 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {

// create the workers
for _, worker := range resources.Workers {
_, err := db.CreateWorker(worker)
_, err := db.CreateWorker(context.TODO(), worker)
if err != nil {
t.Errorf("unable to create worker %d: %v", worker.GetID(), err)
}
}
methods["CreateWorker"] = true

// count the workers
count, err := db.CountWorkers()
count, err := db.CountWorkers(context.TODO())
if err != nil {
t.Errorf("unable to count workers: %v", err)
}
Expand All @@ -1778,7 +1778,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {
methods["CountWorkers"] = true

// list the workers
list, err := db.ListWorkers()
list, err := db.ListWorkers(context.TODO())
if err != nil {
t.Errorf("unable to list workers: %v", err)
}
Expand All @@ -1789,7 +1789,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {

// lookup the workers by hostname
for _, worker := range resources.Workers {
got, err := db.GetWorkerForHostname(worker.GetHostname())
got, err := db.GetWorkerForHostname(context.TODO(), worker.GetHostname())
if err != nil {
t.Errorf("unable to get worker %d by hostname: %v", worker.GetID(), err)
}
Expand All @@ -1802,7 +1802,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {
// update the workers
for _, worker := range resources.Workers {
worker.SetActive(false)
got, err := db.UpdateWorker(worker)
got, err := db.UpdateWorker(context.TODO(), worker)
if err != nil {
t.Errorf("unable to update worker %d: %v", worker.GetID(), err)
}
Expand All @@ -1816,7 +1816,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {

// delete the workers
for _, worker := range resources.Workers {
err = db.DeleteWorker(worker)
err = db.DeleteWorker(context.TODO(), worker)
if err != nil {
t.Errorf("unable to delete worker %d: %v", worker.GetID(), err)
}
Expand Down
1 change: 1 addition & 0 deletions database/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (e *engine) NewResources(ctx context.Context) error {

// create the database agnostic engine for workers
e.WorkerInterface, err = worker.New(
worker.WithContext(e.ctx),
worker.WithClient(e.client),
worker.WithLogger(e.logger),
worker.WithSkipCreation(e.config.SkipCreation),
Expand Down
4 changes: 3 additions & 1 deletion database/worker/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
)

// CountWorkers gets the count of all workers from the database.
func (e *engine) CountWorkers() (int64, error) {
func (e *engine) CountWorkers(ctx context.Context) (int64, error) {
e.logger.Tracef("getting count of all workers from the database")

// variable to store query results
Expand Down
7 changes: 4 additions & 3 deletions database/worker/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -37,12 +38,12 @@ func TestWorker_Engine_CountWorkers(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_workerOne)
_, err := _sqlite.CreateWorker(context.TODO(), _workerOne)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}

_, err = _sqlite.CreateWorker(_workerTwo)
_, err = _sqlite.CreateWorker(context.TODO(), _workerTwo)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestWorker_Engine_CountWorkers(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.CountWorkers()
got, err := test.database.CountWorkers(context.TODO())

if test.failure {
if err == nil {
Expand Down
4 changes: 3 additions & 1 deletion database/worker/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// CreateWorker creates a new worker in the database.
func (e *engine) CreateWorker(w *library.Worker) (*library.Worker, error) {
func (e *engine) CreateWorker(ctx context.Context, w *library.Worker) (*library.Worker, error) {
e.logger.WithFields(logrus.Fields{
"worker": w.GetHostname(),
}).Tracef("creating worker %s in the database", w.GetHostname())
Expand Down
3 changes: 2 additions & 1 deletion database/worker/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -56,7 +57,7 @@ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) RETURNING "id"`).
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.CreateWorker(_worker)
got, err := test.database.CreateWorker(context.TODO(), _worker)

if test.failure {
if err == nil {
Expand Down
4 changes: 3 additions & 1 deletion database/worker/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// DeleteWorker deletes an existing worker from the database.
func (e *engine) DeleteWorker(w *library.Worker) error {
func (e *engine) DeleteWorker(ctx context.Context, w *library.Worker) error {
e.logger.WithFields(logrus.Fields{
"worker": w.GetHostname(),
}).Tracef("deleting worker %s from the database", w.GetHostname())
Expand Down
5 changes: 3 additions & 2 deletions database/worker/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -29,7 +30,7 @@ func TestWorker_Engine_DeleteWorker(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_worker)
_, err := _sqlite.CreateWorker(context.TODO(), _worker)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand All @@ -55,7 +56,7 @@ func TestWorker_Engine_DeleteWorker(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err = test.database.DeleteWorker(_worker)
err = test.database.DeleteWorker(context.TODO(), _worker)

if test.failure {
if err == nil {
Expand Down
4 changes: 3 additions & 1 deletion database/worker/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
)

// GetWorker gets a worker by ID from the database.
func (e *engine) GetWorker(id int64) (*library.Worker, error) {
func (e *engine) GetWorker(ctx context.Context, id int64) (*library.Worker, error) {
e.logger.Tracef("getting worker %d from the database", id)

// variable to store query results
Expand Down
4 changes: 3 additions & 1 deletion database/worker/get_hostname.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// GetWorkerForHostname gets a worker by hostname from the database.
func (e *engine) GetWorkerForHostname(hostname string) (*library.Worker, error) {
func (e *engine) GetWorkerForHostname(ctx context.Context, hostname string) (*library.Worker, error) {
e.logger.WithFields(logrus.Fields{
"worker": hostname,
}).Tracef("getting worker %s from the database", hostname)
Expand Down
5 changes: 3 additions & 2 deletions database/worker/get_hostname_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestWorker_Engine_GetWorkerForName(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_worker)
_, err := _sqlite.CreateWorker(context.TODO(), _worker)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestWorker_Engine_GetWorkerForName(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.GetWorkerForHostname("worker_0")
got, err := test.database.GetWorkerForHostname(context.TODO(), "worker_0")

if test.failure {
if err == nil {
Expand Down
5 changes: 3 additions & 2 deletions database/worker/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestWorker_Engine_GetWorker(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_worker)
_, err := _sqlite.CreateWorker(context.TODO(), _worker)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestWorker_Engine_GetWorker(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.GetWorker(1)
got, err := test.database.GetWorker(context.TODO(), 1)

if test.failure {
if err == nil {
Expand Down
Loading