Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#noissue] Cleanup kafka test #11743

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ public class KafkaClient_0_11_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
final TestProducer producer = new TestProducer(brokerUrl);
System.out.println("##brokerUrl=" + brokerUrl);
producer.sendMessage(brokerUrl, messageCount);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ public class KafkaClient_1_0_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_1_1_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_2_0_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_2_2_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ public class KafkaClient_2_3_0_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ public class KafkaClient_2_3_1_to_max_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class KafkaClient_2_4_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class KafkaClient_2_5_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class KafkaClient_2_6_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_2_7_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,31 @@ public class TestConsumer {

private final Logger logger = LogManager.getLogger(getClass());

private final Properties props;
private final Thread consumerThread;
private final Poller poller;

public TestConsumer(OffsetStore offsetStore, String brokerUrl) {
String testClassName = System.getProperty(SharedPluginTestConstants.TEST_CLAZZ_NAME);
String testSimpleClassName = testClassName != null ? testClassName.substring(testClassName.lastIndexOf(".") + 1) : "UNKNOWN";
String threadName = testSimpleClassName + "-test-poller";
poller = new Poller(offsetStore, brokerUrl);
this.props = getConsumerConfig(brokerUrl);
poller = new Poller(offsetStore, props);
consumerThread = new Thread(poller, threadName);
consumerThread.setDaemon(true);
}

private Properties getConsumerConfig(String brokerUrl) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
return props;
}

public void shutdown() throws InterruptedException {
poller.shutdown();
consumerThread.join(100L);
Expand All @@ -73,14 +86,7 @@ private static class Poller implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final OffsetStore offsetStore;

private Poller(OffsetStore offsetStore, String brokerUrl) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
private Poller(OffsetStore offsetStore, Properties props) {
consumer = new KafkaConsumer<>(props);
this.offsetStore = offsetStore;
}
Expand Down
Loading