From 5b89d5e1a5e8dfb38dc8363108b4a0e6f0da1bd9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 16 Aug 2024 18:03:09 -0700 Subject: [PATCH 1/4] Support disabling `SetFullDocumentBeforeChange` (#469) --- config/mongodb.go | 4 ++++ sources/mongo/streaming.go | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/config/mongodb.go b/config/mongodb.go index 96a758db..91a87ea1 100644 --- a/config/mongodb.go +++ b/config/mongodb.go @@ -34,6 +34,10 @@ type MongoDB struct { Collections []Collection `yaml:"collections"` StreamingSettings StreamingSettings `yaml:"streamingSettings,omitempty"` DisableTLS bool `yaml:"disableTLS,omitempty"` + + // DisableFullDocumentBeforeChange - This is relevant if you're connecting to Document DB. + // BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field. + DisableFullDocumentBeforeChange bool `yaml:"disableFullDocumentBeforeChange,omitempty"` } type Collection struct { diff --git a/sources/mongo/streaming.go b/sources/mongo/streaming.go index 1a8dc453..deb9834f 100644 --- a/sources/mongo/streaming.go +++ b/sources/mongo/streaming.go @@ -47,9 +47,12 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo opts := options.ChangeStream(). // Setting `updateLookup` will emit the whole document for updates // Ref: https://www.mongodb.com/docs/manual/reference/change-events/update/#description - SetFullDocument(options.UpdateLookup). + SetFullDocument(options.UpdateLookup) + + if !cfg.DisableFullDocumentBeforeChange { // FullDocumentBeforeChange will kick in if the db + collection enabled `changeStreamPreAndPostImages` - SetFullDocumentBeforeChange(options.WhenAvailable) + opts = opts.SetFullDocumentBeforeChange(options.WhenAvailable) + } storage := persistedmap.NewPersistedMap(filePath) if encodedResumeToken, exists := storage.Get(offsetKey); exists { From 815a5165ee6ff971da5e7e8b606534a4a3b885ea Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 18 Aug 2024 22:36:34 -0700 Subject: [PATCH 2/4] Moving TTLMap and PersistedMap into `/lib/storage` (#472) --- lib/{ => storage}/persistedmap/persistedmap.go | 0 lib/{ => storage}/persistedmap/persistedmap_test.go | 0 lib/{ => storage}/ttlmap/ttlmap.go | 0 lib/{ => storage}/ttlmap/ttlmap_test.go | 0 sources/dynamodb/offsets/offsets.go | 2 +- sources/mongo/streaming.go | 2 +- 6 files changed, 2 insertions(+), 2 deletions(-) rename lib/{ => storage}/persistedmap/persistedmap.go (100%) rename lib/{ => storage}/persistedmap/persistedmap_test.go (100%) rename lib/{ => storage}/ttlmap/ttlmap.go (100%) rename lib/{ => storage}/ttlmap/ttlmap_test.go (100%) diff --git a/lib/persistedmap/persistedmap.go b/lib/storage/persistedmap/persistedmap.go similarity index 100% rename from lib/persistedmap/persistedmap.go rename to lib/storage/persistedmap/persistedmap.go diff --git a/lib/persistedmap/persistedmap_test.go b/lib/storage/persistedmap/persistedmap_test.go similarity index 100% rename from lib/persistedmap/persistedmap_test.go rename to lib/storage/persistedmap/persistedmap_test.go diff --git a/lib/ttlmap/ttlmap.go b/lib/storage/ttlmap/ttlmap.go similarity index 100% rename from lib/ttlmap/ttlmap.go rename to lib/storage/ttlmap/ttlmap.go diff --git a/lib/ttlmap/ttlmap_test.go b/lib/storage/ttlmap/ttlmap_test.go similarity index 100% rename from lib/ttlmap/ttlmap_test.go rename to lib/storage/ttlmap/ttlmap_test.go diff --git a/sources/dynamodb/offsets/offsets.go b/sources/dynamodb/offsets/offsets.go index 721a38f9..7aef91c1 100644 --- a/sources/dynamodb/offsets/offsets.go +++ b/sources/dynamodb/offsets/offsets.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/artie-labs/reader/lib/ttlmap" + "github.com/artie-labs/reader/lib/storage/ttlmap" ) // ShardExpirationAndBuffer - Buffer for when a shard is closed as the records have a TTL of 24h. However, garbage collection is async. diff --git a/sources/mongo/streaming.go b/sources/mongo/streaming.go index deb9834f..32b8fe03 100644 --- a/sources/mongo/streaming.go +++ b/sources/mongo/streaming.go @@ -14,7 +14,7 @@ import ( "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/iterator" - "github.com/artie-labs/reader/lib/persistedmap" + "github.com/artie-labs/reader/lib/storage/persistedmap" ) const offsetKey = "offset" From d71d8a5ae87e4585a775df32e0cb0336a8471af0 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 19 Aug 2024 08:01:54 -0700 Subject: [PATCH 3/4] Refactoring DDB snapshot (#470) --- config/dynamodb.go | 7 +++ lib/s3lib/s3lib.go | 24 ++++---- sources/dynamodb/snapshot.go | 86 ++++++++------------------- sources/dynamodb/snapshot_iterator.go | 47 +++++++++++++++ writers/writer.go | 13 +--- 5 files changed, 95 insertions(+), 82 deletions(-) create mode 100644 sources/dynamodb/snapshot_iterator.go diff --git a/config/dynamodb.go b/config/dynamodb.go index 64ce1668..98db5ddd 100644 --- a/config/dynamodb.go +++ b/config/dynamodb.go @@ -1,7 +1,9 @@ package config import ( + "cmp" "fmt" + "github.com/artie-labs/reader/constants" "github.com/artie-labs/transfer/lib/stringutil" @@ -42,6 +44,11 @@ type SnapshotSettings struct { // If the files are not specified, that's okay. // We will scan the folder and then load into `specifiedFiles` SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` + BatchSize int32 `yaml:"batchSize"` +} + +func (s *SnapshotSettings) GetBatchSize() int32 { + return cmp.Or(s.BatchSize, constants.DefaultBatchSize) } func (s *SnapshotSettings) Validate() error { diff --git a/lib/s3lib/s3lib.go b/lib/s3lib/s3lib.go index 6797b0cc..d8f7f746 100644 --- a/lib/s3lib/s3lib.go +++ b/lib/s3lib/s3lib.go @@ -49,11 +49,7 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) { } var files []S3File - paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{ - Bucket: bucket, - Prefix: prefix, - }) - + paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix}) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { @@ -69,13 +65,19 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) { return files, nil } -// StreamJsonGzipFile - will take a S3 File that is in `json.gz` format from DynamoDB's export to S3 -// It's not a typical JSON file in that it is compressed and it's new line delimited via separated via an array -// Which means we can stream this file row by row to not OOM. -func (s *S3Client) StreamJsonGzipFile(ctx context.Context, file S3File, ch chan<- map[string]types.AttributeValue) error { - const maxBufferSize = 1024 * 1024 // 1 MB or adjust as needed - +func (s *S3Client) StreamJsonGzipFiles(ctx context.Context, files []S3File, ch chan<- map[string]types.AttributeValue) error { defer close(ch) + for _, file := range files { + if err := s.streamJsonGzipFile(ctx, file, ch); err != nil { + return fmt.Errorf("failed to read s3: %w", err) + } + } + + return nil +} + +func (s *S3Client) streamJsonGzipFile(ctx context.Context, file S3File, ch chan<- map[string]types.AttributeValue) error { + const maxBufferSize = 1024 * 1024 result, err := s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: s.bucketName, Key: file.Key, diff --git a/sources/dynamodb/snapshot.go b/sources/dynamodb/snapshot.go index 47a57f85..e66fe935 100644 --- a/sources/dynamodb/snapshot.go +++ b/sources/dynamodb/snapshot.go @@ -4,24 +4,21 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/lib" - "github.com/artie-labs/reader/lib/dynamo" - "github.com/artie-labs/reader/lib/iterator" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/reader/lib/s3lib" "github.com/artie-labs/reader/writers" ) type SnapshotStore struct { - tableName string - streamArn string - cfg *config.DynamoDB - + tableName string + streamArn string + cfg *config.DynamoDB s3Client *s3lib.S3Client dynamoDBClient *dynamodb.Client } @@ -31,15 +28,33 @@ func (s *SnapshotStore) Close() error { } func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error { + start := time.Now() if err := s.scanFilesOverBucket(ctx); err != nil { return fmt.Errorf("scanning files over bucket failed: %w", err) } - if err := s.streamAndPublish(ctx, writer); err != nil { - return fmt.Errorf("stream and publish failed: %w", err) + keys, err := s.retrievePrimaryKeys(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve primary keys: %w", err) + } + + ch := make(chan map[string]types.AttributeValue) + go func() { + if err = s.s3Client.StreamJsonGzipFiles(ctx, s.cfg.SnapshotSettings.SpecifiedFiles, ch); err != nil { + logger.Panic("Failed to read file", slog.Any("err", err)) + } + }() + + count, err := writer.Write(ctx, NewSnapshotIterator(ch, keys, s.tableName, s.cfg.SnapshotSettings.GetBatchSize())) + if err != nil { + return fmt.Errorf("failed to snapshot: %w", err) } - slog.Info("Finished snapshotting all the files") + slog.Info("Finished snapshotting", + slog.String("tableName", s.tableName), + slog.Int("scannedTotal", count), + slog.Duration("totalDuration", time.Since(start)), + ) return nil } @@ -65,54 +80,3 @@ func (s *SnapshotStore) scanFilesOverBucket(ctx context.Context) error { s.cfg.SnapshotSettings.SpecifiedFiles = files return nil } - -func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Writer) error { - keys, err := s.retrievePrimaryKeys(ctx) - if err != nil { - return fmt.Errorf("failed to retrieve primary keys: %w", err) - } - - writer.SetRunOnComplete(false) - for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles { - logFields := []any{ - slog.String("fileName", *file.Key), - } - - slog.Info("Processing file...", logFields...) - ch := make(chan map[string]types.AttributeValue) - go func() { - if err := s.s3Client.StreamJsonGzipFile(ctx, file, ch); err != nil { - logger.Panic("Failed to read file", slog.Any("err", err)) - } - }() - - var messages []lib.RawMessage - for msg := range ch { - dynamoMsg, err := dynamo.NewMessageFromExport(msg, keys, s.tableName) - if err != nil { - return fmt.Errorf("failed to cast message from DynamoDB, msg: %v, err: %w", msg, err) - } - - messages = append(messages, dynamoMsg.RawMessage()) - // If there are more than 500k messages, we don't need to wait until the whole file is read. - // We can write what we have and continue reading the file. This is done to prevent OOM errors. - if len(messages) > 500_000 { - if _, err = writer.Write(ctx, iterator.Once(messages)); err != nil { - return fmt.Errorf("failed to write messages: %w", err) - } - - // Clear messages - messages = []lib.RawMessage{} - } - } - - // TODO: Create an actual iterator over the files that is passed to the writer. - if _, err = writer.Write(ctx, iterator.Once(messages)); err != nil { - return fmt.Errorf("failed to write messages: %w", err) - } - - slog.Info("Successfully processed file...", logFields...) - } - - return writer.OnComplete() -} diff --git a/sources/dynamodb/snapshot_iterator.go b/sources/dynamodb/snapshot_iterator.go new file mode 100644 index 00000000..50ae95de --- /dev/null +++ b/sources/dynamodb/snapshot_iterator.go @@ -0,0 +1,47 @@ +package dynamodb + +import ( + "fmt" + "github.com/artie-labs/reader/lib" + "github.com/artie-labs/reader/lib/dynamo" + "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" +) + +type SnapshotIterator struct { + ch chan map[string]types.AttributeValue + keys []string + tableName string + batchSize int32 + done bool +} + +func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *SnapshotIterator { + return &SnapshotIterator{ + ch: ch, + keys: keys, + tableName: tblName, + batchSize: batchSize, + } +} + +func (s *SnapshotIterator) HasNext() bool { + return !s.done +} + +func (s *SnapshotIterator) Next() ([]lib.RawMessage, error) { + var msgs []lib.RawMessage + for msg := range s.ch { + dynamoMsg, err := dynamo.NewMessageFromExport(msg, s.keys, s.tableName) + if err != nil { + return nil, fmt.Errorf("failed to cast message from DynamoDB, msg: %v, err: %w", msg, err) + } + + msgs = append(msgs, dynamoMsg.RawMessage()) + if int32(len(msgs)) >= s.batchSize { + return msgs, nil + } + } + + s.done = true + return msgs, nil +} diff --git a/writers/writer.go b/writers/writer.go index d6c9089e..f740ca99 100644 --- a/writers/writer.go +++ b/writers/writer.go @@ -18,15 +18,10 @@ type DestinationWriter interface { type Writer struct { destinationWriter DestinationWriter logProgress bool - runOnComplete bool -} - -func (w *Writer) SetRunOnComplete(runOnComplete bool) { - w.runOnComplete = runOnComplete } func New(destinationWriter DestinationWriter, logProgress bool) Writer { - return Writer{destinationWriter: destinationWriter, logProgress: logProgress, runOnComplete: true} + return Writer{destinationWriter: destinationWriter, logProgress: logProgress} } // Write writes all the messages from an iterator to the destination. @@ -60,10 +55,8 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess } } - if w.runOnComplete { - if err := w.destinationWriter.OnComplete(); err != nil { - return 0, fmt.Errorf("failed running destination OnComplete: %w", err) - } + if err := w.destinationWriter.OnComplete(); err != nil { + return 0, fmt.Errorf("failed running destination OnComplete: %w", err) } return count, nil From ac67568f6fb4077bc12f1128fdb1aa4c3b001c16 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 19 Aug 2024 08:07:26 -0700 Subject: [PATCH 4/4] [Mongo] Moving files around (#471) --- {sources => lib}/mongo/change_event.go | 0 {sources => lib}/mongo/change_event_test.go | 0 {sources => lib}/mongo/message.go | 0 {sources => lib}/mongo/message_test.go | 0 main.go | 2 +- sources/mongo/mongo.go | 9 ++------- sources/mongo/snapshot.go | 11 +++++++---- sources/mongo/streaming.go | 3 ++- writers/transfer/writer_test.go | 2 +- 9 files changed, 13 insertions(+), 14 deletions(-) rename {sources => lib}/mongo/change_event.go (100%) rename {sources => lib}/mongo/change_event_test.go (100%) rename {sources => lib}/mongo/message.go (100%) rename {sources => lib}/mongo/message_test.go (100%) diff --git a/sources/mongo/change_event.go b/lib/mongo/change_event.go similarity index 100% rename from sources/mongo/change_event.go rename to lib/mongo/change_event.go diff --git a/sources/mongo/change_event_test.go b/lib/mongo/change_event_test.go similarity index 100% rename from sources/mongo/change_event_test.go rename to lib/mongo/change_event_test.go diff --git a/sources/mongo/message.go b/lib/mongo/message.go similarity index 100% rename from sources/mongo/message.go rename to lib/mongo/message.go diff --git a/sources/mongo/message_test.go b/lib/mongo/message_test.go similarity index 100% rename from sources/mongo/message_test.go rename to lib/mongo/message_test.go diff --git a/main.go b/main.go index e078b28b..30e54eb7 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,7 @@ func buildSource(ctx context.Context, cfg *config.Settings) (sources.Source, boo case config.SourceDynamo: source, isStreamingMode, err = dynamodb.Load(ctx, *cfg.DynamoDB) case config.SourceMongoDB: - return mongo.Load(*cfg.MongoDB) + return mongo.Load(ctx, *cfg.MongoDB) case config.SourceMSSQL: source, err = mssql.Load(*cfg.MSSQL) case config.SourceMySQL: diff --git a/sources/mongo/mongo.go b/sources/mongo/mongo.go index de63476d..21fbc070 100644 --- a/sources/mongo/mongo.go +++ b/sources/mongo/mongo.go @@ -19,8 +19,7 @@ type Source struct { db *mongo.Database } -func Load(cfg config.MongoDB) (*Source, bool, error) { - ctx := context.Background() +func Load(ctx context.Context, cfg config.MongoDB) (*Source, bool, error) { client, err := mongo.Connect(ctx, mongoLib.OptsFromConfig(cfg)) if err != nil { return nil, false, fmt.Errorf("failed to connect to MongoDB: %w", err) @@ -30,11 +29,7 @@ func Load(cfg config.MongoDB) (*Source, bool, error) { return nil, false, fmt.Errorf("failed to ping MongoDB: %w", err) } - db := client.Database(cfg.Database) - return &Source{ - cfg: cfg, - db: db, - }, cfg.StreamingSettings.Enabled, nil + return &Source{cfg: cfg, db: client.Database(cfg.Database)}, cfg.StreamingSettings.Enabled, nil } func (s *Source) Close() error { diff --git a/sources/mongo/snapshot.go b/sources/mongo/snapshot.go index 9dc1d90e..7149c925 100644 --- a/sources/mongo/snapshot.go +++ b/sources/mongo/snapshot.go @@ -3,13 +3,16 @@ package mongo import ( "context" "fmt" - "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/lib" - "github.com/artie-labs/reader/lib/iterator" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/lib" + "github.com/artie-labs/reader/lib/iterator" + mongoLib "github.com/artie-labs/reader/lib/mongo" ) type snapshotIterator struct { @@ -79,7 +82,7 @@ func (s *snapshotIterator) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to decode document: %w", err) } - mgoMsg, err := ParseMessage(result, nil, "r") + mgoMsg, err := mongoLib.ParseMessage(result, nil, "r") if err != nil { return nil, fmt.Errorf("failed to parse message: %w", err) } diff --git a/sources/mongo/streaming.go b/sources/mongo/streaming.go index 32b8fe03..5f4fc74d 100644 --- a/sources/mongo/streaming.go +++ b/sources/mongo/streaming.go @@ -14,6 +14,7 @@ import ( "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/iterator" + mongoLib "github.com/artie-labs/reader/lib/mongo" "github.com/artie-labs/reader/lib/storage/persistedmap" ) @@ -108,7 +109,7 @@ func (s *streaming) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to decode change event: %w", err) } - changeEvent, err := NewChangeEvent(rawChangeEvent) + changeEvent, err := mongoLib.NewChangeEvent(rawChangeEvent) if err != nil { return nil, fmt.Errorf("failed to parse change event: %w", err) } diff --git a/writers/transfer/writer_test.go b/writers/transfer/writer_test.go index ea40fd83..0618a2c1 100644 --- a/writers/transfer/writer_test.go +++ b/writers/transfer/writer_test.go @@ -10,7 +10,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/sources/mongo" + "github.com/artie-labs/reader/lib/mongo" ) func TestWriter_MessageToEvent(t *testing.T) {