Skip to content

Commit

Permalink
handle edge case
Browse files Browse the repository at this point in the history
- when new topic and consumer group
- make ticker configurable

Co-authored-by: qu1queee <[email protected]>
Signed-off-by: encalada <[email protected]>
  • Loading branch information
2 people authored and reggeenr committed Apr 22, 2024
1 parent 2b7a514 commit 2cb280a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
20 changes: 16 additions & 4 deletions kafka-observer/cmd/observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"

Expand Down Expand Up @@ -46,6 +47,17 @@ func main() {
log.Panicf("%s is not set", cmd.CE_PROJECT_ID)
}

tickerTime := cmd.DEFAULT_OBSERVER_TICKER

// Handle idle timeout init
if observerTicker, exists := os.LookupEnv(cmd.OBSERVER_TICKER); exists {
tickerValue, err := strconv.Atoi(observerTicker)
if err != nil {
log.Panicf("error parsing %s duration: %v", cmd.OBSERVER_TICKER, err)
}
tickerTime = tickerValue
}

observer, err := NewObserver(config)
if err != nil {
log.Panic(err)
Expand Down Expand Up @@ -87,7 +99,7 @@ func main() {

for topicName, topic := range observer.Topics {
go func(topic string) {
ticker := time.Tick(500 * time.Millisecond) // should be configurable
ticker := time.Tick(time.Duration(tickerTime) * time.Millisecond) // should be configurable
for range ticker {
offset, err := observer.GetTopicPartitionsOffset(topic)
if err != nil {
Expand Down Expand Up @@ -118,12 +130,12 @@ func main() {
go func(name string, consumerGroups []ConsumerGroup) {
for i, consumerGroup := range consumerGroups {
go func(topicName string, cg ConsumerGroup, index int) {
ticker := time.Tick(500 * time.Millisecond) // should be configurable
ticker := time.Tick(time.Duration(tickerTime) * time.Millisecond) // should be configurable
for range ticker {
cgOffset, err := observer.GetConsumerGroupTopicPartitionsOffset(topicName, cg.Name)
if err != nil {
errCh <- err
break
continue
}

if observer.IsConsumerGroupTopicOffsetModified(topicName, cg.Name, cgOffset) || len(observer.Topics[topicName].ConsumerGroups[index].PartitionsOffset) == 0 {
Expand Down Expand Up @@ -151,7 +163,7 @@ func main() {
go func(t string, jobName string, podsCount int) {
// Lock is required, as there well be cases where
// multiple goroutines attempt to create jobruns for
// the same job. This avoids creating unneeded pods.
// the same job. This prevents the creation of unnecessary pods.
unlock := jobInvoker.JobMutexes.Lock(jobName)
defer unlock()
if err := jobInvoker.InvokeJobs(int64(podsCount), jobName); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions kafka-observer/cmd/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,12 @@ func (o *Observer) IsAnyConsumerGroupOffsetModified(topic string) JobRunInfo {
for _, c := range topicObject.ConsumerGroups {
count := 0
for partition, offset := range c.PartitionsOffset {
if topicObject.PartitionsOffset[partition] > offset {
if topicObject.PartitionsOffset[partition] > offset && topicObject.PartitionsOffset[partition]!=0 {
log.Printf("topicOffset: %v/%v, consumerGroupOffset(%v): %v/%v", partition, topicObject.PartitionsOffset[partition], c.Name, partition, offset)
count++
}
jobRunInfo.JobRunToCreate[c.JobName] = count
}

}

return jobRunInfo
Expand Down
3 changes: 3 additions & 0 deletions kafka-observer/internal/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (

IDLE_TIMEOUT = "IDLE_TIMEOUT"
DEFAULT_IDLE_TIMEOUT = 60

OBSERVER_TICKER = "OBSERVER_TICKER" // in milliseconds
DEFAULT_OBSERVER_TICKER = 500
)

// KafkaAuth holds required data
Expand Down

0 comments on commit 2cb280a

Please sign in to comment.