From 1047a9acbb014d301af2bfb614452c7992e02b34 Mon Sep 17 00:00:00 2001 From: Kelly Merrick Date: Mon, 27 Feb 2023 13:56:33 -0600 Subject: [PATCH] feat(worker): get available workers --- api/metrics.go | 48 +++++++++- api/worker.go | 48 ++++++++++ database/worker/count_by_status.go | 26 ++++++ database/worker/create_test.go | 6 +- database/worker/list_by_status.go | 55 +++++++++++ database/worker/list_by_status_test.go | 117 ++++++++++++++++++++++++ database/worker/service.go | 2 + database/worker/table.go | 36 +++++--- database/worker/update_test.go | 6 +- database/worker/worker_test.go | 18 ++-- mock/server/worker.go | 21 ++++- router/middleware/worker/worker_test.go | 4 + router/worker.go | 4 +- 13 files changed, 355 insertions(+), 36 deletions(-) create mode 100644 database/worker/count_by_status.go create mode 100644 database/worker/list_by_status.go create mode 100644 database/worker/list_by_status_test.go diff --git a/api/metrics.go b/api/metrics.go index b5f6fe942..6fc1cd603 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -53,6 +53,19 @@ type MetricsQueryParameters struct { ActiveWorkerCount bool `form:"active_worker_count"` // InactiveWorkerCount represents total number of inactive workers InactiveWorkerCount bool `form:"inactive_worker_count"` + + // UnregisteredWorkerCount represents total number of workers with a status of unregistered, + UnregisteredWorkerCount bool `form:"unregistered_worker_count"` + // AvailableWorkerCount represents total number of workers with a status of available, + // where worker RunningBuildIDs.length < worker BuildLimit + AvailableWorkerCount bool `form:"available_worker_count"` + // BusyWorkerCount represents total number of workers with a status of busy, + // where worker BuildLimit == worker RunningBuildIDs.length + BusyWorkerCount bool `form:"busy_worker_count"` + // BusyWorkerCount represents total number of workers with a status of maintenance. + MaintenanceWorkerCount bool `form:"maintenance_worker_count"` + // ErrorWorkerCount represents total number of workers with a status of error + ErrorWorkerCount bool `form:"error_worker_count"` } // predefine Prometheus metrics else they will be regenerated @@ -356,14 +369,19 @@ func recordGauges(c *gin.Context) { // add worker metrics var ( - buildLimit int64 - activeWorkers int64 - inactiveWorkers int64 + buildLimit int64 + activeWorkers int64 + inactiveWorkers int64 + unregisteredWorkers int64 + availableWorkers int64 + busyWorkers int64 + maintenanceWorkers int64 + errorWorkers int64 ) // get worker metrics based on request query parameters - // worker_build_limit, active_worker_count, inactive_worker_count - if q.WorkerBuildLimit || q.ActiveWorkerCount || q.InactiveWorkerCount { + // worker_build_limit, active_worker_count, inactive_worker_count, unregistered_worker_count, available_worker_count, busy_worker_count, maintenance_worker_count, error_worker_count + if q.WorkerBuildLimit || q.ActiveWorkerCount || q.InactiveWorkerCount || q.UnregisteredWorkerCount || q.AvailableWorkerCount || q.BusyWorkerCount || q.MaintenanceWorkerCount || q.ErrorWorkerCount { // send API call to capture the workers workers, err := database.FromContext(c).ListWorkers() if err != nil { @@ -397,5 +415,25 @@ func recordGauges(c *gin.Context) { if q.InactiveWorkerCount { totals.WithLabelValues("worker", "count", "inactive").Set(float64(inactiveWorkers)) } + // unregistered_worker_count + if q.UnregisteredWorkerCount { + totals.WithLabelValues("worker", "count", "unregistered").Set(float64(unregisteredWorkers)) + } + // available_worker_count + if q.AvailableWorkerCount { + totals.WithLabelValues("worker", "count", "available").Set(float64(availableWorkers)) + } + // busy_worker_count + if q.BusyWorkerCount { + totals.WithLabelValues("worker", "count", "busy").Set(float64(busyWorkers)) + } + // maintenance_worker_count + if q.MaintenanceWorkerCount { + totals.WithLabelValues("worker", "count", "maintenance").Set(float64(maintenanceWorkers)) + } + // error_worker_count + if q.ErrorWorkerCount { + totals.WithLabelValues("worker", "count", "error").Set(float64(errorWorkers)) + } } } diff --git a/api/worker.go b/api/worker.go index 4951b182a..ace08a679 100644 --- a/api/worker.go +++ b/api/worker.go @@ -134,6 +134,34 @@ func GetWorkers(c *gin.Context) { c.JSON(http.StatusOK, w) } +// GetWorkersByStatus represents the API handler to capture a +// list of workers with specified status from the configured backend. +func GetWorkersByStatus(c *gin.Context) { + s := c.Param("status") + // capture middleware values + u := user.Retrieve(c) + + // TODO message/error if not valid status or empty string, or they get back all the workers (GetWorkers), how do other endpoints do it? prob use regex to confirm alpha charas only + + // update engine logger with API metadata + // + // https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithFields + logrus.WithFields(logrus.Fields{ + "user": u.GetName(), + }).Info("reading workers") + + w, err := database.FromContext(c).ListWorkersByStatus(s) + if err != nil { + retErr := fmt.Errorf("unable to get workers: %w", err) + + util.HandleError(c, http.StatusInternalServerError, retErr) + + return + } + + c.JSON(http.StatusOK, w) +} + // swagger:operation GET /api/v1/workers/{worker} workers GetWorker // // Retrieve a worker for the configured backend @@ -267,6 +295,26 @@ func UpdateWorker(c *gin.Context) { w.SetActive(input.GetActive()) } + if len(input.GetStatus()) > 0 { + // update status if set + w.SetStatus(input.GetStatus()) + } + + if input.GetLastStatusUpdateAt() > 0 { + // update LastStatusUpdateAt if set + w.SetLastStatusUpdateAt(input.GetLastStatusUpdateAt()) + } + + if len(input.GetRunningBuildIDs()) > 0 { + // update RunningBuildIDs if set + w.SetRunningBuildIDs(input.GetRunningBuildIDs()) + } + + if input.GetLastBuildFinishedAt() > 0 { + // update LastBuildFinishedAt if set + w.SetLastBuildFinishedAt(input.GetLastBuildFinishedAt()) + } + if input.GetLastCheckedIn() > 0 { // update LastCheckedIn if set w.SetLastCheckedIn(input.GetLastCheckedIn()) diff --git a/database/worker/count_by_status.go b/database/worker/count_by_status.go new file mode 100644 index 000000000..e4d990fd0 --- /dev/null +++ b/database/worker/count_by_status.go @@ -0,0 +1,26 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package worker + +import ( + "github.com/go-vela/types/constants" +) + +// CountWorkersByStatus gets the count of all workers from the database with the specified status. +func (e *engine) CountWorkersByStatus(status string) (int64, error) { + e.logger.Tracef("getting count of all workers from the database with the specified status") + + // variable to store query results + var w int64 + + // send query to the database and store result in variable + err := e.client. + Table(constants.TableWorker). + Where("status = ?", status). + Count(&w). + Error + + return w, err +} diff --git a/database/worker/create_test.go b/database/worker/create_test.go index 38c276d83..937e6564e 100644 --- a/database/worker/create_test.go +++ b/database/worker/create_test.go @@ -26,9 +26,9 @@ func TestWorker_Engine_CreateWorker(t *testing.T) { // ensure the mock expects the query _mock.ExpectQuery(`INSERT INTO "workers" -("hostname","address","routes","active","last_checked_in","build_limit","id") -VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING "id"`). - WithArgs("worker_0", "localhost", nil, true, nil, nil, 1). +("hostname","address","routes","active","status","last_status_update_at","running_build_ids","last_build_finished_at","last_checked_in","build_limit","id") +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) RETURNING "id"`). + WithArgs("worker_0", "localhost", nil, true, nil, nil, nil, nil, nil, nil, 1). WillReturnRows(_rows) _sqlite := testSqlite(t) diff --git a/database/worker/list_by_status.go b/database/worker/list_by_status.go new file mode 100644 index 000000000..a5d5ca89f --- /dev/null +++ b/database/worker/list_by_status.go @@ -0,0 +1,55 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package worker + +import ( + "github.com/go-vela/types/constants" + "github.com/go-vela/types/database" + "github.com/go-vela/types/library" +) + +// ListWorkersByStatus gets a list of all workers from the database with the specified status. +func (e *engine) ListWorkersByStatus(status string) ([]*library.Worker, error) { + e.logger.Trace("listing all workers from the database") + + // variables to store query results and return value + count := int64(0) + w := new([]database.Worker) + workers := []*library.Worker{} + + // count the results + count, err := e.CountWorkersByStatus(status) + if err != nil { + return nil, err + } + + // short-circuit if there are no results + if count == 0 { + return workers, nil + } + + // send query to the database and store result in variable + err = e.client. + Table(constants.TableWorker). + Where("status = ?", status). + Find(&w). + Error + if err != nil { + return nil, err + } + + // iterate through all query results + for _, worker := range *w { + // https://golang.org/doc/faq#closures_and_goroutines + tmp := worker + + // convert query result to library type + // + // https://pkg.go.dev/github.com/go-vela/types/database#Worker.ToLibrary + workers = append(workers, tmp.ToLibrary()) + } + + return workers, nil +} diff --git a/database/worker/list_by_status_test.go b/database/worker/list_by_status_test.go new file mode 100644 index 000000000..1c38a5b13 --- /dev/null +++ b/database/worker/list_by_status_test.go @@ -0,0 +1,117 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package worker + +import ( + "reflect" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-vela/types/library" +) + +func TestWorker_Engine_GetWorkersByStatus(t *testing.T) { + // setup types + _workerOne := testWorker() + _workerOne.SetID(1) + _workerOne.SetHostname("worker_0") + _workerOne.SetAddress("localhost") + _workerOne.SetActive(true) + _workerOne.SetStatus("available") + + _workerTwo := testWorker() + _workerTwo.SetID(2) + _workerTwo.SetHostname("worker_1") + _workerTwo.SetAddress("localhost") + _workerTwo.SetActive(true) + _workerTwo.SetStatus("busy") + + _workerThree := testWorker() + _workerThree.SetID(3) + _workerThree.SetHostname("worker_2") + _workerThree.SetAddress("localhost") + _workerThree.SetActive(true) + _workerThree.SetStatus("available") + + _postgres, _mock := testPostgres(t) + defer func() { _sql, _ := _postgres.client.DB(); _sql.Close() }() + + // create expected result in mock + _rows := sqlmock.NewRows([]string{"count"}).AddRow(2) + + // ensure the mock expects the query + _mock.ExpectQuery(`SELECT count(*) FROM "workers" WHERE status = $1`).WithArgs("available").WillReturnRows(_rows) + + // create expected result in mock + _rows = sqlmock.NewRows( + []string{"id", "hostname", "address", "routes", "active", "status", "last_checked_in", "build_limit"}). + AddRow(1, "worker_0", "localhost", nil, true, "available", 0, 0). + AddRow(3, "worker_2", "localhost", nil, true, "available", 0, 0) + + // ensure the mock expects the query + _mock.ExpectQuery(`SELECT * FROM "workers" WHERE status = $1`).WithArgs("available").WillReturnRows(_rows) + + _sqlite := testSqlite(t) + defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }() + + err := _sqlite.CreateWorker(_workerOne) + if err != nil { + t.Errorf("unable to create test worker one for sqlite: %v", err) + } + + err = _sqlite.CreateWorker(_workerTwo) + if err != nil { + t.Errorf("unable to create test worker two for sqlite: %v", err) + } + + err = _sqlite.CreateWorker(_workerThree) + if err != nil { + t.Errorf("unable to create test worker three for sqlite: %v", err) + } + + // setup tests + tests := []struct { + failure bool + name string + database *engine + want []*library.Worker + }{ + { + failure: false, + name: "postgres", + database: _postgres, + want: []*library.Worker{_workerOne, _workerThree}, + }, + { + failure: false, + name: "sqlite3", + database: _sqlite, + want: []*library.Worker{_workerOne, _workerThree}, + }, + } + + // run tests + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := test.database.ListWorkersByStatus("available") + + if test.failure { + if err == nil { + t.Errorf("ListWorkersByStatus for %s should have returned err", test.name) + } + + return + } + + if err != nil { + t.Errorf("ListWorkersByStatus for %s returned err: %v", test.name, err) + } + + if !reflect.DeepEqual(got, test.want) { + t.Errorf("ListWorkersByStatus for %s is %v, want %v", test.name, got, test.want) + } + }) + } +} diff --git a/database/worker/service.go b/database/worker/service.go index b02c6f854..1dafd2210 100644 --- a/database/worker/service.go +++ b/database/worker/service.go @@ -38,6 +38,8 @@ type WorkerService interface { GetWorkerForHostname(string) (*library.Worker, error) // ListWorkers defines a function that gets a list of all workers. ListWorkers() ([]*library.Worker, error) + // ListWorkersByStatus defines a function that gets a list of all workers by specified status. + ListWorkersByStatus(string) ([]*library.Worker, error) // UpdateWorker defines a function that updates an existing worker. UpdateWorker(*library.Worker) error } diff --git a/database/worker/table.go b/database/worker/table.go index 5cecf109d..7dd2983c8 100644 --- a/database/worker/table.go +++ b/database/worker/table.go @@ -14,13 +14,17 @@ const ( CREATE TABLE IF NOT EXISTS workers ( - id SERIAL PRIMARY KEY, - hostname VARCHAR(250), - address VARCHAR(250), - routes VARCHAR(1000), - active BOOLEAN, - last_checked_in INTEGER, - build_limit INTEGER, + id SERIAL PRIMARY KEY, + hostname VARCHAR(250), + address VARCHAR(250), + routes VARCHAR(1000), + active BOOLEAN, + status VARCHAR(50), + last_status_update_at INTEGER, + running_build_ids VARCHAR(1000), + last_build_finished_at INTEGER, + last_checked_in INTEGER, + build_limit INTEGER, UNIQUE(hostname) ); ` @@ -30,13 +34,17 @@ workers ( CREATE TABLE IF NOT EXISTS workers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - hostname TEXT, - address TEXT, - routes TEXT, - active BOOLEAN, - last_checked_in INTEGER, - build_limit INTEGER, + id INTEGER PRIMARY KEY AUTOINCREMENT, + hostname TEXT, + address TEXT, + routes TEXT, + active BOOLEAN, + status VARCHAR(50), + last_status_update_at INTEGER, + running_build_ids VARCHAR(1000), + last_build_finished_at INTEGER, + last_checked_in INTEGER, + build_limit INTEGER, UNIQUE(hostname) ); ` diff --git a/database/worker/update_test.go b/database/worker/update_test.go index 88644678a..3e15cafa1 100644 --- a/database/worker/update_test.go +++ b/database/worker/update_test.go @@ -23,9 +23,9 @@ func TestWorker_Engine_UpdateWorker(t *testing.T) { // ensure the mock expects the query _mock.ExpectExec(`UPDATE "workers" -SET "hostname"=$1,"address"=$2,"routes"=$3,"active"=$4,"last_checked_in"=$5,"build_limit"=$6 -WHERE "id" = $7`). - WithArgs("worker_0", "localhost", nil, true, nil, nil, 1). +SET "hostname"=$1,"address"=$2,"routes"=$3,"active"=$4,"status"=$5,"last_status_update_at"=$6,"running_build_ids"=$7,"last_build_finished_at"=$8,"last_checked_in"=$9,"build_limit"=$10 +WHERE "id" = $11`). + WithArgs("worker_0", "localhost", nil, true, nil, nil, nil, nil, nil, nil, 1). WillReturnResult(sqlmock.NewResult(1, 1)) _sqlite := testSqlite(t) diff --git a/database/worker/worker_test.go b/database/worker/worker_test.go index 48deabe54..3e88cc26b 100644 --- a/database/worker/worker_test.go +++ b/database/worker/worker_test.go @@ -170,12 +170,16 @@ func testSqlite(t *testing.T) *engine { // Worker type with all fields set to their zero values. func testWorker() *library.Worker { return &library.Worker{ - ID: new(int64), - Hostname: new(string), - Address: new(string), - Routes: new([]string), - Active: new(bool), - BuildLimit: new(int64), - LastCheckedIn: new(int64), + ID: new(int64), + Hostname: new(string), + Address: new(string), + Routes: new([]string), + Active: new(bool), + Status: new(string), + LastStatusUpdateAt: new(int64), + RunningBuildIDs: new([]string), + LastBuildFinishedAt: new(int64), + LastCheckedIn: new(int64), + BuildLimit: new(int64), } } diff --git a/mock/server/worker.go b/mock/server/worker.go index ed79507bb..e5e8db4e8 100644 --- a/mock/server/worker.go +++ b/mock/server/worker.go @@ -28,7 +28,12 @@ const ( "large:docker" ], "active": true, - "last_checked_in": 1602612590 + "status": "available", + "last_status_update_at": 1602612590, + "running_build_ids": [], + "last_build_finished_at": 1602612590, + "last_checked_in": 1602612590, + "build_limit": 1 }` // WorkersResp represents a JSON return for one to many workers. @@ -43,7 +48,12 @@ const ( "large:docker" ], "active": true, - "last_checked_in": 1602612590 + "status": "available", + "last_status_update_at: 1602612590 + "running_build_ids": [], + "last_build_finished_at": 1602612590, + "last_checked_in": 1602612590, + "build_limit": 1 }, { "id": 2, @@ -55,7 +65,12 @@ const ( "large:docker" ], "active": true, - "last_checked_in": 1602612590 + "status": "available", + "last_status_update_at: 1602612590 + "running_build_ids": [], + "last_build_finished_at": 1602612590, + "last_checked_in": 1602612590, + "build_limit": 1 } ]` ) diff --git a/router/middleware/worker/worker_test.go b/router/middleware/worker/worker_test.go index cad19f15e..d65102189 100644 --- a/router/middleware/worker/worker_test.go +++ b/router/middleware/worker/worker_test.go @@ -42,6 +42,10 @@ func TestWorker_Establish(t *testing.T) { want.SetAddress("localhost") want.SetRoutes([]string{"foo", "bar", "baz"}) want.SetActive(true) + want.SetStatus("available") + want.SetLastStatusUpdateAt(12345) + want.SetRunningBuildIDs([]string{}) + want.SetLastBuildFinishedAt(12345) want.SetLastCheckedIn(12345) want.SetBuildLimit(0) diff --git a/router/worker.go b/router/worker.go index 7853c2a82..992d7cba6 100644 --- a/router/worker.go +++ b/router/worker.go @@ -15,8 +15,9 @@ import ( // WorkerHandlers is a function that extends the provided base router group // with the API handlers for worker functionality. // -// POST /api/v1/users +// POST /api/v1/workers // GET /api/v1/workers +// GET /api/v1/status/:status // GET /api/v1/workers/:worker // PUT /api/v1/workers/:worker // DELETE /api/v1/workers/:worker . @@ -26,6 +27,7 @@ func WorkerHandlers(base *gin.RouterGroup) { { workers.POST("", perm.MustWorker(), middleware.Payload(), api.CreateWorker) workers.GET("", api.GetWorkers) + workers.GET("/status/:status", api.GetWorkersByStatus) // Worker endpoints w := workers.Group("/:worker")