Skip to content

Commit

Permalink
[Config] Standing up the PostgreSQL config (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jan 24, 2024
1 parent 933fa28 commit c6a9671
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 36 deletions.
47 changes: 35 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ type Kafka struct {
BootstrapServers string `yaml:"bootstrapServers"`
TopicPrefix string `yaml:"topicPrefix"`
AwsEnabled bool `yaml:"awsEnabled"`
PublishSize int `yaml:"publishSize,omitempty"`
MaxRequestSize int64 `yaml:"maxRequestSize,omitempty"`
PublishSize uint `yaml:"publishSize,omitempty"`
MaxRequestSize uint64 `yaml:"maxRequestSize,omitempty"`
}

func (k *Kafka) GenerateDefault() {
func (k *Kafka) GetPublishSize() uint {
if k.PublishSize == 0 {
k.PublishSize = 2500
return constants.DefaultPublishSize
}

return k.PublishSize
}

func (k *Kafka) Validate() error {
Expand Down Expand Up @@ -54,8 +56,18 @@ type Metrics struct {
Tags []string `yaml:"tags"`
}

type Source string

const (
SourceDynamo Source = "dynamodb"
SourcePostgreSQL Source = "postgresql"
)

type Settings struct {
DynamoDB *DynamoDB `yaml:"dynamodb"`
Source Source `yaml:"source"`
PostgreSQL *PostgreSQL `yaml:"postgresql"`
DynamoDB *DynamoDB `yaml:"dynamodb"`

Reporting *Reporting `yaml:"reporting"`
Metrics *Metrics `yaml:"metrics"`
Kafka *Kafka `yaml:"kafka"`
Expand All @@ -74,12 +86,24 @@ func (s *Settings) Validate() error {
return fmt.Errorf("kafka validation failed: %v", err)
}

if s.DynamoDB == nil {
return fmt.Errorf("dynamodb config is nil")
}

if err := s.DynamoDB.Validate(); err != nil {
return fmt.Errorf("dynamodb validation failed: %v", err)
switch s.Source {
// By default, if you don't pass in a source -- it will be dynamodb for backwards compatibility
case SourceDynamo, "":
if s.DynamoDB == nil {
return fmt.Errorf("dynamodb config is nil")
}

if err := s.DynamoDB.Validate(); err != nil {
return fmt.Errorf("dynamodb validation failed: %v", err)
}
case SourcePostgreSQL:
if s.PostgreSQL == nil {
return fmt.Errorf("postgres config is nil")
}

if err := s.PostgreSQL.Validate(); err != nil {
return fmt.Errorf("postgres validation failed: %v", err)
}
}

return nil
Expand All @@ -101,7 +125,6 @@ func ReadConfig(fp string) (*Settings, error) {
log.Fatalf("Failed to validate config file, err: %v", err)
}

settings.Kafka.GenerateDefault()
return &settings, nil
}

Expand Down
58 changes: 58 additions & 0 deletions config/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package config

import (
"fmt"
"github.com/artie-labs/reader/constants"
"github.com/artie-labs/transfer/lib/stringutil"
)

type PostgreSQL struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
Username string `yaml:"userName"`
Password string `yaml:"password"`
Database string `yaml:"database"`
Tables []*PostgreSQLTable `yaml:"tables"`
}

type PostgreSQLTable struct {
Name string `yaml:"name"`
Schema string `yaml:"schema"`
Limit uint `yaml:"limit"`
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd"`
}

func (p *PostgreSQLTable) GetLimit() uint {
if p.Limit == 0 {
return constants.DefaultLimit
}

return p.Limit
}

func (p *PostgreSQL) Validate() error {
if p == nil {
return fmt.Errorf("postgres config is nil")
}

if stringutil.Empty(p.Host, p.Port, p.Username, p.Password, p.Database) {
return fmt.Errorf("one of the postgresql settings is empty: host, port, username, password, database")
}

if len(p.Tables) == 0 {
return fmt.Errorf("no tables passed in")
}

for _, table := range p.Tables {
if table.Name == "" {
return fmt.Errorf("table name must be passed in")
}

if table.Schema == "" {
return fmt.Errorf("schema must be passed in")
}
}

return nil
}
82 changes: 82 additions & 0 deletions config/postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package config

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestPostgresValidate(t *testing.T) {
{
// Config is empty
var p *PostgreSQL
assert.ErrorContains(t, p.Validate(), "postgres config is nil")
}
{
// Host, port, username, password, database are empty
p := &PostgreSQL{}
assert.ErrorContains(t, p.Validate(), "one of the postgresql settings is empty: host, port, username, password, database")
}
{
// Tables are empty
p := &PostgreSQL{
Host: "host",
Port: "port",
Username: "username",
Password: "password",
Database: "database",
}

assert.ErrorContains(t, p.Validate(), "no tables passed in")
}
{
// No table name
p := &PostgreSQL{
Host: "host",
Port: "port",
Username: "username",
Password: "password",
Database: "database",
Tables: []*PostgreSQLTable{
{
Schema: "schema",
},
},
}

assert.ErrorContains(t, p.Validate(), "table name must be passed in")
}
{
// No schema name
p := &PostgreSQL{
Host: "host",
Port: "port",
Username: "username",
Password: "password",
Database: "database",
Tables: []*PostgreSQLTable{
{
Name: "name",
},
},
}

assert.ErrorContains(t, p.Validate(), "schema must be passed in")
}
{
// Valid
p := &PostgreSQL{
Host: "host",
Port: "port",
Username: "username",
Password: "password",
Database: "database",
Tables: []*PostgreSQLTable{
{
Name: "name",
Schema: "schema",
},
},
}
assert.NoError(t, p.Validate())
}
}
6 changes: 5 additions & 1 deletion constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ type contextKey string
const (
ConfigKey contextKey = "__cfg"
KafkaKey contextKey = "__kafka"
LoggerKey contextKey = "__logger"
MtrKey contextKey = "__mtr"
)

const (
DefaultLimit = 5_000
DefaultPublishSize = 5_000
)
16 changes: 6 additions & 10 deletions lib/kafkalib/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var ErrEmptyBatch = fmt.Errorf("batch is empty")

type Batch struct {
msgs []kafka.Message
chunkSize int
iteratorIdx int
chunkSize uint
iteratorIdx uint
}

func (b *Batch) IsValid() error {
Expand All @@ -33,31 +33,27 @@ func (b *Batch) IsValid() error {
return fmt.Errorf("chunk size is too small")
}

if b.iteratorIdx < 0 {
return fmt.Errorf("iterator cannot be less than 0")
}

return nil
}

func NewBatch(messages []kafka.Message, chunkSize int) *Batch {
func NewBatch(messages []kafka.Message, chunkSize uint) *Batch {
return &Batch{
msgs: messages,
chunkSize: chunkSize,
}
}

func (b *Batch) HasNext() bool {
return len(b.msgs) > b.iteratorIdx
return uint(len(b.msgs)) > b.iteratorIdx
}

func (b *Batch) NextChunk() []kafka.Message {
start := b.iteratorIdx
b.iteratorIdx += b.chunkSize
end := b.iteratorIdx

if end > len(b.msgs) {
end = len(b.msgs)
if end > uint(len(b.msgs)) {
end = uint(len(b.msgs))
}

if start > end {
Expand Down
11 changes: 1 addition & 10 deletions lib/kafkalib/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestBatch_IsValid(t *testing.T) {
type _testCase struct {
name string
msgs []kafka.Message
chunkSize int
chunkSize uint
expectError bool
}

Expand All @@ -31,15 +31,6 @@ func TestBatch_IsValid(t *testing.T) {
},
expectError: true,
},
{
name: "happy path (chunkSize = -5)",
msgs: []kafka.Message{
{Value: []byte("message1")},
{Value: []byte("message2")},
},
chunkSize: -5,
expectError: true,
},
{
name: "batch is empty",
chunkSize: 2,
Expand Down
2 changes: 1 addition & 1 deletion lib/kafkalib/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func InjectIntoContext(ctx context.Context) context.Context {
}

if cfg.Kafka.MaxRequestSize > 0 {
writer.BatchBytes = cfg.Kafka.MaxRequestSize
writer.BatchBytes = int64(cfg.Kafka.MaxRequestSize)
}

if cfg.Kafka.AwsEnabled {
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Store struct {

tableName string
streamArn string
batchSize int
batchSize uint
streams *dynamodbstreams.DynamoDBStreams
storage *offsets.OffsetStorage
shardChan chan *dynamodbstreams.Shard
Expand All @@ -49,7 +49,7 @@ func Load(ctx context.Context) *Store {
store := &Store{
tableName: cfg.DynamoDB.TableName,
streamArn: cfg.DynamoDB.StreamArn,
batchSize: cfg.Kafka.PublishSize,
batchSize: cfg.Kafka.GetPublishSize(),
cfg: cfg.DynamoDB,
}

Expand Down

0 comments on commit c6a9671

Please sign in to comment.