Skip to content

Commit

Permalink
feat: change retention (#324)
Browse files Browse the repository at this point in the history
* feat: change retention

* chore: clean up

* fix: config retention tests and flow

* chore: change config retention to a job
  • Loading branch information
yashmehrotra authored Dec 5, 2023
1 parent 72e711a commit 7fd0755
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 5 deletions.
11 changes: 11 additions & 0 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ var AllScraperConfigs = map[string]any{
"trivy": Trivy{},
}

type ChangeRetentionSpec struct {
Name string `json:"name,omitempty"`
Age string `json:"age,omitempty"`
Count int `json:"count,omitempty"`
}

type RetentionSpec struct {
Changes []ChangeRetentionSpec `json:"changes,omitempty"`
}

// ScraperSpec defines the desired state of Config scraper
type ScraperSpec struct {
LogLevel string `json:"logLevel,omitempty"`
Expand All @@ -35,6 +45,7 @@ type ScraperSpec struct {
Azure []Azure `json:"azure,omitempty" yaml:"azure,omitempty"`
SQL []SQL `json:"sql,omitempty" yaml:"sql,omitempty"`
Trivy []Trivy `json:"trivy,omitempty" yaml:"trivy,omitempty"`
Retention RetentionSpec `json:"retention,omitempty"`

// Full flag when set will try to extract out changes from the scraped config.
Full bool `json:"full,omitempty"`
Expand Down
36 changes: 36 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions chart/crds/configs.flanksource.com_scrapeconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,20 @@ spec:
type: array
logLevel:
type: string
retention:
properties:
changes:
items:
properties:
age:
type: string
count:
type: integer
name:
type: string
type: object
type: array
type: object
schedule:
type: string
sql:
Expand Down
2 changes: 1 addition & 1 deletion config/schemas/scrape_config.schema.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ScheduleJobs() {
scheduleFunc("@every 24h", DeleteOldConfigChanges)
scheduleFunc("@every 24h", DeleteOldConfigAnalysis)
scheduleFunc("@every 24h", CleanupConfigItems)
scheduleFunc("@every 1h", ProcessChangeRetentionRules)

if api.UpstreamConfig.Valid() {
pullJob := &UpstreamPullJob{}
Expand Down
48 changes: 48 additions & 0 deletions jobs/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jobs

import (
gocontext "context"
"encoding/json"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/scrapers"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
)

func ProcessChangeRetentionRules() {
ctx := context.NewContext(gocontext.Background()).WithDB(db.DefaultDB(), db.Pool)
jobHistory := models.NewJobHistory("ProcessChangeRetentionRules", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() {
_ = db.PersistJobHistory(jobHistory.End())
}()

var activeScrapers []models.ConfigScraper
if err := ctx.DB().Where("deleted_at IS NULL").Find(&activeScrapers).Error; err != nil {
logger.Errorf("Error querying config scrapers from db: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, s := range activeScrapers {
var spec v1.ScraperSpec
if err := json.Unmarshal([]byte(s.Spec), &spec); err != nil {
logger.Errorf("Error unmarshaling config scraper[%s] into json: %v", s.ID, err)
jobHistory.AddError(err.Error())
continue
}

for _, changeSpec := range spec.Retention.Changes {
err := scrapers.ProcessChangeRetention(ctx, s.ID, changeSpec)
if err != nil {
logger.Errorf("Error processing change retention for scraper[%s] config analysis: %v", s.ID, err)
jobHistory.AddError(err.Error())
} else {
jobHistory.IncrSuccess()
}
}
}
}
66 changes: 66 additions & 0 deletions scrapers/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package scrapers

import (
"database/sql"
"fmt"
"strings"

"github.com/flanksource/commons/duration"
"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty/context"
"github.com/google/uuid"
)

func ProcessChangeRetention(ctx context.Context, scraperID uuid.UUID, spec v1.ChangeRetentionSpec) error {
var whereClauses []string

var ageMinutes int
if spec.Age != "" {
age, err := duration.ParseDuration(spec.Age)
if err != nil {
return fmt.Errorf("error parsing age %s as duration: %w", spec.Age, err)
}
ageMinutes = int(age.Minutes())

whereClauses = append(whereClauses, `((now()- created_at) > interval '1 minute' * @ageMinutes)`)
}

if spec.Count > 0 {
whereClauses = append(whereClauses, `seq > @count`)
}

if len(whereClauses) == 0 {
return fmt.Errorf("both age and count cannot be empty")
}

query := fmt.Sprintf(`
WITH latest_config_changes AS (
SELECT id, change_type, created_at, ROW_NUMBER() OVER(ORDER BY created_at DESC) AS seq
FROM config_changes
WHERE
change_type = @changeType AND
config_id IN (SELECT id FROM config_items WHERE scraper_id = @scraperID)
)
DELETE FROM config_changes
WHERE id IN (
SELECT id from latest_config_changes
WHERE %s
)
`, strings.Join(whereClauses, " OR "))

result := ctx.DB().Exec(query,
sql.Named("changeType", spec.Name),
sql.Named("scraperID", scraperID),
sql.Named("ageMinutes", ageMinutes),
sql.Named("count", spec.Count),
)
if err := result.Error; err != nil {
return fmt.Errorf("error retaining config changes: %w", err)
}

if result.RowsAffected > 0 {
logger.Infof("Deleted %d config_changes as per ChangeRetentionSpec[%s]", result.RowsAffected, spec.Name)
}
return nil
}
8 changes: 4 additions & 4 deletions scrapers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func RunScraper(ctx api.ScrapeContext) (v1.ScrapeResults, error) {
return nil, fmt.Errorf("failed to update db: %w", dbErr)
}

// If error in any of the scrape results, don't delete old items
if len(results) > 0 && !v1.ScrapeResults(results).HasErr() {
persistedID := ctx.ScrapeConfig().GetPersistedID()
if persistedID != nil {
persistedID := ctx.ScrapeConfig().GetPersistedID()
if persistedID != nil {
// If error in any of the scrape results, don't delete old items
if len(results) > 0 && !v1.ScrapeResults(results).HasErr() {
if err := DeleteStaleConfigItems(ctx.DutyContext(), *persistedID); err != nil {
return nil, fmt.Errorf("error deleting stale config items: %w", err)
}
Expand Down
96 changes: 96 additions & 0 deletions scrapers/runscrapers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
dutymodels "github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -218,6 +222,98 @@ var _ = Describe("Scrapers test", Ordered, func() {

Expect(configItem, storedConfigItem)
})

It("should retain config changes as per the spec", func() {
dummyScraper := dutymodels.ConfigScraper{
Name: "Test",
Spec: `{"foo":"bar"}`,
Source: dutymodels.SourceConfigFile,
}
err := db.DefaultDB().Create(&dummyScraper).Error
Expect(err).To(BeNil())

configItemID := uuid.New().String()
dummyCI := models.ConfigItem{
ID: configItemID,
ConfigClass: "Test",
ScraperID: &dummyScraper.ID,
}
err = db.DefaultDB().Create(&dummyCI).Error
Expect(err).To(BeNil())

twoDaysAgo := time.Now().Add(-2 * 24 * time.Hour)
fiveDaysAgo := time.Now().Add(-5 * 24 * time.Hour)
tenDaysAgo := time.Now().Add(-10 * 24 * time.Hour)
configChanges := []models.ConfigChange{
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &tenDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &tenDaysAgo, ExternalChangeId: uuid.New().String()},
}

err = db.DefaultDB().Table("config_changes").Create(&configChanges).Error
Expect(err).To(BeNil())

var currentCount int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&currentCount).
Error
Expect(err).To(BeNil())
Expect(currentCount).To(Equal(len(configChanges)))

ctx := context.NewContext(gocontext.Background()).WithDB(db.DefaultDB(), db.Pool)

// Everything older than 8 days should be removed
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Age: "8d"})
Expect(err).To(BeNil())
var count1 int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&count1).
Error
Expect(err).To(BeNil())
Expect(count1).To(Equal(15))

// Only keep latest 12 config changes
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Count: 12})
Expect(err).To(BeNil())
var count2 int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&count2).
Error
Expect(err).To(BeNil())
Expect(count2).To(Equal(12))

// Keep config changes which are newer than 3 days and max count can be 10
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Age: "3d", Count: 10})
Expect(err).To(BeNil())
var count3 int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&count3).
Error
Expect(err).To(BeNil())
Expect(count3).To(Equal(9))

// No params in ChangeRetentionSpec should fail
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff"})
Expect(err).ToNot(BeNil())
})
})
})

Expand Down

0 comments on commit 7fd0755

Please sign in to comment.