Skip to content

Commit

Permalink
Pull Kafka writer out of DynamoDB Source (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Feb 12, 2024
1 parent 0da275b commit 1a90792
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func main() {

switch cfg.Source {
case "", config.SourceDynamo:
ddb, err := dynamodb.Load(*cfg.DynamoDB, *writer)
ddb, err := dynamodb.Load(*cfg.DynamoDB)
if err != nil {
logger.Fatal("Failed to load dynamodb", slog.Any("err", err))
}
if err = ddb.Run(ctx); err != nil {
if err = ddb.Run(ctx, *writer); err != nil {
logger.Fatal("Failed to run dynamodb snapshot", slog.Any("err", err))
}
case config.SourcePostgreSQL:
Expand Down
10 changes: 4 additions & 6 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
type Store struct {
s3Client *s3lib.S3Client
dynamoDBClient *dynamodb.DynamoDB
writer kafkalib.BatchWriter

tableName string
streamArn string
Expand All @@ -37,7 +36,7 @@ type Store struct {
const jitterSleepBaseMs = 50
const shardScannerInterval = 5 * time.Minute

func Load(cfg config.DynamoDB, writer kafkalib.BatchWriter) (*Store, error) {
func Load(cfg config.DynamoDB) (*Store, error) {
sess, err := session.NewSession(&aws.Config{
Region: ptr.ToString(cfg.AwsRegion),
Credentials: credentials.NewStaticCredentials(cfg.AwsAccessKeyID, cfg.AwsSecretAccessKey, ""),
Expand All @@ -50,7 +49,6 @@ func Load(cfg config.DynamoDB, writer kafkalib.BatchWriter) (*Store, error) {
tableName: cfg.TableName,
streamArn: cfg.StreamArn,
cfg: &cfg,
writer: writer,
}

if cfg.Snapshot {
Expand All @@ -67,13 +65,13 @@ func Load(cfg config.DynamoDB, writer kafkalib.BatchWriter) (*Store, error) {
return store, nil
}

func (s *Store) Run(ctx context.Context) error {
func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
if s.cfg.Snapshot {
if err := s.scanFilesOverBucket(); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
}

if err := s.streamAndPublish(ctx); err != nil {
if err := s.streamAndPublish(ctx, writer); err != nil {
return fmt.Errorf("stream and publish failed: %w", err)
}

Expand All @@ -82,7 +80,7 @@ func (s *Store) Run(ctx context.Context) error {
ticker := time.NewTicker(shardScannerInterval)

// Start to subscribe to the channel
go s.ListenToChannel(ctx)
go s.ListenToChannel(ctx, writer)

// Scan it for the first time manually, so we don't have to wait 5 mins
if err := s.scanForNewShards(); err != nil {
Expand Down
9 changes: 5 additions & 4 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
)

func (s *Store) ListenToChannel(ctx context.Context) {
func (s *Store) ListenToChannel(ctx context.Context, writer kafkalib.BatchWriter) {
for shard := range s.shardChan {
go s.processShard(ctx, shard)
go s.processShard(ctx, shard, writer)
}
}

func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) {
func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer kafkalib.BatchWriter) {
var attempts int

// Is there another go-routine processing this shard?
Expand Down Expand Up @@ -96,7 +97,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)
messages = append(messages, msg.RawMessage())
}

if err = s.writer.WriteRawMessages(ctx, messages); err != nil {
if err = writer.WriteRawMessages(ctx, messages); err != nil {
logger.Panic("Failed to publish messages, exiting...", slog.Any("err", err))
}

Expand Down
5 changes: 3 additions & 2 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
)

Expand All @@ -35,7 +36,7 @@ func (s *Store) scanFilesOverBucket() error {
return nil
}

func (s *Store) streamAndPublish(ctx context.Context) error {
func (s *Store) streamAndPublish(ctx context.Context, writer kafkalib.BatchWriter) error {
keys, err := s.retrievePrimaryKeys()
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
Expand Down Expand Up @@ -63,7 +64,7 @@ func (s *Store) streamAndPublish(ctx context.Context) error {
messages = append(messages, dynamoMsg.RawMessage())
}

if err = s.writer.WriteRawMessages(ctx, messages); err != nil {
if err = writer.WriteRawMessages(ctx, messages); err != nil {
return fmt.Errorf("failed to publish messages: %w", err)
}

Expand Down

0 comments on commit 1a90792

Please sign in to comment.