Skip to content

Commit

Permalink
[FLINK-36287] Migrate SinkITCase to AssertJ
Browse files Browse the repository at this point in the history
(cherry picked from commit 7976880)
  • Loading branch information
AHeise committed Nov 14, 2024
1 parent 1a5e4dc commit 244e955
Showing 1 changed file with 28 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@

import static java.util.stream.Collectors.joining;
import static org.apache.flink.streaming.runtime.operators.sink.TestSink.END_OF_INPUT_STR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration test for {@link org.apache.flink.api.connector.sink.Sink} run time implementation.
Expand All @@ -70,37 +69,36 @@ class SinkITCase extends AbstractTestBase {
// source send data two times
static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;

static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
static final String[] EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
SOURCE_DATA.stream()
// source send data two times
.flatMap(
x ->
Collections.nCopies(
2, Tuple3.of(x, null, Long.MIN_VALUE).toString())
.stream())
.collect(Collectors.toList());
.toArray(String[]::new);

static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE =
static final String[] EXPECTED_COMMITTED_DATA_IN_BATCH_MODE =
SOURCE_DATA.stream()
.map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString())
.collect(Collectors.toList());
.toArray(String[]::new);

static final List<String> EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE =
static final String[] EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE =
SOURCE_DATA.stream()
// source send data two times
.flatMap(
x ->
Collections.nCopies(
2, Tuple3.of(x, null, Long.MIN_VALUE).toString())
.stream())
.collect(Collectors.toList());
.toArray(String[]::new);

static final List<String> EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE =
Collections.singletonList(
SOURCE_DATA.stream()
.map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString())
.sorted()
.collect(joining("+")));
static final String EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE =
SOURCE_DATA.stream()
.map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString())
.sorted()
.collect(joining("+"));

static final Queue<String> COMMIT_QUEUE = new ConcurrentLinkedQueue<>();

Expand All @@ -113,7 +111,7 @@ class SinkITCase extends AbstractTestBase {
static final BooleanSupplier GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA =
(BooleanSupplier & Serializable)
() ->
getSplittedGlobalCommittedData().size()
getSplitGlobalCommittedData().size()
== STREAMING_SOURCE_SEND_ELEMENTS_NUM;

static final BooleanSupplier BOTH_QUEUE_RECEIVE_ALL_DATA =
Expand Down Expand Up @@ -159,13 +157,11 @@ void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Excepti
// the verification of "end of input" would be restored.
GLOBAL_COMMIT_QUEUE.remove(END_OF_INPUT_STR);

assertThat(
COMMIT_QUEUE,
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
assertThat(COMMIT_QUEUE)
.containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);

assertThat(
getSplittedGlobalCommittedData(),
containsInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
assertThat(getSplitGlobalCommittedData())
.containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE);
}

@Test
Expand All @@ -184,12 +180,10 @@ void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {

env.execute();

assertThat(
COMMIT_QUEUE, containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);

assertThat(
GLOBAL_COMMIT_QUEUE,
containsInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
assertThat(GLOBAL_COMMIT_QUEUE)
.containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE);
}

@Test
Expand All @@ -209,9 +203,8 @@ void writerAndCommitterExecuteInStreamingMode() throws Exception {
(Supplier<Queue<String>> & Serializable) () -> COMMIT_QUEUE)
.build());
env.execute();
assertThat(
COMMIT_QUEUE,
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
assertThat(COMMIT_QUEUE)
.containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
}

/**
Expand Down Expand Up @@ -262,8 +255,7 @@ void writerAndCommitterExecuteInBatchMode() throws Exception {
(Supplier<Queue<String>> & Serializable) () -> COMMIT_QUEUE)
.build());
env.execute();
assertThat(
COMMIT_QUEUE, containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
}

@Test
Expand Down Expand Up @@ -292,9 +284,8 @@ void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
// the verification of "end of input" would be restored.
GLOBAL_COMMIT_QUEUE.remove(END_OF_INPUT_STR);

assertThat(
getSplittedGlobalCommittedData(),
containsInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
assertThat(getSplitGlobalCommittedData())
.containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE);
}

@Test
Expand All @@ -310,12 +301,11 @@ void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
.build());
env.execute();

assertThat(
GLOBAL_COMMIT_QUEUE,
containsInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
assertThat(GLOBAL_COMMIT_QUEUE)
.containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE);
}

private static List<String> getSplittedGlobalCommittedData() {
private static List<String> getSplitGlobalCommittedData() {
return GLOBAL_COMMIT_QUEUE.stream()
.flatMap(x -> Arrays.stream(x.split("\\+")))
.collect(Collectors.toList());
Expand Down

0 comments on commit 244e955

Please sign in to comment.