Skip to content

Commit

Permalink
Merge branch 'master' into nv/upgrade-go
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Aug 19, 2024
2 parents 654d8fb + ac67568 commit fe1cfce
Show file tree
Hide file tree
Showing 20 changed files with 119 additions and 100 deletions.
7 changes: 7 additions & 0 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package config

import (
"cmp"
"fmt"
"github.com/artie-labs/reader/constants"

"github.com/artie-labs/transfer/lib/stringutil"

Expand Down Expand Up @@ -42,6 +44,11 @@ type SnapshotSettings struct {
// If the files are not specified, that's okay.
// We will scan the folder and then load into `specifiedFiles`
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"`
BatchSize int32 `yaml:"batchSize"`
}

func (s *SnapshotSettings) GetBatchSize() int32 {
return cmp.Or(s.BatchSize, constants.DefaultBatchSize)
}

func (s *SnapshotSettings) Validate() error {
Expand Down
4 changes: 4 additions & 0 deletions config/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type MongoDB struct {
Collections []Collection `yaml:"collections"`
StreamingSettings StreamingSettings `yaml:"streamingSettings,omitempty"`
DisableTLS bool `yaml:"disableTLS,omitempty"`

// DisableFullDocumentBeforeChange - This is relevant if you're connecting to Document DB.
// BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field.
DisableFullDocumentBeforeChange bool `yaml:"disableFullDocumentBeforeChange,omitempty"`
}

type Collection struct {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
24 changes: 13 additions & 11 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
}

var files []S3File
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
Bucket: bucket,
Prefix: prefix,
})

paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
Expand All @@ -69,13 +65,19 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
return files, nil
}

// StreamJsonGzipFile - will take a S3 File that is in `json.gz` format from DynamoDB's export to S3
// It's not a typical JSON file in that it is compressed and it's new line delimited via separated via an array
// Which means we can stream this file row by row to not OOM.
func (s *S3Client) StreamJsonGzipFile(ctx context.Context, file S3File, ch chan<- map[string]types.AttributeValue) error {
const maxBufferSize = 1024 * 1024 // 1 MB or adjust as needed

func (s *S3Client) StreamJsonGzipFiles(ctx context.Context, files []S3File, ch chan<- map[string]types.AttributeValue) error {
defer close(ch)
for _, file := range files {
if err := s.streamJsonGzipFile(ctx, file, ch); err != nil {
return fmt.Errorf("failed to read s3: %w", err)
}
}

return nil
}

func (s *S3Client) streamJsonGzipFile(ctx context.Context, file S3File, ch chan<- map[string]types.AttributeValue) error {
const maxBufferSize = 1024 * 1024
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: s.bucketName,
Key: file.Key,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func buildSource(ctx context.Context, cfg *config.Settings) (sources.Source, boo
case config.SourceDynamo:
source, isStreamingMode, err = dynamodb.Load(ctx, *cfg.DynamoDB)
case config.SourceMongoDB:
return mongo.Load(*cfg.MongoDB)
return mongo.Load(ctx, *cfg.MongoDB)
case config.SourceMSSQL:
source, err = mssql.Load(*cfg.MSSQL)
case config.SourceMySQL:
Expand Down
2 changes: 1 addition & 1 deletion sources/dynamodb/offsets/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/artie-labs/reader/lib/ttlmap"
"github.com/artie-labs/reader/lib/storage/ttlmap"
)

// ShardExpirationAndBuffer - Buffer for when a shard is closed as the records have a TTL of 24h. However, garbage collection is async.
Expand Down
86 changes: 25 additions & 61 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,21 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/iterator"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/writers"
)

type SnapshotStore struct {
tableName string
streamArn string
cfg *config.DynamoDB

tableName string
streamArn string
cfg *config.DynamoDB
s3Client *s3lib.S3Client
dynamoDBClient *dynamodb.Client
}
Expand All @@ -31,15 +28,33 @@ func (s *SnapshotStore) Close() error {
}

func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error {
start := time.Now()
if err := s.scanFilesOverBucket(ctx); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
}

if err := s.streamAndPublish(ctx, writer); err != nil {
return fmt.Errorf("stream and publish failed: %w", err)
keys, err := s.retrievePrimaryKeys(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}

ch := make(chan map[string]types.AttributeValue)
go func() {
if err = s.s3Client.StreamJsonGzipFiles(ctx, s.cfg.SnapshotSettings.SpecifiedFiles, ch); err != nil {
logger.Panic("Failed to read file", slog.Any("err", err))
}
}()

count, err := writer.Write(ctx, NewSnapshotIterator(ch, keys, s.tableName, s.cfg.SnapshotSettings.GetBatchSize()))
if err != nil {
return fmt.Errorf("failed to snapshot: %w", err)
}

slog.Info("Finished snapshotting all the files")
slog.Info("Finished snapshotting",
slog.String("tableName", s.tableName),
slog.Int("scannedTotal", count),
slog.Duration("totalDuration", time.Since(start)),
)
return nil
}

Expand All @@ -65,54 +80,3 @@ func (s *SnapshotStore) scanFilesOverBucket(ctx context.Context) error {
s.cfg.SnapshotSettings.SpecifiedFiles = files
return nil
}

func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Writer) error {
keys, err := s.retrievePrimaryKeys(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}

writer.SetRunOnComplete(false)
for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles {
logFields := []any{
slog.String("fileName", *file.Key),
}

slog.Info("Processing file...", logFields...)
ch := make(chan map[string]types.AttributeValue)
go func() {
if err := s.s3Client.StreamJsonGzipFile(ctx, file, ch); err != nil {
logger.Panic("Failed to read file", slog.Any("err", err))
}
}()

var messages []lib.RawMessage
for msg := range ch {
dynamoMsg, err := dynamo.NewMessageFromExport(msg, keys, s.tableName)
if err != nil {
return fmt.Errorf("failed to cast message from DynamoDB, msg: %v, err: %w", msg, err)
}

messages = append(messages, dynamoMsg.RawMessage())
// If there are more than 500k messages, we don't need to wait until the whole file is read.
// We can write what we have and continue reading the file. This is done to prevent OOM errors.
if len(messages) > 500_000 {
if _, err = writer.Write(ctx, iterator.Once(messages)); err != nil {
return fmt.Errorf("failed to write messages: %w", err)
}

// Clear messages
messages = []lib.RawMessage{}
}
}

// TODO: Create an actual iterator over the files that is passed to the writer.
if _, err = writer.Write(ctx, iterator.Once(messages)); err != nil {
return fmt.Errorf("failed to write messages: %w", err)
}

slog.Info("Successfully processed file...", logFields...)
}

return writer.OnComplete()
}
47 changes: 47 additions & 0 deletions sources/dynamodb/snapshot_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dynamodb

import (
"fmt"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"
)

type SnapshotIterator struct {
ch chan map[string]types.AttributeValue
keys []string
tableName string
batchSize int32
done bool
}

func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *SnapshotIterator {
return &SnapshotIterator{
ch: ch,
keys: keys,
tableName: tblName,
batchSize: batchSize,
}
}

func (s *SnapshotIterator) HasNext() bool {
return !s.done
}

func (s *SnapshotIterator) Next() ([]lib.RawMessage, error) {
var msgs []lib.RawMessage
for msg := range s.ch {
dynamoMsg, err := dynamo.NewMessageFromExport(msg, s.keys, s.tableName)
if err != nil {
return nil, fmt.Errorf("failed to cast message from DynamoDB, msg: %v, err: %w", msg, err)
}

msgs = append(msgs, dynamoMsg.RawMessage())
if int32(len(msgs)) >= s.batchSize {
return msgs, nil
}
}

s.done = true
return msgs, nil
}
9 changes: 2 additions & 7 deletions sources/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type Source struct {
db *mongo.Database
}

func Load(cfg config.MongoDB) (*Source, bool, error) {
ctx := context.Background()
func Load(ctx context.Context, cfg config.MongoDB) (*Source, bool, error) {
client, err := mongo.Connect(ctx, mongoLib.OptsFromConfig(cfg))
if err != nil {
return nil, false, fmt.Errorf("failed to connect to MongoDB: %w", err)
Expand All @@ -30,11 +29,7 @@ func Load(cfg config.MongoDB) (*Source, bool, error) {
return nil, false, fmt.Errorf("failed to ping MongoDB: %w", err)
}

db := client.Database(cfg.Database)
return &Source{
cfg: cfg,
db: db,
}, cfg.StreamingSettings.Enabled, nil
return &Source{cfg: cfg, db: client.Database(cfg.Database)}, cfg.StreamingSettings.Enabled, nil
}

func (s *Source) Close() error {
Expand Down
11 changes: 7 additions & 4 deletions sources/mongo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package mongo
import (
"context"
"fmt"
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
mongoLib "github.com/artie-labs/reader/lib/mongo"
)

type snapshotIterator struct {
Expand Down Expand Up @@ -79,7 +82,7 @@ func (s *snapshotIterator) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to decode document: %w", err)
}

mgoMsg, err := ParseMessage(result, nil, "r")
mgoMsg, err := mongoLib.ParseMessage(result, nil, "r")
if err != nil {
return nil, fmt.Errorf("failed to parse message: %w", err)
}
Expand Down
12 changes: 8 additions & 4 deletions sources/mongo/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
"github.com/artie-labs/reader/lib/persistedmap"
mongoLib "github.com/artie-labs/reader/lib/mongo"
"github.com/artie-labs/reader/lib/storage/persistedmap"
)

const offsetKey = "offset"
Expand Down Expand Up @@ -47,9 +48,12 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo
opts := options.ChangeStream().
// Setting `updateLookup` will emit the whole document for updates
// Ref: https://www.mongodb.com/docs/manual/reference/change-events/update/#description
SetFullDocument(options.UpdateLookup).
SetFullDocument(options.UpdateLookup)

if !cfg.DisableFullDocumentBeforeChange {
// FullDocumentBeforeChange will kick in if the db + collection enabled `changeStreamPreAndPostImages`
SetFullDocumentBeforeChange(options.WhenAvailable)
opts = opts.SetFullDocumentBeforeChange(options.WhenAvailable)
}

storage := persistedmap.NewPersistedMap(filePath)
if encodedResumeToken, exists := storage.Get(offsetKey); exists {
Expand Down Expand Up @@ -105,7 +109,7 @@ func (s *streaming) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to decode change event: %w", err)
}

changeEvent, err := NewChangeEvent(rawChangeEvent)
changeEvent, err := mongoLib.NewChangeEvent(rawChangeEvent)
if err != nil {
return nil, fmt.Errorf("failed to parse change event: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion writers/transfer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/sources/mongo"
"github.com/artie-labs/reader/lib/mongo"
)

func TestWriter_MessageToEvent(t *testing.T) {
Expand Down
13 changes: 3 additions & 10 deletions writers/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ type DestinationWriter interface {
type Writer struct {
destinationWriter DestinationWriter
logProgress bool
runOnComplete bool
}

func (w *Writer) SetRunOnComplete(runOnComplete bool) {
w.runOnComplete = runOnComplete
}

func New(destinationWriter DestinationWriter, logProgress bool) Writer {
return Writer{destinationWriter: destinationWriter, logProgress: logProgress, runOnComplete: true}
return Writer{destinationWriter: destinationWriter, logProgress: logProgress}
}

// Write writes all the messages from an iterator to the destination.
Expand Down Expand Up @@ -60,10 +55,8 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess
}
}

if w.runOnComplete {
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}

return count, nil
Expand Down

0 comments on commit fe1cfce

Please sign in to comment.