Skip to content

Commit

Permalink
refactor: save changes immediately and then consume the collected in
Browse files Browse the repository at this point in the history
interval
  • Loading branch information
adityathebe committed Sep 6, 2023
1 parent 7fd6d12 commit f1f76f1
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 45 deletions.
34 changes: 32 additions & 2 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/url"
"strings"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/flanksource/config-db/jobs"
"github.com/flanksource/config-db/query"
"github.com/flanksource/config-db/scrapers/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flanksource/config-db/scrapers"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -143,6 +145,34 @@ func exitOnError(err error, description string) {
}
}

func kubernetesChangeEventConsumer(ctx *v1.ScrapeContext, changes []*v1.ChangeResult) {
return
func kubernetesChangeEventConsumer(ctx *v1.ScrapeContext, changes []*kubernetes.InvolvedObject) {
resourcesPerKind := make(map[string]map[string]*kubernetes.InvolvedObject)
for _, c := range changes {
if resourcesPerKind[c.Kind] == nil {
resourcesPerKind[c.Kind] = make(map[string]*kubernetes.InvolvedObject)
}

resourcesPerKind[c.Kind][c.UID] = c
}

for kind, resources := range resourcesPerKind {
if kind != "Pod" {
continue
}

for _, resource := range resources {
if strings.Contains(resource.Name, "junit") { // Temporary
continue
}

logger.Infof("Getting pod (name=%s, namespace=%s)", resource.Name, resource.Namespace)
pod, err := ctx.Kubernetes.CoreV1().Pods(resource.Namespace).Get(ctx, resource.Name, metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get pod (name=%s): %v", resource.Name, err)
continue
}

logger.Infof("Got pod %s", pod.Name)
}
}
}
97 changes: 54 additions & 43 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,43 @@ package kubernetes
import (
"context"
"fmt"
"sync"
"time"

"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
// eventWatchTimeout is the timeout for watching events
eventWatchTimeout = time.Second * 10
// eventWatchInterval is the schedule on which new K8s resources are scraped
// from the events
eventWatchInterval = time.Second * 10
)

// maxBufferSize is the maximum number of events that can be buffered before consuming.
maxBufferSize = 50
type consumerFunc func(ctx *v1.ScrapeContext, involvedObjects []*InvolvedObject)

changesBuffer []*v1.ChangeResult
)
type eventWatcher struct {
involvedObjects []*InvolvedObject

lock *sync.Mutex
}

func WatchEvents(ctx *v1.ScrapeContext, config v1.Kubernetes, consume consumerFunc) error {
watcher := &eventWatcher{
lock: &sync.Mutex{},
}

go watcher.consumeChangeEvents(ctx, consume)

type consumerFunc func(ctx *v1.ScrapeContext, changesBuffer []*v1.ChangeResult)
return watcher.Watch(ctx, config)
}

// WatchEvents watches Kubernetes events for any config changes & fetches
// the referenced config items in batches.
func WatchEvents(ctx *v1.ScrapeContext, config v1.Kubernetes, consume consumerFunc) error {
func (t *eventWatcher) Watch(ctx *v1.ScrapeContext, config v1.Kubernetes) error {
logger.Infof("Watching kubernetes events: %v", config)

watcher, err := ctx.Kubernetes.CoreV1().Events(config.Namespace).Watch(context.TODO(), metav1.ListOptions{})
Expand All @@ -34,51 +48,48 @@ func WatchEvents(ctx *v1.ScrapeContext, config v1.Kubernetes, consume consumerFu
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
consumeChangeEvents(ctx, consume)
return nil

case <-time.After(eventWatchTimeout):
consumeChangeEvents(ctx, consume)

case watchEvent := <-watcher.ResultChan():
var event Event
if err := event.FromObjMap(watchEvent.Object); err != nil {
logger.Errorf("failed to unmarshal event (id=%s): %v", event.GetUID(), err)
continue
}

logger.Infof("New Event: reason=%s source=%s", event.Reason, event.Source)
for watchEvent := range watcher.ResultChan() {
var event Event
if err := event.FromObjMap(watchEvent.Object); err != nil {
logger.Errorf("failed to unmarshal event (id=%s): %v", event.GetUID(), err)
continue
}

if utils.MatchItems(event.Reason, config.Event.Exclusions...) {
continue
}
if event.InvolvedObject == nil {
continue
}

if !utils.MatchItems(event.Reason, config.Event.Exclusions...) {
change := getChangeFromEvent(event, config.Event.SeverityKeywords)
if change == nil {
logger.Debugf("No change detected")
continue
}

changesBuffer = append(changesBuffer, change)
if len(changesBuffer) >= maxBufferSize {
consumeChangeEvents(ctx, consume)
if change != nil {
if err := db.SaveResults(ctx, []v1.ScrapeResult{{Changes: []v1.ChangeResult{*change}}}); err != nil {
logger.Errorf("error saving config change (event=%s): %v", event.Reason, err)
}
}
}

t.lock.Lock()
t.involvedObjects = append(t.involvedObjects, event.InvolvedObject)
t.lock.Unlock()
}

return nil
}

// consumeChangeEvents fetches the configs referenced by the changes and saves them.
// It clears the buffer after.
func consumeChangeEvents(ctx *v1.ScrapeContext, consume consumerFunc) {
logger.Infof("Consuming buffer. Len: %d", len(changesBuffer))
if len(changesBuffer) == 0 {
return
}
func (t *eventWatcher) consumeChangeEvents(ctx *v1.ScrapeContext, consume consumerFunc) {
for {
time.Sleep(eventWatchInterval)

consume(ctx, changesBuffer)
logger.Infof("Consuming buffer. Len: %d", len(t.involvedObjects))
if len(t.involvedObjects) == 0 {
return
}

changesBuffer = nil
t.lock.Lock()
consume(ctx, t.involvedObjects)
t.involvedObjects = nil
t.lock.Unlock()
}
}
1 change: 1 addition & 0 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func extractDeployNameFromReplicaSet(rs string) string {
func getResourceIDsFromObjs(objs []*unstructured.Unstructured) map[string]map[string]map[string]string {
// {Namespace: {Kind: {Name: ID}}}
resourceIDMap := make(map[string]map[string]map[string]string)
resourceIDMap[""] = make(map[string]map[string]string)

for _, obj := range objs {
if collections.Contains([]string{"Namespace", "Deployment", "Node"}, obj.GetKind()) {
Expand Down
2 changes: 2 additions & 0 deletions scrapers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scrapers
import (
"fmt"

"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
)
Expand All @@ -26,5 +27,6 @@ func RunScraper(ctx *v1.ScrapeContext) (v1.ScrapeResults, error) {
}
}

logger.Infof("Saved scrape results: %s", ctx.ScrapeConfig.Name)
return results, nil
}

0 comments on commit f1f76f1

Please sign in to comment.