From b98fad548d4d4676d182358d3d02b0cc9a34c80e Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 20 Sep 2023 09:37:08 -0700 Subject: [PATCH] ScanForNewShards should iterate (#15) --- sources/dynamodb/dynamodb.go | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index efb43869..7805e63b 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -75,13 +75,29 @@ func (s *Store) Run(ctx context.Context) { } func (s *Store) scanForNewShards(ctx context.Context) { - input := &dynamodbstreams.DescribeStreamInput{StreamArn: aws.String(s.streamArn)} - result, err := s.streams.DescribeStream(input) - if err != nil { - logger.FromContext(ctx).Fatalf("Failed to describe stream: %v", err) - } + var exclusiveStartShardId *string + for { + input := &dynamodbstreams.DescribeStreamInput{ + StreamArn: aws.String(s.streamArn), + ExclusiveStartShardId: exclusiveStartShardId, + } + + result, err := s.streams.DescribeStream(input) + if err != nil { + logger.FromContext(ctx).Fatalf("Failed to describe stream: %v", err) + } + + for _, shard := range result.StreamDescription.Shards { + s.shardChan <- shard + } + + if result.StreamDescription.LastEvaluatedShardId == nil { + logger.FromContext(ctx).Info("Finished reading all the shards") + // If LastEvaluatedShardId is null, we've read all the shards. + break + } - for _, shard := range result.StreamDescription.Shards { - s.shardChan <- shard + // Set up the next page query with the LastEvaluatedShardId + exclusiveStartShardId = result.StreamDescription.LastEvaluatedShardId } }