Skip to content

Commit

Permalink
DynamoDB Snapshot (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 27, 2023
1 parent 7f61e63 commit 22f28ec
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 85 deletions.
22 changes: 0 additions & 22 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package config
import (
"context"
"fmt"
"github.com/artie-labs/transfer/lib/stringutil"
"gopkg.in/yaml.v2"
"log"
"os"
Expand Down Expand Up @@ -40,27 +39,6 @@ func (k *Kafka) Validate() error {
return nil
}

type DynamoDB struct {
OffsetFile string `yaml:"offsetFile"`
AwsRegion string `yaml:"awsRegion"`
AwsAccessKeyID string `yaml:"awsAccessKeyId"`
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
StreamArn string `yaml:"streamArn"`
TableName string `yaml:"tableName"`
}

func (d *DynamoDB) Validate() error {
if d == nil {
return fmt.Errorf("dynamodb config is nil")
}

if stringutil.Empty(d.OffsetFile, d.AwsRegion, d.AwsAccessKeyID, d.AwsSecretAccessKey, d.StreamArn, d.TableName) {
return fmt.Errorf("one of the dynamoDB configs is empty: offsetFile, awsRegion, awsAccessKeyID, awsSecretAccessKey, streamArn or tableName")
}

return nil
}

type Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}
Expand Down
56 changes: 56 additions & 0 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package config

import (
"fmt"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/transfer/lib/stringutil"
)

type DynamoDB struct {
OffsetFile string `yaml:"offsetFile"`
AwsRegion string `yaml:"awsRegion"`
AwsAccessKeyID string `yaml:"awsAccessKeyId"`
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
StreamArn string `yaml:"streamArn"`
TableName string `yaml:"tableName"`

Snapshot bool `yaml:"snapshot"`
SnapshotSettings *SnapshotSettings `yaml:"snapshotSettings"`
}

func (d *DynamoDB) Validate() error {
if d == nil {
return fmt.Errorf("dynamodb config is nil")
}

if stringutil.Empty(d.OffsetFile, d.AwsRegion, d.AwsAccessKeyID, d.AwsSecretAccessKey, d.StreamArn, d.TableName) {
return fmt.Errorf("one of the dynamoDB configs is empty: offsetFile, awsRegion, awsAccessKeyID, awsSecretAccessKey, streamArn or tableName")
}

if d.Snapshot {
if err := d.SnapshotSettings.Validate(); err != nil {
return fmt.Errorf("snapshot validation failed - err: %v", err)
}
}

return nil
}

type SnapshotSettings struct {
Folder string `yaml:"folder"`
// If the files are not specified, that's okay.
// We will scan the folder and then load into `specifiedFiles`
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"`
}

func (s *SnapshotSettings) Validate() error {
if s == nil {
return fmt.Errorf("settings is nil")
}

if s.Folder == "" {
return fmt.Errorf("folder is empty")
}

return nil
}
31 changes: 25 additions & 6 deletions examples/dynamodb/service_account.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ resource "aws_iam_role" "dynamodb_streams_role" {
name = "DynamoDBStreamsRole"

assume_role_policy = jsonencode({
Version = "2012-10-17",
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Action = "sts:AssumeRole",
Principal = {
Service = "ec2.amazonaws.com"
},
Expand All @@ -35,19 +35,37 @@ resource "aws_iam_policy" "dynamodb_streams_access" {
description = "My policy that grants access to DynamoDB streams"

policy = jsonencode({
Version = "2012-10-17",
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
Effect = "Allow",
Action = [
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:ListStreams"
"dynamodb:ListStreams",

// Stuff only required for export (snapshot)
"dynamodb:DescribeTable"
],
// Don't want to use "*"? You can specify like this:
// Resource = [ TABLE_ARN, TABLE_ARN + "/stream/*" ]
Resource = "*" # Modify this to restrict access to specific streams or resources
},
// Export (snapshot) requires access to S3
{
"Effect" : "Allow",
"Action" : [
"s3:ListBucket"
],
"Resource" : "arn:aws:s3:::artie-transfer-test"
},
{
"Effect" : "Allow",
"Action" : [
"s3:GetObject"
],
"Resource" : "arn:aws:s3:::artie-transfer-test/AWSDynamoDB/*"
}
]
})
Expand Down Expand Up @@ -79,6 +97,7 @@ resource "aws_iam_access_key" "dynamodb_streams_user_key" {
user = aws_iam_user.dynamodb_streams_user.name
}


# Output AWS credentials
output "aws_access_key_id" {
value = aws_iam_access_key.dynamodb_streams_user_key.id
Expand Down
36 changes: 0 additions & 36 deletions lib/dynamo/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
"github.com/segmentio/kafka-go"
"strconv"
"time"
Expand Down Expand Up @@ -87,41 +86,6 @@ func transformNewImage(data map[string]*dynamodb.AttributeValue) map[string]inte
return transformed
}

func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, error) {
if record == nil || record.Dynamodb == nil {
return nil, fmt.Errorf("record is nil or dynamodb does not exist in this event payload")
}

if len(record.Dynamodb.Keys) == 0 {
return nil, fmt.Errorf("keys is nil")
}

executionTime := time.Now()
if record.Dynamodb.ApproximateCreationDateTime != nil {
executionTime = *record.Dynamodb.ApproximateCreationDateTime
}

op := "r"
if record.EventName != nil {
switch *record.EventName {
case "INSERT":
op = "c"
case "MODIFY":
op = "u"
case "REMOVE":
op = "d"
}
}

return &Message{
op: op,
tableName: tableName,
executionTime: executionTime,
rowData: transformNewImage(record.Dynamodb.NewImage),
primaryKey: transformNewImage(record.Dynamodb.Keys),
}, nil
}

func (m *Message) artieMessage() (util.SchemaEventPayload, error) {
return util.SchemaEventPayload{
Payload: util.Payload{
Expand Down
76 changes: 76 additions & 0 deletions lib/dynamo/parse_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package dynamo

import (
"fmt"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
"time"
)

func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName string) (*Message, error) {
if len(item.Item) == 0 {
return nil, fmt.Errorf("item is nil or keys do not exist in this item payload")
}

if len(keys) == 0 {
return nil, fmt.Errorf("keys is nil")
}

// Snapshot time does not exist on the row
// Perhaps we can have it inferred from the manifest file in the future.
executionTime := time.Now()

rowData := transformNewImage(item.Item)
primaryKeys := make(map[string]interface{})
for _, key := range keys {
val, isOk := rowData[key]
if !isOk {
return nil, fmt.Errorf("key does not exist in the item payload")
}

primaryKeys[key] = val
}

return &Message{
op: "r",
tableName: tableName,
executionTime: executionTime,
rowData: rowData,
primaryKey: primaryKeys,
}, nil
}

func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, error) {
if record == nil || record.Dynamodb == nil {
return nil, fmt.Errorf("record is nil or dynamodb does not exist in this event payload")
}

if len(record.Dynamodb.Keys) == 0 {
return nil, fmt.Errorf("keys is nil")
}

executionTime := time.Now()
if record.Dynamodb.ApproximateCreationDateTime != nil {
executionTime = *record.Dynamodb.ApproximateCreationDateTime
}

op := "r"
if record.EventName != nil {
switch *record.EventName {
case "INSERT":
op = "c"
case "MODIFY":
op = "u"
case "REMOVE":
op = "d"
}
}

return &Message{
op: op,
tableName: tableName,
executionTime: executionTime,
rowData: transformNewImage(record.Dynamodb.NewImage),
primaryKey: transformNewImage(record.Dynamodb.Keys),
}, nil
}
65 changes: 65 additions & 0 deletions lib/dynamo/parse_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dynamo

import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/stretchr/testify/assert"
)

func (d *DynamoDBTestSuite) Test_NewMessageFromExport() {
type _tc struct {
name string
item dynamodb.ItemResponse
keys []string
tableName string
expectedError string
}

tcs := []_tc{
{
name: "Test with empty item",
item: dynamodb.ItemResponse{
Item: map[string]*dynamodb.AttributeValue{},
},
keys: []string{"id"},
tableName: "test",
expectedError: "item is nil or keys do not exist in this item payload",
},
{
name: "Test with empty keys",
item: dynamodb.ItemResponse{
Item: map[string]*dynamodb.AttributeValue{
"id": {
S: ptr.ToString("1"),
},
},
},
keys: []string{},
tableName: "test",
expectedError: "keys is nil",
},
{
name: "Test with valid item and keys",
item: dynamodb.ItemResponse{
Item: map[string]*dynamodb.AttributeValue{
"id": {
S: ptr.ToString("1"),
},
},
},
keys: []string{"id"},
tableName: "test",
},
}

for _, tc := range tcs {
msg, err := NewMessageFromExport(tc.item, tc.keys, tc.tableName)
if tc.expectedError != "" {
assert.Equal(d.T(), tc.expectedError, err.Error(), tc.name)
} else {
assert.NoError(d.T(), err, tc.name)
assert.Equal(d.T(), tc.tableName, msg.tableName, tc.name)
assert.Equal(d.T(), "r", msg.op, tc.name)
}
}
}
Loading

0 comments on commit 22f28ec

Please sign in to comment.