diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplier.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplier.java index 4378dac1ef..51918afd76 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplier.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplier.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import java.util.List; +import java.util.Objects; public class DefaultKafkaClusterConfigSupplier implements KafkaClusterConfigSupplier { private final KafkaClusterConfig kafkaClusterConfig; @@ -19,21 +20,21 @@ public DefaultKafkaClusterConfigSupplier(KafkaClusterConfig kafkaClusterConfig) @Override public List getBootStrapServers() { - return kafkaClusterConfig.getBootStrapServers(); + return Objects.nonNull(kafkaClusterConfig) ? kafkaClusterConfig.getBootStrapServers() : null; } @Override public AuthConfig getAuthConfig() { - return kafkaClusterConfig.getAuthConfig(); + return Objects.nonNull(kafkaClusterConfig) ? kafkaClusterConfig.getAuthConfig() : null; } @Override public AwsConfig getAwsConfig() { - return kafkaClusterConfig.getAwsConfig(); + return Objects.nonNull(kafkaClusterConfig) ? kafkaClusterConfig.getAwsConfig() : null; } @Override public EncryptionConfig getEncryptionConfig() { - return kafkaClusterConfig.getEncryptionConfig(); + return Objects.nonNull(kafkaClusterConfig) ? kafkaClusterConfig.getEncryptionConfig() : null; } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplierTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplierTest.java index ba9ff9cbe4..b11c431108 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplierTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/extension/DefaultKafkaClusterConfigSupplierTest.java @@ -30,7 +30,7 @@ private DefaultKafkaClusterConfigSupplier createObjectUnderTest() { } @Test - void test_getters() { + void testGetters() { final List bootstrapServers = List.of("localhost:9092"); final AuthConfig authConfig = mock(AuthConfig.class); final AwsConfig awsConfig = mock(AwsConfig.class); @@ -45,4 +45,14 @@ void test_getters() { assertThat(defaultKafkaClusterConfigSupplier.getAwsConfig(), equalTo(awsConfig)); assertThat(defaultKafkaClusterConfigSupplier.getEncryptionConfig(), equalTo(encryptionConfig)); } + + @Test + void testGettersWithNullClusterConfig() { + DefaultKafkaClusterConfigSupplier defaultKafkaClusterConfigSupplier = + new DefaultKafkaClusterConfigSupplier(null); + assertThat(defaultKafkaClusterConfigSupplier.getBootStrapServers(), equalTo(null)); + assertThat(defaultKafkaClusterConfigSupplier.getAuthConfig(), equalTo(null)); + assertThat(defaultKafkaClusterConfigSupplier.getAwsConfig(), equalTo(null)); + assertThat(defaultKafkaClusterConfigSupplier.getEncryptionConfig(), equalTo(null)); + } }