Skip to content

Commit

Permalink
chore: add context to schedule functions (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanSussman authored Aug 9, 2023
1 parent b3084f9 commit 516d4fe
Show file tree
Hide file tree
Showing 42 changed files with 167 additions and 92 deletions.
11 changes: 6 additions & 5 deletions api/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func CreateSchedule(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
r := repo.Retrieve(c)
ctx := c.Request.Context()
allowlist := c.Value("allowlistschedule").([]string)
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)

Expand Down Expand Up @@ -145,7 +146,7 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the schedule from the database
dbSchedule, err := database.FromContext(c).GetScheduleForRepo(r, input.GetName())
dbSchedule, err := database.FromContext(c).GetScheduleForRepo(ctx, r, input.GetName())
if err == nil && dbSchedule.GetActive() {
retErr := fmt.Errorf("unable to create schedule: %s is already active", input.GetName())

Expand All @@ -170,7 +171,7 @@ func CreateSchedule(c *gin.Context) {
dbSchedule.SetActive(true)

// send API call to update the schedule
err = database.FromContext(c).UpdateSchedule(dbSchedule, true)
err = database.FromContext(c).UpdateSchedule(ctx, dbSchedule, true)
if err != nil {
retErr := fmt.Errorf("unable to set schedule %s to active: %w", dbSchedule.GetName(), err)

Expand All @@ -180,10 +181,10 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the updated schedule
s, _ = database.FromContext(c).GetScheduleForRepo(r, dbSchedule.GetName())
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, dbSchedule.GetName())
} else {
// send API call to create the schedule
err = database.FromContext(c).CreateSchedule(s)
err = database.FromContext(c).CreateSchedule(ctx, s)
if err != nil {
retErr := fmt.Errorf("unable to create new schedule %s: %w", r.GetName(), err)

Expand All @@ -193,7 +194,7 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the created schedule
s, _ = database.FromContext(c).GetScheduleForRepo(r, input.GetName())
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, input.GetName())
}

c.JSON(http.StatusCreated, s)
Expand Down
3 changes: 2 additions & 1 deletion api/schedule/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func DeleteSchedule(c *gin.Context) {
r := repo.Retrieve(c)
u := user.Retrieve(c)
s := schedule.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -75,7 +76,7 @@ func DeleteSchedule(c *gin.Context) {
"user": u.GetName(),
}).Infof("deleting schedule %s", s.GetName())

err := database.FromContext(c).DeleteSchedule(s)
err := database.FromContext(c).DeleteSchedule(ctx, s)
if err != nil {
retErr := fmt.Errorf("unable to delete schedule %s: %w", s.GetName(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/schedule/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
func ListSchedules(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand Down Expand Up @@ -109,7 +110,7 @@ func ListSchedules(c *gin.Context) {
perPage = util.MaxInt(1, util.MinInt(100, perPage))

// send API call to capture the list of schedules for the repo
s, t, err := database.FromContext(c).ListSchedulesForRepo(r, page, perPage)
s, t, err := database.FromContext(c).ListSchedulesForRepo(ctx, r, page, perPage)
if err != nil {
retErr := fmt.Errorf("unable to get schedules for repo %s: %w", r.GetFullName(), err)

Expand Down
5 changes: 3 additions & 2 deletions api/schedule/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func UpdateSchedule(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
s := schedule.Retrieve(c)
ctx := c.Request.Context()
u := user.Retrieve(c)
scheduleName := util.PathParameter(c, "schedule")
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)
Expand Down Expand Up @@ -128,7 +129,7 @@ func UpdateSchedule(c *gin.Context) {
s.SetUpdatedBy(u.GetName())

// update the schedule within the database
err = database.FromContext(c).UpdateSchedule(s, true)
err = database.FromContext(c).UpdateSchedule(ctx, s, true)
if err != nil {
retErr := fmt.Errorf("unable to update scheduled %s: %w", scheduleName, err)

Expand All @@ -138,7 +139,7 @@ func UpdateSchedule(c *gin.Context) {
}

// capture the updated scheduled
s, _ = database.FromContext(c).GetScheduleForRepo(r, scheduleName)
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, scheduleName)

c.JSON(http.StatusOK, s)
}
13 changes: 7 additions & 6 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -30,11 +31,11 @@ const (
scheduleWait = "waiting to trigger build for schedule"
)

func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedules(ctx context.Context, start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
schedules, err := database.ListActiveSchedules()
schedules, err := database.ListActiveSchedules(ctx)
if err != nil {
return err
}
Expand All @@ -53,7 +54,7 @@ func processSchedules(start time.Time, compiler compiler.Engine, database databa
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
schedule, err := database.GetSchedule(ctx, s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

Expand Down Expand Up @@ -115,15 +116,15 @@ func processSchedules(start time.Time, compiler compiler.Engine, database databa
schedule.SetScheduledAt(time.Now().UTC().Unix())

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(schedule, false)
err = database.UpdateSchedule(ctx, schedule, false)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// process the schedule and trigger a new build
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
err = processSchedule(ctx, schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

Expand All @@ -135,7 +136,7 @@ func processSchedules(start time.Time, compiler compiler.Engine, database databa
}

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedule(ctx context.Context, s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// send API call to capture the repo for the schedule
r, err := database.GetRepo(s.GetRepoID())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func server(c *cli.Context) error {
// sleep for a duration of time before processing schedules
time.Sleep(jitter)

err = processSchedules(start, compiler, database, metadata, queue, scm)
err = processSchedules(ctx, start, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warn("unable to process schedules")
} else {
Expand Down
6 changes: 5 additions & 1 deletion database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -54,6 +55,8 @@ type (
client *gorm.DB
// engine configuration settings used in database functions
config *config
// engine context used in database functions
ctx context.Context
// sirupsen/logrus logger used in database functions
logger *logrus.Entry

Expand Down Expand Up @@ -85,6 +88,7 @@ func New(opts ...EngineOpt) (Interface, error) {
e.client = new(gorm.DB)
e.config = new(config)
e.logger = new(logrus.Entry)
e.ctx = context.TODO()

// apply all provided configuration options
for _, opt := range opts {
Expand Down Expand Up @@ -143,7 +147,7 @@ func New(opts ...EngineOpt) (Interface, error) {
}

// create database agnostic engines for resources
err = e.NewResources()
err = e.NewResources(e.ctx)
if err != nil {
return nil, err
}
Expand Down
23 changes: 13 additions & 10 deletions database/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -923,17 +924,19 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods[element.Method(i).Name] = false
}

ctx := context.TODO()

// create the schedules
for _, schedule := range resources.Schedules {
err := db.CreateSchedule(schedule)
err := db.CreateSchedule(ctx, schedule)
if err != nil {
t.Errorf("unable to create schedule %d: %v", schedule.GetID(), err)
}
}
methods["CreateSchedule"] = true

// count the schedules
count, err := db.CountSchedules()
count, err := db.CountSchedules(ctx)
if err != nil {
t.Errorf("unable to count schedules: %v", err)
}
Expand All @@ -943,7 +946,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["CountSchedules"] = true

// count the schedules for a repo
count, err = db.CountSchedulesForRepo(resources.Repos[0])
count, err = db.CountSchedulesForRepo(ctx, resources.Repos[0])
if err != nil {
t.Errorf("unable to count schedules for repo %d: %v", resources.Repos[0].GetID(), err)
}
Expand All @@ -953,7 +956,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["CountSchedulesForRepo"] = true

// list the schedules
list, err := db.ListSchedules()
list, err := db.ListSchedules(ctx)
if err != nil {
t.Errorf("unable to list schedules: %v", err)
}
Expand All @@ -963,7 +966,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["ListSchedules"] = true

// list the active schedules
list, err = db.ListActiveSchedules()
list, err = db.ListActiveSchedules(ctx)
if err != nil {
t.Errorf("unable to list schedules: %v", err)
}
Expand All @@ -973,7 +976,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["ListActiveSchedules"] = true

// list the schedules for a repo
list, count, err = db.ListSchedulesForRepo(resources.Repos[0], 1, 10)
list, count, err = db.ListSchedulesForRepo(ctx, resources.Repos[0], 1, 10)
if err != nil {
t.Errorf("unable to count schedules for repo %d: %v", resources.Repos[0].GetID(), err)
}
Expand All @@ -988,7 +991,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
// lookup the schedules by name
for _, schedule := range resources.Schedules {
repo := resources.Repos[schedule.GetRepoID()-1]
got, err := db.GetScheduleForRepo(repo, schedule.GetName())
got, err := db.GetScheduleForRepo(ctx, repo, schedule.GetName())
if err != nil {
t.Errorf("unable to get schedule %d for repo %d: %v", schedule.GetID(), repo.GetID(), err)
}
Expand All @@ -1001,13 +1004,13 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
// update the schedules
for _, schedule := range resources.Schedules {
schedule.SetUpdatedAt(time.Now().UTC().Unix())
err = db.UpdateSchedule(schedule, true)
err = db.UpdateSchedule(ctx, schedule, true)
if err != nil {
t.Errorf("unable to update schedule %d: %v", schedule.GetID(), err)
}

// lookup the schedule by ID
got, err := db.GetSchedule(schedule.GetID())
got, err := db.GetSchedule(ctx, schedule.GetID())
if err != nil {
t.Errorf("unable to get schedule %d by ID: %v", schedule.GetID(), err)
}
Expand All @@ -1020,7 +1023,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {

// delete the schedules
for _, schedule := range resources.Schedules {
err = db.DeleteSchedule(schedule)
err = db.DeleteSchedule(ctx, schedule)
if err != nil {
t.Errorf("unable to delete schedule %d: %v", schedule.GetID(), err)
}
Expand Down
14 changes: 13 additions & 1 deletion database/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package database

import "time"
import (
"context"
"time"
)

// EngineOpt represents a configuration option to initialize the database engine.
type EngineOpt func(*engine) error
Expand Down Expand Up @@ -88,3 +91,12 @@ func WithSkipCreation(skipCreation bool) EngineOpt {
return nil
}
}

// WithContext sets the context in the database engine.
func WithContext(ctx context.Context) EngineOpt {
return func(e *engine) error {
e.ctx = ctx

return nil
}
}
4 changes: 3 additions & 1 deletion database/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"github.com/go-vela/server/database/build"
"github.com/go-vela/server/database/hook"
"github.com/go-vela/server/database/log"
Expand All @@ -19,7 +20,7 @@ import (
)

// NewResources creates and returns the database agnostic engines for resources.
func (e *engine) NewResources() error {
func (e *engine) NewResources(ctx context.Context) error {
var err error

// create the database agnostic engine for builds
Expand Down Expand Up @@ -77,6 +78,7 @@ func (e *engine) NewResources() error {

// create the database agnostic engine for schedules
e.ScheduleInterface, err = schedule.New(
schedule.WithContext(e.ctx),
schedule.WithClient(e.client),
schedule.WithLogger(e.logger),
schedule.WithSkipCreation(e.config.SkipCreation),
Expand Down
3 changes: 2 additions & 1 deletion database/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestDatabase_Engine_NewResources(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.database.NewResources()
err := test.database.NewResources(context.TODO())

if test.failure {
if err == nil {
Expand Down
3 changes: 2 additions & 1 deletion database/schedule/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package schedule

import (
"context"
"github.com/go-vela/types/constants"
)

// CountSchedules gets the count of all schedules from the database.
func (e *engine) CountSchedules() (int64, error) {
func (e *engine) CountSchedules(ctx context.Context) (int64, error) {
e.logger.Tracef("getting count of all schedules from the database")

// variable to store query results
Expand Down
Loading

0 comments on commit 516d4fe

Please sign in to comment.