Skip to content

Commit

Permalink
Pull context out of dynamo.KafkaMessage (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Jan 24, 2024
1 parent e1caa9a commit d8bb83d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 25 deletions.
25 changes: 9 additions & 16 deletions lib/dynamo/message.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package dynamo

import (
"context"
"encoding/json"
"fmt"
"github.com/artie-labs/reader/config"
"strconv"
"time"

"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/segmentio/kafka-go"
"strconv"
"time"

"github.com/artie-labs/reader/config"
)

type Message struct {
Expand Down Expand Up @@ -99,16 +100,11 @@ func (m *Message) artieMessage() (util.SchemaEventPayload, error) {
}, nil
}

func (m *Message) TopicName(ctx context.Context) (string, error) {
cfg := config.FromContext(ctx)
if cfg.Kafka == nil {
return "", fmt.Errorf("kafka config is nil")
}

return fmt.Sprintf("%s.%s", cfg.Kafka.TopicPrefix, m.tableName), nil
func (m *Message) TopicName(kafkaCfg config.Kafka) string {
return fmt.Sprintf("%s.%s", kafkaCfg.TopicPrefix, m.tableName)
}

func (m *Message) KafkaMessage(ctx context.Context) (kafka.Message, error) {
func (m *Message) KafkaMessage(kafkaCfg config.Kafka) (kafka.Message, error) {
msg, err := m.artieMessage()
if err != nil {
return kafka.Message{}, fmt.Errorf("failed to generate artie message, err: %v", err)
Expand All @@ -124,10 +120,7 @@ func (m *Message) KafkaMessage(ctx context.Context) (kafka.Message, error) {
return kafka.Message{}, err
}

topic, err := m.TopicName(ctx)
if err != nil {
return kafka.Message{}, err
}
topic := m.TopicName(kafkaCfg)

return kafka.Message{
Topic: topic,
Expand Down
15 changes: 8 additions & 7 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"log/slog"
"time"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
)

type Store struct {
Expand Down Expand Up @@ -84,7 +85,7 @@ func (s *Store) Run(ctx context.Context) {
go s.ListenToChannel(ctx)

// Scan it for the first time manually, so we don't have to wait 5 mins
s.scanForNewShards(ctx)
s.scanForNewShards()
for {
select {
case <-ctx.Done():
Expand All @@ -93,13 +94,13 @@ func (s *Store) Run(ctx context.Context) {
return
case <-ticker.C:
slog.Info("Scanning for new shards...")
s.scanForNewShards(ctx)
s.scanForNewShards()
}
}
}
}

func (s *Store) scanForNewShards(ctx context.Context) {
func (s *Store) scanForNewShards() {
var exclusiveStartShardId *string
for {
input := &dynamodbstreams.DescribeStreamInput{
Expand Down
8 changes: 7 additions & 1 deletion sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"time"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
Expand Down Expand Up @@ -64,6 +65,11 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)
return
}

kafkaCfg := config.FromContext(ctx).Kafka
if kafkaCfg == nil {
logger.Fatal("Kafka config is nil")
}

shardIterator := iteratorOutput.ShardIterator
// Get records from shard iterator
for shardIterator != nil {
Expand Down Expand Up @@ -94,7 +100,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)
)
}

message, err := msg.KafkaMessage(ctx)
message, err := msg.KafkaMessage(*kafkaCfg)
if err != nil {
logger.Fatal("Failed to cast message from DynamoDB",
slog.Any("err", err),
Expand Down
8 changes: 7 additions & 1 deletion sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
Expand Down Expand Up @@ -41,6 +42,11 @@ func (s *Store) streamAndPublish(ctx context.Context) error {
return fmt.Errorf("failed to retrieve primary keys, err: %v", err)
}

kafkaCfg := config.FromContext(ctx).Kafka
if kafkaCfg == nil {
return fmt.Errorf("kafka config is nil")
}

for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles {
logFields := []any{
slog.String("fileName", *file.Key),
Expand All @@ -61,7 +67,7 @@ func (s *Store) streamAndPublish(ctx context.Context) error {
logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err), slog.Any("msg", msg))
}

kafkaMsg, err := dynamoMsg.KafkaMessage(ctx)
kafkaMsg, err := dynamoMsg.KafkaMessage(*kafkaCfg)
if err != nil {
logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err))
}
Expand Down

0 comments on commit d8bb83d

Please sign in to comment.