Skip to content

Commit

Permalink
config: add ignored fields
Browse files Browse the repository at this point in the history
Specify fields to skip sending object update. Will be applied to all objects.
If after removal of these fields from k8s object all remaining fields will be equal,
handler won't trigger sending update. Removing array elements is not supported.
For example,
```yaml
ignorefields:
  status:
  metadata:
    resourceVersion:
    managedFields:
```
will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields"
from k8s object before comparing old & new k8s objects.
  • Loading branch information
ingodwerust committed Oct 4, 2024
1 parent a97d5dd commit 0c03e0c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 27 deletions.
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ type Config struct {
// For watching specific namespace, leave it empty for watching all.
// this config is ignored when watching namespaces
Namespace string `json:"namespace,omitempty"`

// Specify fields to skip sending object update. Will be applied to all objects.
// If after removal of these fields from k8s object all remaining fields will be equal,
// handler won't trigger sending update. Removing array elements is not supported.
// For example,
// ignorefields:
// status:
// metadata:
// resourceVersion:
// managedFields:
// will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields"
// from k8s object before comparing old & new k8s objects.
IgnoredFields map[string]interface{} `json:"ignoredfields,omitempty"`
}

// Slack contains slack configuration
Expand Down
86 changes: 59 additions & 27 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics)
allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopAllCoreEventsCh := make(chan struct{})
defer close(stopAllCoreEventsCh)

Expand All @@ -155,7 +155,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics)
allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopAllEventsCh := make(chan struct{})
defer close(stopAllEventsCh)

Expand All @@ -177,7 +177,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -199,7 +199,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -222,7 +222,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -244,7 +244,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -266,7 +266,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -288,7 +288,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -310,7 +310,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -332,7 +332,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -354,7 +354,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -376,7 +376,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -398,7 +398,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -420,7 +420,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -442,7 +442,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -464,7 +464,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -486,7 +486,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -508,7 +508,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -530,7 +530,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -552,7 +552,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -583,7 +583,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -596,7 +596,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
<-sigterm
}

func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec) *Controller {
func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec, ignoredFields map[string]interface{}) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var newEvent Event
var err error
Expand All @@ -620,20 +620,34 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
kubewatchEventsMetrics.WithLabelValues(resourceType, "create").Inc()
},
UpdateFunc: func(old, new interface{}) {
var ok bool
var okOld, okNew bool
newEvent.namespace = "" // namespace retrived in processItem incase namespace value is empty
newEvent.key, err = cache.MetaNamespaceKeyFunc(old)
newEvent.eventType = "update"
newEvent.resourceType = resourceType
newEvent.apiVersion = apiVersion
newEvent.obj, ok = new.(runtime.Object)
if !ok {
newEvent.obj, okNew = new.(runtime.Object)
if !okNew {
logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot convert to runtime.Object for update on %v", new)
}
newEvent.oldObj, ok = old.(runtime.Object)
if !ok {
newEvent.oldObj, okOld = old.(runtime.Object)
if !okOld {
logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot convert old to runtime.Object for update on %v", old)
}
if okOld && okNew && len(ignoredFields) > 0 {
oldUnstructured, okOld := newEvent.oldObj.DeepCopyObject().(runtime.Unstructured)
newUnstructured, okNew := newEvent.obj.DeepCopyObject().(runtime.Unstructured)
if okOld && okNew {
oldContent := oldUnstructured.UnstructuredContent()
newContent := newUnstructured.UnstructuredContent()
recursiveDelete(oldContent, ignoredFields)
recursiveDelete(newContent, ignoredFields)
if reflect.DeepEqual(oldContent, newContent) {
logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Ignoring update to %v: %s", resourceType, newEvent.key)
return
}
}
}
logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Processing update to %v: %s", resourceType, newEvent.key)
if err == nil {
queue.Add(newEvent)
Expand Down Expand Up @@ -670,6 +684,24 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
}
}

// recursiveDelete recursively removes key from object
// value of key should be either nil or nested map[string]interface{}
// value of object to delete from should be nested map[string]interface{}
func recursiveDelete(object map[string]interface{}, key map[string]interface{}) {
for k, v := range key {
if v == nil {
delete(object, k)
continue
}
if recursiveKey, ok := v.(map[string]interface{}); ok {
if recursiveObj, ok := object[k].(map[string]interface{}); ok {
recursiveDelete(recursiveObj, recursiveKey)
}
}
}
return
}

// Run starts the kubewatch controller
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
Expand Down

0 comments on commit 0c03e0c

Please sign in to comment.