Skip to content

Commit

Permalink
[FLINK-25544][streaming][JUnit5 Migration] The api package of module …
Browse files Browse the repository at this point in the history
…flink-stream-java (#24429)
  • Loading branch information
Jiabao-Sun authored Mar 7, 2024
1 parent 5cc49c4 commit 6f7b248
Show file tree
Hide file tree
Showing 121 changed files with 3,979 additions and 4,411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link AggregationFunction}. */
public class AggregationFunctionTest {
class AggregationFunctionTest {

@Test
public void groupSumIntegerTest() throws Exception {
void groupSumIntegerTest() throws Exception {

// preparing expected outputs
List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
Expand Down Expand Up @@ -124,13 +124,13 @@ public void groupSumIntegerTest() throws Exception {
keySelector,
keyType);

assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
assertThat(groupedSumList).isEqualTo(expectedGroupSumList);
assertThat(groupedMinList).isEqualTo(expectedGroupMinList);
assertThat(groupedMaxList).isEqualTo(expectedGroupMaxList);
}

@Test
public void pojoGroupSumIntegerTest() throws Exception {
void pojoGroupSumIntegerTest() throws Exception {

// preparing expected outputs
List<MyPojo> expectedGroupSumList = new ArrayList<>();
Expand Down Expand Up @@ -204,13 +204,13 @@ public void pojoGroupSumIntegerTest() throws Exception {
keySelector,
keyType);

assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
assertThat(groupedSumList).isEqualTo(expectedGroupSumList);
assertThat(groupedMinList).isEqualTo(expectedGroupMinList);
assertThat(groupedMaxList).isEqualTo(expectedGroupMaxList);
}

@Test
public void minMaxByTest() throws Exception {
void minMaxByTest() throws Exception {
// Tuples are grouped on field 0, aggregated on field 1

// preparing expected outputs
Expand Down Expand Up @@ -283,49 +283,49 @@ public void minMaxByTest() throws Exception {
ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);

assertEquals(
maxByFirstExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType));

assertEquals(
maxByLastExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType));

assertEquals(
minByLastExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType));

assertEquals(
minByFirstExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType));
assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType))
.isEqualTo(maxByFirstExpected);

assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType))
.isEqualTo(maxByLastExpected);

assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType))
.isEqualTo(minByLastExpected);

assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByList(),
keySelector,
keyType))
.isEqualTo(minByFirstExpected);
}

@Test
public void pojoMinMaxByTest() throws Exception {
void pojoMinMaxByTest() throws Exception {
// Pojos are grouped on field 0, aggregated on field 1

// preparing expected outputs
Expand Down Expand Up @@ -397,45 +397,45 @@ public void pojoMinMaxByTest() throws Exception {
ReduceFunction<MyPojo3> minByFunctionLast =
new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);

assertEquals(
maxByFirstExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType));

assertEquals(
maxByLastExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType));

assertEquals(
minByLastExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType));

assertEquals(
minByFirstExpected,
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType));
assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType))
.isEqualTo(maxByFirstExpected);

assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
maxByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType))
.isEqualTo(maxByLastExpected);

assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionLast,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType))
.isEqualTo(minByLastExpected);

assertThat(
MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduceOperator<>(
minByFunctionFirst,
typeInfo.createSerializer(config.getSerializerConfig())),
getInputByPojoList(),
keySelector,
keyType))
.isEqualTo(minByFirstExpected);
}

// *************************************************************************
Expand Down
Loading

0 comments on commit 6f7b248

Please sign in to comment.