Skip to content

Commit

Permalink
feat: change retention
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Dec 1, 2023
1 parent b13a4df commit a55fd75
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 5 deletions.
11 changes: 11 additions & 0 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ var AllScraperConfigs = map[string]any{
"trivy": Trivy{},
}

type ChangeRetentionSpec struct {
Name string `json:"name,omitempty"`
Age string `json:"age,omitempty"`
Count int `json:"count,omitempty"`
}

type RetentionSpec struct {
Changes []ChangeRetentionSpec `json:"changes,omitempty"`
}

// ScraperSpec defines the desired state of Config scraper
type ScraperSpec struct {
LogLevel string `json:"logLevel,omitempty"`
Expand All @@ -35,6 +45,7 @@ type ScraperSpec struct {
Azure []Azure `json:"azure,omitempty" yaml:"azure,omitempty"`
SQL []SQL `json:"sql,omitempty" yaml:"sql,omitempty"`
Trivy []Trivy `json:"trivy,omitempty" yaml:"trivy,omitempty"`
Retention RetentionSpec `json:"retention,omitempty"`

// Full flag when set will try to extract out changes from the scraped config.
Full bool `json:"full,omitempty"`
Expand Down
36 changes: 36 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions chart/crds/configs.flanksource.com_scrapeconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,20 @@ spec:
type: array
logLevel:
type: string
retention:
properties:
changes:
items:
properties:
age:
type: string
count:
type: integer
name:
type: string
type: object
type: array
type: object
schedule:
type: string
sql:
Expand Down
2 changes: 1 addition & 1 deletion config/schemas/scrape_config.schema.json

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions scrapers/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package scrapers

import (
"fmt"

"github.com/flanksource/commons/duration"
"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty/context"
"github.com/google/uuid"
)

func ProcessChangeRetention(ctx context.Context, scraperID uuid.UUID, spec v1.ChangeRetentionSpec) error {
age, err := duration.ParseDuration(spec.Age)
if err != nil {
return fmt.Errorf("error parsing age %s as duration: %w", spec.Age, err)
}
ageMinutes := int(age.Minutes())

query := `
WITH latest_config_changes AS (
SELECT id, change_type, created_at, ROW_NUMBER() OVER(ORDER BY created_at DESC) AS seq
FROM config_changes
WHERE
change_type = ? AND
config_id IN (SELECT id FROM config_items WHERE scraper_id = ?) AND
((now()- created_at) < interval '1 minute' * ?)
)
DELETE FROM config_changes
WHERE id IN (
SELECT id from latest_config_changes WHERE seq > ?
)
`

//query = `
//UPDATE config_changes
//SET deleted_at = NOW()
//WHERE
//change_type = ? AND
//config_id IN (SELECT id FROM config_item WHERE scraper_id = ?) AND
//((NOW() - created_at > INTERVAL '1 minute' * ?)) AND
//deleted_at IS NULL
//`

result := ctx.DB().Exec(query, spec.Name, scraperID, ageMinutes, spec.Count)
if err := result.Error; err != nil {
return fmt.Errorf("error retaining config changes: %w", err)
}

if result.RowsAffected > 0 {
logger.Infof("Marked %d config_changes as deleted", result.RowsAffected)
}

return nil
}
16 changes: 12 additions & 4 deletions scrapers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scrapers
import (
"fmt"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
Expand All @@ -20,14 +21,21 @@ func RunScraper(ctx api.ScrapeContext) (v1.ScrapeResults, error) {
return nil, fmt.Errorf("failed to update db: %w", dbErr)
}

// If error in any of the scrape results, don't delete old items
if len(results) > 0 && !v1.ScrapeResults(results).HasErr() {
persistedID := ctx.ScrapeConfig().GetPersistedID()
if persistedID != nil {
persistedID := ctx.ScrapeConfig().GetPersistedID()
if persistedID != nil {
// If error in any of the scrape results, don't delete old items
if len(results) > 0 && !v1.ScrapeResults(results).HasErr() {
if err := DeleteStaleConfigItems(ctx.DutyContext(), *persistedID); err != nil {
return nil, fmt.Errorf("error deleting stale config items: %w", err)
}
}

// Process change retention
for _, change := range ctx.ScrapeConfig().Spec.Retention.Changes {
if err := ProcessChangeRetention(ctx.DutyContext(), *persistedID, change); err != nil {
logger.Errorf("Error processing change retention rules: %v", err)
}
}
}

return results, nil
Expand Down
66 changes: 66 additions & 0 deletions scrapers/runscrapers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
dutymodels "github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -218,6 +222,68 @@ var _ = Describe("Scrapers test", Ordered, func() {

Expect(configItem, storedConfigItem)
})

It("should retain config changes as per the spec", func() {

dummyScraper := dutymodels.ConfigScraper{
Name: "Test",
Spec: `{"foo":"bar"}`,
Source: dutymodels.SourceConfigFile,
}
err := db.DefaultDB().Create(&dummyScraper).Error
Expect(err).To(BeNil())

ciID := uuid.New()
dummyCI := models.ConfigItem{
ID: ciID.String(),
ConfigClass: "Test",
ScraperID: &dummyScraper.ID,
}
err = db.DefaultDB().Create(&dummyCI).Error
Expect(err).To(BeNil())

configItemID := dummyCI.ID

twoDaysAgo := time.Now().Add(-2 * 24 * time.Hour)
fiveDaysAgo := time.Now().Add(-5 * 24 * time.Hour)
configChanges := []models.ConfigChange{
{ConfigID: configItemID, ChangeType: "TestDiff"},
{ConfigID: configItemID, ChangeType: "TestDiff"},
{ConfigID: configItemID, ChangeType: "TestDiff"},
{ConfigID: configItemID, ChangeType: "TestDiff"},
{ConfigID: configItemID, ChangeType: "TestDiff"},
{ConfigID: configItemID, ChangeType: "TestDiff"},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo},
}
err = db.DefaultDB().Create(&configChanges).Error
Expect(err).To(BeNil())

var currentCount int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&currentCount).
Error
Expect(err).To(BeNil())
Expect(currentCount, len(configChanges))

ctx := context.NewContext(gocontext.Background()).WithDB(db.DefaultDB(), db.Pool)
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Age: "3d", Count: 10})
Expect(err).To(BeNil())

var newCount int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&newCount).
Error
Expect(err).To(BeNil())
Expect(newCount, 8)
})
})
})

Expand Down

0 comments on commit a55fd75

Please sign in to comment.