Skip to content

Commit

Permalink
Add sources.Source interface (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Feb 12, 2024
1 parent 1a90792 commit 6642af6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 19 deletions.
38 changes: 20 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/reader/sources"
"github.com/artie-labs/reader/sources/dynamodb"
"github.com/artie-labs/reader/sources/postgres"
)
Expand Down Expand Up @@ -43,6 +44,16 @@ func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD *mtr.Client) (*ka
return kafkalib.NewBatchWriter(ctx, *cfg, statsD)
}

func buildSource(cfg *config.Settings) (sources.Source, error) {
switch cfg.Source {
case "", config.SourceDynamo:
return dynamodb.Load(*cfg.DynamoDB)
case config.SourcePostgreSQL:
return postgres.Load(*cfg.PostgreSQL, cfg.Kafka.MaxRequestSize)
}
panic(fmt.Sprintf("Unknown source: %s", cfg.Source)) // should never happen
}

func main() {
var configFilePath string
flag.StringVar(&configFilePath, "config", "", "path to config file")
Expand Down Expand Up @@ -72,23 +83,14 @@ func main() {
logger.Fatal("Failed to set up kafka", slog.Any("err", err))
}

switch cfg.Source {
case "", config.SourceDynamo:
ddb, err := dynamodb.Load(*cfg.DynamoDB)
if err != nil {
logger.Fatal("Failed to load dynamodb", slog.Any("err", err))
}
if err = ddb.Run(ctx, *writer); err != nil {
logger.Fatal("Failed to run dynamodb snapshot", slog.Any("err", err))
}
case config.SourcePostgreSQL:
pg, err := postgres.Load(*cfg.PostgreSQL, cfg.Kafka.MaxRequestSize)
if err != nil {
logger.Fatal("Failed to load PostgreSQL", slog.Any("err", err))
}
defer pg.Close()
if err = pg.Run(ctx, *writer, statsD); err != nil {
logger.Fatal("Failed to run postgres snapshot", slog.Any("err", err))
}
source, err := buildSource(cfg)
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to init %s", cfg.Source), slog.Any("err", err))
}
defer source.Close()

err = source.Run(ctx, *writer, statsD)
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to run %s snapshot", cfg.Source), slog.Any("err", err))
}
}
7 changes: 6 additions & 1 deletion sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
)
Expand Down Expand Up @@ -65,7 +66,11 @@ func Load(cfg config.DynamoDB) (*Store, error) {
return store, nil
}

func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *Store) Close() error {
return nil
}

func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter, _ *mtr.Client) error {
if s.cfg.Snapshot {
if err := s.scanFilesOverBucket(); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
Expand Down
13 changes: 13 additions & 0 deletions sources/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sources

import (
"context"

"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/mtr"
)

type Source interface {
Close() error
Run(ctx context.Context, writer kafkalib.BatchWriter, statsD *mtr.Client) error
}

0 comments on commit 6642af6

Please sign in to comment.