Skip to content

Commit

Permalink
feat: save check & status in a single transaction (#1886)
Browse files Browse the repository at this point in the history
* feat: save check & status in a single transaction

* feat: save all checks in one tx instead of one tx per check status

* feat: Only run those fixtures that end in _pass.yaml or _fail.yaml
  • Loading branch information
adityathebe authored May 29, 2024
1 parent a2d81cd commit ff1019a
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 48 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion fixtures/k8s/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resources:
- pod_fail.yaml
- pod_pass.yaml
- cronjob_monitor_fail.yaml
- cronjob_monitor_pass.yaml
- cronjob_monitor.yaml
- kubernetes_bundle.yaml
- kubernetes_resource_ingress_pass.yaml
- kubernetes_resource_namespace_pass.yaml
Expand Down
6 changes: 5 additions & 1 deletion pkg/api/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ func PushHandler(c echo.Context) error {
}
}
}
cache.PostgresCache.Add(data.Check, data.Status)

if _, err := cache.PostgresCache.Add(ctx, data.Check, data.Status); err != nil {
return errorResponse(c, err, http.StatusInternalServerError)
}

c.Response().WriteHeader(http.StatusCreated)
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/run_now.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func RunCanaryHandler(c echo.Context) error {
}

for _, result := range results {
_ = cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result))
if _, err := cache.PostgresCache.Add(ctx.Context, pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)); err != nil {
return errorResponse(c, err, http.StatusInternalServerError)
}

if err := canaryJobs.FormCheckRelationships(ctx.Context, result); err != nil {
ctx.Logger.Named(result.Name).Errorf("error forming check relationships: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ func webhookHandler(ctx dutyContext.Context, id, authToken string, data CheckDat

checks.ExportCheckMetrics(scrapeCtx, transformedResults)
for _, result := range transformedResults {
_ = cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result))
_, err := cache.PostgresCache.Add(ctx, pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result))
if err != nil {
return err
}
}

return nil
Expand Down
54 changes: 25 additions & 29 deletions pkg/cache/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
gocontext "context"
"encoding/json"
"fmt"
"time"

"github.com/flanksource/canary-checker/pkg"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/query"
"github.com/google/uuid"
"github.com/samber/lo"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand All @@ -29,30 +31,21 @@ func NewPostgresCache(context context.Context) *postgresCache {
}
}

func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string {
checkIDs := make([]string, 0, len(statii))

db := c.DB()
for _, status := range statii {
if status.Status {
check.Status = "healthy"
} else {
check.Status = "unhealthy"
}

checkID, err := c.AddCheckFromStatus(check, status)
if err != nil {
logger.Errorf("error persisting check with canary %s: %v", check.CanaryID, err)
} else {
checkIDs = append(checkIDs, checkID.String())
}
c.AddCheckStatus(db, check, status)
func (c *postgresCache) Add(ctx context.Context, check pkg.Check, status pkg.CheckStatus) (string, error) {
check.Status = lo.Ternary(status.Status, "healthy", "unhealthy")
checkID, err := AddCheckFromStatus(ctx, check, status)
if err != nil {
return "", fmt.Errorf("error persisting check with canary %s: %w", check.CanaryID, err)
}

if err := c.AddCheckStatus(ctx.DB(), check, status); err != nil {
return "", fmt.Errorf("error persisting check status with canary %s: %w", check.CanaryID, err)
}

return checkIDs
return checkID.String(), nil
}

func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStatus) (uuid.UUID, error) {
func AddCheckFromStatus(ctx context.Context, check pkg.Check, status pkg.CheckStatus) (uuid.UUID, error) {
if status.Check == nil {
return uuid.Nil, nil
}
Expand All @@ -61,33 +54,34 @@ func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStat
return check.ID, nil
}

return db.PersistCheck(c.DB(), check, check.CanaryID)
return db.PersistCheck(ctx.DB(), check, check.CanaryID)
}

func (c *postgresCache) AddCheckStatus(db *gorm.DB, check pkg.Check, status pkg.CheckStatus) {
func (c *postgresCache) AddCheckStatus(conn *gorm.DB, check pkg.Check, status pkg.CheckStatus) error {
jsonDetails, err := json.Marshal(status.Detail)
if err != nil {
logger.Errorf("error marshalling details: %v", err)
return fmt.Errorf("error marshalling details: %w", err)
}

checks := pkg.Checks{}
var nextRuntime *time.Time
if check.Canary != nil {
nextRuntime, _ = check.Canary.NextRuntime(time.Now())
}
if c.DB().Model(&checks).

if conn.Model(&checks).
Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}).
Where("canary_id = ? AND type = ? AND name = ?", check.CanaryID, check.Type, check.GetName()).
Updates(map[string]any{"status": check.Status, "labels": check.Labels, "last_runtime": status.Time, "next_runtime": nextRuntime}).Error != nil {
logger.Errorf("error updating check: %v", err)
return
return fmt.Errorf("error updating check: %w", err)
}

if len(checks) == 0 || checks[0].ID == uuid.Nil {
logger.Tracef("%s check not found, skipping status insert", check)
return
return nil
}
err = db.Exec(`INSERT INTO check_statuses(

err = conn.Exec(`INSERT INTO check_statuses(
check_id,
details,
duration,
Expand All @@ -112,8 +106,10 @@ func (c *postgresCache) AddCheckStatus(db *gorm.DB, check pkg.Check, status pkg.
).Error

if err != nil {
logger.Errorf("error adding check status to postgres: %v", err)
return fmt.Errorf("error adding check status to postgres: %w", err)
}

return nil
}

func (c *postgresCache) GetDetails(checkkey string, time string) interface{} {
Expand Down
52 changes: 38 additions & 14 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,18 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error {
ctx.Error(err, "error getting transformed checks")
}

var transformedChecksCreated []string
// Transformed checks have a delete strategy
// On deletion they can either be marked healthy, unhealthy or left as is
checkIDDeleteStrategyMap := make(map[string]string)

// TODO: Use ctx with object here
logPass := j.Canary.IsTrace() || j.Canary.IsDebug() || LogPass
logFail := j.Canary.IsTrace() || j.Canary.IsDebug() || LogFail
for _, result := range results {
if logPass && result.Pass || logFail && !result.Pass {
ctx.Logger.Named(result.GetName()).Infof(result.String())
}
transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result))
transformedChecksCreated = append(transformedChecksCreated, transformedChecksAdded...)
for _, checkID := range transformedChecksAdded {
checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy()
}
}

// Establish relationship with components & configs
if err := FormCheckRelationships(ctx.Context, result); err != nil {
ctx.Logger.Named(result.Name).Errorf("error forming check relationships: %v", err)
}
transformedChecksCreated, checkIDDeleteStrategyMap, err := SaveResults(ctx.Context, results)
if err != nil {
return fmt.Errorf("failed to save results: %w", err)
}

UpdateCanaryStatusAndEvent(ctx.Context, j.Canary, results)
Expand Down Expand Up @@ -155,6 +145,40 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error {
return nil
}

func SaveResults(ctx context.Context, results []*pkg.CheckResult) ([]string, map[string]string, error) {
var transformedChecksCreated []string
// Transformed checks have a delete strategy
// On deletion they can either be marked healthy, unhealthy or left as is
checkIDDeleteStrategyMap := make(map[string]string)

if len(results) == 0 {
return transformedChecksCreated, checkIDDeleteStrategyMap, nil
}

tx := ctx.DB().Begin()
if tx.Error != nil {
return nil, nil, fmt.Errorf("error starting transaction: %w", tx.Error)
}
defer tx.Rollback()

for _, result := range results {
transformedChecksAdded, err := cache.PostgresCache.Add(ctx.WithDB(tx, ctx.Pool()), pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result))
if err != nil {
return nil, nil, fmt.Errorf("error adding check to cache: %w", err)
}

transformedChecksCreated = append(transformedChecksCreated, transformedChecksAdded)
checkIDDeleteStrategyMap[transformedChecksAdded] = result.Check.GetTransformDeleteStrategy()

// Establish relationship with components & configs
if err := FormCheckRelationships(ctx.WithDB(tx, ctx.Pool()), result); err != nil {
ctx.Logger.Named(result.Name).Errorf("error forming check relationships: %v", err)
}
}

return transformedChecksCreated, checkIDDeleteStrategyMap, tx.Commit().Error
}

func logIfError(err error, description string) {
if err != nil {
logger.Errorf("%s: %v", description, err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/canary/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error {
if canary.Spec.Webhook != nil {
// Webhook checks can be persisted immediately as they do not require scheduling & running.
result := pkg.Success(canary.Spec.Webhook, *canary)
_ = cache.PostgresCache.Add(pkg.FromV1(*canary, canary.Spec.Webhook), pkg.CheckStatusFromResult(*result))
if _, err := cache.PostgresCache.Add(ctx, pkg.FromV1(*canary, canary.Spec.Webhook), pkg.CheckStatusFromResult(*result)); err != nil {
return err
}
}

var existingJob *job.Job
Expand Down
9 changes: 9 additions & 0 deletions test/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ var _ = ginkgo.Describe("Canary Checks/"+testFolder, func() {
if strings.HasPrefix(name, "_") || !strings.HasSuffix(name, ".yaml") || name == "kustomization.yaml" {
continue
}

// Only run those fixtures that end in _pass.yaml or _fail.yaml
if !strings.HasSuffix(name, "_pass.yaml") &&
!strings.HasSuffix(name, "_pass.yml") &&
!strings.HasSuffix(name, "_fail.yaml") &&
!strings.HasSuffix(name, "_fail.yml") {
continue
}

wg.Add(1)
go func() {
defer ginkgo.GinkgoRecover()
Expand Down

0 comments on commit ff1019a

Please sign in to comment.