Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same kafka reader for fetching and committing messages #65

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/xataio/pgstream/pkg/stream"
"github.com/xataio/pgstream/pkg/tls"
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
"github.com/xataio/pgstream/pkg/wal/processor/search"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
Expand Down Expand Up @@ -79,33 +78,28 @@ func parseKafkaListenerConfig() *stream.KafkaListenerConfig {
return nil
}

readerCfg := parseKafkaReaderConfig(kafkaServers, kafkaTopic, consumerGroupID)

return &stream.KafkaListenerConfig{
Reader: readerCfg,
Checkpointer: parseKafkaCheckpointConfig(&readerCfg),
Reader: parseKafkaReaderConfig(kafkaServers, kafkaTopic, consumerGroupID),
Checkpointer: parseKafkaCheckpointConfig(),
}
}

func parseKafkaReaderConfig(kafkaServers []string, kafkaTopic, consumerGroupID string) kafkalistener.ReaderConfig {
return kafkalistener.ReaderConfig{
Kafka: kafka.ReaderConfig{
Conn: kafka.ConnConfig{
Servers: kafkaServers,
Topic: kafka.TopicConfig{
Name: kafkaTopic,
},
TLS: parseTLSConfig("PGSTREAM_KAFKA"),
func parseKafkaReaderConfig(kafkaServers []string, kafkaTopic, consumerGroupID string) kafka.ReaderConfig {
return kafka.ReaderConfig{
Conn: kafka.ConnConfig{
Servers: kafkaServers,
Topic: kafka.TopicConfig{
Name: kafkaTopic,
},
ConsumerGroupID: consumerGroupID,
ConsumerGroupStartOffset: viper.GetString("PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET"),
TLS: parseTLSConfig("PGSTREAM_KAFKA"),
},
ConsumerGroupID: consumerGroupID,
ConsumerGroupStartOffset: viper.GetString("PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET"),
}
}

func parseKafkaCheckpointConfig(readerCfg *kafkalistener.ReaderConfig) kafkacheckpoint.Config {
func parseKafkaCheckpointConfig() kafkacheckpoint.Config {
return kafkacheckpoint.Config{
Reader: readerCfg.Kafka,
CommitBackoff: parseBackoffConfig("PGSTREAM_KAFKA_COMMIT"),
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"errors"
"time"

"github.com/xataio/pgstream/pkg/kafka"
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
"github.com/xataio/pgstream/pkg/wal/processor/search"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
Expand All @@ -32,7 +32,7 @@ type PostgresListenerConfig struct {
}

type KafkaListenerConfig struct {
Reader kafkalistener.ReaderConfig
Reader kafka.ReaderConfig
Checkpointer kafkacheckpoint.Config
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/stream/integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/xataio/pgstream/pkg/tls"
"github.com/xataio/pgstream/pkg/wal"
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
"github.com/xataio/pgstream/pkg/wal/processor/translator"
Expand Down Expand Up @@ -105,18 +104,13 @@ func testPostgresListenerCfg() stream.ListenerConfig {
}

func testKafkaListenerCfg() stream.ListenerConfig {
readerCfg := kafkalistener.ReaderConfig{
Kafka: kafkalib.ReaderConfig{
Conn: testKafkaCfg(),
ConsumerGroupID: "integration-test-group",
},
}
return stream.ListenerConfig{
Kafka: &stream.KafkaListenerConfig{
Reader: readerCfg,
Checkpointer: kafkacheckpoint.Config{
Reader: readerCfg.Kafka,
Reader: kafkalib.ReaderConfig{
Conn: testKafkaCfg(),
ConsumerGroupID: "integration-test-group",
},
Checkpointer: kafkacheckpoint.Config{},
},
}
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/stream/integration/pg_kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/xataio/pgstream/pkg/kafka"
"github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/stream"
"github.com/xataio/pgstream/pkg/wal"
Expand Down Expand Up @@ -98,10 +100,15 @@ func Test_PostgresToKafka(t *testing.T) {
}

func startKafkaReader(t *testing.T, ctx context.Context, processor func(context.Context, *wal.Event) error) {
reader, err := kafkalistener.NewReader(testKafkaListenerCfg().Kafka.Reader, processor)
kafkaReader, err := kafka.NewReader(testKafkaListenerCfg().Kafka.Reader, log.NewNoopLogger())
require.NoError(t, err)
reader, err := kafkalistener.NewWALReader(kafkaReader, processor)
require.NoError(t, err)
go func() {
defer reader.Close()
defer func() {
reader.Close()
kafkaReader.Close()
}()
reader.Listen(ctx)
}()
}
17 changes: 15 additions & 2 deletions pkg/stream/stream_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"

"github.com/xataio/pgstream/pkg/kafka"
loglib "github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/wal/checkpointer"
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
Expand Down Expand Up @@ -47,7 +48,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric
config.Listener.Postgres.Replication,
pgreplication.WithLogger(logger))
if err != nil {
return fmt.Errorf("error setting up postgres replication handler")
return fmt.Errorf("error setting up postgres replication handler: %w", err)
}
defer replicationHandler.Close()
}
Expand All @@ -60,13 +61,24 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric
}
}

var kafkaReader *kafka.Reader
if config.Listener.Kafka != nil {
var err error
kafkaReader, err = kafka.NewReader(config.Listener.Kafka.Reader, logger)
if err != nil {
return fmt.Errorf("error setting up kafka reader: %w", err)
}
defer kafkaReader.Close()
}

// Checkpointer

var checkpoint checkpointer.Checkpoint
switch {
case config.Listener.Kafka != nil:
kafkaCheckpointer, err := kafkacheckpoint.New(ctx,
config.Listener.Kafka.Checkpointer,
kafkaReader,
kafkacheckpoint.WithLogger(logger))
if err != nil {
return fmt.Errorf("error setting up kafka checkpointer:%w", err)
Expand Down Expand Up @@ -215,7 +227,8 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric
})
case config.Listener.Kafka != nil:
var err error
listener, err := kafkalistener.NewReader(config.Listener.Kafka.Reader,
listener, err := kafkalistener.NewWALReader(
kafkaReader,
processor.ProcessWALEvent,
kafkalistener.WithLogger(logger))
if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions pkg/wal/checkpointer/kafka/wal_kafka_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ import (
// Checkpointer is a kafka implementation of the wal checkpointer. It commits
// the message offsets to kafka.
type Checkpointer struct {
committer msgCommitter
committer kafkaCommitter
backoffProvider backoff.Provider
logger loglib.Logger
offsetParser kafka.OffsetParser
}

type Config struct {
Reader kafka.ReaderConfig
CommitBackoff backoff.Config
}

type msgCommitter interface {
type kafkaCommitter interface {
CommitOffsets(ctx context.Context, offsets ...*kafka.Offset) error
Close() error
}
Expand All @@ -36,29 +35,26 @@ type Option func(c *Checkpointer)

// New returns a kafka checkpointer that commits the message offsets to kafka by
// partition/topic on demand.
func New(ctx context.Context, cfg Config, opts ...Option) (*Checkpointer, error) {
func New(ctx context.Context, cfg Config, committer kafkaCommitter, opts ...Option) (*Checkpointer, error) {
c := &Checkpointer{
logger: loglib.NewNoopLogger(),
backoffProvider: backoff.NewProvider(&cfg.CommitBackoff),
offsetParser: kafka.NewOffsetParser(),
committer: committer,
}

for _, opt := range opts {
opt(c)
}

var err error
c.committer, err = kafka.NewReader(cfg.Reader, c.logger)
if err != nil {
return nil, err
}

return c, nil
}

func WithLogger(l loglib.Logger) Option {
return func(c *Checkpointer) {
c.logger = loglib.NewLogger(l)
c.logger = loglib.NewLogger(l).WithFields(loglib.Fields{
loglib.ServiceField: "wal_kafka_checkpointer",
})
}
}

Expand Down Expand Up @@ -99,7 +95,7 @@ func (c *Checkpointer) CommitOffsets(ctx context.Context, positions []wal.Commit
}

func (c *Checkpointer) Close() error {
return c.committer.Close()
return nil
}

func (c *Checkpointer) commitOffsetsWithRetry(ctx context.Context, offsets []*kafka.Offset) error {
Expand Down
24 changes: 2 additions & 22 deletions pkg/wal/listener/kafka/wal_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime/debug"

"github.com/xataio/pgstream/pkg/kafka"
loglib "github.com/xataio/pgstream/pkg/log"
Expand All @@ -25,13 +24,8 @@ type Reader struct {
processRecord payloadProcessor
}

type ReaderConfig struct {
Kafka kafka.ReaderConfig
}

type kafkaReader interface {
FetchMessage(context.Context) (*kafka.Message, error)
Close() error
}

type payloadProcessor func(context.Context, *wal.Event) error
Expand All @@ -40,24 +34,19 @@ type Option func(*Reader)

// NewReader returns a kafka reader that listens to wal events and calls the
// processor on input.
func NewReader(config ReaderConfig, processRecord payloadProcessor, opts ...Option) (*Reader, error) {
func NewWALReader(kafkaReader kafkaReader, processRecord payloadProcessor, opts ...Option) (*Reader, error) {
r := &Reader{
logger: loglib.NewNoopLogger(),
processRecord: processRecord,
unmarshaler: json.Unmarshal,
offsetParser: kafka.NewOffsetParser(),
reader: kafkaReader,
}

for _, opt := range opts {
opt(r)
}

var err error
r.reader, err = kafka.NewReader(config.Kafka, r.logger)
if err != nil {
return nil, err
}

return r, nil
}

Expand Down Expand Up @@ -117,14 +106,5 @@ func (r *Reader) Listen(ctx context.Context) error {
}

func (r *Reader) Close() error {
// Cleanly closing the connection to Kafka is important
// in order for the consumer's partitions to be re-allocated
// quickly.
if err := r.reader.Close(); err != nil {
r.logger.Error(err, "error closing connection to kafka", loglib.Fields{
"stack_trace": debug.Stack(),
})
return err
}
return nil
}