Skip to content

Commit

Permalink
#1185 - Integration Test for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Duy Le Van committed Oct 21, 2024
1 parent 9f3a36f commit 1ac4576
Show file tree
Hide file tree
Showing 10 changed files with 545 additions and 77 deletions.
13 changes: 10 additions & 3 deletions common-library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,23 @@
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>

<!-- Kafka Dependencies -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>

<!-- Kafka Dependencies -->
<!-- Test Dependency -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
46 changes: 46 additions & 0 deletions common-library/src/it/java/common/container/ContainerFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package common.container;

import dasniko.testcontainers.keycloak.KeycloakContainer;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/**
* Factory class that holds and provides containers used for testing with Testcontainers.
*/
public final class ContainerFactory {

private ContainerFactory() {}

public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
KeycloakContainer keycloak = new KeycloakContainer()
.withRealmImportFiles("/test-realm.json")
.withReuse(true);

registry.add(
"spring.security.oauth2.resourceserver.jwt.issuer-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus")
);
registry.add(
"spring.security.oauth2.resourceserver.jwt.jwk-set-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs")
);
return keycloak;
}

public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) {
var kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version))
);
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Consumer properties
registry.add("auto.offset.reset", () -> "earliest");
registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Producer properties
registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

}
138 changes: 138 additions & 0 deletions common-library/src/it/java/common/kafka/CdcConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package common.kafka;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.Assert;
import org.springframework.web.client.RestClient;
import org.testcontainers.containers.KafkaContainer;

/**
* Base class providing support for testing a CDC (Change Data Capture) consumer.
* This class requires Kafka containers to be running (e.g., {@link Import}), and it provides support
* for a Kafka producer to send messages, as well as simulating a REST client
* for microservice integration.
*
* @param <M> Message Type
*/
@Getter
public abstract class CdcConsumerTest<M> {

private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);

private final Class<M> messageType;

private final String cdcEvent;

@Autowired
private KafkaContainer kafkaContainer;

@MockBean
private RestClient restClient;

@Mock
RestClient.ResponseSpec responseSpec;

@Mock
RestClient.RequestHeadersUriSpec requestHeadersUriSpec;

private KafkaTemplate<String, M> kafkaTemplate;

public CdcConsumerTest(Class<M> messageType, String topicEvent) {
Assert.notNull(topicEvent, "CDC topic must not be null");
Assert.notNull(messageType, "Message type must not be null");
this.cdcEvent = topicEvent;
this.messageType = messageType;
}

public synchronized KafkaTemplate<String, M> getKafkaTemplate() {
// Now, we haven't had any process need Kafka Producer,
// then just temporary config producer like this for testing
if (kafkaTemplate == null) {
synchronized (this) {
final Map<String, Object> props = getProducerProps();
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, M>(props));
}
}
return kafkaTemplate;
}

protected void sendMsg(M message)
throws InterruptedException, ExecutionException, TimeoutException {
var rs = getKafkaTemplate()
.send(this.cdcEvent, message)
.get(10, TimeUnit.SECONDS);
logger.info("Sent message completed: {}", rs);
}

protected <R> void simulateHttpRequestWithResponse(URI url, R response, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenReturn(response);
}

protected <R> void simulateHttpRequestWithError(URI url, Throwable exception, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenThrow(exception);
}

private void setupMockGetRequest(URI url) {
when(restClient.get()).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.uri(url)).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.headers(any())).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec);
}

/**
* Pauses the current thread until the consumer has finished processing. The wait time is dynamic and
* depends on the specific scenario (e.g., retry cases or normal processing).
*
* @param processTime The time (in seconds) the consumer takes to process each record.
* @param numOfRecords The number of records sent to the consumer.
* @param attempts The number of retry attempts. If no retries are needed, set this to '0'.
* @param backOff The backoff time (in seconds) between retries.
* @throws InterruptedException If the thread is interrupted while sleeping.
*/
public void waitForConsumer(
long processTime,
int numOfRecords,
int attempts,
long backOff
) throws InterruptedException {
// retryTime = (1st run) + (total run when retrying) + (total back off time)
long retryTime = processTime + (attempts * processTime) + (backOff * attempts);
long waitTime = (attempts > 0 ? retryTime : processTime) * numOfRecords;
logger.info("Consumer Processing expected time: {}s", waitTime);
Thread.sleep(Duration.ofSeconds(waitTime));
}

private @NotNull Map<String, Object> getProducerProps() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@

public class ElasticTestContainer extends ElasticsearchContainer {

private static final String DOCKER_ELASTIC = "docker.elastic.co/elasticsearch/elasticsearch:7.17.6";
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:%s";

private static final String CLUSTER_NAME = "cluster.name";

private static final String ELASTIC_SEARCH = "elasticsearch";

public ElasticTestContainer() {
super(DOCKER_ELASTIC);
public ElasticTestContainer(String version) {
super(IMAGE_NAME.formatted(version));
this.addFixedExposedPort(9200, 9200);
this.addFixedExposedPort(9300, 9300);
this.addEnv(CLUSTER_NAME, ELASTIC_SEARCH);
this.withEnv("xpack.security.transport.ssl.enabled", "false");
this.withEnv("xpack.security.http.ssl.enabled", "false");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.yas.search.config;

import common.container.ContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;

@TestConfiguration
public class KafkaIntegrationTestConfiguration {

@Value("${kafka.version}")
private String kafkaVersion;

@Value("${elasticsearch.version}")
private String elasticSearchVersion;

@Bean
@ServiceConnection
public KafkaContainer kafkaContainer(DynamicPropertyRegistry registry) {
return ContainerFactory.kafkaContainer(registry, kafkaVersion);
}

@Bean
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
return new ElasticTestContainer(elasticSearchVersion);
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package com.yas.search.config;

import com.yas.commonlibrary.IntegrationTestConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;

@TestConfiguration
public class SearchTestConfig extends IntegrationTestConfiguration {

@Value("${elasticsearch.version}")
private String elasticSearchVersion;

@Bean(destroyMethod = "stop")
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
return new ElasticTestContainer();
return new ElasticTestContainer(elasticSearchVersion);
}
}
Loading

0 comments on commit 1ac4576

Please sign in to comment.