Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Observer Sample Pattern #80

Merged
merged 11 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions kafka-observer/Dockerfile.consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM icr.io/codeengine/golang:latest AS stage

WORKDIR /app/src

COPY cmd/consumer/ ./consumer/

COPY internal/ ./internal/

COPY go.mod .

COPY go.sum .

RUN CGO_ENABLED=0 GOOS=linux go build -o consumer ./consumer

FROM icr.io/codeengine/alpine

WORKDIR /app/src

COPY --from=stage /app/src/consumer/consumer .

CMD [ "./consumer" ]
21 changes: 21 additions & 0 deletions kafka-observer/Dockerfile.observer
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM icr.io/codeengine/golang:latest AS stage

WORKDIR /app/src

COPY cmd/observer/ ./observer/

COPY internal/ ./internal/

COPY go.mod .

COPY go.sum .

RUN CGO_ENABLED=0 GOOS=linux go build -o observer ./observer

FROM icr.io/codeengine/alpine

WORKDIR /app/src

COPY --from=stage /app/src/observer/observer .

CMD [ "./observer" ]
133 changes: 133 additions & 0 deletions kafka-observer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Kafka Observer

## Introduction

This sample demonstrates how you can use IBM Cloud Code Engine to consume streams of messages from an Apache Kafka Service, such as IBM Cloud Event Streams.

The architecture of this sample consists of an **observer** and a **consumer**, implementing a _wake-up_ mechanism, that provides an efficient and serverless way for your Kafka consumers to run.

See the following diagram:

![Architecture Diagram](images/Kafka-Observer-Consumer-Acrchitecture.jpg)

### Observer

The **observer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). At runtime, based on provided Kafka Broker addresses and configurations (_including Kafka Topics_), it will constantly check for any messages in topic offsets and compare with the consumer group offsets of that topic.

The way it works is, the observer monitors every partition of a topic, and if any new message arrives in a particular partition, it triggers corresponding job runs with the number equal to the number of partitions the new messages have arrived in.

For example, consider I have 4 partitions in a topic called Topic X. When new events arrive in partition 0 and partition 1 of the topic, then the observer will know and it will trigger only 2 pods of the Consumer JobRun. The pods will then be able to consume the messages.

The decision on which **consumer** Job to wake-up depends on the Topic the **consumer** Job is using. This wake-up mechanism allows **consumer** Jobs to only run when needed, optimizing resource consumption in a serverless fashion.


### Consumer

The **consumer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). Unlike the observer, the **consumer** runs only in response to incoming messages within the desired Kafka Topics. Once running, it will gracefully shutdown within configuarable idle timeout, if none further messages are claimed.

In this sample, we provided a native Kafka client implementation written in Go. Code Engine users can opt-in for other native clients using different runtimes, such as Java, when implementing their **consumer** logic.


## Requirements

To successfully run this sample, some resources are required in advance and some mandatory input data.

### IBM Cloud Resources:

We require a Kafka Service for producing and consuming events, and a Code Engine Project, to deploy our observer pattern:

- An [IBM Cloud Events streams](https://cloud.ibm.com/eventstreams-provisioning/6a7f4e38-f218-48ef-9dd2-df408747568e/create) instance. The ES instance should have the following topics:
- `payments` Topic with `4` partitions.
- `shipping` Topic with `3` partitions.

- An [IBM Cloud Codeengine project](https://cloud.ibm.com/docs/codeengine?topic=codeengine-manage-project#create-a-project).

- An application producing Kafka messages is required. You can easily create one within your Code Engine Project. See our [tutorial](https://cloud.ibm.com/docs/codeengine?topic=codeengine-subscribe-kafka-tutorial).

### Input Data

Before running the sample `run.sh` script, a user must define a set of environment variables locally, for proper authentication and creation of additional resources:

- `IAM_API_KEY`: Required for the **observer** to authenticate to the CE Project. See the [docs](https://cloud.ibm.com/docs/account?topic=account-manapikey)
- `ES_SERVICE_INSTANCE`: Required for the run.sh script, to identify the Event Streams instance name to use, in order to bind it to the Code Engine **observer** and **consumer** Jobs. This allow the JobRuns to automatically get all authentication data of the Event Streams instance at runtime.

## Observer to Consumer Mapping

Prior to running our Code Engine sample, we must establish the relationship between the **observer**, **consumers** and Kafka. For the sample, this is already done for you, but here we explain the rational.

The mapping is defined via the following [kafkadata](resources/kafkadata) file, which is embedded into a configmap later and mounted into our **observer** Pod as an environment variable:

```yaml
payments:
partitions: 4
jobs:
- name: payments-consumer
consumer_group: payments-consumer-group
- name: foobar-consumer
consumer_group: foobar-consumer-group
shipping:
partitions: 3
jobs:
- name: shipping-consumer
consumer_group: shipping-consumer-group
```

The above is explained as follows:
- `.payments` and `.shipping` correspond to the existing Topics of interest within the same Kafka instance.
- `.payments.partitions` and `.shipping.partitions` correspond to the partition size of the Kafka Topics.
- `.payments.jobs` and `.shipping.jobs` correspond to the CodeEngine **consumer** Jobs that want to consume messages from the related Kafka Topic.
- `.jobs` will be a list. Each value will have the name of the job and the consumer group it belongs to.

As an example, if you have one single Topic `foobar` with `2` partitions and you want CE Job `foobar-consumer` to consume from it, this is how the `kafkadata` file will look:

```yaml
foobar:
partitions: 2
jobs:
- name: foobar-consumer
consumer_group: foobar-consumer-group
```

You can also set the same consumer group for multiple topics like this:

```yaml
payments:
partitions: 2
jobs:
- name: foobar-consumer
consumer_group: foobar-consumer-group
shipping:
partitions: 3
jobs:
- name: foobar-consumer
consumer_group: foobar-consumer-group
```


## Running the Sample

1. Login to IBM Cloud

```
ibmcloud login --apikey <IBMCLOUD_API_KEY> -r <REGION_OF_CE_PROJECT> -g <RESOURCE_GROUP>
```

2. Select the Code Engine Project.
```
ibmcloud ce project select --name <CE_PROJECT_NAME>
```

3. Execute `run.sh` file
```
./run.sh
```

Once you execute `run.sh`, the script will create the necessary resources in your Code Engine Project. For this sample, you will see three different Jobs, one for the **observer** and two for the **consumers**. In addition, the **observer** Jobrun will be up and running.

Now you can send messages using your producer to the Kafka Topics. The **observer** will watch for messages and will submit the corresponding **consumer** JobRuns based on the configuration in the [kafkadata](resources/kafkadata) file.

4. You can clean the resources in the Code Engine Project as part of this sample, as follows:

```
./run.sh clean
```
175 changes: 175 additions & 0 deletions kafka-observer/cmd/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/IBM/CodeEngine/kafka-observer/internal/cmd"
"github.com/IBM/sarama"
)

var (
version = ""
oldest = true
verbose = false
)

var idleTimer *time.Timer

// Consumer implements the ConsumerGroupHandler
// interface
type Consumer struct {
ready chan bool
timeout int
}

func main() {
fmt.Println("retrieving config")

config := cmd.GetConfigConsumer([]string{
cmd.ENV_MESSAGEHUB_BROKERS,
cmd.ENV_MESSAGEHUB_USER,
cmd.ENV_MESSAGEHUB_PWD,
})

keepRunning := true
log.Println("Starting a new Sarama consumer")

if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

version = sarama.DefaultVersion.String()
version, err := sarama.ParseKafkaVersion(version)
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}

saramaConfig := sarama.NewConfig()
saramaConfig.Version = version
saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
saramaConfig.ClientID, _ = os.Hostname()
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.KafkaAuth.User
saramaConfig.Net.SASL.Password = config.KafkaAuth.Token
saramaConfig.Net.TLS.Enable = true

if oldest {
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
}

saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}

consumer := Consumer{
ready: make(chan bool),
timeout: cmd.DEFAULT_IDLE_TIMEOUT,
}

// Handle idle timeout init
if idleTimeOut, exists := os.LookupEnv(cmd.IDLE_TIMEOUT); exists {
timeOut, err := strconv.Atoi(idleTimeOut)
if err != nil {
log.Panicf("error parsing %s duration: %v", cmd.IDLE_TIMEOUT, err)
}
consumer.timeout = timeOut
}

log.Printf("idle timeout of consumer set to %v seconds", consumer.timeout)

brokers := config.Kafka.Brokers
topics := strings.Split(os.Getenv(cmd.KAFKA_TOPIC), ",")

ctx, cancel := context.WithCancel(context.Background())

consumerGroup := os.Getenv(cmd.CONSUMER_GROUP)

client, err := sarama.NewConsumerGroup(brokers, consumerGroup, saramaConfig)
if err != nil {
log.Panicf("error creating consumer group client: %v", err)
}

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, topics, &consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
log.Panicf("error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()

<-consumer.ready
log.Println("sarama consumer up and running!...")

sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

idleTimer = time.NewTimer(time.Second * 100)
qu1queee marked this conversation as resolved.
Show resolved Hide resolved
defer idleTimer.Stop()

for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-idleTimer.C:
log.Println("we are done, timeout expired")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
}
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("error closing client: %v", err)
}
}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
idleTimer.Reset(time.Second * time.Duration(consumer.timeout))
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
log.Printf("message claimed: key= %s, value = %s, timestamp = %v, latency = %s, topic = %s, partition = %v, offset = %v", string(message.Key), string(message.Value), message.Timestamp, time.Since(message.Timestamp), message.Topic, message.Partition, message.Offset)
session.MarkMessage(message, "")
case <-session.Context().Done():
log.Printf("completed")
return nil
}
}
}
Loading
Loading