From bcece41d7c52b999088cc264b58af7558820b704 Mon Sep 17 00:00:00 2001 From: eminano Date: Wed, 28 Aug 2024 12:31:56 +0200 Subject: [PATCH] Use the same kafka reader for fetching and committing messages --- cmd/config.go | 30 ++++++++----------- pkg/stream/config.go | 4 +-- pkg/stream/integration/helper_test.go | 14 +++------ .../integration/pg_kafka_integration_test.go | 11 +++++-- pkg/stream/stream_run.go | 17 +++++++++-- .../kafka/wal_kafka_checkpointer.go | 20 +++++-------- pkg/wal/listener/kafka/wal_kafka_reader.go | 24 ++------------- 7 files changed, 52 insertions(+), 68 deletions(-) diff --git a/cmd/config.go b/cmd/config.go index a2a030c..66a3ab1 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -12,7 +12,6 @@ import ( "github.com/xataio/pgstream/pkg/stream" "github.com/xataio/pgstream/pkg/tls" kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka" - kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka" kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka" "github.com/xataio/pgstream/pkg/wal/processor/search" "github.com/xataio/pgstream/pkg/wal/processor/search/opensearch" @@ -79,33 +78,28 @@ func parseKafkaListenerConfig() *stream.KafkaListenerConfig { return nil } - readerCfg := parseKafkaReaderConfig(kafkaServers, kafkaTopic, consumerGroupID) - return &stream.KafkaListenerConfig{ - Reader: readerCfg, - Checkpointer: parseKafkaCheckpointConfig(&readerCfg), + Reader: parseKafkaReaderConfig(kafkaServers, kafkaTopic, consumerGroupID), + Checkpointer: parseKafkaCheckpointConfig(), } } -func parseKafkaReaderConfig(kafkaServers []string, kafkaTopic, consumerGroupID string) kafkalistener.ReaderConfig { - return kafkalistener.ReaderConfig{ - Kafka: kafka.ReaderConfig{ - Conn: kafka.ConnConfig{ - Servers: kafkaServers, - Topic: kafka.TopicConfig{ - Name: kafkaTopic, - }, - TLS: parseTLSConfig("PGSTREAM_KAFKA"), +func parseKafkaReaderConfig(kafkaServers []string, kafkaTopic, consumerGroupID string) kafka.ReaderConfig { + return kafka.ReaderConfig{ + Conn: kafka.ConnConfig{ + Servers: kafkaServers, + Topic: kafka.TopicConfig{ + Name: kafkaTopic, }, - ConsumerGroupID: consumerGroupID, - ConsumerGroupStartOffset: viper.GetString("PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET"), + TLS: parseTLSConfig("PGSTREAM_KAFKA"), }, + ConsumerGroupID: consumerGroupID, + ConsumerGroupStartOffset: viper.GetString("PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET"), } } -func parseKafkaCheckpointConfig(readerCfg *kafkalistener.ReaderConfig) kafkacheckpoint.Config { +func parseKafkaCheckpointConfig() kafkacheckpoint.Config { return kafkacheckpoint.Config{ - Reader: readerCfg.Kafka, CommitBackoff: parseBackoffConfig("PGSTREAM_KAFKA_COMMIT"), } } diff --git a/pkg/stream/config.go b/pkg/stream/config.go index 653fad1..12a9a1c 100644 --- a/pkg/stream/config.go +++ b/pkg/stream/config.go @@ -6,8 +6,8 @@ import ( "errors" "time" + "github.com/xataio/pgstream/pkg/kafka" kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka" - kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka" kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka" "github.com/xataio/pgstream/pkg/wal/processor/search" "github.com/xataio/pgstream/pkg/wal/processor/search/opensearch" @@ -32,7 +32,7 @@ type PostgresListenerConfig struct { } type KafkaListenerConfig struct { - Reader kafkalistener.ReaderConfig + Reader kafka.ReaderConfig Checkpointer kafkacheckpoint.Config } diff --git a/pkg/stream/integration/helper_test.go b/pkg/stream/integration/helper_test.go index 586ce4c..22fa624 100644 --- a/pkg/stream/integration/helper_test.go +++ b/pkg/stream/integration/helper_test.go @@ -19,7 +19,6 @@ import ( "github.com/xataio/pgstream/pkg/tls" "github.com/xataio/pgstream/pkg/wal" kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka" - kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka" kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka" "github.com/xataio/pgstream/pkg/wal/processor/search/opensearch" "github.com/xataio/pgstream/pkg/wal/processor/translator" @@ -105,18 +104,13 @@ func testPostgresListenerCfg() stream.ListenerConfig { } func testKafkaListenerCfg() stream.ListenerConfig { - readerCfg := kafkalistener.ReaderConfig{ - Kafka: kafkalib.ReaderConfig{ - Conn: testKafkaCfg(), - ConsumerGroupID: "integration-test-group", - }, - } return stream.ListenerConfig{ Kafka: &stream.KafkaListenerConfig{ - Reader: readerCfg, - Checkpointer: kafkacheckpoint.Config{ - Reader: readerCfg.Kafka, + Reader: kafkalib.ReaderConfig{ + Conn: testKafkaCfg(), + ConsumerGroupID: "integration-test-group", }, + Checkpointer: kafkacheckpoint.Config{}, }, } } diff --git a/pkg/stream/integration/pg_kafka_integration_test.go b/pkg/stream/integration/pg_kafka_integration_test.go index 4db7b47..3e8a9a7 100644 --- a/pkg/stream/integration/pg_kafka_integration_test.go +++ b/pkg/stream/integration/pg_kafka_integration_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/xataio/pgstream/pkg/kafka" + "github.com/xataio/pgstream/pkg/log" "github.com/xataio/pgstream/pkg/schemalog" "github.com/xataio/pgstream/pkg/stream" "github.com/xataio/pgstream/pkg/wal" @@ -98,10 +100,15 @@ func Test_PostgresToKafka(t *testing.T) { } func startKafkaReader(t *testing.T, ctx context.Context, processor func(context.Context, *wal.Event) error) { - reader, err := kafkalistener.NewReader(testKafkaListenerCfg().Kafka.Reader, processor) + kafkaReader, err := kafka.NewReader(testKafkaListenerCfg().Kafka.Reader, log.NewNoopLogger()) + require.NoError(t, err) + reader, err := kafkalistener.NewWALReader(kafkaReader, processor) require.NoError(t, err) go func() { - defer reader.Close() + defer func() { + reader.Close() + kafkaReader.Close() + }() reader.Listen(ctx) }() } diff --git a/pkg/stream/stream_run.go b/pkg/stream/stream_run.go index 71c376f..ce95401 100644 --- a/pkg/stream/stream_run.go +++ b/pkg/stream/stream_run.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" + "github.com/xataio/pgstream/pkg/kafka" loglib "github.com/xataio/pgstream/pkg/log" "github.com/xataio/pgstream/pkg/wal/checkpointer" kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka" @@ -47,7 +48,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric config.Listener.Postgres.Replication, pgreplication.WithLogger(logger)) if err != nil { - return fmt.Errorf("error setting up postgres replication handler") + return fmt.Errorf("error setting up postgres replication handler: %w", err) } defer replicationHandler.Close() } @@ -60,6 +61,16 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric } } + var kafkaReader *kafka.Reader + if config.Listener.Kafka != nil { + var err error + kafkaReader, err = kafka.NewReader(config.Listener.Kafka.Reader, logger) + if err != nil { + return fmt.Errorf("error setting up kafka reader: %w", err) + } + defer kafkaReader.Close() + } + // Checkpointer var checkpoint checkpointer.Checkpoint @@ -67,6 +78,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric case config.Listener.Kafka != nil: kafkaCheckpointer, err := kafkacheckpoint.New(ctx, config.Listener.Kafka.Checkpointer, + kafkaReader, kafkacheckpoint.WithLogger(logger)) if err != nil { return fmt.Errorf("error setting up kafka checkpointer:%w", err) @@ -215,7 +227,8 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric }) case config.Listener.Kafka != nil: var err error - listener, err := kafkalistener.NewReader(config.Listener.Kafka.Reader, + listener, err := kafkalistener.NewWALReader( + kafkaReader, processor.ProcessWALEvent, kafkalistener.WithLogger(logger)) if err != nil { diff --git a/pkg/wal/checkpointer/kafka/wal_kafka_checkpointer.go b/pkg/wal/checkpointer/kafka/wal_kafka_checkpointer.go index 1a9e5e8..5dc599c 100644 --- a/pkg/wal/checkpointer/kafka/wal_kafka_checkpointer.go +++ b/pkg/wal/checkpointer/kafka/wal_kafka_checkpointer.go @@ -16,18 +16,17 @@ import ( // Checkpointer is a kafka implementation of the wal checkpointer. It commits // the message offsets to kafka. type Checkpointer struct { - committer msgCommitter + committer kafkaCommitter backoffProvider backoff.Provider logger loglib.Logger offsetParser kafka.OffsetParser } type Config struct { - Reader kafka.ReaderConfig CommitBackoff backoff.Config } -type msgCommitter interface { +type kafkaCommitter interface { CommitOffsets(ctx context.Context, offsets ...*kafka.Offset) error Close() error } @@ -36,29 +35,26 @@ type Option func(c *Checkpointer) // New returns a kafka checkpointer that commits the message offsets to kafka by // partition/topic on demand. -func New(ctx context.Context, cfg Config, opts ...Option) (*Checkpointer, error) { +func New(ctx context.Context, cfg Config, committer kafkaCommitter, opts ...Option) (*Checkpointer, error) { c := &Checkpointer{ logger: loglib.NewNoopLogger(), backoffProvider: backoff.NewProvider(&cfg.CommitBackoff), offsetParser: kafka.NewOffsetParser(), + committer: committer, } for _, opt := range opts { opt(c) } - var err error - c.committer, err = kafka.NewReader(cfg.Reader, c.logger) - if err != nil { - return nil, err - } - return c, nil } func WithLogger(l loglib.Logger) Option { return func(c *Checkpointer) { - c.logger = loglib.NewLogger(l) + c.logger = loglib.NewLogger(l).WithFields(loglib.Fields{ + loglib.ServiceField: "wal_kafka_checkpointer", + }) } } @@ -99,7 +95,7 @@ func (c *Checkpointer) CommitOffsets(ctx context.Context, positions []wal.Commit } func (c *Checkpointer) Close() error { - return c.committer.Close() + return nil } func (c *Checkpointer) commitOffsetsWithRetry(ctx context.Context, offsets []*kafka.Offset) error { diff --git a/pkg/wal/listener/kafka/wal_kafka_reader.go b/pkg/wal/listener/kafka/wal_kafka_reader.go index 1c8ff90..4126179 100644 --- a/pkg/wal/listener/kafka/wal_kafka_reader.go +++ b/pkg/wal/listener/kafka/wal_kafka_reader.go @@ -7,7 +7,6 @@ import ( "encoding/json" "errors" "fmt" - "runtime/debug" "github.com/xataio/pgstream/pkg/kafka" loglib "github.com/xataio/pgstream/pkg/log" @@ -25,13 +24,8 @@ type Reader struct { processRecord payloadProcessor } -type ReaderConfig struct { - Kafka kafka.ReaderConfig -} - type kafkaReader interface { FetchMessage(context.Context) (*kafka.Message, error) - Close() error } type payloadProcessor func(context.Context, *wal.Event) error @@ -40,24 +34,19 @@ type Option func(*Reader) // NewReader returns a kafka reader that listens to wal events and calls the // processor on input. -func NewReader(config ReaderConfig, processRecord payloadProcessor, opts ...Option) (*Reader, error) { +func NewWALReader(kafkaReader kafkaReader, processRecord payloadProcessor, opts ...Option) (*Reader, error) { r := &Reader{ logger: loglib.NewNoopLogger(), processRecord: processRecord, unmarshaler: json.Unmarshal, offsetParser: kafka.NewOffsetParser(), + reader: kafkaReader, } for _, opt := range opts { opt(r) } - var err error - r.reader, err = kafka.NewReader(config.Kafka, r.logger) - if err != nil { - return nil, err - } - return r, nil } @@ -117,14 +106,5 @@ func (r *Reader) Listen(ctx context.Context) error { } func (r *Reader) Close() error { - // Cleanly closing the connection to Kafka is important - // in order for the consumer's partitions to be re-allocated - // quickly. - if err := r.reader.Close(); err != nil { - r.logger.Error(err, "error closing connection to kafka", loglib.Fields{ - "stack_trace": debug.Stack(), - }) - return err - } return nil }