From 6763f0a557dcdddf721688a7dde54afa86c93737 Mon Sep 17 00:00:00 2001 From: Elphas Toringepi Date: Thu, 31 Oct 2024 15:38:46 +0000 Subject: [PATCH] [FLINK-36637] Fix ListShards for Kinesis streams with large number of shards --- .../connector/kinesis/source/proxy/KinesisStreamProxy.java | 2 +- .../kinesis/source/proxy/KinesisStreamProxyTest.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java index afa3878c..706c5888 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java @@ -76,7 +76,7 @@ public List listShards(String streamArn, ListShardsStartingPosition start listShardsResponse = kinesisClient.listShards( ListShardsRequest.builder() - .streamARN(streamArn) + .streamARN(nextToken == null ? streamArn : null) .shardFilter( nextToken == null ? startingPosition.getShardFilter() diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java index 19d8bd67..cab4599f 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java @@ -151,14 +151,14 @@ void testListShardsMultipleCalls() { ListShardItem.builder() .validation( getListShardRequestValidation( - STREAM_ARN, null, "next-token-1")) + null, null, "next-token-1")) .shards(expectedShards.subList(1, 2)) .nextToken("next-token-2") .build(), ListShardItem.builder() .validation( getListShardRequestValidation( - STREAM_ARN, null, "next-token-2")) + null, null, "next-token-2")) .shards(expectedShards.subList(2, 4)) .nextToken(null) .build()) @@ -461,6 +461,9 @@ private Consumer getListShardRequestValidation( .nextToken(nextToken) .build(); assertThat(req).isEqualTo(expectedReq); + if (nextToken != null) { + assertThat(streamArn).isNull(); + } }; }