Skip to content

Commit

Permalink
adds an option to modify the kafka consumer parition assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang.balkundi committed Jan 28, 2022
1 parent d52019f commit 67fdee8
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 48 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.5.1] 2022-01-28

# Added
- consumer config now takes in a new option called `PartititionAssignment` which is used to configure how kafka partitions are assigned to a consumer

## [1.5.0] 2022-01-07

# Changes
- Kafka consumer default `PollTimeout` is `100 ms`
- `ConsumerGroupID` is renamed to `GroupID` and `OriginTopics` is renamed to `Topics` to keep it consistent with kafka terminology
- The run loop for kafka consumer polling has been restructured.

# Removed
- The server package only exposes a Run method which orchestrates an HTTP Server rather than providing a full web server implementation
- `httprouter` dependency has been removed

## [1.4.5] 2021-12-16

# Changes
Expand Down
23 changes: 17 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func main() {
StreamConfig: kafka.StreamConfig{
{
BootstrapServers: "localhost:9092",
OriginTopics: "plain-text-log",
ConsumerGroupID: "pt_consumer",
Topics: "plain-text-log",
GroupID: "pt_consumer",
ConsumerCount: 2,
RouteGroup: "plain-text-group",
},
Expand Down Expand Up @@ -124,7 +124,8 @@ Stream A ----> Handler --Retry--> RabbitMQ <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;|_____
Stream B _____|

- The rabbitmq auto retry implements the streamer interface.
- The rabbitmq auto retry implements the streamer interface. This means ziggurat will push the messages from RabbitMQ to
and execute you handlers for every mesasge.
- The rabbitmq auto retry exposes a Wrap method in which the handlerFunc can be wrapped and provide the queue name to
retry with.

Expand Down Expand Up @@ -169,10 +170,20 @@ return ziggurat.Retry
Once the messages are consumed they are processed by the handler.
- queue_name_dlq_queue : messages move here when the retry count is exhausted, `RetryCount` config.
- You can have as many consumers as you wish, this value can be tweaked based on you throughput and your machine's
capacity. This can be tweaked using the `ConsumerCount` config.
capacity. This can be tweaked using the `ConsumerCount` config.

### I have a lot of messages in my dlq queue what do I do with them ?

- The AutoRetry struct exposes two http handlers, the `DSViewHandler` and the `DSReplayHandler`.
- The AutoRetry struct exposes two http handlers, the `DSViewHandler` and the `DSReplayHandler`.
- The above handler conform to the `http.Handler` interface and can be used with any router of your choice.
- The `DSViewHandler` allows you to peek into messages without consuming them, whereas the `DSReplay` moves messages from `dlq` to `instant` queue ready to be consumed.
- The `DSViewHandler` allows you to peek into messages without consuming them, whereas the `DSReplay` moves messages
from `dlq` to `instant` queue ready to be consumed.

```golang
ar := rabbitmq.AutoRetry(...)
ctx := context.Background()
router := someRouter.New()
router.POST("/rabbitmq/dead_set/view",ar.DSViewHandler(ctx))
router.POST("/rabbitmq/dead_set/replay", ar.DSReplayHandler(ctx))
// pass this on to your HTTP server
```
2 changes: 1 addition & 1 deletion cmd/ziggurat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func die(err error) {
if err != nil {
fmt.Println("command failed with error(s):")
fmt.Println(err.Error())
os.Exit(127)
os.Exit(1)
}

}
Expand Down
66 changes: 66 additions & 0 deletions example/sampleapp/main.go~
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//go:build ignore
// +build ignore

package main

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/gojekfarm/ziggurat/mw/prometheus"

"github.com/gojekfarm/ziggurat"
"github.com/gojekfarm/ziggurat/kafka"
"github.com/gojekfarm/ziggurat/logger"
)

func main() {
var zig ziggurat.Ziggurat
var r kafka.Router

ctx := context.Background()
l := logger.NewLogger(logger.LevelInfo)

ks := kafka.Streams{
StreamConfig: kafka.StreamConfig{{
BootstrapServers: "localhost:9092",
Topics: "plain-text-log",
GroupID: "pt_consumer",
ConsumerCount: 2,
RouteGroup: "plain-text-messages",
}},
Logger: l,
}

r.HandleFunc("plain-text-messages/", func(ctx context.Context, event *ziggurat.Event) error {
val := string(event.Value)
s := strings.Split(val, "_")
num, err := strconv.Atoi(s[1])
if err != nil {
return err
}
if num%2 == 0 {
return ziggurat.Retry
}
return nil
})

wait := make(chan struct{})
zig.StartFunc(func(ctx context.Context) {
go func() {
err := prometheus.StartMonitoringServer(ctx)
l.Error("error running prom server", err)
wait <- struct{}{}
}()
})

h := ziggurat.Use(&r, prometheus.PublishHandlerMetrics)

if runErr := zig.RunAll(ctx, h, &ks); runErr != nil {
l.Error("error running streams", runErr)
}

<-wait
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.17
require (
github.com/cactus/go-statsd-client/v5 v5.0.0
github.com/confluentinc/confluent-kafka-go v1.7.0
github.com/julienschmidt/httprouter v1.3.0
github.com/makasim/amqpextra v0.16.4
github.com/prometheus/client_golang v1.10.0
github.com/rs/zerolog v1.26.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down
41 changes: 24 additions & 17 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package kafka
import "github.com/confluentinc/confluent-kafka-go/kafka"

type ConsumerConfig struct {
BootstrapServers string
DebugLevel string
GroupID string
Topics string
AutoCommitInterval int
ConsumerCount int
PollTimeout int
RouteGroup string
AutoOffsetReset string
BootstrapServers string
DebugLevel string
GroupID string
Topics string
AutoCommitInterval int
ConsumerCount int
PollTimeout int
RouteGroup string
AutoOffsetReset string
PartitionAssignment string
}

func (c ConsumerConfig) toConfigMap() kafka.ConfigMap {
autoCommitInterval := 15000
debugLevel := "consumer"
autoOffsetReset := "earliest"
partitionAssignment := "range,roundrobin"

if c.AutoCommitInterval > 0 {
autoCommitInterval = c.AutoCommitInterval
Expand All @@ -30,15 +32,20 @@ func (c ConsumerConfig) toConfigMap() kafka.ConfigMap {
autoOffsetReset = c.AutoOffsetReset
}

if c.PartitionAssignment != "" {
partitionAssignment = c.PartitionAssignment
}

kafkaConfMap := kafka.ConfigMap{
"bootstrap.servers": c.BootstrapServers,
"group.id": c.GroupID,
"auto.offset.reset": autoOffsetReset,
"enable.auto.commit": true,
"auto.commit.interval.ms": autoCommitInterval,
"debug": debugLevel,
"go.logs.channel.enable": true,
"enable.auto.offset.store": false,
"bootstrap.servers": c.BootstrapServers,
"group.id": c.GroupID,
"auto.offset.reset": autoOffsetReset,
"enable.auto.commit": true,
"auto.commit.interval.ms": autoCommitInterval,
"debug": debugLevel,
"go.logs.channel.enable": true,
"enable.auto.offset.store": false,
"partition.assignment.strategy": partitionAssignment,
}
return kafkaConfMap
}
Expand Down
18 changes: 9 additions & 9 deletions kafka/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ func (s *Streams) Stream(ctx context.Context, handler ziggurat.Handler) error {
for _, consConf := range s.StreamConfig {
groupID := consConf.GroupID
topics := strings.Split(consConf.Topics, ",")
confMap := consConf.toConfigMap()
// sets default pollTimeout of 100ms
pollTimeout := 100
// allow a PollTimeout of -1
if consConf.PollTimeout > 0 || consConf.PollTimeout == -1 {
pollTimeout = consConf.PollTimeout
}
s.workers[groupID] = make([]*worker, consConf.ConsumerCount)
for i := 0; i < consConf.ConsumerCount; i++ {
confMap := consConf.toConfigMap()
consumer := createConsumer(&confMap, s.Logger, topics)
workerID := fmt.Sprintf("%s_%d", groupID, i)
// sets default pollTimeout of 100ms
pollTimeout := 100
// allow a PollTimeout of -1
if consConf.PollTimeout > 0 || consConf.PollTimeout == -1 {
pollTimeout = consConf.PollTimeout
}
w := worker{
handler: handler,
logger: s.Logger,
consumer: consumer,
topics: topics,
confMap: &confMap,
routeGroup: consConf.RouteGroup,
pollTimeout: pollTimeout,
killSig: make(chan struct{}),
Expand Down
5 changes: 5 additions & 0 deletions kafka/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ type worker struct {
routeGroup string
pollTimeout int
killSig chan struct{}
topics []string
confMap *kafka.ConfigMap
id string
err error
}

func (w *worker) run(ctx context.Context) {

w.consumer = createConsumer(w.confMap, w.logger, w.topics)

defer func() {
err := closeConsumer(w.consumer)
w.logger.Error("error closing kafka consumer", err, map[string]interface{}{"Worker-ID": w.id})
Expand Down
1 change: 0 additions & 1 deletion router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (dr *defaultRouter) Handle(ctx context.Context, event *ziggurat.Event) erro
}

// New creates a new router
// router stores handlers in a map[string]ziggurat.Handler
// the route received in the event header is matched against the entries in the map
// and the corresponding handler is executed
func New(opts ...func(dr *defaultRouter)) *defaultRouter {
Expand Down
2 changes: 1 addition & 1 deletion server/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func Run(ctx context.Context, s *http.Server) error {
errChan := make(chan error)
errChan := make(chan error, 1)
go func() {
err := s.ListenAndServe()
if err != nil {
Expand Down
20 changes: 9 additions & 11 deletions zigg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ var signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2, sysc
// var z ziggurat.Ziggurat
// z.run(ctx context.Context,s ziggurat.Streamer,h ziggurat.Handler)
type Ziggurat struct {
handler Handler
Logger StructuredLogger
startFunc StartFunction
stopFunc StopFunction
streams Streamer
multiStreams []Streamer
handler Handler
Logger StructuredLogger
startFunc StartFunction
stopFunc StopFunction
streams Streamer
}

// Run method runs the provided streams and blocks on it until an error is encountered
Expand Down Expand Up @@ -98,6 +97,9 @@ func (z *Ziggurat) StopFunc(function StopFunction) {
// The streams are started concurrently and the handler is executed for
// all the streams.
func (z *Ziggurat) RunAll(ctx context.Context, handler Handler, streams ...Streamer) error {
parentCtx, cancelFunc := signal.NotifyContext(ctx, signals...)
defer cancelFunc()

if z.Logger == nil {
z.Logger = logger.NOOP
}
Expand All @@ -109,18 +111,14 @@ func (z *Ziggurat) RunAll(ctx context.Context, handler Handler, streams ...Strea
panic("error: handler cannot be nil")
}

parentCtx, cancelFunc := signal.NotifyContext(ctx, signals...)

if z.startFunc != nil {
z.startFunc(parentCtx)
}

defer cancelFunc()

var wg sync.WaitGroup
wg.Add(len(streams))
errChan := make(chan error, len(streams))
for i, _ := range streams {
for i := range streams {
go func(i int) {
err := streams[i].Stream(parentCtx, handler)
errChan <- err
Expand Down

0 comments on commit 67fdee8

Please sign in to comment.