diff --git a/fixtures/k8s/cronjob_monitor_pass.yaml b/fixtures/k8s/cronjob_monitor.yaml similarity index 100% rename from fixtures/k8s/cronjob_monitor_pass.yaml rename to fixtures/k8s/cronjob_monitor.yaml diff --git a/fixtures/k8s/kustomization.yaml b/fixtures/k8s/kustomization.yaml index fa6a0d094..f9e1224a2 100644 --- a/fixtures/k8s/kustomization.yaml +++ b/fixtures/k8s/kustomization.yaml @@ -9,7 +9,7 @@ resources: - pod_fail.yaml - pod_pass.yaml - cronjob_monitor_fail.yaml - - cronjob_monitor_pass.yaml + - cronjob_monitor.yaml - kubernetes_bundle.yaml - kubernetes_resource_ingress_pass.yaml - kubernetes_resource_namespace_pass.yaml diff --git a/pkg/api/push.go b/pkg/api/push.go index 605fa40d4..a877c3cd8 100644 --- a/pkg/api/push.go +++ b/pkg/api/push.go @@ -72,7 +72,11 @@ func PushHandler(c echo.Context) error { } } } - cache.PostgresCache.Add(data.Check, data.Status) + + if _, err := cache.PostgresCache.Add(ctx, data.Check, data.Status); err != nil { + return errorResponse(c, err, http.StatusInternalServerError) + } + c.Response().WriteHeader(http.StatusCreated) return nil } diff --git a/pkg/api/run_now.go b/pkg/api/run_now.go index 087540c0f..63306a1ea 100644 --- a/pkg/api/run_now.go +++ b/pkg/api/run_now.go @@ -59,7 +59,10 @@ func RunCanaryHandler(c echo.Context) error { } for _, result := range results { - _ = cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)) + if _, err := cache.PostgresCache.Add(ctx.Context, pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)); err != nil { + return errorResponse(c, err, http.StatusInternalServerError) + } + if err := canaryJobs.FormCheckRelationships(ctx.Context, result); err != nil { ctx.Logger.Named(result.Name).Errorf("error forming check relationships: %v", err) } diff --git a/pkg/api/webhook.go b/pkg/api/webhook.go index 87d855828..9d806fc62 100644 --- a/pkg/api/webhook.go +++ b/pkg/api/webhook.go @@ -113,7 +113,10 @@ func webhookHandler(ctx dutyContext.Context, id, authToken string, data CheckDat checks.ExportCheckMetrics(scrapeCtx, transformedResults) for _, result := range transformedResults { - _ = cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)) + _, err := cache.PostgresCache.Add(ctx, pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)) + if err != nil { + return err + } } return nil diff --git a/pkg/cache/postgres.go b/pkg/cache/postgres.go index 70ec5f73b..933ff434b 100644 --- a/pkg/cache/postgres.go +++ b/pkg/cache/postgres.go @@ -3,6 +3,7 @@ package cache import ( gocontext "context" "encoding/json" + "fmt" "time" "github.com/flanksource/canary-checker/pkg" @@ -11,6 +12,7 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/query" "github.com/google/uuid" + "github.com/samber/lo" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -29,30 +31,21 @@ func NewPostgresCache(context context.Context) *postgresCache { } } -func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string { - checkIDs := make([]string, 0, len(statii)) - - db := c.DB() - for _, status := range statii { - if status.Status { - check.Status = "healthy" - } else { - check.Status = "unhealthy" - } - - checkID, err := c.AddCheckFromStatus(check, status) - if err != nil { - logger.Errorf("error persisting check with canary %s: %v", check.CanaryID, err) - } else { - checkIDs = append(checkIDs, checkID.String()) - } - c.AddCheckStatus(db, check, status) +func (c *postgresCache) Add(ctx context.Context, check pkg.Check, status pkg.CheckStatus) (string, error) { + check.Status = lo.Ternary(status.Status, "healthy", "unhealthy") + checkID, err := AddCheckFromStatus(ctx, check, status) + if err != nil { + return "", fmt.Errorf("error persisting check with canary %s: %w", check.CanaryID, err) + } + + if err := c.AddCheckStatus(ctx.DB(), check, status); err != nil { + return "", fmt.Errorf("error persisting check status with canary %s: %w", check.CanaryID, err) } - return checkIDs + return checkID.String(), nil } -func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStatus) (uuid.UUID, error) { +func AddCheckFromStatus(ctx context.Context, check pkg.Check, status pkg.CheckStatus) (uuid.UUID, error) { if status.Check == nil { return uuid.Nil, nil } @@ -61,13 +54,13 @@ func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStat return check.ID, nil } - return db.PersistCheck(c.DB(), check, check.CanaryID) + return db.PersistCheck(ctx.DB(), check, check.CanaryID) } -func (c *postgresCache) AddCheckStatus(db *gorm.DB, check pkg.Check, status pkg.CheckStatus) { +func (c *postgresCache) AddCheckStatus(conn *gorm.DB, check pkg.Check, status pkg.CheckStatus) error { jsonDetails, err := json.Marshal(status.Detail) if err != nil { - logger.Errorf("error marshalling details: %v", err) + return fmt.Errorf("error marshalling details: %w", err) } checks := pkg.Checks{} @@ -75,19 +68,20 @@ func (c *postgresCache) AddCheckStatus(db *gorm.DB, check pkg.Check, status pkg. if check.Canary != nil { nextRuntime, _ = check.Canary.NextRuntime(time.Now()) } - if c.DB().Model(&checks). + + if conn.Model(&checks). Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}). Where("canary_id = ? AND type = ? AND name = ?", check.CanaryID, check.Type, check.GetName()). Updates(map[string]any{"status": check.Status, "labels": check.Labels, "last_runtime": status.Time, "next_runtime": nextRuntime}).Error != nil { - logger.Errorf("error updating check: %v", err) - return + return fmt.Errorf("error updating check: %w", err) } if len(checks) == 0 || checks[0].ID == uuid.Nil { logger.Tracef("%s check not found, skipping status insert", check) - return + return nil } - err = db.Exec(`INSERT INTO check_statuses( + + err = conn.Exec(`INSERT INTO check_statuses( check_id, details, duration, @@ -112,8 +106,10 @@ func (c *postgresCache) AddCheckStatus(db *gorm.DB, check pkg.Check, status pkg. ).Error if err != nil { - logger.Errorf("error adding check status to postgres: %v", err) + return fmt.Errorf("error adding check status to postgres: %w", err) } + + return nil } func (c *postgresCache) GetDetails(checkkey string, time string) interface{} { diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index fd12afb28..deee65c8f 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -101,11 +101,6 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { ctx.Error(err, "error getting transformed checks") } - var transformedChecksCreated []string - // Transformed checks have a delete strategy - // On deletion they can either be marked healthy, unhealthy or left as is - checkIDDeleteStrategyMap := make(map[string]string) - // TODO: Use ctx with object here logPass := j.Canary.IsTrace() || j.Canary.IsDebug() || LogPass logFail := j.Canary.IsTrace() || j.Canary.IsDebug() || LogFail @@ -113,16 +108,11 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { if logPass && result.Pass || logFail && !result.Pass { ctx.Logger.Named(result.GetName()).Infof(result.String()) } - transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)) - transformedChecksCreated = append(transformedChecksCreated, transformedChecksAdded...) - for _, checkID := range transformedChecksAdded { - checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() - } + } - // Establish relationship with components & configs - if err := FormCheckRelationships(ctx.Context, result); err != nil { - ctx.Logger.Named(result.Name).Errorf("error forming check relationships: %v", err) - } + transformedChecksCreated, checkIDDeleteStrategyMap, err := SaveResults(ctx.Context, results) + if err != nil { + return fmt.Errorf("failed to save results: %w", err) } UpdateCanaryStatusAndEvent(ctx.Context, j.Canary, results) @@ -155,6 +145,40 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { return nil } +func SaveResults(ctx context.Context, results []*pkg.CheckResult) ([]string, map[string]string, error) { + var transformedChecksCreated []string + // Transformed checks have a delete strategy + // On deletion they can either be marked healthy, unhealthy or left as is + checkIDDeleteStrategyMap := make(map[string]string) + + if len(results) == 0 { + return transformedChecksCreated, checkIDDeleteStrategyMap, nil + } + + tx := ctx.DB().Begin() + if tx.Error != nil { + return nil, nil, fmt.Errorf("error starting transaction: %w", tx.Error) + } + defer tx.Rollback() + + for _, result := range results { + transformedChecksAdded, err := cache.PostgresCache.Add(ctx.WithDB(tx, ctx.Pool()), pkg.FromV1(result.Canary, result.Check), pkg.CheckStatusFromResult(*result)) + if err != nil { + return nil, nil, fmt.Errorf("error adding check to cache: %w", err) + } + + transformedChecksCreated = append(transformedChecksCreated, transformedChecksAdded) + checkIDDeleteStrategyMap[transformedChecksAdded] = result.Check.GetTransformDeleteStrategy() + + // Establish relationship with components & configs + if err := FormCheckRelationships(ctx.WithDB(tx, ctx.Pool()), result); err != nil { + ctx.Logger.Named(result.Name).Errorf("error forming check relationships: %v", err) + } + } + + return transformedChecksCreated, checkIDDeleteStrategyMap, tx.Commit().Error +} + func logIfError(err error, description string) { if err != nil { logger.Errorf("%s: %v", description, err) diff --git a/pkg/jobs/canary/sync.go b/pkg/jobs/canary/sync.go index 08339961e..d4f8799fd 100644 --- a/pkg/jobs/canary/sync.go +++ b/pkg/jobs/canary/sync.go @@ -78,7 +78,9 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { if canary.Spec.Webhook != nil { // Webhook checks can be persisted immediately as they do not require scheduling & running. result := pkg.Success(canary.Spec.Webhook, *canary) - _ = cache.PostgresCache.Add(pkg.FromV1(*canary, canary.Spec.Webhook), pkg.CheckStatusFromResult(*result)) + if _, err := cache.PostgresCache.Add(ctx, pkg.FromV1(*canary, canary.Spec.Webhook), pkg.CheckStatusFromResult(*result)); err != nil { + return err + } } var existingJob *job.Job diff --git a/test/run_test.go b/test/run_test.go index 42e7d98c5..1fe9aec6e 100644 --- a/test/run_test.go +++ b/test/run_test.go @@ -59,6 +59,15 @@ var _ = ginkgo.Describe("Canary Checks/"+testFolder, func() { if strings.HasPrefix(name, "_") || !strings.HasSuffix(name, ".yaml") || name == "kustomization.yaml" { continue } + + // Only run those fixtures that end in _pass.yaml or _fail.yaml + if !strings.HasSuffix(name, "_pass.yaml") && + !strings.HasSuffix(name, "_pass.yml") && + !strings.HasSuffix(name, "_fail.yaml") && + !strings.HasSuffix(name, "_fail.yml") { + continue + } + wg.Add(1) go func() { defer ginkgo.GinkgoRecover()