Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25538][Connectors/Kafka] Migration flink-connector-kafka from Junit4 to Junit5 #106

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.flink.connector.kafka.sink;

import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;

Expand All @@ -29,12 +30,13 @@
* Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
* KafkaCommittableSerializer}.
*/
public class KafkaCommittableSerializerTest extends TestLogger {
@ExtendWith({TestLoggerExtension.class})
class KafkaCommittableSerializerTest {

private static final KafkaCommittableSerializer SERIALIZER = new KafkaCommittableSerializer();

@Test
public void testCommittableSerDe() throws IOException {
void testCommittableSerDe() throws IOException {
final String transactionalId = "test-id";
final short epoch = 5;
final KafkaCommittable committable = new KafkaCommittable(1L, epoch, transactionalId, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@

/** Tests for {@link KafkaCommitter}. */
@ExtendWith({TestLoggerExtension.class})
public class KafkaCommitterTest {
class KafkaCommitterTest {

private static final int PRODUCER_ID = 0;
private static final short EPOCH = 0;
private static final String TRANSACTIONAL_ID = "transactionalId";

/** Causes a network error by inactive broker and tests that a retry will happen. */
@Test
public void testRetryCommittableOnRetriableError() throws IOException, InterruptedException {
void testRetryCommittableOnRetriableError() throws IOException, InterruptedException {
Properties properties = getProperties();
try (final KafkaCommitter committer = new KafkaCommitter(properties);
FlinkKafkaInternalProducer<Object, Object> producer =
Expand All @@ -66,7 +66,7 @@ public void testRetryCommittableOnRetriableError() throws IOException, Interrupt
}

@Test
public void testFailJobOnUnknownFatalError() throws IOException, InterruptedException {
void testFailJobOnUnknownFatalError() throws IOException, InterruptedException {
Properties properties = getProperties();
try (final KafkaCommitter committer = new KafkaCommitter(properties);
FlinkKafkaInternalProducer<Object, Object> producer =
Expand All @@ -87,7 +87,7 @@ public void testFailJobOnUnknownFatalError() throws IOException, InterruptedExce
}

@Test
public void testKafkaCommitterClosesProducer() throws IOException, InterruptedException {
void testKafkaCommitterClosesProducer() throws IOException, InterruptedException {
Properties properties = getProperties();
FlinkKafkaInternalProducer<Object, Object> producer =
new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestLoggerExtension;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
Expand All @@ -31,8 +31,9 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
Expand All @@ -47,33 +48,34 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
@ExtendWith({TestLoggerExtension.class})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we don't need to put each time this ExtendsWith for almost every test
instead just put it once in services (common practice in different flink modules) like e.g. here
https://github.com/apache/flink/blob/e3b123d7d1e48e7adbb04fb3470d02bb76f5ff73/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension#L16

class KafkaRecordSerializationSchemaBuilderTest {

private static final String DEFAULT_TOPIC = "test";

private static Map<String, ?> configurableConfiguration;
private static Map<String, ?> configuration;
private static boolean isKeySerializer;

@Before
public void setUp() {
@BeforeEach
void setUp() {
configurableConfiguration = new HashMap<>();
configuration = new HashMap<>();
isKeySerializer = false;
}

@Test
public void testDoNotAllowMultipleKeySerializer() {
void testDoNotAllowMultipleKeySerializer() {
assertOnlyOneSerializerAllowed(keySerializationSetter());
}

@Test
public void testDoNotAllowMultipleValueSerializer() {
void testDoNotAllowMultipleValueSerializer() {
assertOnlyOneSerializerAllowed(valueSerializationSetter());
}

@Test
public void testDoNotAllowMultipleTopicSelector() {
void testDoNotAllowMultipleTopicSelector() {
assertThatThrownBy(
() ->
KafkaRecordSerializationSchema.builder()
Expand All @@ -89,7 +91,7 @@ public void testDoNotAllowMultipleTopicSelector() {
}

@Test
public void testExpectTopicSelector() {
void testExpectTopicSelector() {
assertThatThrownBy(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
Expand All @@ -98,13 +100,13 @@ public void testExpectTopicSelector() {
}

@Test
public void testExpectValueSerializer() {
void testExpectValueSerializer() {
assertThatThrownBy(KafkaRecordSerializationSchema.builder().setTopic(DEFAULT_TOPIC)::build)
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testSerializeRecordWithTopicSelector() {
void testSerializeRecordWithTopicSelector() {
final TopicSelector<String> topicSelector =
(e) -> {
if (e.equals("a")) {
Expand All @@ -129,7 +131,7 @@ public void testSerializeRecordWithTopicSelector() {
}

@Test
public void testSerializeRecordWithPartitioner() throws Exception {
void testSerializeRecordWithPartitioner() throws Exception {
AtomicBoolean opened = new AtomicBoolean(false);
final int partition = 5;
final FlinkKafkaPartitioner<Object> partitioner =
Expand All @@ -148,7 +150,7 @@ public void testSerializeRecordWithPartitioner() throws Exception {
}

@Test
public void testSerializeRecordWithHeaderProvider() throws Exception {
void testSerializeRecordWithHeaderProvider() throws Exception {
final HeaderProvider<String> headerProvider =
(ignored) ->
new RecordHeaders(
Expand All @@ -169,7 +171,7 @@ public void testSerializeRecordWithHeaderProvider() throws Exception {
}

@Test
public void testSerializeRecordWithKey() {
void testSerializeRecordWithKey() {
final SerializationSchema<String> serializationSchema = new SimpleStringSchema();
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
Expand All @@ -184,7 +186,7 @@ public void testSerializeRecordWithKey() {
}

@Test
public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception {
void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception {
final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue");
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
Expand All @@ -201,7 +203,7 @@ public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception
}

@Test
public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exception {
void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exception {
final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue");
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
Expand All @@ -215,7 +217,7 @@ public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exceptio
}

@Test
public void testSerializeRecordWithKafkaSerializer() throws Exception {
void testSerializeRecordWithKafkaSerializer() throws Exception {
final Map<String, String> config = Collections.singletonMap("configKey", "configValue");
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
Expand All @@ -231,7 +233,7 @@ public void testSerializeRecordWithKafkaSerializer() throws Exception {
}

@Test
public void testSerializeRecordWithTimestamp() {
void testSerializeRecordWithTimestamp() {
final SerializationSchema<String> serializationSchema = new SimpleStringSchema();
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class KafkaSinkBuilderTest extends TestLogger {
};

@Test
public void testPropertyHandling() {
void testPropertyHandling() {
validateProducerConfig(
getBasicBuilder(),
p -> {
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testPropertyHandling() {
}

@Test
public void testBootstrapServerSetting() {
void testBootstrapServerSetting() {
Properties testConf1 = new Properties();
testConf1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer");

Expand Down
Loading