Skip to content

Commit

Permalink
[FLINK-36287] Disallow UC for inner sink channels
Browse files Browse the repository at this point in the history
Between sink writer, the committer, and any pre/post commit topology (including global committer), we don't send actual payload but just committables. These committables must be committed on notifyCheckpointCompleted. However, if a barrier overtakes these committables, they may only be read after the RPC call has been made leading to violations. In particular, we could even have these issues during a final checkpoint.

This commit generalizes the way, we disable UC for broadcast and pointwise connections, such that the SinkTransformationTranslator can also disable it for other distribution pattern.

This commit also Simplifies committer with UC disabled: Without unaligned checkpoints, we receive all committables of a given upstream task before the respective barrier. Thus, when the barrier reaches the committer, all committables of a specific checkpoint must have been received. Committing happens even later on notifyCheckpointComplete.

Added an assertion that verifies that all committables are indeed collected on commit. Note that this change also works when we recover a sink with channel state as it will only be called on the next (partially aligned) checkpoint.

(cherry picked from commit 9d21878)
  • Loading branch information
AHeise committed Nov 14, 2024
1 parent 244e955 commit 5dea94b
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
Expand Down Expand Up @@ -287,7 +286,8 @@ public StreamGraph generate() {
setFineGrainedGlobalStreamExchangeMode(streamGraph);

for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
if (node.getInEdges().stream()
.anyMatch(e -> !e.getPartitioner().isSupportsUnalignedCheckpoint())) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
Expand All @@ -303,11 +303,6 @@ public StreamGraph generate() {
return builtStreamGraph;
}

private boolean shouldDisableUnalignedCheckpointing(StreamEdge edge) {
StreamPartitioner<?> partitioner = edge.getPartitioner();
return partitioner.isPointwise() || partitioner.isBroadcast();
}

private void setDynamic(final StreamGraph graph) {
Optional<JobManagerOptions.SchedulerType> schedulerTypeOptional =
executionConfig.getSchedulerType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public boolean isPointwise() {
return true;
}

@Override
public void disableUnalignedCheckpoints() {
hashPartitioner.disableUnalignedCheckpoints();
}

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public abstract class StreamPartitioner<T>

protected int numberOfChannels;

/**
* By default, all partitioner except {@link #isBroadcast()} or {@link #isPointwise()} support
* unaligned checkpoints. However, transformations may disable unaligned checkpoints for
* specific cases.
*/
private boolean supportsUnalignedCheckpoint = true;

@Override
public void setup(int numberOfChannels) {
this.numberOfChannels = numberOfChannels;
Expand Down Expand Up @@ -78,4 +85,12 @@ public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
public abstract SubtaskStateMapper getDownstreamSubtaskStateMapper();

public abstract boolean isPointwise();

public boolean isSupportsUnalignedCheckpoint() {
return supportsUnalignedCheckpoint && !isPointwise() && !isBroadcast();
}

public void disableUnalignedCheckpoints() {
this.supportsUnalignedCheckpoint = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
Expand All @@ -48,10 +49,14 @@

import javax.annotation.Nullable;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand Down Expand Up @@ -170,11 +175,9 @@ private void expand() {
sink instanceof SupportsConcurrentExecutionAttempts);
}

final List<Transformation<?>> sinkTransformations =
executionEnvironment
.getTransformations()
.subList(sizeBefore, executionEnvironment.getTransformations().size());
sinkTransformations.forEach(context::transform);
getSinkTransformations(sizeBefore).forEach(context::transform);

disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore));

// Remove all added sink subtransformations to avoid duplications and allow additional
// expansions
Expand All @@ -185,6 +188,58 @@ private void expand() {
}
}

private List<Transformation<?>> getSinkTransformations(int sizeBefore) {
return executionEnvironment
.getTransformations()
.subList(sizeBefore, executionEnvironment.getTransformations().size());
}

/**
* Disables UC for all connections of operators within the sink expansion. This is necessary
* because committables need to be at the respective operators on notifyCheckpointComplete
* or else we can't commit all side-effects, which violates the contract of
* notifyCheckpointComplete.
*/
private void disallowUnalignedCheckpoint(List<Transformation<?>> sinkTransformations) {
Optional<Transformation<?>> writerOpt =
sinkTransformations.stream().filter(SinkExpander::isWriter).findFirst();
Preconditions.checkState(writerOpt.isPresent(), "Writer transformation not found.");
Transformation<?> writer = writerOpt.get();
int indexOfWriter = sinkTransformations.indexOf(writer);

// check all transformation after the writer and recursively disable UC for all inputs
// up to the writer
Set<Integer> seen = new HashSet<>(writer.getId());
Queue<Transformation<?>> pending =
new ArrayDeque<>(
sinkTransformations.subList(
indexOfWriter + 1, sinkTransformations.size()));

while (!pending.isEmpty()) {
Transformation<?> current = pending.poll();
seen.add(current.getId());

for (Transformation<?> input : current.getInputs()) {
if (input instanceof PartitionTransformation) {
((PartitionTransformation<?>) input)
.getPartitioner()
.disableUnalignedCheckpoints();
}
if (seen.add(input.getId())) {
pending.add(input);
}
}
}
}

private static boolean isWriter(Transformation<?> t) {
if (!(t instanceof OneInputTransformation)) {
return false;
}
return ((OneInputTransformation<?, ?>) t).getOperatorFactory()
instanceof SinkWriterOperatorFactory;
}

private <CommT, WriteResultT> void addCommittingTopology(
Sink<T> sink, DataStream<T> inputStream) {
SupportsCommitter<CommT> committingSink = (SupportsCommitter<CommT>) sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ void generateWriterCommitterTopology() {
findNodeName(streamGraph, name -> name.contains("Committer"));

assertThat(streamGraph.getStreamNodes()).hasSize(3);
assertNoUnalignedOutput(writerNode);

validateTopology(
writerNode,
Expand Down Expand Up @@ -213,6 +214,10 @@ void validateTopology(
assertThat(dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP);
}

protected static void assertNoUnalignedOutput(StreamNode src) {
assertThat(src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints());
}

StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode) {
return buildGraph(sink, runtimeExecutionMode, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ void generateWriterCommitterGlobalCommitterTopology() {
SinkWriterOperatorFactory.class,
PARALLELISM,
-1);
assertNoUnalignedOutput(writerNode);

StreamNode lastNode;
if (runtimeExecutionMode == RuntimeExecutionMode.STREAMING) {
// in streaming writer and committer are merged into one operator

Expand All @@ -99,12 +99,12 @@ void generateWriterCommitterGlobalCommitterTopology() {
CommitterOperatorFactory.class,
PARALLELISM,
-1);
assertNoUnalignedOutput(committerNode);
}
lastNode = committerNode;

final StreamNode globalCommitterNode = findGlobalCommitter(streamGraph);
validateTopology(
lastNode,
committerNode,
SimpleVersionedSerializerTypeSerializerProxy.class,
globalCommitterNode,
SimpleOperatorFactory.class,
Expand Down Expand Up @@ -138,6 +138,7 @@ void generateWriterGlobalCommitterTopology() {
final StreamNode committerNode = findCommitter(streamGraph);
final StreamNode globalCommitterNode = findGlobalCommitter(streamGraph);

assertNoUnalignedOutput(writerNode);
validateTopology(
writerNode,
SimpleVersionedSerializerTypeSerializerProxy.class,
Expand All @@ -146,6 +147,7 @@ void generateWriterGlobalCommitterTopology() {
PARALLELISM,
-1);

assertNoUnalignedOutput(committerNode);
validateTopology(
committerNode,
SimpleVersionedSerializerTypeSerializerProxy.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary;
import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

abstract class CommitterOperatorTestBase {

Expand Down Expand Up @@ -86,7 +87,7 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
}

@Test
void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception {
void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
SinkAndCounters sinkAndCounters = sinkWithPostCommit();
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
Expand All @@ -109,8 +110,7 @@ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exceptio
final CommittableWithLineage<String> second = new CommittableWithLineage<>("2", 1L, 1);
testHarness.processElement(new StreamRecord<>(second));

// Trigger commit Retry
testHarness.getProcessingTimeService().setCurrentTime(2000);
assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException();

final List<StreamElement> output = fromOutput(testHarness.getOutput());
assertThat(output).hasSize(3);
Expand All @@ -126,42 +126,6 @@ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exceptio
testHarness.close();
}

@Test
void testImmediatelyCommitLateCommittables() throws Exception {
SinkAndCounters sinkAndCounters = sinkWithPostCommit();

final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
testHarness = createTestHarness(sinkAndCounters.sink, false, true);
testHarness.open();

final CommittableSummary<String> committableSummary =
new CommittableSummary<>(1, 1, 1L, 1, 1, 0);
testHarness.processElement(new StreamRecord<>(committableSummary));

// Receive notify checkpoint completed before the last data. This might happen for unaligned
// checkpoints.
testHarness.notifyOfCompletedCheckpoint(1);

assertThat(testHarness.getOutput()).isEmpty();

final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 1L, 1);

// Commit elements with lower or equal the latest checkpoint id immediately
testHarness.processElement(new StreamRecord<>(first));

final List<StreamElement> output = fromOutput(testHarness.getOutput());
assertThat(output).hasSize(2);
assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
.hasPendingCommittables(0);
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
.isEqualTo(copyCommittableWithDifferentOrigin(first, 0));
testHarness.close();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.test.util.AbstractTestBase;
Expand Down Expand Up @@ -148,7 +150,7 @@ void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Excepti
(Supplier<Queue<String>> & Serializable) () -> GLOBAL_COMMIT_QUEUE)
.build());

env.execute();
executeAndVerifyStreamGraph(env);

// TODO: At present, for a bounded scenario, the occurrence of final checkpoint is not a
// deterministic event, so
Expand Down Expand Up @@ -178,7 +180,7 @@ void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
() -> GLOBAL_COMMIT_QUEUE)
.build());

env.execute();
executeAndVerifyStreamGraph(env);

assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);

Expand All @@ -202,7 +204,9 @@ void writerAndCommitterExecuteInStreamingMode() throws Exception {
.setDefaultCommitter(
(Supplier<Queue<String>> & Serializable) () -> COMMIT_QUEUE)
.build());
env.execute();

executeAndVerifyStreamGraph(env);

assertThat(COMMIT_QUEUE)
.containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
}
Expand Down Expand Up @@ -254,7 +258,9 @@ void writerAndCommitterExecuteInBatchMode() throws Exception {
.setDefaultCommitter(
(Supplier<Queue<String>> & Serializable) () -> COMMIT_QUEUE)
.build());
env.execute();

executeAndVerifyStreamGraph(env);

assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
}

Expand All @@ -275,7 +281,7 @@ void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
(Supplier<Queue<String>> & Serializable) () -> GLOBAL_COMMIT_QUEUE)
.build());

env.execute();
executeAndVerifyStreamGraph(env);

// TODO: At present, for a bounded scenario, the occurrence of final checkpoint is not a
// deterministic event, so
Expand All @@ -299,12 +305,29 @@ void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
(Supplier<Queue<String>> & Serializable)
() -> GLOBAL_COMMIT_QUEUE)
.build());
env.execute();

executeAndVerifyStreamGraph(env);

assertThat(GLOBAL_COMMIT_QUEUE)
.containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE);
}

private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) throws Exception {
StreamGraph streamGraph = env.getStreamGraph();
assertNoUnalignedCheckpointInSink(streamGraph);
env.execute(streamGraph);
}

private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
// all the edges out of the sink nodes should not support unaligned checkpoints
// we rely on other tests that this property is correctly used.
assertThat(streamGraph.getStreamNodes())
.filteredOn(t -> t.getOperatorName().contains("Sink"))
.flatMap(StreamNode::getOutEdges)
.allMatch(e -> !e.supportsUnalignedCheckpoints())
.isNotEmpty();
}

private static List<String> getSplitGlobalCommittedData() {
return GLOBAL_COMMIT_QUEUE.stream()
.flatMap(x -> Arrays.stream(x.split("\\+")))
Expand Down

0 comments on commit 5dea94b

Please sign in to comment.