diff --git a/config/config.go b/config/config.go index c176dafa..e54281cc 100644 --- a/config/config.go +++ b/config/config.go @@ -3,7 +3,6 @@ package config import ( "context" "fmt" - "github.com/artie-labs/transfer/lib/stringutil" "gopkg.in/yaml.v2" "log" "os" @@ -40,27 +39,6 @@ func (k *Kafka) Validate() error { return nil } -type DynamoDB struct { - OffsetFile string `yaml:"offsetFile"` - AwsRegion string `yaml:"awsRegion"` - AwsAccessKeyID string `yaml:"awsAccessKeyId"` - AwsSecretAccessKey string `yaml:"awsSecretAccessKey"` - StreamArn string `yaml:"streamArn"` - TableName string `yaml:"tableName"` -} - -func (d *DynamoDB) Validate() error { - if d == nil { - return fmt.Errorf("dynamodb config is nil") - } - - if stringutil.Empty(d.OffsetFile, d.AwsRegion, d.AwsAccessKeyID, d.AwsSecretAccessKey, d.StreamArn, d.TableName) { - return fmt.Errorf("one of the dynamoDB configs is empty: offsetFile, awsRegion, awsAccessKeyID, awsSecretAccessKey, streamArn or tableName") - } - - return nil -} - type Reporting struct { Sentry *Sentry `yaml:"sentry"` } diff --git a/config/dynamodb.go b/config/dynamodb.go new file mode 100644 index 00000000..0007a7e1 --- /dev/null +++ b/config/dynamodb.go @@ -0,0 +1,56 @@ +package config + +import ( + "fmt" + "github.com/artie-labs/reader/lib/s3lib" + "github.com/artie-labs/transfer/lib/stringutil" +) + +type DynamoDB struct { + OffsetFile string `yaml:"offsetFile"` + AwsRegion string `yaml:"awsRegion"` + AwsAccessKeyID string `yaml:"awsAccessKeyId"` + AwsSecretAccessKey string `yaml:"awsSecretAccessKey"` + StreamArn string `yaml:"streamArn"` + TableName string `yaml:"tableName"` + + Snapshot bool `yaml:"snapshot"` + SnapshotSettings *SnapshotSettings `yaml:"snapshotSettings"` +} + +func (d *DynamoDB) Validate() error { + if d == nil { + return fmt.Errorf("dynamodb config is nil") + } + + if stringutil.Empty(d.OffsetFile, d.AwsRegion, d.AwsAccessKeyID, d.AwsSecretAccessKey, d.StreamArn, d.TableName) { + return fmt.Errorf("one of the dynamoDB configs is empty: offsetFile, awsRegion, awsAccessKeyID, awsSecretAccessKey, streamArn or tableName") + } + + if d.Snapshot { + if err := d.SnapshotSettings.Validate(); err != nil { + return fmt.Errorf("snapshot validation failed - err: %v", err) + } + } + + return nil +} + +type SnapshotSettings struct { + Folder string `yaml:"folder"` + // If the files are not specified, that's okay. + // We will scan the folder and then load into `specifiedFiles` + SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` +} + +func (s *SnapshotSettings) Validate() error { + if s == nil { + return fmt.Errorf("settings is nil") + } + + if s.Folder == "" { + return fmt.Errorf("folder is empty") + } + + return nil +} diff --git a/examples/dynamodb/service_account.tf b/examples/dynamodb/service_account.tf index 453cc801..b3709fc9 100644 --- a/examples/dynamodb/service_account.tf +++ b/examples/dynamodb/service_account.tf @@ -16,10 +16,10 @@ resource "aws_iam_role" "dynamodb_streams_role" { name = "DynamoDBStreamsRole" assume_role_policy = jsonencode({ - Version = "2012-10-17", + Version = "2012-10-17", Statement = [ { - Action = "sts:AssumeRole", + Action = "sts:AssumeRole", Principal = { Service = "ec2.amazonaws.com" }, @@ -35,19 +35,37 @@ resource "aws_iam_policy" "dynamodb_streams_access" { description = "My policy that grants access to DynamoDB streams" policy = jsonencode({ - Version = "2012-10-17", + Version = "2012-10-17", Statement = [ { - Effect = "Allow", - Action = [ + Effect = "Allow", + Action = [ "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:GetRecords", - "dynamodb:ListStreams" + "dynamodb:ListStreams", + + // Stuff only required for export (snapshot) + "dynamodb:DescribeTable" ], // Don't want to use "*"? You can specify like this: // Resource = [ TABLE_ARN, TABLE_ARN + "/stream/*" ] Resource = "*" # Modify this to restrict access to specific streams or resources + }, + // Export (snapshot) requires access to S3 + { + "Effect" : "Allow", + "Action" : [ + "s3:ListBucket" + ], + "Resource" : "arn:aws:s3:::artie-transfer-test" + }, + { + "Effect" : "Allow", + "Action" : [ + "s3:GetObject" + ], + "Resource" : "arn:aws:s3:::artie-transfer-test/AWSDynamoDB/*" } ] }) @@ -79,6 +97,7 @@ resource "aws_iam_access_key" "dynamodb_streams_user_key" { user = aws_iam_user.dynamodb_streams_user.name } + # Output AWS credentials output "aws_access_key_id" { value = aws_iam_access_key.dynamodb_streams_user_key.id diff --git a/lib/dynamo/message.go b/lib/dynamo/message.go index 7118dd9b..7866a243 100644 --- a/lib/dynamo/message.go +++ b/lib/dynamo/message.go @@ -7,7 +7,6 @@ import ( "github.com/artie-labs/reader/config" "github.com/artie-labs/transfer/lib/cdc/util" "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/dynamodbstreams" "github.com/segmentio/kafka-go" "strconv" "time" @@ -87,41 +86,6 @@ func transformNewImage(data map[string]*dynamodb.AttributeValue) map[string]inte return transformed } -func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, error) { - if record == nil || record.Dynamodb == nil { - return nil, fmt.Errorf("record is nil or dynamodb does not exist in this event payload") - } - - if len(record.Dynamodb.Keys) == 0 { - return nil, fmt.Errorf("keys is nil") - } - - executionTime := time.Now() - if record.Dynamodb.ApproximateCreationDateTime != nil { - executionTime = *record.Dynamodb.ApproximateCreationDateTime - } - - op := "r" - if record.EventName != nil { - switch *record.EventName { - case "INSERT": - op = "c" - case "MODIFY": - op = "u" - case "REMOVE": - op = "d" - } - } - - return &Message{ - op: op, - tableName: tableName, - executionTime: executionTime, - rowData: transformNewImage(record.Dynamodb.NewImage), - primaryKey: transformNewImage(record.Dynamodb.Keys), - }, nil -} - func (m *Message) artieMessage() (util.SchemaEventPayload, error) { return util.SchemaEventPayload{ Payload: util.Payload{ diff --git a/lib/dynamo/parse_message.go b/lib/dynamo/parse_message.go new file mode 100644 index 00000000..a7750220 --- /dev/null +++ b/lib/dynamo/parse_message.go @@ -0,0 +1,76 @@ +package dynamo + +import ( + "fmt" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodbstreams" + "time" +) + +func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName string) (*Message, error) { + if len(item.Item) == 0 { + return nil, fmt.Errorf("item is nil or keys do not exist in this item payload") + } + + if len(keys) == 0 { + return nil, fmt.Errorf("keys is nil") + } + + // Snapshot time does not exist on the row + // Perhaps we can have it inferred from the manifest file in the future. + executionTime := time.Now() + + rowData := transformNewImage(item.Item) + primaryKeys := make(map[string]interface{}) + for _, key := range keys { + val, isOk := rowData[key] + if !isOk { + return nil, fmt.Errorf("key does not exist in the item payload") + } + + primaryKeys[key] = val + } + + return &Message{ + op: "r", + tableName: tableName, + executionTime: executionTime, + rowData: rowData, + primaryKey: primaryKeys, + }, nil +} + +func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, error) { + if record == nil || record.Dynamodb == nil { + return nil, fmt.Errorf("record is nil or dynamodb does not exist in this event payload") + } + + if len(record.Dynamodb.Keys) == 0 { + return nil, fmt.Errorf("keys is nil") + } + + executionTime := time.Now() + if record.Dynamodb.ApproximateCreationDateTime != nil { + executionTime = *record.Dynamodb.ApproximateCreationDateTime + } + + op := "r" + if record.EventName != nil { + switch *record.EventName { + case "INSERT": + op = "c" + case "MODIFY": + op = "u" + case "REMOVE": + op = "d" + } + } + + return &Message{ + op: op, + tableName: tableName, + executionTime: executionTime, + rowData: transformNewImage(record.Dynamodb.NewImage), + primaryKey: transformNewImage(record.Dynamodb.Keys), + }, nil +} diff --git a/lib/dynamo/parse_message_test.go b/lib/dynamo/parse_message_test.go new file mode 100644 index 00000000..5707f3a7 --- /dev/null +++ b/lib/dynamo/parse_message_test.go @@ -0,0 +1,65 @@ +package dynamo + +import ( + "github.com/artie-labs/transfer/lib/ptr" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/stretchr/testify/assert" +) + +func (d *DynamoDBTestSuite) Test_NewMessageFromExport() { + type _tc struct { + name string + item dynamodb.ItemResponse + keys []string + tableName string + expectedError string + } + + tcs := []_tc{ + { + name: "Test with empty item", + item: dynamodb.ItemResponse{ + Item: map[string]*dynamodb.AttributeValue{}, + }, + keys: []string{"id"}, + tableName: "test", + expectedError: "item is nil or keys do not exist in this item payload", + }, + { + name: "Test with empty keys", + item: dynamodb.ItemResponse{ + Item: map[string]*dynamodb.AttributeValue{ + "id": { + S: ptr.ToString("1"), + }, + }, + }, + keys: []string{}, + tableName: "test", + expectedError: "keys is nil", + }, + { + name: "Test with valid item and keys", + item: dynamodb.ItemResponse{ + Item: map[string]*dynamodb.AttributeValue{ + "id": { + S: ptr.ToString("1"), + }, + }, + }, + keys: []string{"id"}, + tableName: "test", + }, + } + + for _, tc := range tcs { + msg, err := NewMessageFromExport(tc.item, tc.keys, tc.tableName) + if tc.expectedError != "" { + assert.Equal(d.T(), tc.expectedError, err.Error(), tc.name) + } else { + assert.NoError(d.T(), err, tc.name) + assert.Equal(d.T(), tc.tableName, msg.tableName, tc.name) + assert.Equal(d.T(), "r", msg.op, tc.name) + } + } +} diff --git a/lib/s3lib/s3lib.go b/lib/s3lib/s3lib.go new file mode 100644 index 00000000..62c57883 --- /dev/null +++ b/lib/s3lib/s3lib.go @@ -0,0 +1,106 @@ +package s3lib + +import ( + "bufio" + "compress/gzip" + "encoding/json" + "fmt" + "github.com/aws/aws-sdk-go/aws/session" + "strings" + + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/s3" +) + +type S3Client struct { + client *s3.S3 +} + +func NewClient(session *session.Session) *S3Client { + return &S3Client{client: s3.New(session)} +} + +func bucketAndPrefixFromFilePath(fp string) (*string, *string, error) { + // Remove the s3:// prefix if it's there + fp = strings.TrimPrefix(fp, "s3://") + + parts := strings.SplitN(fp, "/", 2) + if len(parts) < 2 { + return nil, nil, fmt.Errorf("invalid S3 path, missing prefix") + } + + bucket := parts[0] + prefix := parts[1] + return &bucket, &prefix, nil +} + +type S3File struct { + Bucket *string `yaml:"bucket"` + Key *string `yaml:"key"` +} + +func (s *S3Client) ListFiles(fp string) ([]S3File, error) { + bucket, prefix, err := bucketAndPrefixFromFilePath(fp) + if err != nil { + return nil, err + } + + var files []S3File + err = s.client.ListObjectsPages(&s3.ListObjectsInput{Bucket: bucket, Prefix: prefix}, + func(page *s3.ListObjectsOutput, lastPage bool) bool { + for _, object := range page.Contents { + files = append(files, S3File{ + Key: object.Key, + Bucket: bucket, + }) + } + + return true + }) + + if err != nil { + return nil, err + } + + return files, nil +} + +// StreamJsonGzipFile - will take a S3 File that is in `json.gz` format from DynamoDB's export to S3 +// It's not a typical JSON file in that it is compressed and it's new line delimited via separated via an array +// Which means we can stream this file row by row to not OOM. +func (s *S3Client) StreamJsonGzipFile(file S3File, ch chan<- dynamodb.ItemResponse) error { + defer close(ch) + result, err := s.client.GetObject(&s3.GetObjectInput{ + Bucket: file.Bucket, + Key: file.Key, + }) + if err != nil { + return fmt.Errorf("failed to get object from S3, err: %v", err) + } + + defer result.Body.Close() + + // Create a gzip reader + gz, err := gzip.NewReader(result.Body) + if err != nil { + return fmt.Errorf("failed to create a GZIP reader for object, err: %v", err) + } + defer gz.Close() + + scanner := bufio.NewScanner(gz) + for scanner.Scan() { + line := scanner.Bytes() + var content dynamodb.ItemResponse + if err = json.Unmarshal(line, &content); err != nil { + return fmt.Errorf("failed to unmarshal, err: %v", err) + } + + ch <- content + } + + if err = scanner.Err(); err != nil { + return fmt.Errorf("error reading from S3 object, err: %v", err) + } + + return nil +} diff --git a/lib/s3lib/s3lib_test.go b/lib/s3lib/s3lib_test.go new file mode 100644 index 00000000..04dcdce3 --- /dev/null +++ b/lib/s3lib/s3lib_test.go @@ -0,0 +1,75 @@ +package s3lib + +import ( + "github.com/artie-labs/transfer/lib/ptr" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBucketAndPrefixFromFilePath(t *testing.T) { + type _tc struct { + name string + fp string + expectedBucket *string + expectedPrefix *string + expectedErr bool + } + + tcs := []_tc{ + { + name: "valid path (w/ S3 prefix)", + fp: "s3://bucket/prefix", + expectedBucket: ptr.ToString("bucket"), + expectedPrefix: ptr.ToString("prefix"), + }, + { + name: "valid path (w/ S3 prefix) with trailing slash", + fp: "s3://bucket/prefix/", + expectedBucket: ptr.ToString("bucket"), + expectedPrefix: ptr.ToString("prefix/"), + }, + { + name: "valid path (w/ S3 prefix) with multiple slashes", + fp: "s3://bucket/prefix/with/multiple/slashes", + expectedBucket: ptr.ToString("bucket"), + expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"), + }, + // Without S3 prefix + { + name: "valid path (w/o S3 prefix)", + fp: "bucket/prefix", + expectedBucket: ptr.ToString("bucket"), + expectedPrefix: ptr.ToString("prefix"), + }, + { + name: "valid path (w/o S3 prefix) with trailing slash", + fp: "bucket/prefix/", + expectedBucket: ptr.ToString("bucket"), + expectedPrefix: ptr.ToString("prefix/"), + }, + { + name: "valid path (w/o S3 prefix) with multiple slashes", + fp: "bucket/prefix/with/multiple/slashes", + expectedBucket: ptr.ToString("bucket"), + expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"), + }, + { + name: "invalid path", + fp: "s3://bucket", + expectedErr: true, + }, + } + + for _, tc := range tcs { + actualBucket, actualPrefix, actualErr := bucketAndPrefixFromFilePath(tc.fp) + if tc.expectedErr { + assert.Error(t, actualErr, tc.name) + } else { + assert.NoError(t, actualErr, tc.name) + + // Now check the actualBucket and prefix + assert.Equal(t, *tc.expectedBucket, *actualBucket, tc.name) + assert.Equal(t, *tc.expectedPrefix, *actualPrefix, tc.name) + } + } +} diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 7805e63b..13b33b34 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -4,22 +4,29 @@ import ( "context" "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib/logger" + "github.com/artie-labs/reader/lib/s3lib" "github.com/artie-labs/reader/sources/dynamodb/offsets" "github.com/artie-labs/transfer/lib/ptr" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodbstreams" "time" ) type Store struct { + s3Client *s3lib.S3Client + dynamoDBClient *dynamodb.DynamoDB + tableName string streamArn string batchSize int streams *dynamodbstreams.DynamoDBStreams storage *offsets.OffsetStorage shardChan chan *dynamodbstreams.Shard + + cfg *config.DynamoDB } // jitterSleepBaseMs - sleep for 50 ms as the base. @@ -28,7 +35,6 @@ const shardScannerInterval = 5 * time.Minute func Load(ctx context.Context) *Store { cfg := config.FromContext(ctx) - sess, err := session.NewSession(&aws.Config{ Region: ptr.ToString(cfg.DynamoDB.AwsRegion), Credentials: credentials.NewStaticCredentials(cfg.DynamoDB.AwsAccessKeyID, cfg.DynamoDB.AwsSecretAccessKey, ""), @@ -42,36 +48,55 @@ func Load(ctx context.Context) *Store { tableName: cfg.DynamoDB.TableName, streamArn: cfg.DynamoDB.StreamArn, batchSize: cfg.Kafka.PublishSize, - storage: offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile, nil, nil), - streams: dynamodbstreams.New(sess), - shardChan: make(chan *dynamodbstreams.Shard), + cfg: cfg.DynamoDB, + } + + if cfg.DynamoDB.Snapshot { + // Snapshot needs the DynamoDB client to describe table and S3 library to read from the files. + store.dynamoDBClient = dynamodb.New(sess) + store.s3Client = s3lib.NewClient(sess) + } else { + // If it's not snapshotting, then we'll need to create offset storage, streams client and a channel. + store.storage = offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile, nil, nil) + store.streams = dynamodbstreams.New(sess) + store.shardChan = make(chan *dynamodbstreams.Shard) } return store } func (s *Store) Run(ctx context.Context) { - ticker := time.NewTicker(shardScannerInterval) - - // Start to subscribe to the channel - go s.ListenToChannel(ctx) + if s.cfg.Snapshot { + if err := s.scanFilesOverBucket(); err != nil { + logger.FromContext(ctx).WithError(err).Fatalf("scanning files over bucket failed") + } - // Scan it for the first time manually, so we don't have to wait 5 mins - s.scanForNewShards(ctx) + if err := s.streamAndPublish(ctx); err != nil { + logger.FromContext(ctx).WithError(err).Fatalf("scanning files over bucket failed") + } - log := logger.FromContext(ctx) - for { - select { - case <-ctx.Done(): - close(s.shardChan) - log.Info("Terminating process...") - return - case <-ticker.C: - log.Info("Scanning for new shards...") - s.scanForNewShards(ctx) + logger.FromContext(ctx).Info("Finished snapshotting all the files") + } else { + ticker := time.NewTicker(shardScannerInterval) + + // Start to subscribe to the channel + go s.ListenToChannel(ctx) + + // Scan it for the first time manually, so we don't have to wait 5 mins + s.scanForNewShards(ctx) + log := logger.FromContext(ctx) + for { + select { + case <-ctx.Done(): + close(s.shardChan) + log.Info("Terminating process...") + return + case <-ticker.C: + log.Info("Scanning for new shards...") + s.scanForNewShards(ctx) + } } } - } func (s *Store) scanForNewShards(ctx context.Context) { diff --git a/sources/dynamodb/primary_keys.go b/sources/dynamodb/primary_keys.go new file mode 100644 index 00000000..2e57ce43 --- /dev/null +++ b/sources/dynamodb/primary_keys.go @@ -0,0 +1,29 @@ +package dynamodb + +import ( + "fmt" + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +// retrievePrimaryKeys - This function is called when we process the DynamoDB table snapshot. +// This is because the snapshot is a JSON file and it does not contain which are the partition and sort keys. +func (s *Store) retrievePrimaryKeys() ([]string, error) { + output, err := s.dynamoDBClient.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: &s.tableName, + }) + + if err != nil { + return nil, err + } + + var keys []string + for _, key := range output.Table.KeySchema { + if key.AttributeName != nil { + keys = append(keys, *key.AttributeName) + } else { + return nil, fmt.Errorf("key does not have attribute name, key: %v", key.String()) + } + } + + return keys, nil +} diff --git a/sources/dynamodb/snapshot.go b/sources/dynamodb/snapshot.go new file mode 100644 index 00000000..96a69ec1 --- /dev/null +++ b/sources/dynamodb/snapshot.go @@ -0,0 +1,71 @@ +package dynamodb + +import ( + "context" + "fmt" + "github.com/artie-labs/reader/lib/dynamo" + "github.com/artie-labs/reader/lib/kafkalib" + "github.com/artie-labs/reader/lib/logger" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/segmentio/kafka-go" +) + +func (s *Store) scanFilesOverBucket() error { + if len(s.cfg.SnapshotSettings.SpecifiedFiles) > 0 { + // Don't scan because you are already specifying files + return nil + } + + files, err := s.s3Client.ListFiles(s.cfg.SnapshotSettings.Folder) + if err != nil { + return fmt.Errorf("failed to list files, err: %v", err) + } + + if len(files) == 0 { + return fmt.Errorf("no files found in the folder: %v", s.cfg.SnapshotSettings.Folder) + } + + s.cfg.SnapshotSettings.SpecifiedFiles = files + return nil +} + +func (s *Store) streamAndPublish(ctx context.Context) error { + log := logger.FromContext(ctx) + + keys, err := s.retrievePrimaryKeys() + if err != nil { + return fmt.Errorf("failed to retrieve primary keys, err: %v", err) + } + + for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles { + ch := make(chan dynamodb.ItemResponse) + go func() { + if err := s.s3Client.StreamJsonGzipFile(file, ch); err != nil { + log.Fatalf("Failed to read file: %v", err) + } + }() + + var kafkaMsgs []kafka.Message + for msg := range ch { + dynamoMsg, err := dynamo.NewMessageFromExport(msg, keys, s.tableName) + if err != nil { + log.WithError(err).WithFields(map[string]interface{}{ + "msg": msg, + }).Fatal("failed to cast message from DynamoDB") + } + + kafkaMsg, err := dynamoMsg.KafkaMessage(ctx) + if err != nil { + log.WithError(err).Fatal("failed to cast message from DynamoDB") + } + + kafkaMsgs = append(kafkaMsgs, kafkaMsg) + } + + if err = kafkalib.NewBatch(kafkaMsgs, s.batchSize).Publish(ctx); err != nil { + log.WithError(err).Fatalf("failed to publish messages, exiting...") + } + } + + return nil +}