Skip to content

Commit

Permalink
#1185 - Integration Test for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Duy Le Van committed Oct 16, 2024
1 parent f19c79c commit 4e3fd48
Show file tree
Hide file tree
Showing 10 changed files with 526 additions and 74 deletions.
14 changes: 14 additions & 0 deletions common-library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<sonar.organization>nashtech-garage</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.projectKey>nashtech-garage_yas-common-library</sonar.projectKey>
<kafka-it.version>1.19.7</kafka-it.version>
</properties>
<dependencies>
<dependency>
Expand All @@ -43,6 +44,19 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Test -->
<!-- TODO: check duplicate -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${kafka-it.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
43 changes: 43 additions & 0 deletions common-library/src/it/java/common/container/ContainerFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package common.container;

import dasniko.testcontainers.keycloak.KeycloakContainer;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public final class ContainerFactory {

private ContainerFactory() {}

public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
KeycloakContainer keycloak = new KeycloakContainer()
.withRealmImportFiles("/test-realm.json")
.withReuse(true);

registry.add(
"spring.security.oauth2.resourceserver.jwt.issuer-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus")
);
registry.add(
"spring.security.oauth2.resourceserver.jwt.jwk-set-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs")
);
return keycloak;
}

public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) {
var kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version))
);
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Consumer properties
registry.add("auto.offset.reset", () -> "earliest");
registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Producer properties
registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

}
118 changes: 118 additions & 0 deletions common-library/src/it/java/common/kafka/CdcConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package common.kafka;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.Assert;
import org.springframework.web.client.RestClient;
import org.testcontainers.containers.KafkaContainer;

@Getter
public abstract class CdcConsumerTest<M> {

private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);

private final Class<M> messageType;

private final String cdcEvent;

@Autowired
private KafkaContainer kafkaContainer;

@MockBean
private RestClient restClient;

@Mock
RestClient.ResponseSpec responseSpec;

@Mock
RestClient.RequestHeadersUriSpec requestHeadersUriSpec;

private KafkaTemplate<String, M> kafkaTemplate;

public CdcConsumerTest(Class<M> messageType, String topicEvent) {
Assert.notNull(topicEvent, "CDC topic must not be null");
Assert.notNull(messageType, "Message type must not be null");
this.cdcEvent = topicEvent;
this.messageType = messageType;
}

public synchronized KafkaTemplate<String, M> getKafkaTemplate() {
// Now, we haven't had any process need Kafka Producer,
// then just temporary config producer like this for testing
if (kafkaTemplate == null) {
synchronized (this) {
final Map<String, Object> props = getProducerProps();
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, M>(props));
}
}
return kafkaTemplate;
}

protected void sendMsg(M message)
throws InterruptedException, ExecutionException, TimeoutException {
var rs = getKafkaTemplate()
.send(this.cdcEvent, message)
.get(10, TimeUnit.SECONDS);
logger.info("Sent message completed: {}", rs);
}

protected <R> void simulateHttpRequestWithResponse(URI url, R response, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenReturn(response);
}

protected <R> void simulateHttpRequestWithError(URI url, Throwable exception, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenThrow(exception);
}

private void setupMockGetRequest(URI url) {
when(restClient.get()).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.uri(url)).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.headers(any())).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec);
}

public void waitForConsumer(
long processTime,
int numOfRecords,
int attempts,
long backOff
) throws InterruptedException {
long retryTime = processTime + (attempts * processTime) + (backOff * attempts);
long waitTime = (attempts > 0 ? retryTime : processTime) * numOfRecords;
logger.info("Consumer Processing expected time: {}s", waitTime);
Thread.sleep(Duration.ofSeconds(waitTime));
}

private @NotNull Map<String, Object> getProducerProps() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@

public class ElasticTestContainer extends ElasticsearchContainer {

private static final String DOCKER_ELASTIC = "docker.elastic.co/elasticsearch/elasticsearch:7.17.6";
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:%s";

private static final String CLUSTER_NAME = "cluster.name";

private static final String ELASTIC_SEARCH = "elasticsearch";

public ElasticTestContainer() {
super(DOCKER_ELASTIC);
public ElasticTestContainer(String version) {
super(IMAGE_NAME.formatted(version));
this.addFixedExposedPort(9200, 9200);
this.addFixedExposedPort(9300, 9300);
this.addEnv(CLUSTER_NAME, ELASTIC_SEARCH);
this.withEnv("xpack.security.transport.ssl.enabled", "false");
this.withEnv("xpack.security.http.ssl.enabled", "false");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.yas.search.config;

import common.container.ContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;

@TestConfiguration
public class KafkaIntegrationTestConfiguration {

@Value("${kafka.version}")
private String kafkaVersion;

@Value("${elasticsearch.version}")
private String elasticSearchVersion;

@Bean
@ServiceConnection
public KafkaContainer kafkaContainer(DynamicPropertyRegistry registry) {
return ContainerFactory.kafkaContainer(registry, kafkaVersion);
}

@Bean
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
return new ElasticTestContainer(elasticSearchVersion);
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package com.yas.search.config;

import com.yas.commonlibrary.IntegrationTestConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;

@TestConfiguration
public class SearchTestConfig extends IntegrationTestConfiguration {

@Value("${elasticsearch.version}")
private String elasticSearchVersion;

@Bean(destroyMethod = "stop")
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
return new ElasticTestContainer();
return new ElasticTestContainer(elasticSearchVersion);
}
}
Loading

0 comments on commit 4e3fd48

Please sign in to comment.