Skip to content

Commit

Permalink
Enhacements for the observer code
Browse files Browse the repository at this point in the history
- Implement the custom KafkaTopic interface. This
allow us to operate on Topics and Consumer Groups
offsets. It also uses goroutines and channels, to
determine when a change is done in terms of offsets
comparisons. We dont longer commit neither read
topic messages.
- Improve logging
- Add missing functions comments
- Fixes for job_invoker
- Improve handling on env vars for both consumers
and observers.
- Move images from golang to alpine
- Introduce IDLE_TIMEOUT env var

Signed-off-by: encalada <[email protected]>
  • Loading branch information
qu1queee committed Apr 12, 2024
1 parent b03cc32 commit 46e04ef
Show file tree
Hide file tree
Showing 11 changed files with 659 additions and 380 deletions.
2 changes: 1 addition & 1 deletion kafka-observer/Dockerfile.consumer
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ COPY go.sum .

RUN CGO_ENABLED=0 GOOS=linux go build -o consumer ./consumer

FROM icr.io/codeengine/golang:latest
FROM icr.io/codeengine/alpine

WORKDIR /app/src

Expand Down
2 changes: 1 addition & 1 deletion kafka-observer/Dockerfile.observer
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ COPY go.sum .

RUN CGO_ENABLED=0 GOOS=linux go build -o observer ./observer

FROM icr.io/codeengine/golang:latest
FROM icr.io/codeengine/alpine

WORKDIR /app/src

Expand Down
4 changes: 2 additions & 2 deletions kafka-observer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ See the following diagram:

### Observer

The **observer** is a Code Engine Job operating in Daemonset mode. At runtime, it dynamically creates a new consumer group based on provided Kafka Broker addresses and configurations (_including Kafka Topics_), persistently waiting for incoming messages to be claimed from a Kafka Broker.
The **observer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). At runtime, it dynamically creates a new consumer group based on provided Kafka Broker addresses and configurations (_including Kafka Topics_), persistently waiting for incoming messages to be claimed from a Kafka Broker.

When a new message is claimed from a specific Kafka Topic, the **observer** wakes-up the corresponding **consumer** Job, by submitting a JobRun. The decision on which **consumer** Job to wake-up depends on the Topic the **consumer** Job is using. This wake-up mechanism allows **consumer** Jobs to only run when needed, optimizing resource consumption in a serverless fashion.


### Consumer

The **consumer** is a Code Engine Job operating in Daemonset mode. Unlike the observer, the **consumer** runs only in response to incoming messages within the desired Kafka Topics. Once running, it will gracefully shutdown within one minute, if none further messages are claimed.
The **consumer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). Unlike the observer, the **consumer** runs only in response to incoming messages within the desired Kafka Topics. Once running, it will gracefully shutdown within one minute, if none further messages are claimed.

In this sample, we provided a native Kafka client implementation written in Go. Code Engine users can opt-in for other native clients using different runtimes, such as Java, when implementing their **consumer** logic.

Expand Down
44 changes: 32 additions & 12 deletions kafka-observer/cmd/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
Expand All @@ -27,13 +28,18 @@ var idleTimer *time.Timer
// Consumer implements the ConsumerGroupHandler
// interface
type Consumer struct {
ready chan bool
ready chan bool
timeout int
}

func main() {
fmt.Println("retrieving config")

config := cmd.GetConfig()
config := cmd.GetConfigConsumer([]string{
cmd.ENV_MESSAGEHUB_BROKERS,
cmd.ENV_MESSAGEHUB_USER,
cmd.ENV_MESSAGEHUB_PWD,
})

keepRunning := true
log.Println("Starting a new Sarama consumer")
Expand Down Expand Up @@ -64,29 +70,43 @@ func main() {
saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}

consumer := Consumer{
ready: make(chan bool),
ready: make(chan bool),
timeout: cmd.DEFAULT_IDLE_TIMEOUT,
}

// Handle idle timeout init
if idleTimeOut, exists := os.LookupEnv(cmd.IDLE_TIMEOUT); exists {
timeOut, err := strconv.Atoi(idleTimeOut)
if err != nil {
log.Panicf("error parsing %s duration: %v", cmd.IDLE_TIMEOUT, err)
}
consumer.timeout = timeOut
}

log.Printf("idle timeout of consumer set to %v seconds", consumer.timeout)

brokers := config.Kafka.Brokers
topics := strings.Split(os.Getenv("KAFKA_TOPIC"), ",")
topics := strings.Split(os.Getenv(cmd.KAFKA_TOPIC), ",")

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(brokers, "consuming-group", saramaConfig)

consumerGroup := os.Getenv(cmd.CONSUMER_GROUP)

client, err := sarama.NewConsumerGroup(brokers, consumerGroup, saramaConfig)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
log.Panicf("error creating consumer group client: %v", err)
}

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// TODO: fix topics, cannot use all
if err := client.Consume(ctx, topics, &consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
log.Panicf("Error from consumer: %v", err)
log.Panicf("error from consumer: %v", err)
}
if ctx.Err() != nil {
return
Expand All @@ -96,7 +116,7 @@ func main() {
}()

<-consumer.ready
log.Println("Sarama consumer up and running!...")
log.Println("sarama consumer up and running!...")

sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)
Expand All @@ -123,7 +143,7 @@ func main() {
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
log.Panicf("error closing client: %v", err)
}
}

Expand All @@ -138,14 +158,14 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
idleTimer.Reset(time.Second * 60)
idleTimer.Reset(time.Second * time.Duration(consumer.timeout))
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partition = %v, offset = %v", string(message.Value), message.Timestamp, message.Topic, message.Partition, message.Offset)
log.Printf("message claimed: key= %s, value = %s, timestamp = %v, latency = %s, topic = %s, partition = %v, offset = %v", string(message.Key), string(message.Value), message.Timestamp, time.Since(message.Timestamp), message.Topic, message.Partition, message.Offset)
session.MarkMessage(message, "")
case <-session.Context().Done():
log.Printf("completed")
Expand Down
187 changes: 187 additions & 0 deletions kafka-observer/cmd/observer/job_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package main

import (
"errors"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"

"github.com/IBM/CodeEngine/kafka-observer/internal/cmd"
"github.com/IBM/code-engine-go-sdk/codeenginev2"
"github.com/IBM/go-sdk-core/v5/core"
)

type KeyedMutex struct {
mutexes sync.Map // Zero value is empty and ready for use
}

func (m *KeyedMutex) Lock(key string) func() {
value, _ := m.mutexes.LoadOrStore(key, &sync.Mutex{})
mtx := value.(*sync.Mutex)
mtx.Lock()

return func() { mtx.Unlock() }
}

func createJobInvoker(ceClient cmd.CEClient, projectID string, km *KeyedMutex) (*JobInvoker, error) {
ceService, err := GetCodeEngineService(ceClient)
if err != nil {
return nil, err
}

return &JobInvoker{
ProjectID: projectID,
CodeEngine: ceService,
JobMutexes: km,
}, nil
}

func (ji *JobInvoker) InvokeJobs(count int64, jobName string) error {
failuresCounter := 0
var indices string
var err error
if indices, err = ji.getArrayDesiredIndices(jobName, count); err != nil {
return err
}
if indices != "" {
if err := ji.createJobRun(jobName, indices); err != nil {
failuresCounter++
log.Printf("creating Jobrun %s failed: %v", jobName, err)
return err
}
}
return nil
}

// GetCodeEngineService returns a Code Engine client
func GetCodeEngineService(ceConfig cmd.CEClient) (*codeenginev2.CodeEngineV2, error) {
authenticator := &core.IamAuthenticator{
ApiKey: ceConfig.IamApiKey,
ClientId: "bx",
ClientSecret: "bx",
URL: "https://iam.cloud.ibm.com",
}

baseUrl := os.Getenv("CE_API_BASE_URL")
if baseUrl == "" {
log.Println("fetching base url failed, falling back to a default one")
baseUrl = codeenginev2.DefaultServiceURL
} else {
baseUrl = baseUrl + "/v2"
}

codeEngineService, err := codeenginev2.NewCodeEngineV2(&codeenginev2.CodeEngineV2Options{
Authenticator: authenticator,
URL: baseUrl,
})
if err != nil {
log.Printf("error getting CE api client: %s\n", err.Error())
return nil, err
}
return codeEngineService, nil
}

// getArrayDesiredIndices returns the arrayspec for a jobrun based on JobRun array indices
func (ji *JobInvoker) getArrayDesiredIndices(jobName string, desired int64) (string, error) {
var alreadyCreated int64
listJobRunsOptions := &codeenginev2.ListJobRunsOptions{
ProjectID: core.StringPtr(ji.ProjectID),
JobName: core.StringPtr(jobName),
Limit: core.Int64Ptr(int64(100)),
}

pager, err := ji.CodeEngine.NewJobRunsPager(listJobRunsOptions)
if err != nil {
return "", err
}

var allResults []codeenginev2.JobRun
for pager.HasNext() {
nextPage, err := pager.GetNext()
if err != nil {
return "", err
}
allResults = append(allResults, nextPage...)
}

if len(allResults) == 0 {
log.Printf("new desired array indices value: %v", strconv.Itoa(int(desired)))
return fmt.Sprintf("0-%v", desired-1), nil
}

for _, jr := range allResults {
if jr.StatusDetails != nil {
if jr.StatusDetails.Requested != nil {
alreadyCreated += *jr.StatusDetails.Requested
}
if jr.StatusDetails.Running != nil {
alreadyCreated += *jr.StatusDetails.Running
}
if jr.StatusDetails.Pending != nil {
alreadyCreated += *jr.StatusDetails.Pending
}
}
}
newpods := desired - alreadyCreated
if newpods <= 0 {
log.Printf("already created JobRuns instances are sufficient, nothing to do.")
return "", nil
}
reqArraySpec := fmt.Sprintf("0-%v", newpods-1)
log.Printf("new desired array indices value: %v", reqArraySpec)
return reqArraySpec, nil
}

// createJobRun creates the jobrun for a job
func (ji *JobInvoker) createJobRun(jobName string, arraySpec string) error {
createJobRunOptions := ji.CodeEngine.NewCreateJobRunOptions(ji.ProjectID)
createJobRunOptions.SetJobName(jobName)
createJobRunOptions.SetScaleArraySpec(arraySpec)

log.Printf("creating jobrun for job %s with arrayspec %s", jobName, arraySpec)
result, _, err := ji.CodeEngine.CreateJobRun(createJobRunOptions)
if err != nil {
return err
}

log.Printf("jobrun %s created for job %s ", *result.Name, *result.JobName)

retryTimes := 0

for {
getJobRunOptions := ji.CodeEngine.NewGetJobRunOptions(ji.ProjectID, *result.Name)

// For now ignoring error
jr, _, _ := ji.CodeEngine.GetJobRun(getJobRunOptions)
var podsActive int64
if jr != nil {
if jr.StatusDetails != nil {
if jr.StatusDetails.Requested != nil {
podsActive += *jr.StatusDetails.Requested
}
if jr.StatusDetails.Running != nil {
podsActive += *jr.StatusDetails.Running
}
if jr.StatusDetails.Pending != nil {
podsActive += *jr.StatusDetails.Pending
}
}
}
if podsActive > 0 {
log.Printf("jobrun %s has been scheduled", *result.Name)
break
}

if retryTimes > 4 {
log.Printf("couldn't get the jobrun %s in 5 retries", *result.Name)
return errors.New("couldn't get the jobrun " + *result.Name + "in 5 retries ")
}

time.Sleep(2 * time.Second)
retryTimes++
}
return nil
}
Loading

0 comments on commit 46e04ef

Please sign in to comment.