From b339f0e93c845714d36b759bc501fda7e4ad57b9 Mon Sep 17 00:00:00 2001 From: yansuopeng Date: Wed, 3 Apr 2024 17:57:49 +0800 Subject: [PATCH] [FLINK-34995] flink kafka connector source stuck when partition leader invalid --- .../enumerator/subscriber/PartitionSetSubscriber.java | 8 +++++--- .../source/enumerator/subscriber/TopicListSubscriber.java | 5 ++++- .../enumerator/subscriber/TopicPatternSubscriber.java | 8 +++++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java index 3423b0f90..355399454 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java @@ -56,7 +56,7 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) for (TopicPartition subscribedPartition : this.subscribedPartitions) { if (topicMetadata.containsKey(subscribedPartition.topic()) - && partitionExistsInTopic( + && partitionExistsInTopicAndValid( subscribedPartition, topicMetadata.get(subscribedPartition.topic()))) { existingSubscribedPartitions.add(subscribedPartition); } else { @@ -70,7 +70,9 @@ && partitionExistsInTopic( return existingSubscribedPartitions; } - private boolean partitionExistsInTopic(TopicPartition partition, TopicDescription topic) { - return topic.partitions().size() > partition.partition(); + private boolean partitionExistsInTopicAndValid( + TopicPartition partition, TopicDescription topic) { + return topic.partitions().stream() + .anyMatch(p -> p.leader() != null && p.partition() == partition.partition()); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java index b2ad844ab..319648b01 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java @@ -54,7 +54,10 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) Set subscribedPartitions = new HashSet<>(); for (TopicDescription topic : topicMetadata.values()) { for (TopicPartitionInfo partition : topic.partitions()) { - subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition())); + if (partition.leader() != null) { + subscribedPartitions.add( + new TopicPartition(topic.name(), partition.partition())); + } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java index 2a9a75331..2ff315f88 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java @@ -53,9 +53,11 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) (topicName, topicDescription) -> { if (topicPattern.matcher(topicName).matches()) { for (TopicPartitionInfo partition : topicDescription.partitions()) { - subscribedTopicPartitions.add( - new TopicPartition( - topicDescription.name(), partition.partition())); + if (partition.leader() != null) { + subscribedTopicPartitions.add( + new TopicPartition( + topicDescription.name(), partition.partition())); + } } } });