Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33466][Connectors/Kafka]: Bounded Kafka source never finishes after restore from savepoint #71

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@ public class KafkaSourceEnumState {
* this flag will be marked as true if inital partitions are discovered after enumerator starts.
*/
private final boolean initialDiscoveryFinished;
/** this flag will be marked as true if all partition splits are already assigned. */
private final boolean noMoreNewPartitionSplits;

public KafkaSourceEnumState(
Set<TopicPartitionAndAssignmentStatus> partitions, boolean initialDiscoveryFinished) {
Set<TopicPartitionAndAssignmentStatus> partitions,
boolean initialDiscoveryFinished,
boolean noMoreNewPartitionSplits) {
this.partitions = partitions;
this.initialDiscoveryFinished = initialDiscoveryFinished;
this.noMoreNewPartitionSplits = noMoreNewPartitionSplits;
}

KafkaSourceEnumState(
Set<TopicPartition> assignPartitions,
Set<TopicPartition> unassignedInitialPartitions,
boolean initialDiscoveryFinished) {
boolean initialDiscoveryFinished,
boolean noMoreNewPartitionSplits) {
this.partitions = new HashSet<>();
partitions.addAll(
assignPartitions.stream()
Expand All @@ -63,6 +69,7 @@ public KafkaSourceEnumState(
AssignmentStatus.UNASSIGNED_INITIAL))
.collect(Collectors.toSet()));
this.initialDiscoveryFinished = initialDiscoveryFinished;
this.noMoreNewPartitionSplits = noMoreNewPartitionSplits;
}

public Set<TopicPartitionAndAssignmentStatus> partitions() {
Expand All @@ -81,6 +88,10 @@ public boolean initialDiscoveryFinished() {
return initialDiscoveryFinished;
}

public boolean noMoreNewPartitionSplits() {
return noMoreNewPartitionSplits;
}

private Set<TopicPartition> filterPartitionsByAssignmentStatus(
AssignmentStatus assignmentStatus) {
return partitions.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class KafkaSourceEnumStateSerializer
private static final int VERSION_1 = 1;
/**
* state of VERSION_2 contains initialDiscoveryFinished and partitions with different assignment
* status.
* status, as well as noMoreNewPartitionSplits.
*/
private static final int VERSION_2 = 2;

Expand All @@ -69,6 +69,7 @@ public int getVersion() {
public byte[] serialize(KafkaSourceEnumState enumState) throws IOException {
Set<TopicPartitionAndAssignmentStatus> partitions = enumState.partitions();
boolean initialDiscoveryFinished = enumState.initialDiscoveryFinished();
boolean noMoreNewPartitionSplits = enumState.noMoreNewPartitionSplits();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeInt(partitions.size());
Expand All @@ -78,6 +79,7 @@ public byte[] serialize(KafkaSourceEnumState enumState) throws IOException {
out.writeInt(topicPartitionAndAssignmentStatus.assignmentStatus().getStatusCode());
}
out.writeBoolean(initialDiscoveryFinished);
out.writeBoolean(noMoreNewPartitionSplits);
out.flush();
return baos.toByteArray();
}
Expand All @@ -91,7 +93,7 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I
case VERSION_1:
final Set<TopicPartition> assignedPartitions =
deserializeTopicPartitions(serialized);
return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true);
return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true, false);
case VERSION_0:
Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment =
SerdeUtils.deserializeSplitAssignments(
Expand All @@ -103,7 +105,8 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I
split ->
currentAssignedSplits.add(
split.getTopicPartition())));
return new KafkaSourceEnumState(currentAssignedSplits, new HashSet<>(), true);
return new KafkaSourceEnumState(
currentAssignedSplits, new HashSet<>(), true, false);
default:
throw new IOException(
String.format(
Expand Down Expand Up @@ -152,11 +155,13 @@ private static KafkaSourceEnumState deserializeTopicPartitionAndAssignmentStatus
AssignmentStatus.ofStatusCode(statusCode)));
}
final boolean initialDiscoveryFinished = in.readBoolean();
final boolean noMoreNewPartitionSplits = in.readBoolean();
if (in.available() > 0) {
throw new IOException("Unexpected trailing bytes in serialized topic partitions");
}

return new KafkaSourceEnumState(partitions, initialDiscoveryFinished);
return new KafkaSourceEnumState(
partitions, initialDiscoveryFinished, noMoreNewPartitionSplits);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public KafkaSourceEnumerator(
properties,
context,
boundedness,
new KafkaSourceEnumState(Collections.emptySet(), Collections.emptySet(), false));
new KafkaSourceEnumState(
Collections.emptySet(), Collections.emptySet(), false, false));
}

public KafkaSourceEnumerator(
Expand Down Expand Up @@ -142,6 +143,7 @@ public KafkaSourceEnumerator(
this.unassignedInitialPartitions =
new HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions());
this.initialDiscoveryFinished = kafkaSourceEnumState.initialDiscoveryFinished();
this.noMoreNewPartitionSplits = kafkaSourceEnumState.noMoreNewPartitionSplits();
}

/**
Expand Down Expand Up @@ -209,7 +211,10 @@ public void addReader(int subtaskId) {
@Override
public KafkaSourceEnumState snapshotState(long checkpointId) throws Exception {
return new KafkaSourceEnumState(
assignedPartitions, unassignedInitialPartitions, initialDiscoveryFinished);
assignedPartitions,
unassignedInitialPartitions,
initialDiscoveryFinished,
noMoreNewPartitionSplits);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ public void testWorkWithPreexistingAssignments() throws Throwable {
preexistingAssignments,
Collections.emptySet(),
true,
false,
new Properties())) {
enumerator.start();
runPeriodicPartitionDiscovery(context2);
Expand Down Expand Up @@ -394,6 +395,7 @@ public void testKafkaClientProperties() throws Exception {
Collections.emptySet(),
Collections.emptySet(),
false,
false,
properties)) {
enumerator.start();

Expand Down Expand Up @@ -589,6 +591,7 @@ private KafkaSourceEnumerator createEnumerator(
Collections.emptySet(),
Collections.emptySet(),
false,
false,
props);
}

Expand All @@ -608,6 +611,7 @@ private KafkaSourceEnumerator createEnumerator(
Collections.emptySet(),
Collections.emptySet(),
false,
false,
props);
}

Expand All @@ -622,6 +626,7 @@ private KafkaSourceEnumerator createEnumerator(
Set<TopicPartition> assignedPartitions,
Set<TopicPartition> unassignedInitialPartitions,
boolean initialDiscoveryFinished,
boolean noMoreNewPartitionSplits,
Properties overrideProperties) {
// Use a TopicPatternSubscriber so that no exception if a subscribed topic hasn't been
// created yet.
Expand All @@ -644,7 +649,10 @@ private KafkaSourceEnumerator createEnumerator(
enumContext,
Boundedness.CONTINUOUS_UNBOUNDED,
new KafkaSourceEnumState(
assignedPartitions, unassignedInitialPartitions, initialDiscoveryFinished));
assignedPartitions,
unassignedInitialPartitions,
initialDiscoveryFinished,
noMoreNewPartitionSplits));
}

// ---------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void testEnumStateSerde() throws IOException {
new KafkaSourceEnumState(
constructTopicPartitions(0),
constructTopicPartitions(NUM_PARTITIONS_PER_TOPIC),
true,
true);
final KafkaSourceEnumStateSerializer serializer = new KafkaSourceEnumStateSerializer();

Expand All @@ -60,6 +61,7 @@ public void testEnumStateSerde() throws IOException {
assertThat(restoredState.unassignedInitialPartitions())
.isEqualTo(state.unassignedInitialPartitions());
assertThat(restoredState.initialDiscoveryFinished()).isTrue();
assertThat(restoredState.noMoreNewPartitionSplits()).isTrue();
}

@Test
Expand Down Expand Up @@ -87,10 +89,12 @@ public void testBackwardCompatibility() throws IOException {
assertThat(kafkaSourceEnumStateV0.assignedPartitions()).isEqualTo(topicPartitions);
assertThat(kafkaSourceEnumStateV0.unassignedInitialPartitions()).isEmpty();
assertThat(kafkaSourceEnumStateV0.initialDiscoveryFinished()).isTrue();
assertThat(kafkaSourceEnumStateV0.noMoreNewPartitionSplits()).isFalse();

assertThat(kafkaSourceEnumStateV1.assignedPartitions()).isEqualTo(topicPartitions);
assertThat(kafkaSourceEnumStateV1.unassignedInitialPartitions()).isEmpty();
assertThat(kafkaSourceEnumStateV1.initialDiscoveryFinished()).isTrue();
assertThat(kafkaSourceEnumStateV1.noMoreNewPartitionSplits()).isFalse();
}

private Set<TopicPartition> constructTopicPartitions(int startPartition) {
Expand Down