diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java index 1375d9f60..2f13f5bd2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java @@ -35,17 +35,23 @@ public class KafkaSourceEnumState { * this flag will be marked as true if inital partitions are discovered after enumerator starts. */ private final boolean initialDiscoveryFinished; + /** this flag will be marked as true if all partition splits are already assigned. */ + private final boolean noMoreNewPartitionSplits; public KafkaSourceEnumState( - Set partitions, boolean initialDiscoveryFinished) { + Set partitions, + boolean initialDiscoveryFinished, + boolean noMoreNewPartitionSplits) { this.partitions = partitions; this.initialDiscoveryFinished = initialDiscoveryFinished; + this.noMoreNewPartitionSplits = noMoreNewPartitionSplits; } KafkaSourceEnumState( Set assignPartitions, Set unassignedInitialPartitions, - boolean initialDiscoveryFinished) { + boolean initialDiscoveryFinished, + boolean noMoreNewPartitionSplits) { this.partitions = new HashSet<>(); partitions.addAll( assignPartitions.stream() @@ -63,6 +69,7 @@ public KafkaSourceEnumState( AssignmentStatus.UNASSIGNED_INITIAL)) .collect(Collectors.toSet())); this.initialDiscoveryFinished = initialDiscoveryFinished; + this.noMoreNewPartitionSplits = noMoreNewPartitionSplits; } public Set partitions() { @@ -81,6 +88,10 @@ public boolean initialDiscoveryFinished() { return initialDiscoveryFinished; } + public boolean noMoreNewPartitionSplits() { + return noMoreNewPartitionSplits; + } + private Set filterPartitionsByAssignmentStatus( AssignmentStatus assignmentStatus) { return partitions.stream() diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index 0ea4d9f65..28579457f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -54,7 +54,7 @@ public class KafkaSourceEnumStateSerializer private static final int VERSION_1 = 1; /** * state of VERSION_2 contains initialDiscoveryFinished and partitions with different assignment - * status. + * status, as well as noMoreNewPartitionSplits. */ private static final int VERSION_2 = 2; @@ -69,6 +69,7 @@ public int getVersion() { public byte[] serialize(KafkaSourceEnumState enumState) throws IOException { Set partitions = enumState.partitions(); boolean initialDiscoveryFinished = enumState.initialDiscoveryFinished(); + boolean noMoreNewPartitionSplits = enumState.noMoreNewPartitionSplits(); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos)) { out.writeInt(partitions.size()); @@ -78,6 +79,7 @@ public byte[] serialize(KafkaSourceEnumState enumState) throws IOException { out.writeInt(topicPartitionAndAssignmentStatus.assignmentStatus().getStatusCode()); } out.writeBoolean(initialDiscoveryFinished); + out.writeBoolean(noMoreNewPartitionSplits); out.flush(); return baos.toByteArray(); } @@ -91,7 +93,7 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I case VERSION_1: final Set assignedPartitions = deserializeTopicPartitions(serialized); - return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true); + return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true, false); case VERSION_0: Map> currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments( @@ -103,7 +105,8 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I split -> currentAssignedSplits.add( split.getTopicPartition()))); - return new KafkaSourceEnumState(currentAssignedSplits, new HashSet<>(), true); + return new KafkaSourceEnumState( + currentAssignedSplits, new HashSet<>(), true, false); default: throw new IOException( String.format( @@ -152,11 +155,13 @@ private static KafkaSourceEnumState deserializeTopicPartitionAndAssignmentStatus AssignmentStatus.ofStatusCode(statusCode))); } final boolean initialDiscoveryFinished = in.readBoolean(); + final boolean noMoreNewPartitionSplits = in.readBoolean(); if (in.available() > 0) { throw new IOException("Unexpected trailing bytes in serialized topic partitions"); } - return new KafkaSourceEnumState(partitions, initialDiscoveryFinished); + return new KafkaSourceEnumState( + partitions, initialDiscoveryFinished, noMoreNewPartitionSplits); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 02323a74f..9fee9e269 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -112,7 +112,8 @@ public KafkaSourceEnumerator( properties, context, boundedness, - new KafkaSourceEnumState(Collections.emptySet(), Collections.emptySet(), false)); + new KafkaSourceEnumState( + Collections.emptySet(), Collections.emptySet(), false, false)); } public KafkaSourceEnumerator( @@ -142,6 +143,7 @@ public KafkaSourceEnumerator( this.unassignedInitialPartitions = new HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions()); this.initialDiscoveryFinished = kafkaSourceEnumState.initialDiscoveryFinished(); + this.noMoreNewPartitionSplits = kafkaSourceEnumState.noMoreNewPartitionSplits(); } /** @@ -209,7 +211,10 @@ public void addReader(int subtaskId) { @Override public KafkaSourceEnumState snapshotState(long checkpointId) throws Exception { return new KafkaSourceEnumState( - assignedPartitions, unassignedInitialPartitions, initialDiscoveryFinished); + assignedPartitions, + unassignedInitialPartitions, + initialDiscoveryFinished, + noMoreNewPartitionSplits); } @Override diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index 8b308af16..0d7e60464 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -363,6 +363,7 @@ public void testWorkWithPreexistingAssignments() throws Throwable { preexistingAssignments, Collections.emptySet(), true, + false, new Properties())) { enumerator.start(); runPeriodicPartitionDiscovery(context2); @@ -394,6 +395,7 @@ public void testKafkaClientProperties() throws Exception { Collections.emptySet(), Collections.emptySet(), false, + false, properties)) { enumerator.start(); @@ -589,6 +591,7 @@ private KafkaSourceEnumerator createEnumerator( Collections.emptySet(), Collections.emptySet(), false, + false, props); } @@ -608,6 +611,7 @@ private KafkaSourceEnumerator createEnumerator( Collections.emptySet(), Collections.emptySet(), false, + false, props); } @@ -622,6 +626,7 @@ private KafkaSourceEnumerator createEnumerator( Set assignedPartitions, Set unassignedInitialPartitions, boolean initialDiscoveryFinished, + boolean noMoreNewPartitionSplits, Properties overrideProperties) { // Use a TopicPatternSubscriber so that no exception if a subscribed topic hasn't been // created yet. @@ -644,7 +649,10 @@ private KafkaSourceEnumerator createEnumerator( enumContext, Boundedness.CONTINUOUS_UNBOUNDED, new KafkaSourceEnumState( - assignedPartitions, unassignedInitialPartitions, initialDiscoveryFinished)); + assignedPartitions, + unassignedInitialPartitions, + initialDiscoveryFinished, + noMoreNewPartitionSplits)); } // --------------------- diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java index 6c172e4a2..71b428daf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java @@ -48,6 +48,7 @@ public void testEnumStateSerde() throws IOException { new KafkaSourceEnumState( constructTopicPartitions(0), constructTopicPartitions(NUM_PARTITIONS_PER_TOPIC), + true, true); final KafkaSourceEnumStateSerializer serializer = new KafkaSourceEnumStateSerializer(); @@ -60,6 +61,7 @@ public void testEnumStateSerde() throws IOException { assertThat(restoredState.unassignedInitialPartitions()) .isEqualTo(state.unassignedInitialPartitions()); assertThat(restoredState.initialDiscoveryFinished()).isTrue(); + assertThat(restoredState.noMoreNewPartitionSplits()).isTrue(); } @Test @@ -87,10 +89,12 @@ public void testBackwardCompatibility() throws IOException { assertThat(kafkaSourceEnumStateV0.assignedPartitions()).isEqualTo(topicPartitions); assertThat(kafkaSourceEnumStateV0.unassignedInitialPartitions()).isEmpty(); assertThat(kafkaSourceEnumStateV0.initialDiscoveryFinished()).isTrue(); + assertThat(kafkaSourceEnumStateV0.noMoreNewPartitionSplits()).isFalse(); assertThat(kafkaSourceEnumStateV1.assignedPartitions()).isEqualTo(topicPartitions); assertThat(kafkaSourceEnumStateV1.unassignedInitialPartitions()).isEmpty(); assertThat(kafkaSourceEnumStateV1.initialDiscoveryFinished()).isTrue(); + assertThat(kafkaSourceEnumStateV1.noMoreNewPartitionSplits()).isFalse(); } private Set constructTopicPartitions(int startPartition) {