diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_0_11_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_0_11_x_IT.java index eab8a53d9202..2677f14fc05d 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_0_11_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_0_11_x_IT.java @@ -53,24 +53,24 @@ public class KafkaClient_0_11_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); System.out.println("##brokerUrl=" + brokerUrl); - producer.sendMessage(brokerUrl, messageCount); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_0_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_0_x_IT.java index 7ca4a40b2e3a..1ef3f02a9fbe 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_0_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_0_x_IT.java @@ -55,23 +55,23 @@ public class KafkaClient_1_0_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_1_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_1_x_IT.java index a7ea4bb0f473..672f5871c367 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_1_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_1_1_x_IT.java @@ -53,23 +53,23 @@ public class KafkaClient_1_1_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_0_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_0_x_IT.java index 7fe20fc81fce..30941dc67dfa 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_0_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_0_x_IT.java @@ -53,23 +53,23 @@ public class KafkaClient_2_0_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_2_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_2_x_IT.java index 40d6bfd0f172..89c23d8d1856 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_2_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_2_x_IT.java @@ -53,23 +53,23 @@ public class KafkaClient_2_2_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_0_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_0_IT.java index a4962a7ad689..d2195b705df0 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_0_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_0_IT.java @@ -38,23 +38,23 @@ public class KafkaClient_2_3_0_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_1_to_max_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_1_to_max_IT.java index 00a67bbdb3ef..c16b5a8806c1 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_1_to_max_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_3_1_to_max_IT.java @@ -38,23 +38,23 @@ public class KafkaClient_2_3_1_to_max_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_4_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_4_x_IT.java index c159ec219e10..31752f29cf97 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_4_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_4_x_IT.java @@ -37,23 +37,23 @@ public class KafkaClient_2_4_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_5_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_5_x_IT.java index 3e8e2647e2e7..72bcbece7c3e 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_5_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_5_x_IT.java @@ -37,23 +37,23 @@ public class KafkaClient_2_5_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_6_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_6_x_IT.java index 0fec6f671aa8..235e5c14d1a4 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_6_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_6_x_IT.java @@ -37,23 +37,23 @@ public class KafkaClient_2_6_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_7_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_7_x_IT.java index b68aeff53eac..1895e795a005 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_7_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_7_x_IT.java @@ -53,23 +53,23 @@ public class KafkaClient_2_7_x_IT extends KafkaClient2ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, messageCount); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_RECORD); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_RECORD); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestConsumer.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestConsumer.java index 142b9b7ec8ad..acfd71b46f26 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestConsumer.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestConsumer.java @@ -45,6 +45,7 @@ public class TestConsumer { private final Logger logger = LogManager.getLogger(getClass()); + private final Properties props; private final Thread consumerThread; private final Poller poller; @@ -52,11 +53,23 @@ public TestConsumer(OffsetStore offsetStore, String brokerUrl) { String testClassName = System.getProperty(SharedPluginTestConstants.TEST_CLAZZ_NAME); String testSimpleClassName = testClassName != null ? testClassName.substring(testClassName.lastIndexOf(".") + 1) : "UNKNOWN"; String threadName = testSimpleClassName + "-test-poller"; - poller = new Poller(offsetStore, brokerUrl); + this.props = getConsumerConfig(brokerUrl); + poller = new Poller(offsetStore, props); consumerThread = new Thread(poller, threadName); consumerThread.setDaemon(true); } + private Properties getConsumerConfig(String brokerUrl) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true)); + props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + return props; + } + public void shutdown() throws InterruptedException { poller.shutdown(); consumerThread.join(100L); @@ -73,14 +86,7 @@ private static class Poller implements Runnable { private final KafkaConsumer consumer; private final OffsetStore offsetStore; - private Poller(OffsetStore offsetStore, String brokerUrl) { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true)); - props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + private Poller(OffsetStore offsetStore, Properties props) { consumer = new KafkaConsumer<>(props); this.offsetStore = offsetStore; } diff --git a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java index 23e1a3ece6b4..21fbdf42b2b0 100644 --- a/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java +++ b/agent-module/plugins-it/kafka-it/kafka-2-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java @@ -32,16 +32,26 @@ */ public class TestProducer { - public void sendMessage(String brokerUrl, int messageCount) { - sendMessage(brokerUrl, messageCount, TRACE_TYPE_RECORD); + private final Properties props; + + public TestProducer(String brokerUrl) { + this.props = getProducerConfig(brokerUrl); } - public void sendMessage(String brokerUrl, int messageCount, String traceType) { + private Properties getProducerConfig(String brokerUrl) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return props; + } + + public void sendMessage(int messageCount) { + sendMessage(messageCount, TRACE_TYPE_RECORD); + } + + public void sendMessage(int messageCount, String traceType) { KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < messageCount; i++) { ProducerRecord record = new ProducerRecord<>(TOPIC, traceType, MESSAGE); @@ -50,7 +60,4 @@ public void sendMessage(String brokerUrl, int messageCount, String traceType) { producer.flush(); producer.close(); } - - - } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_8_x_3_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_8_x_3_IT.java index 1f0e45f1f505..175f57e14f26 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_8_x_3_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_2_8_x_3_IT.java @@ -48,24 +48,24 @@ public class KafkaClient_2_8_x_3_IT extends KafkaClient3ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessage(brokerUrl, messageCount); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_0_x_3_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_0_x_3_IT.java index eb8ed95acfff..95a6bf40fde2 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_0_x_3_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_0_x_3_IT.java @@ -48,24 +48,24 @@ public class KafkaClient_3_0_x_3_IT extends KafkaClient3ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessage(brokerUrl, messageCount); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_1_x_3_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_1_x_3_IT.java index 21d1c101bbdf..68a6b4d18577 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_1_x_3_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_1_x_3_IT.java @@ -48,24 +48,24 @@ public class KafkaClient_3_1_x_3_IT extends KafkaClient3ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessage(brokerUrl, messageCount); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_2_x_3_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_2_x_3_IT.java index ee8b1a02057d..dd21db4d0224 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_2_x_3_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaClient_3_2_x_3_IT.java @@ -48,24 +48,24 @@ public class KafkaClient_3_2_x_3_IT extends KafkaClient3ITBase { @Test public void producerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessage(brokerUrl, messageCount); + producer.sendMessage(messageCount); KafkaClientITBase.verifyProducerSend(brokerUrl, messageCount); } @Disabled @Test public void recordEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1); KafkaClientITBase.verifySingleConsumerEntryPoint(brokerUrl, offset); } @Test public void recordMultiEntryPointTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessage(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessage(1, TRACE_TYPE_MULTI_RECORDS); KafkaClientITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_5_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_5_x_IT.java index 3b3583971970..aa0f8351a586 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_5_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_5_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_2_5_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_2_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_2_x_IT.java index 88a175a87ab8..7977089cdc1a 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_2_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_2_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_2_6_2_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_x_IT.java index 6aeca8fbfe31..484920a45027 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_6_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_2_6_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_7_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_7_x_IT.java index 87fc6fabeed1..fd8b954770ac 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_7_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_7_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_2_7_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_8_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_8_x_IT.java index 625eba8df313..adc85a777f90 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_8_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_2_8_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_2_8_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_0_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_0_x_IT.java index c06ecc8641ca..42f46fadfe77 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_0_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_0_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_3_0_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_1_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_1_x_IT.java index b2cae2822c9f..3b469190b5ca 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_1_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_1_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_3_1_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_2_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_2_x_IT.java index 893befd85634..3557f61ac6aa 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_2_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_2_x_IT.java @@ -48,16 +48,16 @@ public class KafkaStreams_3_2_x_IT extends KafkaStreamsIT { @Test public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_3_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_3_x_IT.java index b46a53595d27..d38ddc3041fe 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_3_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_3_x_IT.java @@ -50,17 +50,17 @@ public class KafkaStreams_3_3_x_IT extends KafkaStreamsIT { @Disabled public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test @Disabled public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_4_x_IT.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_4_x_IT.java index 6dd654911638..22e6a9a2bbfd 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_4_x_IT.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/com/navercorp/pinpoint/it/plugin/kafka/KafkaStreams_3_4_x_IT.java @@ -50,17 +50,17 @@ public class KafkaStreams_3_4_x_IT extends KafkaStreamsIT { @Disabled public void streamsProducerSendTest() throws NoSuchMethodException { int messageCount = random.nextInt(5) + 1; - final TestProducer producer = new TestProducer(); + final TestProducer producer = new TestProducer(brokerUrl); - producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD); + producer.sendMessageForStream(messageCount, TRACE_TYPE_RECORD); KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount); } @Test @Disabled public void streamsConsumeTest() throws NoSuchMethodException { - final TestProducer producer = new TestProducer(); - producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS); + final TestProducer producer = new TestProducer(brokerUrl); + producer.sendMessageForStream(1, TRACE_TYPE_MULTI_RECORDS); KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl); } } diff --git a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java index 9da4f910840f..1f4f3f17732f 100644 --- a/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java +++ b/agent-module/plugins-it/kafka-it/kafka-3-it/src/test/java/test/pinpoint/plugin/kafka/TestProducer.java @@ -33,16 +33,26 @@ */ public class TestProducer { - public void sendMessage(String brokerUrl, int messageCount) { - sendMessage(brokerUrl, messageCount, TRACE_TYPE_RECORD); + private final Properties props; + + public TestProducer(String brokerUrl) { + props = getProducerConfig(brokerUrl); } - public void sendMessage(String brokerUrl, int messageCount, String traceType) { + private Properties getProducerConfig(String brokerUrl) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return props; + } + + public void sendMessage(int messageCount) { + sendMessage(messageCount, TRACE_TYPE_RECORD); + } + + public void sendMessage(int messageCount, String traceType) { KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < messageCount; i++) { ProducerRecord record = new ProducerRecord<>(TOPIC, traceType, MESSAGE); @@ -52,12 +62,7 @@ public void sendMessage(String brokerUrl, int messageCount, String traceType) { producer.close(); } - public void sendMessageForStream(String brokerUrl, int messageCount, String traceType) { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); - props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + public void sendMessageForStream(int messageCount, String traceType) { KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < messageCount; i++) { ProducerRecord record = new ProducerRecord<>(INPUT_TOPIC, traceType, MESSAGE);