From c6a96711d87f7751817735936edf92e3f2c2323c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 24 Jan 2024 10:16:51 -0800 Subject: [PATCH] [Config] Standing up the PostgreSQL config (#28) --- config/config.go | 47 +++++++++++++++------ config/postgres.go | 58 +++++++++++++++++++++++++ config/postgres_test.go | 82 ++++++++++++++++++++++++++++++++++++ constants/constants.go | 6 ++- lib/kafkalib/batch.go | 16 +++---- lib/kafkalib/batch_test.go | 11 +---- lib/kafkalib/kafka.go | 2 +- sources/dynamodb/dynamodb.go | 4 +- 8 files changed, 190 insertions(+), 36 deletions(-) create mode 100644 config/postgres.go create mode 100644 config/postgres_test.go diff --git a/config/config.go b/config/config.go index 90d2b314..c111f67d 100644 --- a/config/config.go +++ b/config/config.go @@ -15,14 +15,16 @@ type Kafka struct { BootstrapServers string `yaml:"bootstrapServers"` TopicPrefix string `yaml:"topicPrefix"` AwsEnabled bool `yaml:"awsEnabled"` - PublishSize int `yaml:"publishSize,omitempty"` - MaxRequestSize int64 `yaml:"maxRequestSize,omitempty"` + PublishSize uint `yaml:"publishSize,omitempty"` + MaxRequestSize uint64 `yaml:"maxRequestSize,omitempty"` } -func (k *Kafka) GenerateDefault() { +func (k *Kafka) GetPublishSize() uint { if k.PublishSize == 0 { - k.PublishSize = 2500 + return constants.DefaultPublishSize } + + return k.PublishSize } func (k *Kafka) Validate() error { @@ -54,8 +56,18 @@ type Metrics struct { Tags []string `yaml:"tags"` } +type Source string + +const ( + SourceDynamo Source = "dynamodb" + SourcePostgreSQL Source = "postgresql" +) + type Settings struct { - DynamoDB *DynamoDB `yaml:"dynamodb"` + Source Source `yaml:"source"` + PostgreSQL *PostgreSQL `yaml:"postgresql"` + DynamoDB *DynamoDB `yaml:"dynamodb"` + Reporting *Reporting `yaml:"reporting"` Metrics *Metrics `yaml:"metrics"` Kafka *Kafka `yaml:"kafka"` @@ -74,12 +86,24 @@ func (s *Settings) Validate() error { return fmt.Errorf("kafka validation failed: %v", err) } - if s.DynamoDB == nil { - return fmt.Errorf("dynamodb config is nil") - } - - if err := s.DynamoDB.Validate(); err != nil { - return fmt.Errorf("dynamodb validation failed: %v", err) + switch s.Source { + // By default, if you don't pass in a source -- it will be dynamodb for backwards compatibility + case SourceDynamo, "": + if s.DynamoDB == nil { + return fmt.Errorf("dynamodb config is nil") + } + + if err := s.DynamoDB.Validate(); err != nil { + return fmt.Errorf("dynamodb validation failed: %v", err) + } + case SourcePostgreSQL: + if s.PostgreSQL == nil { + return fmt.Errorf("postgres config is nil") + } + + if err := s.PostgreSQL.Validate(); err != nil { + return fmt.Errorf("postgres validation failed: %v", err) + } } return nil @@ -101,7 +125,6 @@ func ReadConfig(fp string) (*Settings, error) { log.Fatalf("Failed to validate config file, err: %v", err) } - settings.Kafka.GenerateDefault() return &settings, nil } diff --git a/config/postgres.go b/config/postgres.go new file mode 100644 index 00000000..e896ae7f --- /dev/null +++ b/config/postgres.go @@ -0,0 +1,58 @@ +package config + +import ( + "fmt" + "github.com/artie-labs/reader/constants" + "github.com/artie-labs/transfer/lib/stringutil" +) + +type PostgreSQL struct { + Host string `yaml:"host"` + Port string `yaml:"port"` + Username string `yaml:"userName"` + Password string `yaml:"password"` + Database string `yaml:"database"` + Tables []*PostgreSQLTable `yaml:"tables"` +} + +type PostgreSQLTable struct { + Name string `yaml:"name"` + Schema string `yaml:"schema"` + Limit uint `yaml:"limit"` + OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart"` + OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd"` +} + +func (p *PostgreSQLTable) GetLimit() uint { + if p.Limit == 0 { + return constants.DefaultLimit + } + + return p.Limit +} + +func (p *PostgreSQL) Validate() error { + if p == nil { + return fmt.Errorf("postgres config is nil") + } + + if stringutil.Empty(p.Host, p.Port, p.Username, p.Password, p.Database) { + return fmt.Errorf("one of the postgresql settings is empty: host, port, username, password, database") + } + + if len(p.Tables) == 0 { + return fmt.Errorf("no tables passed in") + } + + for _, table := range p.Tables { + if table.Name == "" { + return fmt.Errorf("table name must be passed in") + } + + if table.Schema == "" { + return fmt.Errorf("schema must be passed in") + } + } + + return nil +} diff --git a/config/postgres_test.go b/config/postgres_test.go new file mode 100644 index 00000000..82dbd9a4 --- /dev/null +++ b/config/postgres_test.go @@ -0,0 +1,82 @@ +package config + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestPostgresValidate(t *testing.T) { + { + // Config is empty + var p *PostgreSQL + assert.ErrorContains(t, p.Validate(), "postgres config is nil") + } + { + // Host, port, username, password, database are empty + p := &PostgreSQL{} + assert.ErrorContains(t, p.Validate(), "one of the postgresql settings is empty: host, port, username, password, database") + } + { + // Tables are empty + p := &PostgreSQL{ + Host: "host", + Port: "port", + Username: "username", + Password: "password", + Database: "database", + } + + assert.ErrorContains(t, p.Validate(), "no tables passed in") + } + { + // No table name + p := &PostgreSQL{ + Host: "host", + Port: "port", + Username: "username", + Password: "password", + Database: "database", + Tables: []*PostgreSQLTable{ + { + Schema: "schema", + }, + }, + } + + assert.ErrorContains(t, p.Validate(), "table name must be passed in") + } + { + // No schema name + p := &PostgreSQL{ + Host: "host", + Port: "port", + Username: "username", + Password: "password", + Database: "database", + Tables: []*PostgreSQLTable{ + { + Name: "name", + }, + }, + } + + assert.ErrorContains(t, p.Validate(), "schema must be passed in") + } + { + // Valid + p := &PostgreSQL{ + Host: "host", + Port: "port", + Username: "username", + Password: "password", + Database: "database", + Tables: []*PostgreSQLTable{ + { + Name: "name", + Schema: "schema", + }, + }, + } + assert.NoError(t, p.Validate()) + } +} diff --git a/constants/constants.go b/constants/constants.go index 033f4e03..f02b88a8 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -5,6 +5,10 @@ type contextKey string const ( ConfigKey contextKey = "__cfg" KafkaKey contextKey = "__kafka" - LoggerKey contextKey = "__logger" MtrKey contextKey = "__mtr" ) + +const ( + DefaultLimit = 5_000 + DefaultPublishSize = 5_000 +) diff --git a/lib/kafkalib/batch.go b/lib/kafkalib/batch.go index dea18c94..1ba851c4 100644 --- a/lib/kafkalib/batch.go +++ b/lib/kafkalib/batch.go @@ -20,8 +20,8 @@ var ErrEmptyBatch = fmt.Errorf("batch is empty") type Batch struct { msgs []kafka.Message - chunkSize int - iteratorIdx int + chunkSize uint + iteratorIdx uint } func (b *Batch) IsValid() error { @@ -33,14 +33,10 @@ func (b *Batch) IsValid() error { return fmt.Errorf("chunk size is too small") } - if b.iteratorIdx < 0 { - return fmt.Errorf("iterator cannot be less than 0") - } - return nil } -func NewBatch(messages []kafka.Message, chunkSize int) *Batch { +func NewBatch(messages []kafka.Message, chunkSize uint) *Batch { return &Batch{ msgs: messages, chunkSize: chunkSize, @@ -48,7 +44,7 @@ func NewBatch(messages []kafka.Message, chunkSize int) *Batch { } func (b *Batch) HasNext() bool { - return len(b.msgs) > b.iteratorIdx + return uint(len(b.msgs)) > b.iteratorIdx } func (b *Batch) NextChunk() []kafka.Message { @@ -56,8 +52,8 @@ func (b *Batch) NextChunk() []kafka.Message { b.iteratorIdx += b.chunkSize end := b.iteratorIdx - if end > len(b.msgs) { - end = len(b.msgs) + if end > uint(len(b.msgs)) { + end = uint(len(b.msgs)) } if start > end { diff --git a/lib/kafkalib/batch_test.go b/lib/kafkalib/batch_test.go index 286f1e88..8205a76b 100644 --- a/lib/kafkalib/batch_test.go +++ b/lib/kafkalib/batch_test.go @@ -10,7 +10,7 @@ func TestBatch_IsValid(t *testing.T) { type _testCase struct { name string msgs []kafka.Message - chunkSize int + chunkSize uint expectError bool } @@ -31,15 +31,6 @@ func TestBatch_IsValid(t *testing.T) { }, expectError: true, }, - { - name: "happy path (chunkSize = -5)", - msgs: []kafka.Message{ - {Value: []byte("message1")}, - {Value: []byte("message2")}, - }, - chunkSize: -5, - expectError: true, - }, { name: "batch is empty", chunkSize: 2, diff --git a/lib/kafkalib/kafka.go b/lib/kafkalib/kafka.go index 05720fe2..b47a0327 100644 --- a/lib/kafkalib/kafka.go +++ b/lib/kafkalib/kafka.go @@ -48,7 +48,7 @@ func InjectIntoContext(ctx context.Context) context.Context { } if cfg.Kafka.MaxRequestSize > 0 { - writer.BatchBytes = cfg.Kafka.MaxRequestSize + writer.BatchBytes = int64(cfg.Kafka.MaxRequestSize) } if cfg.Kafka.AwsEnabled { diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 602b4784..2ab73257 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -23,7 +23,7 @@ type Store struct { tableName string streamArn string - batchSize int + batchSize uint streams *dynamodbstreams.DynamoDBStreams storage *offsets.OffsetStorage shardChan chan *dynamodbstreams.Shard @@ -49,7 +49,7 @@ func Load(ctx context.Context) *Store { store := &Store{ tableName: cfg.DynamoDB.TableName, streamArn: cfg.DynamoDB.StreamArn, - batchSize: cfg.Kafka.PublishSize, + batchSize: cfg.Kafka.GetPublishSize(), cfg: cfg.DynamoDB, }