From 2cb280a4a95dfffa9db36619a9c2f1edd65f44f8 Mon Sep 17 00:00:00 2001 From: MaheshRKumawat Date: Fri, 12 Apr 2024 20:19:13 +0530 Subject: [PATCH] handle edge case - when new topic and consumer group - make ticker configurable Co-authored-by: qu1queee Signed-off-by: encalada --- kafka-observer/cmd/observer/main.go | 20 ++++++++++++++++---- kafka-observer/cmd/observer/observer.go | 3 +-- kafka-observer/internal/cmd/config.go | 3 +++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/kafka-observer/cmd/observer/main.go b/kafka-observer/cmd/observer/main.go index 8c24a723..f64c1971 100644 --- a/kafka-observer/cmd/observer/main.go +++ b/kafka-observer/cmd/observer/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "strconv" "syscall" "time" @@ -46,6 +47,17 @@ func main() { log.Panicf("%s is not set", cmd.CE_PROJECT_ID) } + tickerTime := cmd.DEFAULT_OBSERVER_TICKER + + // Handle idle timeout init + if observerTicker, exists := os.LookupEnv(cmd.OBSERVER_TICKER); exists { + tickerValue, err := strconv.Atoi(observerTicker) + if err != nil { + log.Panicf("error parsing %s duration: %v", cmd.OBSERVER_TICKER, err) + } + tickerTime = tickerValue + } + observer, err := NewObserver(config) if err != nil { log.Panic(err) @@ -87,7 +99,7 @@ func main() { for topicName, topic := range observer.Topics { go func(topic string) { - ticker := time.Tick(500 * time.Millisecond) // should be configurable + ticker := time.Tick(time.Duration(tickerTime) * time.Millisecond) // should be configurable for range ticker { offset, err := observer.GetTopicPartitionsOffset(topic) if err != nil { @@ -118,12 +130,12 @@ func main() { go func(name string, consumerGroups []ConsumerGroup) { for i, consumerGroup := range consumerGroups { go func(topicName string, cg ConsumerGroup, index int) { - ticker := time.Tick(500 * time.Millisecond) // should be configurable + ticker := time.Tick(time.Duration(tickerTime) * time.Millisecond) // should be configurable for range ticker { cgOffset, err := observer.GetConsumerGroupTopicPartitionsOffset(topicName, cg.Name) if err != nil { errCh <- err - break + continue } if observer.IsConsumerGroupTopicOffsetModified(topicName, cg.Name, cgOffset) || len(observer.Topics[topicName].ConsumerGroups[index].PartitionsOffset) == 0 { @@ -151,7 +163,7 @@ func main() { go func(t string, jobName string, podsCount int) { // Lock is required, as there well be cases where // multiple goroutines attempt to create jobruns for - // the same job. This avoids creating unneeded pods. + // the same job. This prevents the creation of unnecessary pods. unlock := jobInvoker.JobMutexes.Lock(jobName) defer unlock() if err := jobInvoker.InvokeJobs(int64(podsCount), jobName); err != nil { diff --git a/kafka-observer/cmd/observer/observer.go b/kafka-observer/cmd/observer/observer.go index 1587af4a..8a805b55 100644 --- a/kafka-observer/cmd/observer/observer.go +++ b/kafka-observer/cmd/observer/observer.go @@ -128,13 +128,12 @@ func (o *Observer) IsAnyConsumerGroupOffsetModified(topic string) JobRunInfo { for _, c := range topicObject.ConsumerGroups { count := 0 for partition, offset := range c.PartitionsOffset { - if topicObject.PartitionsOffset[partition] > offset { + if topicObject.PartitionsOffset[partition] > offset && topicObject.PartitionsOffset[partition]!=0 { log.Printf("topicOffset: %v/%v, consumerGroupOffset(%v): %v/%v", partition, topicObject.PartitionsOffset[partition], c.Name, partition, offset) count++ } jobRunInfo.JobRunToCreate[c.JobName] = count } - } return jobRunInfo diff --git a/kafka-observer/internal/cmd/config.go b/kafka-observer/internal/cmd/config.go index d7090db2..ac7c59e0 100644 --- a/kafka-observer/internal/cmd/config.go +++ b/kafka-observer/internal/cmd/config.go @@ -23,6 +23,9 @@ const ( IDLE_TIMEOUT = "IDLE_TIMEOUT" DEFAULT_IDLE_TIMEOUT = 60 + + OBSERVER_TICKER = "OBSERVER_TICKER" // in milliseconds + DEFAULT_OBSERVER_TICKER = 500 ) // KafkaAuth holds required data