Skip to content

Commit

Permalink
Merge branch 'reconnect-watch'
Browse files Browse the repository at this point in the history
  • Loading branch information
Florianisme committed Dec 15, 2024
2 parents 4fcc7f7 + d5d3c7f commit 651b9c5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
31 changes: 20 additions & 11 deletions events/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watch2 "k8s.io/client-go/tools/watch"
"time"
)

type Watcher struct {
events *watch.Interface
watcher *watch2.RetryWatcher
persister *persistence.TimestampPersister
logger *logging.Logger
}
Expand All @@ -23,17 +25,17 @@ func Init() *Watcher {

logger := logging.Init()
persister := persistence.Init(kubeClient, logger)
events := startEventWatch(kubeClient)
retryWatcher := startEventWatch(kubeClient)

return &Watcher{
events: events,
watcher: retryWatcher,
persister: persister,
logger: logger,
}
}

func (s *Watcher) StartWatching() {
for watchedEvent := range (*s.events).ResultChan() {
for watchedEvent := range (*s.watcher).ResultChan() {
event, ok := watchedEvent.Object.(*v1.Event)

if !ok {
Expand All @@ -58,13 +60,14 @@ func (s *Watcher) StartWatching() {

s.persister.UpdateCurrentTimestamp(timestamp.Time)
}
s.logger.Logger.Debug().Msg("end of channel reached, no more events will be processed")
s.logger.Logger.Debug().Msg("end of channel reached, no more watcher will be processed")
}

func eventAlreadyProcessed(timestamp metav1.Time, s *Watcher) bool {
// We allow up to 3 seconds of buffer here. In case loads of events are being created at once, we might miss them otherwise.
// We allow up to 3 seconds of buffer here. In case loads of watcher are being created at once, we might miss them otherwise.
// The chance of processing an event twice after restart is relatively low compared to missing one otherwise.
return s.persister.GetCurrentTimestamp().Sub(timestamp.Time) > (3 * time.Second)
eventAge := s.persister.GetCurrentTimestamp().Sub(timestamp.Time)
return eventAge > (3 * time.Second)
}

func mapLoggableEvent(event *v1.Event) *logging.LoggableEvent {
Expand Down Expand Up @@ -109,15 +112,21 @@ func getComparableTimestamp(event *v1.Event) metav1.Time {
return event.CreationTimestamp
}

func startEventWatch(client *kubernetes.Clientset) *watch.Interface {
events, err := client.CoreV1().Events("").Watch(context.TODO(), metav1.ListOptions{})
func startEventWatch(client *kubernetes.Clientset) *watch2.RetryWatcher {
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
// Return watcher with 30m timeout
timeoutSeconds := int64(30 * 60 * 60)
return client.CoreV1().Events("").Watch(context.TODO(), metav1.ListOptions{TimeoutSeconds: &timeoutSeconds})
}

watcher, err := watch2.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
if err != nil {
panic(err)
}
return &events
return watcher
}

func (s *Watcher) StopWatching() {
(*s.events).Stop()
(*s.watcher).Stop()
s.persister.Flush()
}
1 change: 1 addition & 0 deletions persistence/timestamp_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (s *TimestampPersister) runScheduledUpdates() {
case <-s.done:
return
case <-s.ticker.C:
s.UpdateCurrentTimestamp(time.Now())
s.updateConfigMap()
}
}
Expand Down

0 comments on commit 651b9c5

Please sign in to comment.