forked from content-services/content-sources-backend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
factory.go
32 lines (26 loc) · 952 Bytes
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package event
import (
"context"
"github.com/confluentinc/confluent-kafka-go/kafka"
m "github.com/content-services/content-sources-backend/pkg/instrumentation"
"github.com/rs/zerolog/log"
)
// Adapted from: https://github.com/RedHatInsights/playbook-dispatcher/blob/master/internal/response-consumer/main.go#L21
// Start initiate a kafka run loop consumer given the
// configuration and the event handler for the received
// messages.
// config a reference to an initialized KafkaConfig. It cannot be nil.
// handler is the event handler which receive the read messages.
func Start(ctx context.Context, config *KafkaConfig, handler Eventable, m *m.Metrics) {
var (
err error
consumer *kafka.Consumer
)
if consumer, err = NewConsumer(config); err != nil {
log.Logger.Panic().Msgf("error creating consumer: %s", err.Error())
return
}
defer consumer.Close()
start := NewConsumerEventLoop(ctx, consumer, handler, m)
start()
}