diff --git a/common-library/pom.xml b/common-library/pom.xml
index 9d8d935202..978509214f 100644
--- a/common-library/pom.xml
+++ b/common-library/pom.xml
@@ -37,16 +37,23 @@
org.springframework
spring-tx
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
com.opencsv
opencsv
${opencsv.version}
-
+
- org.springframework.kafka
- spring-kafka
+ org.testcontainers
+ kafka
+ test
diff --git a/common-library/src/it/java/common/container/ContainerFactory.java b/common-library/src/it/java/common/container/ContainerFactory.java
new file mode 100644
index 0000000000..b10677d7f6
--- /dev/null
+++ b/common-library/src/it/java/common/container/ContainerFactory.java
@@ -0,0 +1,46 @@
+package common.container;
+
+import dasniko.testcontainers.keycloak.KeycloakContainer;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Factory class that holds and provides containers used for testing with Testcontainers.
+ */
+public final class ContainerFactory {
+
+ private ContainerFactory() {}
+
+ public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
+ KeycloakContainer keycloak = new KeycloakContainer()
+ .withRealmImportFiles("/test-realm.json")
+ .withReuse(true);
+
+ registry.add(
+ "spring.security.oauth2.resourceserver.jwt.issuer-uri",
+ () -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus")
+ );
+ registry.add(
+ "spring.security.oauth2.resourceserver.jwt.jwk-set-uri",
+ () -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs")
+ );
+ return keycloak;
+ }
+
+ public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) {
+ var kafkaContainer = new KafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version))
+ );
+ registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
+
+ // Consumer properties
+ registry.add("auto.offset.reset", () -> "earliest");
+ registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
+
+ // Producer properties
+ registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
+ return kafkaContainer;
+ }
+
+}
diff --git a/common-library/src/it/java/common/kafka/CdcConsumerTest.java b/common-library/src/it/java/common/kafka/CdcConsumerTest.java
new file mode 100644
index 0000000000..d192a15b06
--- /dev/null
+++ b/common-library/src/it/java/common/kafka/CdcConsumerTest.java
@@ -0,0 +1,138 @@
+package common.kafka;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Getter;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.Assert;
+import org.springframework.web.client.RestClient;
+import org.testcontainers.containers.KafkaContainer;
+
+/**
+ * Base class providing support for testing a CDC (Change Data Capture) consumer.
+ * This class requires Kafka containers to be running (e.g., {@link Import}), and it provides support
+ * for a Kafka producer to send messages, as well as simulating a REST client
+ * for microservice integration.
+ *
+ * @param Message Type
+ */
+@Getter
+public abstract class CdcConsumerTest {
+
+ private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);
+
+ private final Class messageType;
+
+ private final String cdcEvent;
+
+ @Autowired
+ private KafkaContainer kafkaContainer;
+
+ @MockBean
+ private RestClient restClient;
+
+ @Mock
+ RestClient.ResponseSpec responseSpec;
+
+ @Mock
+ RestClient.RequestHeadersUriSpec requestHeadersUriSpec;
+
+ private KafkaTemplate kafkaTemplate;
+
+ public CdcConsumerTest(Class messageType, String topicEvent) {
+ Assert.notNull(topicEvent, "CDC topic must not be null");
+ Assert.notNull(messageType, "Message type must not be null");
+ this.cdcEvent = topicEvent;
+ this.messageType = messageType;
+ }
+
+ public synchronized KafkaTemplate getKafkaTemplate() {
+ // Now, we haven't had any process need Kafka Producer,
+ // then just temporary config producer like this for testing
+ if (kafkaTemplate == null) {
+ synchronized (this) {
+ final Map props = getProducerProps();
+ kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(props));
+ }
+ }
+ return kafkaTemplate;
+ }
+
+ protected void sendMsg(M message)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ var rs = getKafkaTemplate()
+ .send(this.cdcEvent, message)
+ .get(10, TimeUnit.SECONDS);
+ logger.info("Sent message completed: {}", rs);
+ }
+
+ protected void simulateHttpRequestWithResponse(URI url, R response, Class responseType) {
+ setupMockGetRequest(url);
+ when(responseSpec.body(responseType)).thenReturn(response);
+ }
+
+ protected void simulateHttpRequestWithError(URI url, Throwable exception, Class responseType) {
+ setupMockGetRequest(url);
+ when(responseSpec.body(responseType)).thenThrow(exception);
+ }
+
+ private void setupMockGetRequest(URI url) {
+ when(restClient.get()).thenReturn(requestHeadersUriSpec);
+ when(requestHeadersUriSpec.uri(url)).thenReturn(requestHeadersUriSpec);
+ when(requestHeadersUriSpec.headers(any())).thenReturn(requestHeadersUriSpec);
+ when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec);
+ }
+
+ /**
+ * Pauses the current thread until the consumer has finished processing. The wait time is dynamic and
+ * depends on the specific scenario (e.g., retry cases or normal processing).
+ *
+ * @param processTime The time (in seconds) the consumer takes to process each record.
+ * @param numOfRecords The number of records sent to the consumer.
+ * @param attempts The number of retry attempts. If no retries are needed, set this to '0'.
+ * @param backOff The backoff time (in seconds) between retries.
+ * @throws InterruptedException If the thread is interrupted while sleeping.
+ */
+ public void waitForConsumer(
+ long processTime,
+ int numOfRecords,
+ int attempts,
+ long backOff
+ ) throws InterruptedException {
+ // retryTime = (1st run) + (total run when retrying) + (total back off time)
+ long retryTime = processTime + (attempts * processTime) + (backOff * attempts);
+ long waitTime = (attempts > 0 ? retryTime : processTime) * numOfRecords;
+ logger.info("Consumer Processing expected time: {}s", waitTime);
+ Thread.sleep(Duration.ofSeconds(waitTime));
+ }
+
+ private @NotNull Map getProducerProps() {
+ final Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return props;
+ }
+}
+
diff --git a/search/src/it/java/com/yas/search/config/ElasticTestContainer.java b/search/src/it/java/com/yas/search/config/ElasticTestContainer.java
index 548a5f71d6..8a21186521 100644
--- a/search/src/it/java/com/yas/search/config/ElasticTestContainer.java
+++ b/search/src/it/java/com/yas/search/config/ElasticTestContainer.java
@@ -4,16 +4,15 @@
public class ElasticTestContainer extends ElasticsearchContainer {
- private static final String DOCKER_ELASTIC = "docker.elastic.co/elasticsearch/elasticsearch:7.17.6";
+ private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:%s";
private static final String CLUSTER_NAME = "cluster.name";
private static final String ELASTIC_SEARCH = "elasticsearch";
- public ElasticTestContainer() {
- super(DOCKER_ELASTIC);
+ public ElasticTestContainer(String version) {
+ super(IMAGE_NAME.formatted(version));
this.addFixedExposedPort(9200, 9200);
- this.addFixedExposedPort(9300, 9300);
this.addEnv(CLUSTER_NAME, ELASTIC_SEARCH);
this.withEnv("xpack.security.transport.ssl.enabled", "false");
this.withEnv("xpack.security.http.ssl.enabled", "false");
diff --git a/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java b/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java
new file mode 100644
index 0000000000..16908dd32b
--- /dev/null
+++ b/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java
@@ -0,0 +1,32 @@
+package com.yas.search.config;
+
+import common.container.ContainerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.testcontainers.containers.KafkaContainer;
+
+@TestConfiguration
+public class KafkaIntegrationTestConfiguration {
+
+ @Value("${kafka.version}")
+ private String kafkaVersion;
+
+ @Value("${elasticsearch.version}")
+ private String elasticSearchVersion;
+
+ @Bean
+ @ServiceConnection
+ public KafkaContainer kafkaContainer(DynamicPropertyRegistry registry) {
+ return ContainerFactory.kafkaContainer(registry, kafkaVersion);
+ }
+
+ @Bean
+ @ServiceConnection
+ public ElasticTestContainer elasticTestContainer() {
+ return new ElasticTestContainer(elasticSearchVersion);
+ }
+
+}
diff --git a/search/src/it/java/com/yas/search/config/SearchTestConfig.java b/search/src/it/java/com/yas/search/config/SearchTestConfig.java
index 3d4621d4a7..bf92312283 100644
--- a/search/src/it/java/com/yas/search/config/SearchTestConfig.java
+++ b/search/src/it/java/com/yas/search/config/SearchTestConfig.java
@@ -1,6 +1,7 @@
package com.yas.search.config;
import com.yas.commonlibrary.IntegrationTestConfiguration;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
@@ -8,9 +9,12 @@
@TestConfiguration
public class SearchTestConfig extends IntegrationTestConfiguration {
+ @Value("${elasticsearch.version}")
+ private String elasticSearchVersion;
+
@Bean(destroyMethod = "stop")
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
- return new ElasticTestContainer();
+ return new ElasticTestContainer(elasticSearchVersion);
}
}
diff --git a/search/src/it/java/com/yas/search/controller/ProductControllerIT.java b/search/src/it/java/com/yas/search/controller/ProductControllerIT.java
index 33371751fa..93b10a37f8 100644
--- a/search/src/it/java/com/yas/search/controller/ProductControllerIT.java
+++ b/search/src/it/java/com/yas/search/controller/ProductControllerIT.java
@@ -25,78 +25,78 @@
@PropertySource("classpath:application.properties")
public class ProductControllerIT extends AbstractControllerIT {
- @Autowired
- ProductRepository productRepository;
+ @Autowired
+ ProductRepository productRepository;
- @BeforeEach
- public void init() {
+ @BeforeEach
+ public void init() {
- Product product = new Product();
- product.setId(1L);
- product.setName("Macbook M1");
- product.setBrand("Apple");
- product.setCategories(List.of("Laptop", "Macbook"));
- product.setAttributes(List.of("CPU", "RAM", "SSD"));
+ Product product = new Product();
+ product.setId(1L);
+ product.setName("Macbook M1");
+ product.setBrand("Apple");
+ product.setCategories(List.of("Laptop", "Macbook"));
+ product.setAttributes(List.of("CPU", "RAM", "SSD"));
- productRepository.save(product, RefreshPolicy.IMMEDIATE);
- }
+ productRepository.save(product, RefreshPolicy.IMMEDIATE);
+ }
- @AfterEach
- public void destroy() {
- productRepository.deleteAll();
- }
+ @AfterEach
+ public void destroy() {
+ productRepository.deleteAll();
+ }
- @Test
- public void test_findProductAdvance_shouldReturnSuccessfully() {
- given(getRequestSpecification())
- .auth().oauth2(getAccessToken("admin", "admin"))
- .contentType(ContentType.JSON)
- .queryParam("keyword", "Macbook")
- .queryParam("category", "Laptop")
- .queryParam("attribute", "CPU")
- .queryParam("page", 0)
- .queryParam("size", 12)
- .get("/search/storefront/catalog-search")
- .then()
- .statusCode(HttpStatus.OK.value())
- .body("pageNo", equalTo(0))
- .body("pageSize", equalTo(12))
- .body("totalElements", equalTo(1))
- .log()
- .ifValidationFails();
- }
+ @Test
+ public void test_findProductAdvance_shouldReturnSuccessfully() {
+ given(getRequestSpecification())
+ .auth().oauth2(getAccessToken("admin", "admin"))
+ .contentType(ContentType.JSON)
+ .queryParam("keyword", "Macbook")
+ .queryParam("category", "Laptop")
+ .queryParam("attribute", "CPU")
+ .queryParam("page", 0)
+ .queryParam("size", 12)
+ .get("/search/storefront/catalog-search")
+ .then()
+ .statusCode(HttpStatus.OK.value())
+ .body("pageNo", equalTo(0))
+ .body("pageSize", equalTo(12))
+ .body("totalElements", equalTo(1))
+ .log()
+ .ifValidationFails();
+ }
- @Test
- public void test_findProductAdvance_shouldNotReturnAnyProduct() {
- given(getRequestSpecification())
- .auth().oauth2(getAccessToken("admin", "admin"))
- .contentType(ContentType.JSON)
- .queryParam("keyword", "Macbook")
- .queryParam("category", "Laptop")
- .queryParam("brand", "Samsung")
- .queryParam("page", 0)
- .queryParam("size", 12)
- .get("/search/storefront/catalog-search")
- .then()
- .statusCode(HttpStatus.OK.value())
- .body("pageNo", equalTo(0))
- .body("pageSize", equalTo(12))
- .body("totalElements", equalTo(0))
- .log()
- .ifValidationFails();
- }
+ @Test
+ public void test_findProductAdvance_shouldNotReturnAnyProduct() {
+ given(getRequestSpecification())
+ .auth().oauth2(getAccessToken("admin", "admin"))
+ .contentType(ContentType.JSON)
+ .queryParam("keyword", "Macbook")
+ .queryParam("category", "Laptop")
+ .queryParam("brand", "Samsung")
+ .queryParam("page", 0)
+ .queryParam("size", 12)
+ .get("/search/storefront/catalog-search")
+ .then()
+ .statusCode(HttpStatus.OK.value())
+ .body("pageNo", equalTo(0))
+ .body("pageSize", equalTo(12))
+ .body("totalElements", equalTo(0))
+ .log()
+ .ifValidationFails();
+ }
- @Test
- public void test_productSearchAutoComplete_shouldReturnSuccessfully() {
- given(getRequestSpecification())
- .auth().oauth2(getAccessToken("admin", "admin"))
- .contentType(ContentType.JSON)
- .queryParam("keyword", "Macbook")
- .get("/search/storefront/search_suggest")
- .then()
- .statusCode(HttpStatus.OK.value())
- .body("productNames", hasSize(1))
- .log()
- .ifValidationFails();
- }
+ @Test
+ public void test_productSearchAutoComplete_shouldReturnSuccessfully() {
+ given(getRequestSpecification())
+ .auth().oauth2(getAccessToken("admin", "admin"))
+ .contentType(ContentType.JSON)
+ .queryParam("keyword", "Macbook")
+ .get("/search/storefront/search_suggest")
+ .then()
+ .statusCode(HttpStatus.OK.value())
+ .body("productNames", hasSize(1))
+ .log()
+ .ifValidationFails();
+ }
}
diff --git a/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java
new file mode 100644
index 0000000000..d530ee765d
--- /dev/null
+++ b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java
@@ -0,0 +1,227 @@
+package com.yas.search.kafka;
+
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.CREATE;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yas.commonlibrary.kafka.cdc.message.Product;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import com.yas.search.config.KafkaIntegrationTestConfiguration;
+import com.yas.search.config.ServiceUrlConfig;
+import com.yas.search.repository.ProductRepository;
+import com.yas.search.service.ProductSyncDataService;
+import com.yas.search.viewmodel.ProductEsDetailVm;
+import common.kafka.CdcConsumerTest;
+import java.net.URI;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.web.util.UriComponentsBuilder;
+
+@Import(KafkaIntegrationTestConfiguration.class)
+@PropertySource("classpath:application.properties")
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+public class ProductCdcConsumerTest extends CdcConsumerTest {
+
+ public static final String STOREFRONT_PRODUCTS_ES_PATH = "/storefront/products-es/{id}";
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Autowired
+ private ServiceUrlConfig serviceUrlConfig;
+
+ @Autowired
+ private ProductRepository productRepository;
+
+ @SpyBean
+ private ProductSyncDataService productSyncDataService;
+
+ public ProductCdcConsumerTest() {
+ super(ProductCdcMessage.class, "dbproduct.public.product");
+ }
+
+ @AfterEach
+ public void tearDown() {
+ productRepository.deleteAll();
+ }
+
+ @DisplayName("When having product create event, data must sync as create")
+ @Test
+ public void test_whenHavingCreateEvent_shouldSyncAsCreate()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+ ProductEsDetailVm response = getSampleProduct();
+
+ // When
+ // Simulate Product Detail API response
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(CREATE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ // Verify consumer
+ waitForConsumer(2, 1, 0, 0);
+ verify(productSyncDataService, times(1)).createProduct(productId);
+
+ // Verify ES Sync data
+ Optional product = productRepository.findById(productId);
+ assertTrue(product.isPresent(), "ElasticSearch must create data accordingly to CDC event.");
+ }
+
+ @DisplayName("When having product create event, but consumer process failed, consumer must perform retry.")
+ @Test
+ public void test_whenHavingCreateEvent_thenProcessFailed_shouldPerformRetry()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+
+ // When
+ // Simulate Product Detail API throw errors
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithError(url, new RuntimeException("Invalid Request"), ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(CREATE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ waitForConsumer(2, 1, 4, 6);
+ verify(productSyncDataService, times(4)).createProduct(productId);
+ }
+
+ @DisplayName("When having product update event, data must sync as update")
+ @Test
+ public void test_whenHavingUpdateEvent_shouldSyncAsUpdate()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+ ProductEsDetailVm response = getSampleProduct();
+
+ // Create existing product
+ com.yas.search.model.Product product = getSampleEsProduct(productId);
+ productRepository.save(product);
+
+ // When
+ // Simulate Product Detail API response
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(UPDATE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ // Verify Consumer
+ waitForConsumer(2, 1, 0, 0);
+ verify(productSyncDataService, times(1)).updateProduct(productId);
+ Optional updated = productRepository.findById(productId);
+
+ // Verify ES sync data
+ assertTrue(updated.isPresent(), "ElasticSearch must have product data.");
+ assertEquals(updated.get().getName(), response.name(), "Product name must be updated.");
+ }
+
+ @DisplayName("When having product delete event, data must sync as delete")
+ @Test
+ public void test_whenHavingDeleteEvent_shouldSyncAsDelete()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+ ProductEsDetailVm response = getSampleProduct();
+
+ // Create existing product
+ com.yas.search.model.Product product = getSampleEsProduct(productId);
+ productRepository.save(product);
+
+ // When
+ // Simulate Product Detail API response
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(DELETE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ // Verify Consumer
+ waitForConsumer(2, 1, 0, 0);
+ verify(productSyncDataService, times(1)).deleteProduct(productId);
+ Optional updated = productRepository.findById(productId);
+
+ // Verify ES sync data
+ assertTrue(updated.isEmpty(), "ElasticSearch must remove product data.");
+ }
+
+ private static com.yas.search.model.@NotNull Product getSampleEsProduct(long productId) {
+ com.yas.search.model.Product product = new com.yas.search.model.Product();
+ product.setId(productId);
+ product.setName("Modern Speaker");
+ return product;
+ }
+
+ private static @NotNull ProductEsDetailVm getSampleProduct() {
+ return new ProductEsDetailVm(
+ 1001L,
+ "Wireless Bluetooth Speaker",
+ "wireless-bluetooth-speaker",
+ 79.99,
+ true,
+ true,
+ true,
+ false,
+ 501L,
+ "SoundWave",
+ List.of("Electronics", "Audio"),
+ List.of("Bluetooth 5.0", "10-hour battery life")
+ );
+ }
+
+}
diff --git a/search/src/it/resources/application.properties b/search/src/it/resources/application.properties
index a521186f4b..1adc98f409 100644
--- a/search/src/it/resources/application.properties
+++ b/search/src/it/resources/application.properties
@@ -28,6 +28,18 @@ elasticsearch.password=
yas.services.product=http://api.yas.local/product
spring.kafka.bootstrap-servers=localhost:9092
-spring.kafka.consumer.bootstrap-servers=localhost:9092
-spring.kafka.consumer.group-id=search
+
+# CDC Kafka Config
product.topic.name=dbproduct.public.product
+
+# Kafka Consumer Config
+spring.kafka.consumer.group-id=search
+spring.kafka.consumer.auto-offset-reset=earliest
+
+# Kafka Producer Config
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
+
+# TestContainers version
+kafka.version=7.0.9
+elasticsearch.version=7.17.6
diff --git a/search/src/it/resources/logback-spring.xml b/search/src/it/resources/logback-spring.xml
index e7d52d7271..33770030c4 100644
--- a/search/src/it/resources/logback-spring.xml
+++ b/search/src/it/resources/logback-spring.xml
@@ -9,6 +9,9 @@
+
+
+