Skip to content

Commit

Permalink
Merge pull request #28 from utilitywarehouse/limit-local-watcher-ns
Browse files Browse the repository at this point in the history
Limit local watchers to semaphore namespace
  • Loading branch information
ffilippopoulos authored Aug 2, 2021
2 parents 0a64554 + 20a0f83 commit 0b0ce79
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 6 deletions.
2 changes: 2 additions & 0 deletions deploy/example/local/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ rules:
- endpoints
verbs:
- get
- list
- watch
- create
- update
- delete
Expand Down
8 changes: 5 additions & 3 deletions kube/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ type EndpointsWatcher struct {
eventHandler EndpointsEventHandler
labelSelector string
name string
namespace string
ListHealthy bool
WatchHealthy bool
}

func NewEndpointsWatcher(name string, client kubernetes.Interface, resyncPeriod time.Duration, handler EndpointsEventHandler, labelSelector string) *EndpointsWatcher {
func NewEndpointsWatcher(name string, client kubernetes.Interface, resyncPeriod time.Duration, handler EndpointsEventHandler, labelSelector, namespace string) *EndpointsWatcher {
return &EndpointsWatcher{
ctx: context.Background(),
client: client,
Expand All @@ -42,14 +43,15 @@ func NewEndpointsWatcher(name string, client kubernetes.Interface, resyncPeriod
eventHandler: handler,
labelSelector: labelSelector,
name: name,
namespace: namespace,
}
}

func (ew *EndpointsWatcher) Init() {
listWatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = ew.labelSelector
l, err := ew.client.CoreV1().Endpoints(metav1.NamespaceAll).List(ew.ctx, options)
l, err := ew.client.CoreV1().Endpoints(ew.namespace).List(ew.ctx, options)
if err != nil {
log.Logger.Error("endpoints list error", "watcher", ew.name, "err", err)
ew.ListHealthy = false
Expand All @@ -60,7 +62,7 @@ func (ew *EndpointsWatcher) Init() {
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = ew.labelSelector
w, err := ew.client.CoreV1().Endpoints(metav1.NamespaceAll).Watch(ew.ctx, options)
w, err := ew.client.CoreV1().Endpoints(ew.namespace).Watch(ew.ctx, options)
if err != nil {
log.Logger.Error("endpoints watch error", "watcher", ew.name, "err", err)
ew.WatchHealthy = false
Expand Down
8 changes: 5 additions & 3 deletions kube/service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ type ServiceWatcher struct {
eventHandler ServiceEventHandler
labelSelector string
name string
namespace string
ListHealthy bool
WatchHealthy bool
}

func NewServiceWatcher(name string, client kubernetes.Interface, resyncPeriod time.Duration, handler ServiceEventHandler, labelSelector string) *ServiceWatcher {
func NewServiceWatcher(name string, client kubernetes.Interface, resyncPeriod time.Duration, handler ServiceEventHandler, labelSelector, namespace string) *ServiceWatcher {
return &ServiceWatcher{
ctx: context.Background(),
client: client,
Expand All @@ -42,14 +43,15 @@ func NewServiceWatcher(name string, client kubernetes.Interface, resyncPeriod ti
eventHandler: handler,
labelSelector: labelSelector,
name: name,
namespace: namespace,
}
}

func (sw *ServiceWatcher) Init() {
listWatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = sw.labelSelector
l, err := sw.client.CoreV1().Services(metav1.NamespaceAll).List(sw.ctx, options)
l, err := sw.client.CoreV1().Services(sw.namespace).List(sw.ctx, options)
if err != nil {
log.Logger.Error("service list error", "watcher", sw.name, "err", err)
sw.ListHealthy = false
Expand All @@ -60,7 +62,7 @@ func (sw *ServiceWatcher) Init() {
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = sw.labelSelector
w, err := sw.client.CoreV1().Services(metav1.NamespaceAll).Watch(sw.ctx, options)
w, err := sw.client.CoreV1().Services(sw.namespace).Watch(sw.ctx, options)
if err != nil {
log.Logger.Error("service watch error", "watcher", sw.name, "err", err)
sw.WatchHealthy = false
Expand Down
4 changes: 4 additions & 0 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewRunner(client, watchClient kubernetes.Interface, namespace, prefix, labe
resyncPeriod,
runner.ServiceEventHandler,
labelselector,
metav1.NamespaceAll,
)
runner.serviceWatcher = serviceWatcher
runner.serviceWatcher.Init()
Expand All @@ -77,6 +78,7 @@ func NewRunner(client, watchClient kubernetes.Interface, namespace, prefix, labe
resyncPeriod,
nil,
labels.Set(MirrorLabels).String(),
namespace,
)
runner.mirrorServiceWatcher = mirrorServiceWatcher
runner.mirrorServiceWatcher.Init()
Expand All @@ -88,6 +90,7 @@ func NewRunner(client, watchClient kubernetes.Interface, namespace, prefix, labe
resyncPeriod,
runner.EndpointsEventHandler,
labelselector,
metav1.NamespaceAll,
)
runner.endpointsWatcher = endpointsWatcher
runner.endpointsWatcher.Init()
Expand All @@ -99,6 +102,7 @@ func NewRunner(client, watchClient kubernetes.Interface, namespace, prefix, labe
resyncPeriod,
nil,
labels.Set(MirrorLabels).String(),
namespace,
)
runner.mirrorEndpointsWatcher = mirrorEndpointsWatcher
runner.mirrorEndpointsWatcher.Init()
Expand Down

0 comments on commit 0b0ce79

Please sign in to comment.