Skip to content

Commit

Permalink
Merge pull request #353 from maxekman/fix/kafka-reconnect-issue
Browse files Browse the repository at this point in the history
Fix / Kafka reconnect issue
  • Loading branch information
maxekman authored Nov 8, 2021
2 parents 7d5b216 + 8596076 commit 3602de4
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 27 deletions.
49 changes: 31 additions & 18 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"log"
"strings"
"sync"
"time"

Expand All @@ -36,6 +35,7 @@ type EventBus struct {
addr string
appID string
topic string
client *kafka.Client
writer *kafka.Writer
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
Expand Down Expand Up @@ -73,7 +73,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
}

// Get or create the topic.
client := &kafka.Client{
b.client = &kafka.Client{
Addr: kafka.TCP(addr),
}

Expand All @@ -82,7 +82,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
var err error

for i := 0; i < 10; i++ {
resp, err = client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
resp, err = b.client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{{
Topic: b.topic,
NumPartitions: 5,
Expand Down Expand Up @@ -190,31 +190,44 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
}

// Get or create the subscription.
joined := make(chan struct{})
groupID := b.appID + "_" + h.HandlerType().String()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{b.addr},
Topic: b.topic,
GroupID: groupID, // Send messages to only one subscriber per group.
MaxBytes: 100e3, // 100KB
MaxWait: time.Second, // Allow to exit readloop in max 1s.
WatchPartitionChanges: true,
StartOffset: kafka.LastOffset, // Don't read old messages.
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
// NOTE: Hacky way to use logger to find out when the reader is ready.
if strings.HasPrefix(msg, "Joined group") {
select {
case <-joined:
default:
close(joined) // Close once.
}
}
}),
})

select {
case <-joined:
case <-time.After(10 * time.Second):
req := &kafka.ListGroupsRequest{
Addr: b.client.Addr,
}

exist := false

for i := 0; i < 20; i++ {
resp, err := b.client.ListGroups(ctx, req)
if err != nil || resp.Error != nil {
return fmt.Errorf("could not list Kafka groups: %w", err)
}

for _, grp := range resp.Groups {
if grp.GroupID == groupID {
exist = true

break
}
}

if exist {
break
}

time.Sleep(500 * time.Millisecond)
}

if !exist {
return fmt.Errorf("did not join group in time")
}

Expand Down
17 changes: 16 additions & 1 deletion eventbus/kafka/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,27 @@ func TestAddHandlerIntegration(t *testing.T) {
t.Skip("skipping integration test")
}

bus1, _, err := newTestEventBus("")
bus1, appID, err := newTestEventBus("")
if err != nil {
t.Fatal("there should be no error:", err)
}

eventbus.TestAddHandler(t, bus1)

if err := bus1.Close(); err != nil {
t.Error("there should be no error:", err)
}

bus2, _, err := newTestEventBus(appID)
if err != nil {
t.Fatal("there should be no error:", err)
}

eventbus.TestAddHandler(t, bus2)

if err := bus2.Close(); err != nil {
t.Error("there should be no error:", err)
}
}

func TestEventBusIntegration(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/nats-io/nats-server/v2 v2.4.0 // indirect
github.com/nats-io/nats.go v1.12.1
github.com/opentracing/opentracing-go v1.2.0
github.com/segmentio/kafka-go v0.4.17
github.com/segmentio/kafka-go v0.4.23
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.mongodb.org/mongo-driver v1.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/segmentio/kafka-go v0.4.23 h1:jjacNjmn1fPvkVGFs6dej98fa7UT/bYF8wZBFMMIld4=
github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
22 changes: 17 additions & 5 deletions repo/mongodb/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ var ErrNoCursor = errors.New("no cursor")

// Repo implements an MongoDB repository for entities.
type Repo struct {
client *mongo.Client
entities *mongo.Collection
newEntity func() eh.Entity
client *mongo.Client
entities *mongo.Collection
newEntity func() eh.Entity
connectionCheck bool
}

// NewRepo creates a new Repo.
Expand Down Expand Up @@ -79,8 +80,10 @@ func NewRepoWithClient(client *mongo.Client, db, collection string, options ...O
}
}

if err := r.client.Ping(context.Background(), readpref.Primary()); err != nil {
return nil, fmt.Errorf("could not connect to MongoDB: %w", err)
if r.connectionCheck {
if err := r.client.Ping(context.Background(), readpref.Primary()); err != nil {
return nil, fmt.Errorf("could not connect to MongoDB: %w", err)
}
}

return r, nil
Expand All @@ -89,6 +92,15 @@ func NewRepoWithClient(client *mongo.Client, db, collection string, options ...O
// Option is an option setter used to configure creation.
type Option func(*Repo) error

// WithConnectionCheck adds an optional DB connection check when calling New().
func WithConnectionCheck(h eh.EventHandler) Option {
return func(r *Repo) error {
r.connectionCheck = true

return nil
}
}

// InnerRepo implements the InnerRepo method of the eventhorizon.ReadRepo interface.
func (r *Repo) InnerRepo(ctx context.Context) eh.ReadRepo {
return nil
Expand Down

0 comments on commit 3602de4

Please sign in to comment.