diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java index afa3878c..706c5888 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java @@ -76,7 +76,7 @@ public List listShards(String streamArn, ListShardsStartingPosition start listShardsResponse = kinesisClient.listShards( ListShardsRequest.builder() - .streamARN(streamArn) + .streamARN(nextToken == null ? streamArn : null) .shardFilter( nextToken == null ? startingPosition.getShardFilter() diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java index 19d8bd67..cab4599f 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java @@ -151,14 +151,14 @@ void testListShardsMultipleCalls() { ListShardItem.builder() .validation( getListShardRequestValidation( - STREAM_ARN, null, "next-token-1")) + null, null, "next-token-1")) .shards(expectedShards.subList(1, 2)) .nextToken("next-token-2") .build(), ListShardItem.builder() .validation( getListShardRequestValidation( - STREAM_ARN, null, "next-token-2")) + null, null, "next-token-2")) .shards(expectedShards.subList(2, 4)) .nextToken(null) .build()) @@ -461,6 +461,9 @@ private Consumer getListShardRequestValidation( .nextToken(nextToken) .build(); assertThat(req).isEqualTo(expectedReq); + if (nextToken != null) { + assertThat(streamArn).isNull(); + } }; }