Skip to content

Commit

Permalink
[DynamoDB] Final touches + adding a load test (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 23, 2023
1 parent 266220e commit 8b320ce
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 25 deletions.
115 changes: 115 additions & 0 deletions scripts/dynamo/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"fmt"
"log"
"math/rand"
"os"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

const (
region = "us-east-1"
table = "ddb-test"
maxBatchSize = 25 // DynamoDB's limit for batch write
)

func main() {
if len(os.Args) != 2 {
log.Fatalf("Usage: %s <number_of_rows>", os.Args[0])
}

numRows, err := strconv.Atoi(os.Args[1])
if err != nil || numRows < 1 {
log.Fatalf("Please provide a valid number for rows")
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
if err != nil {
log.Fatalf("Failed to create session: %v", err)
}

svc := dynamodb.New(sess)
rand.Seed(time.Now().UnixNano())

// Splitting the items into batches
for i := 0; i < numRows; i += maxBatchSize {
var writeRequests []*dynamodb.WriteRequest
accountID := fmt.Sprintf("account-%d", i)
// For each batch, prepare the items
for j := 0; j < maxBatchSize && (i+j) < numRows; j++ {
userID := fmt.Sprintf("user_id_%v", j)
item := map[string]*dynamodb.AttributeValue{
"account_id": {
S: aws.String(accountID),
},
"user_id": {
S: aws.String(userID),
},
"random_number": {
N: aws.String(fmt.Sprintf("%v", rand.Int63())), // Example number
},
"flag": {
BOOL: aws.Bool(rand.Intn(2) == 0), // Randomly true or false
},
"is_null": {
NULL: aws.Bool(true), // Will always be Null
},
"string_set": {
SS: []*string{aws.String("value1"), aws.String("value2"), aws.String("value44"), aws.String("value55"), aws.String("value66")},
},
"number_set": {
NS: []*string{aws.String("1"), aws.String("2"), aws.String("3")},
},
"sample_list": {
L: []*dynamodb.AttributeValue{
{
S: aws.String("item1"),
},
{
N: aws.String("2"),
},
},
},
"sample_map": {
M: map[string]*dynamodb.AttributeValue{
"key1": {
S: aws.String("value1"),
},
"key2": {
N: aws.String("2"),
},
},
},
}

writeRequest := &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: item,
},
}
writeRequests = append(writeRequests, writeRequest)
}

input := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
table: writeRequests,
},
}

_, err := svc.BatchWriteItem(input)
if err != nil {
log.Printf("Failed to write batch starting at index %d. Error: %v", i, err)
continue
}

log.Printf("Inserted batch of items starting from index %d", i)
}
}
33 changes: 8 additions & 25 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type Store struct {

const (
flushOffsetInterval = 30 * time.Second
// jitterSleepBaseMs - sleep for 5s as the base.
jitterSleepBaseMs = 5000
// jitterSleepBaseMs - sleep for 50 ms as the base.
jitterSleepBaseMs = 50
)

func Load(ctx context.Context) *Store {
Expand Down Expand Up @@ -63,20 +63,17 @@ func (s *Store) Run(ctx context.Context) {
}()

log := logger.FromContext(ctx)
var retrievedMessages bool
var attempts int

for {
input := &dynamodbstreams.DescribeStreamInput{StreamArn: aws.String(s.streamArn)}
result, err := s.streams.DescribeStream(input)
if err != nil {
log.Fatalf("Failed to describe stream: %v", err)
}

for shardCount, shard := range result.StreamDescription.Shards {
for _, shard := range result.StreamDescription.Shards {
iteratorType := "TRIM_HORIZON"
var startingSequenceNumber string

if seqNumber, exists := s.storage.ReadOnlyLastProcessedSequenceNumbers(*shard.ShardId); exists {
iteratorType = "AFTER_SEQUENCE_NUMBER"
startingSequenceNumber = seqNumber
Expand Down Expand Up @@ -106,6 +103,7 @@ func (s *Store) Run(ctx context.Context) {
for shardIterator != nil {
getRecordsInput := &dynamodbstreams.GetRecordsInput{
ShardIterator: shardIterator,
Limit: ptr.ToInt64(1000),
}

getRecordsOutput, err := s.streams.GetRecords(getRecordsInput)
Expand Down Expand Up @@ -145,32 +143,17 @@ func (s *Store) Run(ctx context.Context) {
}

if len(getRecordsOutput.Records) > 0 {
retrievedMessages = true
attempts = 0
lastRecord := getRecordsOutput.Records[len(getRecordsOutput.Records)-1]
s.storage.SetLastProcessedSequenceNumber(*shard.ShardId, *lastRecord.Dynamodb.SequenceNumber)
} else {
// Don't break if it's not the last shard because then we'll skip over the iteration.
if shardCount == len(result.StreamDescription.Shards)-1 {
break
}
attempts += 1
sleepDuration := time.Duration(jitter.JitterMs(jitterSleepBaseMs, attempts)) * time.Millisecond
time.Sleep(sleepDuration)
}

shardIterator = getRecordsOutput.NextShardIterator
}
}

if !retrievedMessages {
attempts += 1
sleepDuration := time.Duration(jitter.JitterMs(jitterSleepBaseMs, attempts)) * time.Millisecond
log.WithFields(map[string]interface{}{
"streamArn": s.streamArn,
"sleepDuration": sleepDuration,
"attempts": attempts,
}).Info("No messages retrieved this iteration, sleeping and will retry again")

time.Sleep(sleepDuration)
} else {
attempts = 0
}
}
}

0 comments on commit 8b320ce

Please sign in to comment.