Skip to content

Commit

Permalink
ScanForNewShards should iterate (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 20, 2023
1 parent e815a3d commit b98fad5
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,29 @@ func (s *Store) Run(ctx context.Context) {
}

func (s *Store) scanForNewShards(ctx context.Context) {
input := &dynamodbstreams.DescribeStreamInput{StreamArn: aws.String(s.streamArn)}
result, err := s.streams.DescribeStream(input)
if err != nil {
logger.FromContext(ctx).Fatalf("Failed to describe stream: %v", err)
}
var exclusiveStartShardId *string
for {
input := &dynamodbstreams.DescribeStreamInput{
StreamArn: aws.String(s.streamArn),
ExclusiveStartShardId: exclusiveStartShardId,
}

result, err := s.streams.DescribeStream(input)
if err != nil {
logger.FromContext(ctx).Fatalf("Failed to describe stream: %v", err)
}

for _, shard := range result.StreamDescription.Shards {
s.shardChan <- shard
}

if result.StreamDescription.LastEvaluatedShardId == nil {
logger.FromContext(ctx).Info("Finished reading all the shards")
// If LastEvaluatedShardId is null, we've read all the shards.
break
}

for _, shard := range result.StreamDescription.Shards {
s.shardChan <- shard
// Set up the next page query with the LastEvaluatedShardId
exclusiveStartShardId = result.StreamDescription.LastEvaluatedShardId
}
}

0 comments on commit b98fad5

Please sign in to comment.