Skip to content

Commit

Permalink
#1185 - Integration Test for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Duy Le Van committed Oct 16, 2024
1 parent b44442f commit c50716e
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 74 deletions.
14 changes: 14 additions & 0 deletions common-library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<sonar.organization>nashtech-garage</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.projectKey>nashtech-garage_yas-common-library</sonar.projectKey>
<kafka-it.version>1.19.7</kafka-it.version>
</properties>
<dependencies>
<dependency>
Expand All @@ -43,6 +44,19 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Test -->
<!-- TODO: check duplicate -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${kafka-it.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
43 changes: 43 additions & 0 deletions common-library/src/it/java/common/container/ContainerFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package common.container;

import dasniko.testcontainers.keycloak.KeycloakContainer;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public final class ContainerFactory {

private ContainerFactory() {}

public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
KeycloakContainer keycloak = new KeycloakContainer()
.withRealmImportFiles("/test-realm.json")
.withReuse(true);

registry.add(
"spring.security.oauth2.resourceserver.jwt.issuer-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus")
);
registry.add(
"spring.security.oauth2.resourceserver.jwt.jwk-set-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs")
);
return keycloak;
}

public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) {
var kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version))
);
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Consumer properties
registry.add("auto.offset.reset", () -> "earliest");
registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Producer properties
registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@

public class ElasticTestContainer extends ElasticsearchContainer {

private static final String DOCKER_ELASTIC = "docker.elastic.co/elasticsearch/elasticsearch:7.17.6";
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:%s";

private static final String CLUSTER_NAME = "cluster.name";

private static final String ELASTIC_SEARCH = "elasticsearch";

public ElasticTestContainer() {
super(DOCKER_ELASTIC);
public ElasticTestContainer(String version) {
super(IMAGE_NAME.formatted(version));
this.addFixedExposedPort(9200, 9200);
this.addFixedExposedPort(9300, 9300);
this.addEnv(CLUSTER_NAME, ELASTIC_SEARCH);
this.withEnv("xpack.security.transport.ssl.enabled", "false");
this.withEnv("xpack.security.http.ssl.enabled", "false");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.yas.search.config;

import dasniko.testcontainers.keycloak.KeycloakContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
Expand All @@ -9,6 +10,9 @@
@TestConfiguration
public class IntegrationTestConfiguration {

@Value("${elasticsearch.version}")
private String elasticSearchVersion;

@Bean(destroyMethod = "stop")
public KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
KeycloakContainer keycloak = new KeycloakContainer()
Expand All @@ -25,6 +29,6 @@ public KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
@Bean(destroyMethod = "stop")
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
return new ElasticTestContainer();
return new ElasticTestContainer(elasticSearchVersion);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.yas.search.config;

import common.container.ContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;

@TestConfiguration
public class KafkaIntegrationTestConfiguration {

@Value("${kafka.version}")
private String kafkaVersion;

@Value("${elasticsearch.version}")
private String elasticSearchVersion;

@Bean
@ServiceConnection
public KafkaContainer kafkaContainer(DynamicPropertyRegistry registry) {
return ContainerFactory.kafkaContainer(registry, kafkaVersion);
}

@Bean
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
return new ElasticTestContainer(elasticSearchVersion);
}

}
134 changes: 67 additions & 67 deletions search/src/it/java/com/yas/search/controller/ProductControllerIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,78 +25,78 @@
@PropertySource("classpath:application.properties")
public class ProductControllerIT extends AbstractControllerIT {

@Autowired
ProductRepository productRepository;
@Autowired
ProductRepository productRepository;

@BeforeEach
public void init() {
@BeforeEach
public void init() {

Product product = new Product();
product.setId(1L);
product.setName("Macbook M1");
product.setBrand("Apple");
product.setCategories(List.of("Laptop", "Macbook"));
product.setAttributes(List.of("CPU", "RAM", "SSD"));
Product product = new Product();
product.setId(1L);
product.setName("Macbook M1");
product.setBrand("Apple");
product.setCategories(List.of("Laptop", "Macbook"));
product.setAttributes(List.of("CPU", "RAM", "SSD"));

productRepository.save(product, RefreshPolicy.IMMEDIATE);
}
productRepository.save(product, RefreshPolicy.IMMEDIATE);
}

@AfterEach
public void destroy() {
productRepository.deleteAll();
}
@AfterEach
public void destroy() {
productRepository.deleteAll();
}

@Test
public void test_findProductAdvance_shouldReturnSuccessfully() {
given(getRequestSpecification())
.auth().oauth2(getAccessToken("admin", "admin"))
.contentType(ContentType.JSON)
.queryParam("keyword", "Macbook")
.queryParam("category", "Laptop")
.queryParam("attribute", "CPU")
.queryParam("page", 0)
.queryParam("size", 12)
.get("/search/storefront/catalog-search")
.then()
.statusCode(HttpStatus.OK.value())
.body("pageNo", equalTo(0))
.body("pageSize", equalTo(12))
.body("totalElements", equalTo(1))
.log()
.ifValidationFails();
}
@Test
public void test_findProductAdvance_shouldReturnSuccessfully() {
given(getRequestSpecification())
.auth().oauth2(getAccessToken("admin", "admin"))
.contentType(ContentType.JSON)
.queryParam("keyword", "Macbook")
.queryParam("category", "Laptop")
.queryParam("attribute", "CPU")
.queryParam("page", 0)
.queryParam("size", 12)
.get("/search/storefront/catalog-search")
.then()
.statusCode(HttpStatus.OK.value())
.body("pageNo", equalTo(0))
.body("pageSize", equalTo(12))
.body("totalElements", equalTo(1))
.log()
.ifValidationFails();
}

@Test
public void test_findProductAdvance_shouldNotReturnAnyProduct() {
given(getRequestSpecification())
.auth().oauth2(getAccessToken("admin", "admin"))
.contentType(ContentType.JSON)
.queryParam("keyword", "Macbook")
.queryParam("category", "Laptop")
.queryParam("brand", "Samsung")
.queryParam("page", 0)
.queryParam("size", 12)
.get("/search/storefront/catalog-search")
.then()
.statusCode(HttpStatus.OK.value())
.body("pageNo", equalTo(0))
.body("pageSize", equalTo(12))
.body("totalElements", equalTo(0))
.log()
.ifValidationFails();
}
@Test
public void test_findProductAdvance_shouldNotReturnAnyProduct() {
given(getRequestSpecification())
.auth().oauth2(getAccessToken("admin", "admin"))
.contentType(ContentType.JSON)
.queryParam("keyword", "Macbook")
.queryParam("category", "Laptop")
.queryParam("brand", "Samsung")
.queryParam("page", 0)
.queryParam("size", 12)
.get("/search/storefront/catalog-search")
.then()
.statusCode(HttpStatus.OK.value())
.body("pageNo", equalTo(0))
.body("pageSize", equalTo(12))
.body("totalElements", equalTo(0))
.log()
.ifValidationFails();
}

@Test
public void test_productSearchAutoComplete_shouldReturnSuccessfully() {
given(getRequestSpecification())
.auth().oauth2(getAccessToken("admin", "admin"))
.contentType(ContentType.JSON)
.queryParam("keyword", "Macbook")
.get("/search/storefront/search_suggest")
.then()
.statusCode(HttpStatus.OK.value())
.body("productNames", hasSize(1))
.log()
.ifValidationFails();
}
@Test
public void test_productSearchAutoComplete_shouldReturnSuccessfully() {
given(getRequestSpecification())
.auth().oauth2(getAccessToken("admin", "admin"))
.contentType(ContentType.JSON)
.queryParam("keyword", "Macbook")
.get("/search/storefront/search_suggest")
.then()
.statusCode(HttpStatus.OK.value())
.body("productNames", hasSize(1))
.log()
.ifValidationFails();
}
}
70 changes: 70 additions & 0 deletions search/src/it/java/com/yas/search/kafka/CdcConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.yas.search.kafka;

import com.yas.search.config.KafkaIntegrationTestConfiguration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.testcontainers.containers.KafkaContainer;

@Getter
@Import(KafkaIntegrationTestConfiguration.class)
public abstract class CdcConsumerTest<M> {

private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);

private final Class<M> messageType;

@Autowired
private KafkaContainer kafkaContainer;

private KafkaTemplate<String, M> kafkaTemplate;

public abstract String getCdcTopic();

public CdcConsumerTest(Class<M> messageType) {
this.messageType = messageType;
}

public synchronized KafkaTemplate<String, M> getKafkaTemplate() {
// Now, we haven't had any process need Kafka Producer, then just temporary config producer like this for testing
if (kafkaTemplate == null) {
synchronized (this) {
final Map<String, Object> props = getProducerProps();
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, M>(props));
}
}
return kafkaTemplate;
}

protected void sendMsg(M message)
throws InterruptedException, ExecutionException, TimeoutException {
var rs = getKafkaTemplate()
.send(getCdcTopic(), message)
.get(10, TimeUnit.SECONDS);
logger.info("Sent message completed: {}", rs);
}

private @NotNull Map<String, Object> getProducerProps() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}

Loading

0 comments on commit c50716e

Please sign in to comment.