Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add context to schedule functions #898

Merged
merged 8 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions api/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func CreateSchedule(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
r := repo.Retrieve(c)
ctx := c.Request.Context()
allowlist := c.Value("allowlistschedule").([]string)
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)

Expand Down Expand Up @@ -145,7 +146,7 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the schedule from the database
dbSchedule, err := database.FromContext(c).GetScheduleForRepo(r, input.GetName())
dbSchedule, err := database.FromContext(c).GetScheduleForRepo(ctx, r, input.GetName())
if err == nil && dbSchedule.GetActive() {
retErr := fmt.Errorf("unable to create schedule: %s is already active", input.GetName())

Expand All @@ -170,7 +171,7 @@ func CreateSchedule(c *gin.Context) {
dbSchedule.SetActive(true)

// send API call to update the schedule
err = database.FromContext(c).UpdateSchedule(dbSchedule, true)
err = database.FromContext(c).UpdateSchedule(ctx, dbSchedule, true)
if err != nil {
retErr := fmt.Errorf("unable to set schedule %s to active: %w", dbSchedule.GetName(), err)

Expand All @@ -180,10 +181,10 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the updated schedule
s, _ = database.FromContext(c).GetScheduleForRepo(r, dbSchedule.GetName())
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, dbSchedule.GetName())
} else {
// send API call to create the schedule
err = database.FromContext(c).CreateSchedule(s)
err = database.FromContext(c).CreateSchedule(ctx, s)
if err != nil {
retErr := fmt.Errorf("unable to create new schedule %s: %w", r.GetName(), err)

Expand All @@ -193,7 +194,7 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the created schedule
s, _ = database.FromContext(c).GetScheduleForRepo(r, input.GetName())
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, input.GetName())
}

c.JSON(http.StatusCreated, s)
Expand Down
3 changes: 2 additions & 1 deletion api/schedule/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func DeleteSchedule(c *gin.Context) {
r := repo.Retrieve(c)
u := user.Retrieve(c)
s := schedule.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -75,7 +76,7 @@ func DeleteSchedule(c *gin.Context) {
"user": u.GetName(),
}).Infof("deleting schedule %s", s.GetName())

err := database.FromContext(c).DeleteSchedule(s)
err := database.FromContext(c).DeleteSchedule(ctx, s)
if err != nil {
retErr := fmt.Errorf("unable to delete schedule %s: %w", s.GetName(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/schedule/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
func ListSchedules(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand Down Expand Up @@ -109,7 +110,7 @@ func ListSchedules(c *gin.Context) {
perPage = util.MaxInt(1, util.MinInt(100, perPage))

// send API call to capture the list of schedules for the repo
s, t, err := database.FromContext(c).ListSchedulesForRepo(r, page, perPage)
s, t, err := database.FromContext(c).ListSchedulesForRepo(ctx, r, page, perPage)
if err != nil {
retErr := fmt.Errorf("unable to get schedules for repo %s: %w", r.GetFullName(), err)

Expand Down
5 changes: 3 additions & 2 deletions api/schedule/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func UpdateSchedule(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
s := schedule.Retrieve(c)
ctx := c.Request.Context()
u := user.Retrieve(c)
scheduleName := util.PathParameter(c, "schedule")
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)
Expand Down Expand Up @@ -128,7 +129,7 @@ func UpdateSchedule(c *gin.Context) {
s.SetUpdatedBy(u.GetName())

// update the schedule within the database
err = database.FromContext(c).UpdateSchedule(s, true)
err = database.FromContext(c).UpdateSchedule(ctx, s, true)
if err != nil {
retErr := fmt.Errorf("unable to update scheduled %s: %w", scheduleName, err)

Expand All @@ -138,7 +139,7 @@ func UpdateSchedule(c *gin.Context) {
}

// capture the updated scheduled
s, _ = database.FromContext(c).GetScheduleForRepo(r, scheduleName)
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, scheduleName)

c.JSON(http.StatusOK, s)
}
13 changes: 7 additions & 6 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -30,11 +31,11 @@ const (
scheduleWait = "waiting to trigger build for schedule"
)

func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedules(ctx context.Context, start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
schedules, err := database.ListActiveSchedules()
schedules, err := database.ListActiveSchedules(ctx)
if err != nil {
return err
}
Expand All @@ -53,7 +54,7 @@ func processSchedules(start time.Time, compiler compiler.Engine, database databa
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
schedule, err := database.GetSchedule(ctx, s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

Expand Down Expand Up @@ -115,15 +116,15 @@ func processSchedules(start time.Time, compiler compiler.Engine, database databa
schedule.SetScheduledAt(time.Now().UTC().Unix())

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(schedule, false)
err = database.UpdateSchedule(ctx, schedule, false)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// process the schedule and trigger a new build
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
err = processSchedule(ctx, schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

Expand All @@ -135,7 +136,7 @@ func processSchedules(start time.Time, compiler compiler.Engine, database databa
}

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedule(ctx context.Context, s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// send API call to capture the repo for the schedule
r, err := database.GetRepo(s.GetRepoID())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func server(c *cli.Context) error {
// sleep for a duration of time before processing schedules
time.Sleep(jitter)

err = processSchedules(start, compiler, database, metadata, queue, scm)
err = processSchedules(ctx, start, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warn("unable to process schedules")
} else {
Expand Down
6 changes: 5 additions & 1 deletion database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -54,6 +55,8 @@ type (
client *gorm.DB
// engine configuration settings used in database functions
config *config
// engine context used in database functions
ctx context.Context
// sirupsen/logrus logger used in database functions
logger *logrus.Entry

Expand Down Expand Up @@ -85,6 +88,7 @@ func New(opts ...EngineOpt) (Interface, error) {
e.client = new(gorm.DB)
e.config = new(config)
e.logger = new(logrus.Entry)
e.ctx = context.TODO()

// apply all provided configuration options
for _, opt := range opts {
Expand Down Expand Up @@ -143,7 +147,7 @@ func New(opts ...EngineOpt) (Interface, error) {
}

// create database agnostic engines for resources
err = e.NewResources()
err = e.NewResources(e.ctx)
if err != nil {
return nil, err
}
Expand Down
23 changes: 13 additions & 10 deletions database/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -923,17 +924,19 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods[element.Method(i).Name] = false
}

ctx := context.TODO()

// create the schedules
for _, schedule := range resources.Schedules {
err := db.CreateSchedule(schedule)
err := db.CreateSchedule(ctx, schedule)
if err != nil {
t.Errorf("unable to create schedule %d: %v", schedule.GetID(), err)
}
}
methods["CreateSchedule"] = true

// count the schedules
count, err := db.CountSchedules()
count, err := db.CountSchedules(ctx)
if err != nil {
t.Errorf("unable to count schedules: %v", err)
}
Expand All @@ -943,7 +946,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["CountSchedules"] = true

// count the schedules for a repo
count, err = db.CountSchedulesForRepo(resources.Repos[0])
count, err = db.CountSchedulesForRepo(ctx, resources.Repos[0])
if err != nil {
t.Errorf("unable to count schedules for repo %d: %v", resources.Repos[0].GetID(), err)
}
Expand All @@ -953,7 +956,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["CountSchedulesForRepo"] = true

// list the schedules
list, err := db.ListSchedules()
list, err := db.ListSchedules(ctx)
if err != nil {
t.Errorf("unable to list schedules: %v", err)
}
Expand All @@ -963,7 +966,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["ListSchedules"] = true

// list the active schedules
list, err = db.ListActiveSchedules()
list, err = db.ListActiveSchedules(ctx)
if err != nil {
t.Errorf("unable to list schedules: %v", err)
}
Expand All @@ -973,7 +976,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
methods["ListActiveSchedules"] = true

// list the schedules for a repo
list, count, err = db.ListSchedulesForRepo(resources.Repos[0], 1, 10)
list, count, err = db.ListSchedulesForRepo(ctx, resources.Repos[0], 1, 10)
if err != nil {
t.Errorf("unable to count schedules for repo %d: %v", resources.Repos[0].GetID(), err)
}
Expand All @@ -988,7 +991,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
// lookup the schedules by name
for _, schedule := range resources.Schedules {
repo := resources.Repos[schedule.GetRepoID()-1]
got, err := db.GetScheduleForRepo(repo, schedule.GetName())
got, err := db.GetScheduleForRepo(ctx, repo, schedule.GetName())
if err != nil {
t.Errorf("unable to get schedule %d for repo %d: %v", schedule.GetID(), repo.GetID(), err)
}
Expand All @@ -1001,13 +1004,13 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {
// update the schedules
for _, schedule := range resources.Schedules {
schedule.SetUpdatedAt(time.Now().UTC().Unix())
err = db.UpdateSchedule(schedule, true)
err = db.UpdateSchedule(ctx, schedule, true)
if err != nil {
t.Errorf("unable to update schedule %d: %v", schedule.GetID(), err)
}

// lookup the schedule by ID
got, err := db.GetSchedule(schedule.GetID())
got, err := db.GetSchedule(ctx, schedule.GetID())
if err != nil {
t.Errorf("unable to get schedule %d by ID: %v", schedule.GetID(), err)
}
Expand All @@ -1020,7 +1023,7 @@ func testSchedules(t *testing.T, db Interface, resources *Resources) {

// delete the schedules
for _, schedule := range resources.Schedules {
err = db.DeleteSchedule(schedule)
err = db.DeleteSchedule(ctx, schedule)
if err != nil {
t.Errorf("unable to delete schedule %d: %v", schedule.GetID(), err)
}
Expand Down
14 changes: 13 additions & 1 deletion database/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package database

import "time"
import (
"context"
"time"
)

// EngineOpt represents a configuration option to initialize the database engine.
type EngineOpt func(*engine) error
Expand Down Expand Up @@ -88,3 +91,12 @@ func WithSkipCreation(skipCreation bool) EngineOpt {
return nil
}
}

// WithContext sets the context in the database engine.
func WithContext(ctx context.Context) EngineOpt {
return func(e *engine) error {
e.ctx = ctx

return nil
}
}
4 changes: 3 additions & 1 deletion database/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"github.com/go-vela/server/database/build"
"github.com/go-vela/server/database/hook"
"github.com/go-vela/server/database/log"
Expand All @@ -19,7 +20,7 @@ import (
)

// NewResources creates and returns the database agnostic engines for resources.
func (e *engine) NewResources() error {
func (e *engine) NewResources(ctx context.Context) error {
var err error

// create the database agnostic engine for builds
Expand Down Expand Up @@ -77,6 +78,7 @@ func (e *engine) NewResources() error {

// create the database agnostic engine for schedules
e.ScheduleInterface, err = schedule.New(
schedule.WithContext(e.ctx),
schedule.WithClient(e.client),
schedule.WithLogger(e.logger),
schedule.WithSkipCreation(e.config.SkipCreation),
Expand Down
3 changes: 2 additions & 1 deletion database/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestDatabase_Engine_NewResources(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.database.NewResources()
err := test.database.NewResources(context.TODO())

if test.failure {
if err == nil {
Expand Down
3 changes: 2 additions & 1 deletion database/schedule/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package schedule

import (
"context"
"github.com/go-vela/types/constants"
)

// CountSchedules gets the count of all schedules from the database.
func (e *engine) CountSchedules() (int64, error) {
func (e *engine) CountSchedules(ctx context.Context) (int64, error) {
e.logger.Tracef("getting count of all schedules from the database")

// variable to store query results
Expand Down
Loading