From 67fdee8c2a81d7cb70d36c1c2d71543b851c3d2a Mon Sep 17 00:00:00 2001 From: "shubhang.balkundi" Date: Fri, 28 Jan 2022 09:51:16 +0530 Subject: [PATCH] adds an option to modify the kafka consumer parition assignment --- CHANGELOG.md | 16 +++++++++ README.md | 23 +++++++++---- cmd/ziggurat/main.go | 2 +- example/sampleapp/main.go~ | 66 ++++++++++++++++++++++++++++++++++++++ go.mod | 1 - go.sum | 1 - kafka/config.go | 41 +++++++++++++---------- kafka/orchestrator.go | 18 +++++------ kafka/worker.go | 5 +++ router/router.go | 1 - server/run.go | 2 +- zigg.go | 20 ++++++------ 12 files changed, 148 insertions(+), 48 deletions(-) create mode 100644 example/sampleapp/main.go~ diff --git a/CHANGELOG.md b/CHANGELOG.md index ecd34e5..0b59007 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.5.1] 2022-01-28 + +# Added +- consumer config now takes in a new option called `PartititionAssignment` which is used to configure how kafka partitions are assigned to a consumer + +## [1.5.0] 2022-01-07 + +# Changes +- Kafka consumer default `PollTimeout` is `100 ms` +- `ConsumerGroupID` is renamed to `GroupID` and `OriginTopics` is renamed to `Topics` to keep it consistent with kafka terminology +- The run loop for kafka consumer polling has been restructured. + +# Removed +- The server package only exposes a Run method which orchestrates an HTTP Server rather than providing a full web server implementation +- `httprouter` dependency has been removed + ## [1.4.5] 2021-12-16 # Changes diff --git a/README.md b/README.md index c0a7eaf..048ca9b 100644 --- a/README.md +++ b/README.md @@ -55,8 +55,8 @@ func main() { StreamConfig: kafka.StreamConfig{ { BootstrapServers: "localhost:9092", - OriginTopics: "plain-text-log", - ConsumerGroupID: "pt_consumer", + Topics: "plain-text-log", + GroupID: "pt_consumer", ConsumerCount: 2, RouteGroup: "plain-text-group", }, @@ -124,7 +124,8 @@ Stream A ----> Handler --Retry--> RabbitMQ
                           |_____ Stream B _____| -- The rabbitmq auto retry implements the streamer interface. +- The rabbitmq auto retry implements the streamer interface. This means ziggurat will push the messages from RabbitMQ to + and execute you handlers for every mesasge. - The rabbitmq auto retry exposes a Wrap method in which the handlerFunc can be wrapped and provide the queue name to retry with. @@ -169,10 +170,20 @@ return ziggurat.Retry Once the messages are consumed they are processed by the handler. - queue_name_dlq_queue : messages move here when the retry count is exhausted, `RetryCount` config. - You can have as many consumers as you wish, this value can be tweaked based on you throughput and your machine's - capacity. This can be tweaked using the `ConsumerCount` config. + capacity. This can be tweaked using the `ConsumerCount` config. ### I have a lot of messages in my dlq queue what do I do with them ? -- The AutoRetry struct exposes two http handlers, the `DSViewHandler` and the `DSReplayHandler`. +- The AutoRetry struct exposes two http handlers, the `DSViewHandler` and the `DSReplayHandler`. - The above handler conform to the `http.Handler` interface and can be used with any router of your choice. -- The `DSViewHandler` allows you to peek into messages without consuming them, whereas the `DSReplay` moves messages from `dlq` to `instant` queue ready to be consumed. \ No newline at end of file +- The `DSViewHandler` allows you to peek into messages without consuming them, whereas the `DSReplay` moves messages + from `dlq` to `instant` queue ready to be consumed. + +```golang +ar := rabbitmq.AutoRetry(...) +ctx := context.Background() +router := someRouter.New() +router.POST("/rabbitmq/dead_set/view",ar.DSViewHandler(ctx)) +router.POST("/rabbitmq/dead_set/replay", ar.DSReplayHandler(ctx)) +// pass this on to your HTTP server +``` diff --git a/cmd/ziggurat/main.go b/cmd/ziggurat/main.go index 8965d72..207a1bd 100644 --- a/cmd/ziggurat/main.go +++ b/cmd/ziggurat/main.go @@ -13,7 +13,7 @@ func die(err error) { if err != nil { fmt.Println("command failed with error(s):") fmt.Println(err.Error()) - os.Exit(127) + os.Exit(1) } } diff --git a/example/sampleapp/main.go~ b/example/sampleapp/main.go~ new file mode 100644 index 0000000..ec34563 --- /dev/null +++ b/example/sampleapp/main.go~ @@ -0,0 +1,66 @@ +//go:build ignore +// +build ignore + +package main + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/gojekfarm/ziggurat/mw/prometheus" + + "github.com/gojekfarm/ziggurat" + "github.com/gojekfarm/ziggurat/kafka" + "github.com/gojekfarm/ziggurat/logger" +) + +func main() { + var zig ziggurat.Ziggurat + var r kafka.Router + + ctx := context.Background() + l := logger.NewLogger(logger.LevelInfo) + + ks := kafka.Streams{ + StreamConfig: kafka.StreamConfig{{ + BootstrapServers: "localhost:9092", + Topics: "plain-text-log", + GroupID: "pt_consumer", + ConsumerCount: 2, + RouteGroup: "plain-text-messages", + }}, + Logger: l, + } + + r.HandleFunc("plain-text-messages/", func(ctx context.Context, event *ziggurat.Event) error { + val := string(event.Value) + s := strings.Split(val, "_") + num, err := strconv.Atoi(s[1]) + if err != nil { + return err + } + if num%2 == 0 { + return ziggurat.Retry + } + return nil + }) + + wait := make(chan struct{}) + zig.StartFunc(func(ctx context.Context) { + go func() { + err := prometheus.StartMonitoringServer(ctx) + l.Error("error running prom server", err) + wait <- struct{}{} + }() + }) + + h := ziggurat.Use(&r, prometheus.PublishHandlerMetrics) + + if runErr := zig.RunAll(ctx, h, &ks); runErr != nil { + l.Error("error running streams", runErr) + } + + <-wait +} diff --git a/go.mod b/go.mod index 3554de9..05d9d9b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.17 require ( github.com/cactus/go-statsd-client/v5 v5.0.0 github.com/confluentinc/confluent-kafka-go v1.7.0 - github.com/julienschmidt/httprouter v1.3.0 github.com/makasim/amqpextra v0.16.4 github.com/prometheus/client_golang v1.10.0 github.com/rs/zerolog v1.26.0 diff --git a/go.sum b/go.sum index 8f82678..f1c20b5 100644 --- a/go.sum +++ b/go.sum @@ -145,7 +145,6 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= diff --git a/kafka/config.go b/kafka/config.go index 6f2962a..cbe96f5 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -3,21 +3,23 @@ package kafka import "github.com/confluentinc/confluent-kafka-go/kafka" type ConsumerConfig struct { - BootstrapServers string - DebugLevel string - GroupID string - Topics string - AutoCommitInterval int - ConsumerCount int - PollTimeout int - RouteGroup string - AutoOffsetReset string + BootstrapServers string + DebugLevel string + GroupID string + Topics string + AutoCommitInterval int + ConsumerCount int + PollTimeout int + RouteGroup string + AutoOffsetReset string + PartitionAssignment string } func (c ConsumerConfig) toConfigMap() kafka.ConfigMap { autoCommitInterval := 15000 debugLevel := "consumer" autoOffsetReset := "earliest" + partitionAssignment := "range,roundrobin" if c.AutoCommitInterval > 0 { autoCommitInterval = c.AutoCommitInterval @@ -30,15 +32,20 @@ func (c ConsumerConfig) toConfigMap() kafka.ConfigMap { autoOffsetReset = c.AutoOffsetReset } + if c.PartitionAssignment != "" { + partitionAssignment = c.PartitionAssignment + } + kafkaConfMap := kafka.ConfigMap{ - "bootstrap.servers": c.BootstrapServers, - "group.id": c.GroupID, - "auto.offset.reset": autoOffsetReset, - "enable.auto.commit": true, - "auto.commit.interval.ms": autoCommitInterval, - "debug": debugLevel, - "go.logs.channel.enable": true, - "enable.auto.offset.store": false, + "bootstrap.servers": c.BootstrapServers, + "group.id": c.GroupID, + "auto.offset.reset": autoOffsetReset, + "enable.auto.commit": true, + "auto.commit.interval.ms": autoCommitInterval, + "debug": debugLevel, + "go.logs.channel.enable": true, + "enable.auto.offset.store": false, + "partition.assignment.strategy": partitionAssignment, } return kafkaConfMap } diff --git a/kafka/orchestrator.go b/kafka/orchestrator.go index ca2d077..15f1191 100644 --- a/kafka/orchestrator.go +++ b/kafka/orchestrator.go @@ -31,21 +31,21 @@ func (s *Streams) Stream(ctx context.Context, handler ziggurat.Handler) error { for _, consConf := range s.StreamConfig { groupID := consConf.GroupID topics := strings.Split(consConf.Topics, ",") + confMap := consConf.toConfigMap() + // sets default pollTimeout of 100ms + pollTimeout := 100 + // allow a PollTimeout of -1 + if consConf.PollTimeout > 0 || consConf.PollTimeout == -1 { + pollTimeout = consConf.PollTimeout + } s.workers[groupID] = make([]*worker, consConf.ConsumerCount) for i := 0; i < consConf.ConsumerCount; i++ { - confMap := consConf.toConfigMap() - consumer := createConsumer(&confMap, s.Logger, topics) workerID := fmt.Sprintf("%s_%d", groupID, i) - // sets default pollTimeout of 100ms - pollTimeout := 100 - // allow a PollTimeout of -1 - if consConf.PollTimeout > 0 || consConf.PollTimeout == -1 { - pollTimeout = consConf.PollTimeout - } w := worker{ handler: handler, logger: s.Logger, - consumer: consumer, + topics: topics, + confMap: &confMap, routeGroup: consConf.RouteGroup, pollTimeout: pollTimeout, killSig: make(chan struct{}), diff --git a/kafka/worker.go b/kafka/worker.go index efab9b5..da2201f 100644 --- a/kafka/worker.go +++ b/kafka/worker.go @@ -23,11 +23,16 @@ type worker struct { routeGroup string pollTimeout int killSig chan struct{} + topics []string + confMap *kafka.ConfigMap id string err error } func (w *worker) run(ctx context.Context) { + + w.consumer = createConsumer(w.confMap, w.logger, w.topics) + defer func() { err := closeConsumer(w.consumer) w.logger.Error("error closing kafka consumer", err, map[string]interface{}{"Worker-ID": w.id}) diff --git a/router/router.go b/router/router.go index 1f1e20b..d0d05b8 100644 --- a/router/router.go +++ b/router/router.go @@ -36,7 +36,6 @@ func (dr *defaultRouter) Handle(ctx context.Context, event *ziggurat.Event) erro } // New creates a new router -// router stores handlers in a map[string]ziggurat.Handler // the route received in the event header is matched against the entries in the map // and the corresponding handler is executed func New(opts ...func(dr *defaultRouter)) *defaultRouter { diff --git a/server/run.go b/server/run.go index 952587f..e692039 100644 --- a/server/run.go +++ b/server/run.go @@ -6,7 +6,7 @@ import ( ) func Run(ctx context.Context, s *http.Server) error { - errChan := make(chan error) + errChan := make(chan error, 1) go func() { err := s.ListenAndServe() if err != nil { diff --git a/zigg.go b/zigg.go index 0549c9d..e227ea5 100644 --- a/zigg.go +++ b/zigg.go @@ -25,12 +25,11 @@ var signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2, sysc // var z ziggurat.Ziggurat // z.run(ctx context.Context,s ziggurat.Streamer,h ziggurat.Handler) type Ziggurat struct { - handler Handler - Logger StructuredLogger - startFunc StartFunction - stopFunc StopFunction - streams Streamer - multiStreams []Streamer + handler Handler + Logger StructuredLogger + startFunc StartFunction + stopFunc StopFunction + streams Streamer } // Run method runs the provided streams and blocks on it until an error is encountered @@ -98,6 +97,9 @@ func (z *Ziggurat) StopFunc(function StopFunction) { // The streams are started concurrently and the handler is executed for // all the streams. func (z *Ziggurat) RunAll(ctx context.Context, handler Handler, streams ...Streamer) error { + parentCtx, cancelFunc := signal.NotifyContext(ctx, signals...) + defer cancelFunc() + if z.Logger == nil { z.Logger = logger.NOOP } @@ -109,18 +111,14 @@ func (z *Ziggurat) RunAll(ctx context.Context, handler Handler, streams ...Strea panic("error: handler cannot be nil") } - parentCtx, cancelFunc := signal.NotifyContext(ctx, signals...) - if z.startFunc != nil { z.startFunc(parentCtx) } - defer cancelFunc() - var wg sync.WaitGroup wg.Add(len(streams)) errChan := make(chan error, len(streams)) - for i, _ := range streams { + for i := range streams { go func(i int) { err := streams[i].Stream(parentCtx, handler) errChan <- err