diff --git a/jobs/jobs.go b/jobs/jobs.go index 36ad071c..aae6e134 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -27,6 +27,7 @@ func ScheduleJobs() { scheduleFunc("@every 24h", DeleteOldConfigChanges) scheduleFunc("@every 24h", DeleteOldConfigAnalysis) scheduleFunc("@every 24h", CleanupConfigItems) + scheduleFunc("@every 1h", ProcessChangeRetentionRules) if api.UpstreamConfig.Valid() { pullJob := &UpstreamPullJob{} diff --git a/jobs/retention.go b/jobs/retention.go new file mode 100644 index 00000000..8593a3db --- /dev/null +++ b/jobs/retention.go @@ -0,0 +1,48 @@ +package jobs + +import ( + gocontext "context" + "encoding/json" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/config-db/db" + "github.com/flanksource/config-db/scrapers" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" +) + +func ProcessChangeRetentionRules() { + ctx := context.NewContext(gocontext.Background()).WithDB(db.DefaultDB(), db.Pool) + jobHistory := models.NewJobHistory("ProcessChangeRetentionRules", "", "").Start() + _ = db.PersistJobHistory(jobHistory) + defer func() { + _ = db.PersistJobHistory(jobHistory.End()) + }() + + var activeScrapers []models.ConfigScraper + if err := ctx.DB().Where("deleted_at IS NULL").Find(&activeScrapers).Error; err != nil { + logger.Errorf("Error querying config scrapers from db: %v", err) + jobHistory.AddError(err.Error()) + return + } + + for _, s := range activeScrapers { + var spec v1.ScraperSpec + if err := json.Unmarshal([]byte(s.Spec), &spec); err != nil { + logger.Errorf("Error unmarshaling config scraper[%s] into json: %v", s.ID, err) + jobHistory.AddError(err.Error()) + continue + } + + for _, changeSpec := range spec.Retention.Changes { + err := scrapers.ProcessChangeRetention(ctx, s.ID, changeSpec) + if err != nil { + logger.Errorf("Error processing change retention for scraper[%s] config analysis: %v", s.ID, err) + jobHistory.AddError(err.Error()) + } else { + jobHistory.IncrSuccess() + } + } + } +} diff --git a/scrapers/run.go b/scrapers/run.go index bd1df9b4..e27b733c 100644 --- a/scrapers/run.go +++ b/scrapers/run.go @@ -3,7 +3,6 @@ package scrapers import ( "fmt" - "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/config-db/db" @@ -29,13 +28,6 @@ func RunScraper(ctx api.ScrapeContext) (v1.ScrapeResults, error) { return nil, fmt.Errorf("error deleting stale config items: %w", err) } } - - // Process change retention - for _, change := range ctx.ScrapeConfig().Spec.Retention.Changes { - if err := ProcessChangeRetention(ctx.DutyContext(), *persistedID, change); err != nil { - logger.Errorf("Error processing change retention rules: %v", err) - } - } } return results, nil