Skip to content

Commit

Permalink
Merge pull request #618 from sotoon/supported-namespaces
Browse files Browse the repository at this point in the history
Supported namespaces
  • Loading branch information
ese authored Aug 16, 2023
2 parents e2ba7ff + a3d23de commit f32faf0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
31 changes: 20 additions & 11 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package utils

import (
"flag"
"fmt"
"path/filepath"
"regexp"

"github.com/spotahome/redis-operator/operator/redisfailover"
"k8s.io/client-go/util/homedir"
Expand All @@ -11,21 +13,23 @@ import (
// CMDFlags are the flags used by the cmd
// TODO: improve flags.
type CMDFlags struct {
KubeConfig string
Development bool
ListenAddr string
MetricsPath string
K8sQueriesPerSecond int
K8sQueriesBurstable int
Concurrency int
LogLevel string
KubeConfig string
SupportedNamespacesRegex string
Development bool
ListenAddr string
MetricsPath string
K8sQueriesPerSecond int
K8sQueriesBurstable int
Concurrency int
LogLevel string
}

// Init initializes and parse the flags
func (c *CMDFlags) Init() {
kubehome := filepath.Join(homedir.HomeDir(), ".kube", "config")
// register flags
flag.StringVar(&c.KubeConfig, "kubeconfig", kubehome, "kubernetes configuration path, only used when development mode enabled")
flag.StringVar(&c.SupportedNamespacesRegex, "supported-namespaces-regex", ".*", "To limit the namespaces this operator looks into")
flag.BoolVar(&c.Development, "development", false, "development flag will allow to run the operator outside a kubernetes cluster")
flag.StringVar(&c.ListenAddr, "listen-address", ":9710", "Address to listen on for metrics.")
flag.StringVar(&c.MetricsPath, "metrics-path", "/metrics", "Path to serve the metrics.")
Expand All @@ -37,13 +41,18 @@ func (c *CMDFlags) Init() {
flag.StringVar(&c.LogLevel, "log-level", "info", "set log level")
// Parse flags
flag.Parse()

if _, err := regexp.Compile(c.SupportedNamespacesRegex); err != nil {
panic(fmt.Errorf("supported namespaces Regex is not valid: %w", err))
}
}

// ToRedisOperatorConfig convert the flags to redisfailover config
func (c *CMDFlags) ToRedisOperatorConfig() redisfailover.Config {
return redisfailover.Config{
ListenAddress: c.ListenAddr,
MetricsPath: c.MetricsPath,
Concurrency: c.Concurrency,
ListenAddress: c.ListenAddr,
MetricsPath: c.MetricsPath,
Concurrency: c.Concurrency,
SupportedNamespacesRegex: c.SupportedNamespacesRegex,
}
}
7 changes: 4 additions & 3 deletions operator/redisfailover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package redisfailover

// Config is the configuration for the redis operator.
type Config struct {
ListenAddress string
MetricsPath string
Concurrency int
ListenAddress string
MetricsPath string
Concurrency int
SupportedNamespacesRegex string
}
37 changes: 33 additions & 4 deletions operator/redisfailover/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redisfailover

import (
"context"
"regexp"
"time"

"github.com/spotahome/kooper/v2/controller"
Expand All @@ -13,6 +14,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

redisfailoverv1 "github.com/spotahome/redis-operator/api/redisfailover/v1"
"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
rfservice "github.com/spotahome/redis-operator/operator/redisfailover/service"
Expand All @@ -36,7 +38,7 @@ func New(cfg Config, k8sService k8s.Services, k8sClient kubernetes.Interface, lo

// Create the handlers.
rfHandler := NewRedisFailoverHandler(cfg, rfService, rfChecker, rfHealer, k8sService, kooperMetricsRecorder, logger)
rfRetriever := NewRedisFailoverRetriever(k8sService)
rfRetriever := NewRedisFailoverRetriever(cfg, k8sService)

kooperLogger := kooperlogger{Logger: logger.WithField("operator", "redisfailover")}
// Leader election service.
Expand All @@ -58,13 +60,40 @@ func New(cfg Config, k8sService k8s.Services, k8sClient kubernetes.Interface, lo
})
}

func NewRedisFailoverRetriever(cli k8s.Services) controller.Retriever {
func NewRedisFailoverRetriever(cfg Config, cli k8s.Services) controller.Retriever {
isNamespaceSupported := func(rf redisfailoverv1.RedisFailover) bool {
match, _ := regexp.Match(cfg.SupportedNamespacesRegex, []byte(rf.Namespace))
return match
}
// check in the startup whether the regex compiles

return controller.MustRetrieverFromListerWatcher(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cli.ListRedisFailovers(context.Background(), "", options)
rfList, err := cli.ListRedisFailovers(context.Background(), "", options)
if err != nil {
return rfList, err
}

targetRFList := make([]redisfailoverv1.RedisFailover, 0)
for _, rf := range rfList.Items {
if isNamespaceSupported(rf) {
targetRFList = append(targetRFList, rf)
}
}
rfList.Items = targetRFList

return rfList, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cli.WatchRedisFailovers(context.Background(), "", options)
watcher, err := cli.WatchRedisFailovers(context.Background(), "", options)
watcher = watch.Filter(watcher, func(event watch.Event) (watch.Event, bool) {
rf, ok := event.Object.(*redisfailoverv1.RedisFailover)
if !ok {
return event, false
}
return event, isNamespaceSupported(*rf)
})
return watcher, err
},
})
}
Expand Down

0 comments on commit f32faf0

Please sign in to comment.