Skip to content

Commit

Permalink
[#11705] Cleanup kafka test
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Nov 19, 2024
1 parent b5c4d41 commit 505c0ac
Show file tree
Hide file tree
Showing 28 changed files with 172 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ public class KafkaClient_0_11_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
final TestProducer producer = new TestProducer(brokerUrl);
System.out.println("##brokerUrl=" + brokerUrl);
producer.sendMessage(brokerUrl, messageCount);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ public class KafkaClient_1_0_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_1_1_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_2_0_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_2_2_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ public class KafkaClient_2_3_0_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ public class KafkaClient_2_3_1_to_max_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class KafkaClient_2_4_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class KafkaClient_2_5_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class KafkaClient_2_6_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ public class KafkaClient_2_7_x_IT extends KafkaClient2ITBase {
@Test
public void producerSendTest() throws NoSuchMethodException {
int messageCount = random.nextInt(5) + 1;
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, messageCount);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(messageCount);
KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Disabled
@Test
public void recordEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_RECORD);
KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset);
}

@Test
public void recordMultiEntryPointTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
final TestProducer producer = new TestProducer(brokerUrl);
producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS);
KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,31 @@ public class TestConsumer {

private final Logger logger = LogManager.getLogger(getClass());

private final Properties props;
private final Thread consumerThread;
private final Poller poller;

public TestConsumer(OffsetStore offsetStore, String brokerUrl) {
String testClassName = System.getProperty(SharedPluginTestConstants.TEST_CLAZZ_NAME);
String testSimpleClassName = testClassName != null ? testClassName.substring(testClassName.lastIndexOf(".") + 1) : "UNKNOWN";
String threadName = testSimpleClassName + "-test-poller";
poller = new Poller(offsetStore, brokerUrl);
this.props = getConsumerConfig(brokerUrl);
poller = new Poller(offsetStore, props);
consumerThread = new Thread(poller, threadName);
consumerThread.setDaemon(true);
}

private Properties getConsumerConfig(String brokerUrl) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
return props;
}

public void shutdown() throws InterruptedException {
poller.shutdown();
consumerThread.join(100L);
Expand All @@ -73,14 +86,7 @@ private static class Poller implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final OffsetStore offsetStore;

private Poller(OffsetStore offsetStore, String brokerUrl) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
private Poller(OffsetStore offsetStore, Properties props) {
consumer = new KafkaConsumer<>(props);
this.offsetStore = offsetStore;
}
Expand Down
Loading

0 comments on commit 505c0ac

Please sign in to comment.