From 9897b721733a61cc437cd6149c402766dc41c8e7 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 15:04:49 -0700 Subject: [PATCH] Imports. --- sources/dynamodb/dynamodb.go | 8 +++++--- sources/dynamodb/snapshot/snapshot.go | 4 ++-- sources/dynamodb/snapshot/snapshot_iterator.go | 10 +++++----- sources/dynamodb/stream/stream.go | 7 ++++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index b41066d8..5c643c38 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -3,13 +3,15 @@ package dynamodb import ( "context" "fmt" + + "github.com/aws/aws-sdk-go-v2/aws/arn" + awsCfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/sources" "github.com/artie-labs/reader/sources/dynamodb/snapshot" "github.com/artie-labs/reader/sources/dynamodb/stream" - "github.com/aws/aws-sdk-go-v2/aws/arn" - awsCfg "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" ) func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error) { diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index ade9365e..72b3ae60 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -3,15 +3,15 @@ package snapshot import ( "context" "fmt" - "github.com/artie-labs/reader/lib/dynamo" - "github.com/aws/aws-sdk-go-v2/aws" "log/slog" "time" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/lib/dynamo" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/reader/lib/s3lib" "github.com/artie-labs/reader/writers" diff --git a/sources/dynamodb/snapshot/snapshot_iterator.go b/sources/dynamodb/snapshot/snapshot_iterator.go index a29f4723..0b0e061a 100644 --- a/sources/dynamodb/snapshot/snapshot_iterator.go +++ b/sources/dynamodb/snapshot/snapshot_iterator.go @@ -7,7 +7,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" ) -type SnapshotIterator struct { +type Iterator struct { ch chan map[string]types.AttributeValue keys []string tableName string @@ -15,8 +15,8 @@ type SnapshotIterator struct { done bool } -func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *SnapshotIterator { - return &SnapshotIterator{ +func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *Iterator { + return &Iterator{ ch: ch, keys: keys, tableName: tblName, @@ -24,11 +24,11 @@ func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, } } -func (s *SnapshotIterator) HasNext() bool { +func (s *Iterator) HasNext() bool { return !s.done } -func (s *SnapshotIterator) Next() ([]lib.RawMessage, error) { +func (s *Iterator) Next() ([]lib.RawMessage, error) { var msgs []lib.RawMessage for msg := range s.ch { dynamoMsg, err := dynamo.NewMessageFromExport(msg, s.keys, s.tableName) diff --git a/sources/dynamodb/stream/stream.go b/sources/dynamodb/stream/stream.go index 3fdd5f83..0aee6d99 100644 --- a/sources/dynamodb/stream/stream.go +++ b/sources/dynamodb/stream/stream.go @@ -6,12 +6,13 @@ import ( "log/slog" "time" - "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/sources/dynamodb/offsets" - "github.com/artie-labs/reader/writers" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" + + "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/sources/dynamodb/offsets" + "github.com/artie-labs/reader/writers" ) const (