Skip to content

Commit

Permalink
[FLINK-32846][runtime][JUnit5 Migration] The minicluster and net pack…
Browse files Browse the repository at this point in the history
…age of flink-runtime module (#23244)
  • Loading branch information
wangzzu authored Aug 27, 2023
1 parent ac61e83 commit 42f170b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
Expand All @@ -45,28 +44,22 @@
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration test cases for the {@link MiniCluster}. */
public class MiniClusterITCase extends TestLogger {
class MiniClusterITCase {

@Test
public void runJobWithSingleRpcService() throws Exception {
void runJobWithSingleRpcService() throws Exception {
final int numOfTMs = 3;
final int slotsPerTM = 7;

Expand All @@ -86,7 +79,7 @@ public void runJobWithSingleRpcService() throws Exception {
}

@Test
public void runJobWithMultipleRpcServices() throws Exception {
void runJobWithMultipleRpcServices() throws Exception {
final int numOfTMs = 3;
final int slotsPerTM = 7;

Expand All @@ -106,30 +99,26 @@ public void runJobWithMultipleRpcServices() throws Exception {
}

@Test
public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception {
try {
final JobVertex vertex1 = new JobVertex("Test Vertex1");
vertex1.setParallelism(1);
vertex1.setMaxParallelism(1);
vertex1.setInvokableClass(BlockingNoOpInvokable.class);

final JobVertex vertex2 = new JobVertex("Test Vertex2");
vertex2.setParallelism(1);
vertex2.setMaxParallelism(1);
vertex2.setInvokableClass(BlockingNoOpInvokable.class);

vertex2.connectNewDataSetAsInput(
vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2);

runHandleJobsWhenNotEnoughSlots(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertThat(e, FlinkMatchers.containsMessage("Job execution failed"));
assertThat(e, FlinkMatchers.containsCause(NoResourceAvailableException.class));
}
void testHandleStreamingJobsWhenNotEnoughSlot() {
final JobVertex vertex1 = new JobVertex("Test Vertex1");
vertex1.setParallelism(1);
vertex1.setMaxParallelism(1);
vertex1.setInvokableClass(BlockingNoOpInvokable.class);

final JobVertex vertex2 = new JobVertex("Test Vertex2");
vertex2.setParallelism(1);
vertex2.setMaxParallelism(1);
vertex2.setInvokableClass(BlockingNoOpInvokable.class);

vertex2.connectNewDataSetAsInput(
vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2);

assertThatThrownBy(() -> runHandleJobsWhenNotEnoughSlots(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(NoResourceAvailableException.class)
.hasMessageContaining("Job execution failed");
}

private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exception {
Expand All @@ -155,7 +144,7 @@ private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exc
}

@Test
public void testForwardJob() throws Exception {
void testForwardJob() throws Exception {
final int parallelism = 31;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -186,7 +175,7 @@ public void testForwardJob() throws Exception {
}

@Test
public void testBipartiteJob() throws Exception {
void testBipartiteJob() throws Exception {
final int parallelism = 31;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -217,7 +206,7 @@ public void testBipartiteJob() throws Exception {
}

@Test
public void testTwoInputJobFailingEdgeMismatch() throws Exception {
void testTwoInputJobFailingEdgeMismatch() throws Exception {
final int parallelism = 1;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -250,19 +239,16 @@ public void testTwoInputJobFailingEdgeMismatch() throws Exception {
final JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(sender1, receiver, sender2);

try {
miniCluster.executeJobBlocking(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, ArrayIndexOutOfBoundsException.class).isPresent());
assertTrue(findThrowableWithMessage(e, "2").isPresent());
}
assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class)
.rootCause()
.hasMessageContaining("2");
}
}

@Test
public void testTwoInputJob() throws Exception {
void testTwoInputJob() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -300,7 +286,7 @@ public void testTwoInputJob() throws Exception {
}

@Test
public void testSchedulingAllAtOnce() throws Exception {
void testSchedulingAllAtOnce() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -343,7 +329,7 @@ public void testSchedulingAllAtOnce() throws Exception {
}

@Test
public void testJobWithAFailingSenderVertex() throws Exception {
void testJobWithAFailingSenderVertex() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand All @@ -369,19 +355,16 @@ public void testJobWithAFailingSenderVertex() throws Exception {

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);

try {
miniCluster.executeJobBlocking(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, Exception.class).isPresent());
assertTrue(findThrowableWithMessage(e, "Test exception").isPresent());
}
assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(Exception.class)
.rootCause()
.hasMessageContaining("Test exception");
}
}

@Test
public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -417,19 +400,16 @@ public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);

try {
miniCluster.executeJobBlocking(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, Exception.class).isPresent());
assertTrue(findThrowableWithMessage(e, "Test exception").isPresent());
}
assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(Exception.class)
.rootCause()
.hasMessageContaining("Test exception");
}
}

@Test
public void testJobWithAFailingReceiverVertex() throws Exception {
void testJobWithAFailingReceiverVertex() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand All @@ -455,19 +435,16 @@ public void testJobWithAFailingReceiverVertex() throws Exception {

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);

try {
miniCluster.executeJobBlocking(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, Exception.class).isPresent());
assertTrue(findThrowableWithMessage(e, "Test exception").isPresent());
}
assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(Exception.class)
.rootCause()
.hasMessageContaining("Test exception");
}
}

@Test
public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand All @@ -493,20 +470,16 @@ public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);

try {
miniCluster.executeJobBlocking(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, Exception.class).isPresent());
assertTrue(
findThrowableWithMessage(e, "Test exception in constructor").isPresent());
}
assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(Exception.class)
.rootCause()
.hasMessageContaining("Test exception in constructor");
}
}

@Test
public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -542,20 +515,16 @@ public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);

try {
miniCluster.executeJobBlocking(jobGraph);

fail("Job should fail.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, Exception.class).isPresent());
assertTrue(
findThrowableWithMessage(e, "Test exception in constructor").isPresent());
}
assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
.isInstanceOf(JobExecutionException.class)
.hasCauseInstanceOf(Exception.class)
.rootCause()
.hasMessageContaining("Test exception in constructor");
}
}

@Test
public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg =
Expand Down Expand Up @@ -593,12 +562,12 @@ public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {

jobResultFuture.get().toJobExecutionResult(getClass().getClassLoader());

assertTrue(WaitOnFinalizeJobVertex.finalizedOnMaster.get());
assertThat(WaitOnFinalizeJobVertex.finalizedOnMaster).isTrue();
}
}

@Test
public void testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization() throws Exception {
void testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization() throws Exception {
final int parallelism = 1;

final MiniClusterConfiguration cfg =
Expand All @@ -625,22 +594,21 @@ public void testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization() throw
(JobSubmissionResult ignored) ->
miniCluster.requestJobResult(jobGraph.getJobID()));

try {
jobResultFuture.get().toJobExecutionResult(getClass().getClassLoader());
} catch (JobExecutionException e) {
assertThat(e, FlinkMatchers.containsCause(OutOfMemoryError.class));
assertThat(
findThrowable(e, OutOfMemoryError.class)
.map(OutOfMemoryError::getMessage)
.get(),
startsWith(
"Java heap space. A heap space-related out-of-memory error has occurred."));
}
assertThatThrownBy(
() ->
jobResultFuture
.get()
.toJobExecutionResult(getClass().getClassLoader()))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(OutOfMemoryError.class)
.rootCause()
.hasMessageContaining(
"Java heap space. A heap space-related out-of-memory error has occurred.");
}
}

@Test
public void testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization() throws Exception {
void testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization() throws Exception {
final int parallelism = 1;

final MiniClusterConfiguration cfg =
Expand All @@ -667,15 +635,15 @@ public void testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization() thr
(JobSubmissionResult ignored) ->
miniCluster.requestJobResult(jobGraph.getJobID()));

try {
jobResultFuture.get();
} catch (ExecutionException e) {
assertThat(e, FlinkMatchers.containsCause(OutOfMemoryError.class));
assertThat(
e,
FlinkMatchers.containsMessage(
"Java heap space. A heap space-related out-of-memory error has occurred."));
}
assertThatThrownBy(
() ->
jobResultFuture
.get()
.toJobExecutionResult(getClass().getClassLoader()))
.isInstanceOf(JobExecutionException.class)
.hasRootCauseInstanceOf(OutOfMemoryError.class)
.rootCause()
.hasMessageContaining("Java heap space");
}
}

Expand Down
Loading

0 comments on commit 42f170b

Please sign in to comment.