Skip to content

Commit

Permalink
[FLINK-32847][flink-runtime][JUnit5 Migration] Module: The operators …
Browse files Browse the repository at this point in the history
…package of flink-runtime (#23233)
  • Loading branch information
Jiabao-Sun authored Aug 27, 2023
1 parent c1fba73 commit ac61e83
Show file tree
Hide file tree
Showing 86 changed files with 2,678 additions and 2,787 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.util.Collector;

import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.TestTemplate;

public abstract class AbstractOuterJoinTaskExternalITCase
import static org.assertj.core.api.Assertions.assertThat;

abstract class AbstractOuterJoinTaskExternalITCase
extends BinaryOperatorTestBase<
FlatJoinFunction<
Tuple2<Integer, Integer>,
Expand Down Expand Up @@ -75,13 +76,13 @@ public abstract class AbstractOuterJoinTaskExternalITCase
protected final CountingOutputCollector<Tuple2<Integer, Integer>> output =
new CountingOutputCollector<>();

public AbstractOuterJoinTaskExternalITCase(ExecutionConfig config) {
AbstractOuterJoinTaskExternalITCase(ExecutionConfig config) {
super(config, HASH_MEM, 2, SORT_MEM);
bnljn_frac = (double) BNLJN_MEM / this.getMemoryManager().getMemorySize();
}

@Test
public void testExternalSortOuterJoinTask() throws Exception {
@TestTemplate
void testExternalSortOuterJoinTask() throws Exception {
final int keyCnt1 = 16384 * 4;
final int valCnt1 = 2;

Expand Down Expand Up @@ -114,7 +115,9 @@ public void testExternalSortOuterJoinTask() throws Exception {
this.comparator2.duplicate());
testDriver(testTask, MockJoinStub.class);

Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
assertThat(this.output.getNumberOfRecords())
.withFailMessage("Wrong result set size.")
.isEqualTo(expCnt);
}

protected abstract int calculateExpectedCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,16 @@
import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.util.Collector;

import org.apache.flink.shaded.guava31.com.google.common.base.Throwables;

import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.TestTemplate;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public abstract class AbstractOuterJoinTaskTest
abstract class AbstractOuterJoinTaskTest
extends BinaryOperatorTestBase<
FlatJoinFunction<
Tuple2<Integer, Integer>,
Expand Down Expand Up @@ -89,13 +86,13 @@ public abstract class AbstractOuterJoinTaskTest
(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
new TypeSerializer<?>[] {IntSerializer.INSTANCE, IntSerializer.INSTANCE});

public AbstractOuterJoinTaskTest(ExecutionConfig config) {
AbstractOuterJoinTaskTest(ExecutionConfig config) {
super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
bnljn_frac = (double) BNLJN_MEM / this.getMemoryManager().getMemorySize();
}

@Test
public void testSortBoth1OuterJoinTask() throws Exception {
@TestTemplate
void testSortBoth1OuterJoinTask() throws Exception {
final int keyCnt1 = 20;
final int valCnt1 = 1;

Expand All @@ -105,8 +102,8 @@ public void testSortBoth1OuterJoinTask() throws Exception {
testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
}

@Test
public void testSortBoth2OuterJoinTask() throws Exception {
@TestTemplate
void testSortBoth2OuterJoinTask() throws Exception {
final int keyCnt1 = 20;
final int valCnt1 = 1;

Expand All @@ -116,8 +113,8 @@ public void testSortBoth2OuterJoinTask() throws Exception {
testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
}

@Test
public void testSortBoth3OuterJoinTask() throws Exception {
@TestTemplate
void testSortBoth3OuterJoinTask() throws Exception {
int keyCnt1 = 20;
int valCnt1 = 1;

Expand All @@ -127,8 +124,8 @@ public void testSortBoth3OuterJoinTask() throws Exception {
testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
}

@Test
public void testSortBoth4OuterJoinTask() throws Exception {
@TestTemplate
void testSortBoth4OuterJoinTask() throws Exception {
int keyCnt1 = 20;
int valCnt1 = 20;

Expand All @@ -138,8 +135,8 @@ public void testSortBoth4OuterJoinTask() throws Exception {
testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
}

@Test
public void testSortBoth5OuterJoinTask() throws Exception {
@TestTemplate
void testSortBoth5OuterJoinTask() throws Exception {
int keyCnt1 = 20;
int valCnt1 = 20;

Expand All @@ -149,8 +146,8 @@ public void testSortBoth5OuterJoinTask() throws Exception {
testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
}

@Test
public void testSortBoth6OuterJoinTask() throws Exception {
@TestTemplate
void testSortBoth6OuterJoinTask() throws Exception {
int keyCnt1 = 10;
int valCnt1 = 1;

Expand Down Expand Up @@ -188,15 +185,15 @@ private void testSortBothOuterJoinTask(int keyCnt1, int valCnt1, int keyCnt2, in

final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);

Assert.assertTrue(
"Result set size was " + this.outList.size() + ". Expected was " + expCnt,
this.outList.size() == expCnt);
assertThat(this.outList)
.withFailMessage("Result set size was %d. Expected was %d", outList.size(), expCnt)
.hasSize(expCnt);

this.outList.clear();
}

@Test
public void testSortFirstOuterJoinTask() throws Exception {
@TestTemplate
void testSortFirstOuterJoinTask() throws Exception {
int keyCnt1 = 20;
int valCnt1 = 20;

Expand Down Expand Up @@ -226,15 +223,15 @@ public void testSortFirstOuterJoinTask() throws Exception {

final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);

Assert.assertTrue(
"Result set size was " + this.outList.size() + ". Expected was " + expCnt,
this.outList.size() == expCnt);
assertThat(this.outList)
.withFailMessage("Result set size was %d. Expected was %d", outList.size(), expCnt)
.hasSize(expCnt);

this.outList.clear();
}

@Test
public void testSortSecondOuterJoinTask() throws Exception {
@TestTemplate
void testSortSecondOuterJoinTask() throws Exception {
int keyCnt1 = 20;
int valCnt1 = 20;

Expand Down Expand Up @@ -264,15 +261,15 @@ public void testSortSecondOuterJoinTask() throws Exception {

final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);

Assert.assertTrue(
"Result set size was " + this.outList.size() + ". Expected was " + expCnt,
this.outList.size() == expCnt);
assertThat(this.outList)
.withFailMessage("Result set size was %d. Expected was %d", outList.size(), expCnt)
.hasSize(expCnt);

this.outList.clear();
}

@Test
public void testMergeOuterJoinTask() throws Exception {
@TestTemplate
void testMergeOuterJoinTask() throws Exception {
int keyCnt1 = 20;
int valCnt1 = 20;

Expand Down Expand Up @@ -300,15 +297,15 @@ public void testMergeOuterJoinTask() throws Exception {

final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);

Assert.assertTrue(
"Result set size was " + this.outList.size() + ". Expected was " + expCnt,
this.outList.size() == expCnt);
assertThat(this.outList)
.withFailMessage("Result set size was %d. Expected was %d", outList.size(), expCnt)
.hasSize(expCnt);

this.outList.clear();
}

@Test(expected = ExpectedTestException.class)
public void testFailingOuterJoinTask() throws Exception {
@TestTemplate
void testFailingOuterJoinTask() {
int keyCnt1 = 20;
int valCnt1 = 20;

Expand All @@ -332,11 +329,12 @@ public void testFailingOuterJoinTask() throws Exception {
addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, true), this.serializer);
addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, true), this.serializer);

testDriver(testTask, MockFailingJoinStub.class);
assertThatThrownBy(() -> testDriver(testTask, MockFailingJoinStub.class))
.isInstanceOf(ExpectedTestException.class);
}

@Test
public void testCancelOuterJoinTaskWhileSort1() throws Exception {
@TestTemplate
void testCancelOuterJoinTaskWhileSort1() throws Exception {
setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
addDriverComparator(this.comparator1);
addDriverComparator(this.comparator2);
Expand Down Expand Up @@ -379,16 +377,15 @@ public void run() {

taskRunner.join(60000);

assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
assertThat(taskRunner.isAlive())
.withFailMessage("Task thread did not finish within 60 seconds")
.isFalse();

final Throwable taskError = error.get();
if (taskError != null) {
fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
}
assertThat(error.get()).isNull();
}

@Test
public void testCancelOuterJoinTaskWhileSort2() throws Exception {
@TestTemplate
void testCancelOuterJoinTaskWhileSort2() throws Exception {
setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
addDriverComparator(this.comparator1);
addDriverComparator(this.comparator2);
Expand Down Expand Up @@ -431,16 +428,15 @@ public void run() {

taskRunner.join(60000);

assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
assertThat(taskRunner.isAlive())
.withFailMessage("Task thread did not finish within 60 seconds")
.isFalse();

final Throwable taskError = error.get();
if (taskError != null) {
fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
}
assertThat(error.get()).isNull();
}

@Test
public void testCancelOuterJoinTaskWhileRunning() throws Exception {
@TestTemplate
void testCancelOuterJoinTaskWhileRunning() throws Exception {
setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
addDriverComparator(this.comparator1);
addDriverComparator(this.comparator2);
Expand Down Expand Up @@ -480,12 +476,11 @@ public void run() {

taskRunner.join(60000);

assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
assertThat(taskRunner.isAlive())
.withFailMessage("Task thread did not finish within 60 seconds")
.isFalse();

final Throwable taskError = error.get();
if (taskError != null) {
fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
}
assertThat(error.get()).isNull();
}

protected abstract AbstractOuterJoinDriver<
Expand Down
Loading

0 comments on commit ac61e83

Please sign in to comment.