Skip to content

Commit

Permalink
Pull Fatal calls out of Run methods (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Jan 27, 2024
1 parent 895215c commit 96ba1e0
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ func main() {
if err = ddb.Validate(); err != nil {
logger.Fatal("Failed to validate dynamodb", slog.Any("err", err))
}
ddb.Run(ctx)
if err = ddb.Run(ctx); err != nil {
logger.Fatal("Failed to run dynamodb snapshot", slog.Any("err", err))
}
case config.SourcePostgreSQL:
postgres.Run(ctx, *cfg, statsD, _kafka)
if err = postgres.Run(ctx, *cfg, statsD, _kafka); err != nil {
logger.Fatal("Failed to run postgres snapshot", slog.Any("err", err))
}
}
}
9 changes: 5 additions & 4 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func (s *Store) Validate() error {
return nil
}

func (s *Store) Run(ctx context.Context) {
func (s *Store) Run(ctx context.Context) error {
if s.cfg.Snapshot {
if err := s.scanFilesOverBucket(); err != nil {
logger.Fatal("Scanning files over bucket failed", slog.Any("err", err))
return fmt.Errorf("scanning files over bucket failed, err: %w", err)
}

if err := s.streamAndPublish(ctx); err != nil {
logger.Fatal("Stream and publish failed", slog.Any("err", err))
return fmt.Errorf("stream and publish failed, err: %w", err)
}

slog.Info("Finished snapshotting all the files")
Expand All @@ -110,13 +110,14 @@ func (s *Store) Run(ctx context.Context) {
case <-ctx.Done():
close(s.shardChan)
slog.Info("Terminating process...")
return
return nil
case <-ticker.C:
slog.Info("Scanning for new shards...")
s.scanForNewShards()
}
}
}
return nil
}

func (s *Store) scanForNewShards() {
Expand Down
15 changes: 9 additions & 6 deletions sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgres
import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"

Expand All @@ -11,35 +12,35 @@ import (

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/reader/lib/postgres"
)

func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWriter *kafka.Writer) {
func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWriter *kafka.Writer) error {
batchWriter := kafkalib.NewBatchWriter(ctx, *cfg.Kafka, kafkaWriter)

db, err := sql.Open("postgres", postgres.NewConnection(cfg.PostgreSQL).String())
if err != nil {
logger.Fatal("Failed to connect to postgres", slog.Any("err", err))
return fmt.Errorf("failed to connect to postgres, err: %w", err)
}
defer db.Close()

for _, table := range cfg.PostgreSQL.Tables {
snapshotStartTime := time.Now()
iter, err := postgres.LoadTable(db, table, statsD, cfg.Kafka.MaxRequestSize)
if err != nil {
logger.Fatal("Failed to create table iterator", slog.Any("err", err), slog.String("table", table.Name))
return fmt.Errorf("failed to create table iterator, table: %s, err: %w", table.Name, err)
}

var count int
for iter.HasNext() {
msgs, err := iter.Next()
if err != nil {
logger.Fatal("Failed to iterate over table", slog.Any("err", err), slog.String("table", table.Name))
return fmt.Errorf("failed to iterate over table, table: %s, err: %w", table.Name, err)

} else if len(msgs) > 0 {
if err = batchWriter.Write(msgs); err != nil {
logger.Fatal("Failed to write messages to kafka", slog.Any("err", err), slog.String("table", table.Name))
return fmt.Errorf("failed to write messages to kafka, table: %s, err: %w", table.Name, err)
}
count += len(msgs)
slog.Info("Scanning progress", slog.Duration("timing", time.Since(snapshotStartTime)), slog.Int("count", count))
Expand All @@ -51,4 +52,6 @@ func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWrit
slog.Duration("totalDuration", time.Since(snapshotStartTime)),
)
}

return nil
}

0 comments on commit 96ba1e0

Please sign in to comment.