diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index 3b33d62c..3e39781e 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -28,10 +28,11 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S return } - if shard.ParentShardId != nil { - parentID := *shard.ParentShardId - if s.storage.GetShardProcessing(parentID) && !s.storage.GetShardProcessed(parentID) { - slog.Info("Parent shard is being processed, let's sleep 3s and retry", slog.String("shardId", *shard.ShardId)) + if parentID := shard.ParentShardId; parentID != nil { + // If the parent shard exists, is it still being processed? If so, let's wait a bit and then retry. + // We must process the parent shard first before processing the child shard. + if s.storage.GetShardProcessing(*parentID) && !s.storage.GetShardProcessed(*parentID) { + slog.Info("Parent shard is being processed, let's sleep 3s and retry", slog.String("shardId", *shard.ShardId), slog.String("parentShardId", *parentID)) time.Sleep(3 * time.Second) s.processShard(ctx, shard, writer) return