From 1ac4576050ec283995b981e4ebf4fcd46c23eab9 Mon Sep 17 00:00:00 2001 From: Duy Le Van Date: Thu, 10 Oct 2024 17:07:39 +0700 Subject: [PATCH] #1185 - Integration Test for Kafka --- common-library/pom.xml | 13 +- .../common/container/ContainerFactory.java | 46 ++++ .../it/java/common/kafka/CdcConsumerTest.java | 138 +++++++++++ .../search/config/ElasticTestContainer.java | 7 +- .../KafkaIntegrationTestConfiguration.java | 32 +++ .../yas/search/config/SearchTestConfig.java | 6 +- .../controller/ProductControllerIT.java | 134 +++++------ .../search/kafka/ProductCdcConsumerTest.java | 227 ++++++++++++++++++ .../src/it/resources/application.properties | 16 +- search/src/it/resources/logback-spring.xml | 3 + 10 files changed, 545 insertions(+), 77 deletions(-) create mode 100644 common-library/src/it/java/common/container/ContainerFactory.java create mode 100644 common-library/src/it/java/common/kafka/CdcConsumerTest.java create mode 100644 search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java create mode 100644 search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java diff --git a/common-library/pom.xml b/common-library/pom.xml index 9d8d935202..978509214f 100644 --- a/common-library/pom.xml +++ b/common-library/pom.xml @@ -37,16 +37,23 @@ org.springframework spring-tx + + + + org.springframework.kafka + spring-kafka + com.opencsv opencsv ${opencsv.version} - + - org.springframework.kafka - spring-kafka + org.testcontainers + kafka + test diff --git a/common-library/src/it/java/common/container/ContainerFactory.java b/common-library/src/it/java/common/container/ContainerFactory.java new file mode 100644 index 0000000000..b10677d7f6 --- /dev/null +++ b/common-library/src/it/java/common/container/ContainerFactory.java @@ -0,0 +1,46 @@ +package common.container; + +import dasniko.testcontainers.keycloak.KeycloakContainer; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Factory class that holds and provides containers used for testing with Testcontainers. + */ +public final class ContainerFactory { + + private ContainerFactory() {} + + public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) { + KeycloakContainer keycloak = new KeycloakContainer() + .withRealmImportFiles("/test-realm.json") + .withReuse(true); + + registry.add( + "spring.security.oauth2.resourceserver.jwt.issuer-uri", + () -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus") + ); + registry.add( + "spring.security.oauth2.resourceserver.jwt.jwk-set-uri", + () -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs") + ); + return keycloak; + } + + public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) { + var kafkaContainer = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version)) + ); + registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); + + // Consumer properties + registry.add("auto.offset.reset", () -> "earliest"); + registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers); + + // Producer properties + registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers); + return kafkaContainer; + } + +} diff --git a/common-library/src/it/java/common/kafka/CdcConsumerTest.java b/common-library/src/it/java/common/kafka/CdcConsumerTest.java new file mode 100644 index 0000000000..d192a15b06 --- /dev/null +++ b/common-library/src/it/java/common/kafka/CdcConsumerTest.java @@ -0,0 +1,138 @@ +package common.kafka; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.Getter; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.jetbrains.annotations.NotNull; +import org.mockito.Mock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.Assert; +import org.springframework.web.client.RestClient; +import org.testcontainers.containers.KafkaContainer; + +/** + * Base class providing support for testing a CDC (Change Data Capture) consumer. + * This class requires Kafka containers to be running (e.g., {@link Import}), and it provides support + * for a Kafka producer to send messages, as well as simulating a REST client + * for microservice integration. + * + * @param Message Type + */ +@Getter +public abstract class CdcConsumerTest { + + private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class); + + private final Class messageType; + + private final String cdcEvent; + + @Autowired + private KafkaContainer kafkaContainer; + + @MockBean + private RestClient restClient; + + @Mock + RestClient.ResponseSpec responseSpec; + + @Mock + RestClient.RequestHeadersUriSpec requestHeadersUriSpec; + + private KafkaTemplate kafkaTemplate; + + public CdcConsumerTest(Class messageType, String topicEvent) { + Assert.notNull(topicEvent, "CDC topic must not be null"); + Assert.notNull(messageType, "Message type must not be null"); + this.cdcEvent = topicEvent; + this.messageType = messageType; + } + + public synchronized KafkaTemplate getKafkaTemplate() { + // Now, we haven't had any process need Kafka Producer, + // then just temporary config producer like this for testing + if (kafkaTemplate == null) { + synchronized (this) { + final Map props = getProducerProps(); + kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(props)); + } + } + return kafkaTemplate; + } + + protected void sendMsg(M message) + throws InterruptedException, ExecutionException, TimeoutException { + var rs = getKafkaTemplate() + .send(this.cdcEvent, message) + .get(10, TimeUnit.SECONDS); + logger.info("Sent message completed: {}", rs); + } + + protected void simulateHttpRequestWithResponse(URI url, R response, Class responseType) { + setupMockGetRequest(url); + when(responseSpec.body(responseType)).thenReturn(response); + } + + protected void simulateHttpRequestWithError(URI url, Throwable exception, Class responseType) { + setupMockGetRequest(url); + when(responseSpec.body(responseType)).thenThrow(exception); + } + + private void setupMockGetRequest(URI url) { + when(restClient.get()).thenReturn(requestHeadersUriSpec); + when(requestHeadersUriSpec.uri(url)).thenReturn(requestHeadersUriSpec); + when(requestHeadersUriSpec.headers(any())).thenReturn(requestHeadersUriSpec); + when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec); + } + + /** + * Pauses the current thread until the consumer has finished processing. The wait time is dynamic and + * depends on the specific scenario (e.g., retry cases or normal processing). + * + * @param processTime The time (in seconds) the consumer takes to process each record. + * @param numOfRecords The number of records sent to the consumer. + * @param attempts The number of retry attempts. If no retries are needed, set this to '0'. + * @param backOff The backoff time (in seconds) between retries. + * @throws InterruptedException If the thread is interrupted while sleeping. + */ + public void waitForConsumer( + long processTime, + int numOfRecords, + int attempts, + long backOff + ) throws InterruptedException { + // retryTime = (1st run) + (total run when retrying) + (total back off time) + long retryTime = processTime + (attempts * processTime) + (backOff * attempts); + long waitTime = (attempts > 0 ? retryTime : processTime) * numOfRecords; + logger.info("Consumer Processing expected time: {}s", waitTime); + Thread.sleep(Duration.ofSeconds(waitTime)); + } + + private @NotNull Map getProducerProps() { + final Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return props; + } +} + diff --git a/search/src/it/java/com/yas/search/config/ElasticTestContainer.java b/search/src/it/java/com/yas/search/config/ElasticTestContainer.java index 548a5f71d6..8a21186521 100644 --- a/search/src/it/java/com/yas/search/config/ElasticTestContainer.java +++ b/search/src/it/java/com/yas/search/config/ElasticTestContainer.java @@ -4,16 +4,15 @@ public class ElasticTestContainer extends ElasticsearchContainer { - private static final String DOCKER_ELASTIC = "docker.elastic.co/elasticsearch/elasticsearch:7.17.6"; + private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:%s"; private static final String CLUSTER_NAME = "cluster.name"; private static final String ELASTIC_SEARCH = "elasticsearch"; - public ElasticTestContainer() { - super(DOCKER_ELASTIC); + public ElasticTestContainer(String version) { + super(IMAGE_NAME.formatted(version)); this.addFixedExposedPort(9200, 9200); - this.addFixedExposedPort(9300, 9300); this.addEnv(CLUSTER_NAME, ELASTIC_SEARCH); this.withEnv("xpack.security.transport.ssl.enabled", "false"); this.withEnv("xpack.security.http.ssl.enabled", "false"); diff --git a/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java b/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java new file mode 100644 index 0000000000..16908dd32b --- /dev/null +++ b/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java @@ -0,0 +1,32 @@ +package com.yas.search.config; + +import common.container.ContainerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.testcontainers.containers.KafkaContainer; + +@TestConfiguration +public class KafkaIntegrationTestConfiguration { + + @Value("${kafka.version}") + private String kafkaVersion; + + @Value("${elasticsearch.version}") + private String elasticSearchVersion; + + @Bean + @ServiceConnection + public KafkaContainer kafkaContainer(DynamicPropertyRegistry registry) { + return ContainerFactory.kafkaContainer(registry, kafkaVersion); + } + + @Bean + @ServiceConnection + public ElasticTestContainer elasticTestContainer() { + return new ElasticTestContainer(elasticSearchVersion); + } + +} diff --git a/search/src/it/java/com/yas/search/config/SearchTestConfig.java b/search/src/it/java/com/yas/search/config/SearchTestConfig.java index 3d4621d4a7..bf92312283 100644 --- a/search/src/it/java/com/yas/search/config/SearchTestConfig.java +++ b/search/src/it/java/com/yas/search/config/SearchTestConfig.java @@ -1,6 +1,7 @@ package com.yas.search.config; import com.yas.commonlibrary.IntegrationTestConfiguration; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.context.annotation.Bean; @@ -8,9 +9,12 @@ @TestConfiguration public class SearchTestConfig extends IntegrationTestConfiguration { + @Value("${elasticsearch.version}") + private String elasticSearchVersion; + @Bean(destroyMethod = "stop") @ServiceConnection public ElasticTestContainer elasticTestContainer() { - return new ElasticTestContainer(); + return new ElasticTestContainer(elasticSearchVersion); } } diff --git a/search/src/it/java/com/yas/search/controller/ProductControllerIT.java b/search/src/it/java/com/yas/search/controller/ProductControllerIT.java index 33371751fa..93b10a37f8 100644 --- a/search/src/it/java/com/yas/search/controller/ProductControllerIT.java +++ b/search/src/it/java/com/yas/search/controller/ProductControllerIT.java @@ -25,78 +25,78 @@ @PropertySource("classpath:application.properties") public class ProductControllerIT extends AbstractControllerIT { - @Autowired - ProductRepository productRepository; + @Autowired + ProductRepository productRepository; - @BeforeEach - public void init() { + @BeforeEach + public void init() { - Product product = new Product(); - product.setId(1L); - product.setName("Macbook M1"); - product.setBrand("Apple"); - product.setCategories(List.of("Laptop", "Macbook")); - product.setAttributes(List.of("CPU", "RAM", "SSD")); + Product product = new Product(); + product.setId(1L); + product.setName("Macbook M1"); + product.setBrand("Apple"); + product.setCategories(List.of("Laptop", "Macbook")); + product.setAttributes(List.of("CPU", "RAM", "SSD")); - productRepository.save(product, RefreshPolicy.IMMEDIATE); - } + productRepository.save(product, RefreshPolicy.IMMEDIATE); + } - @AfterEach - public void destroy() { - productRepository.deleteAll(); - } + @AfterEach + public void destroy() { + productRepository.deleteAll(); + } - @Test - public void test_findProductAdvance_shouldReturnSuccessfully() { - given(getRequestSpecification()) - .auth().oauth2(getAccessToken("admin", "admin")) - .contentType(ContentType.JSON) - .queryParam("keyword", "Macbook") - .queryParam("category", "Laptop") - .queryParam("attribute", "CPU") - .queryParam("page", 0) - .queryParam("size", 12) - .get("/search/storefront/catalog-search") - .then() - .statusCode(HttpStatus.OK.value()) - .body("pageNo", equalTo(0)) - .body("pageSize", equalTo(12)) - .body("totalElements", equalTo(1)) - .log() - .ifValidationFails(); - } + @Test + public void test_findProductAdvance_shouldReturnSuccessfully() { + given(getRequestSpecification()) + .auth().oauth2(getAccessToken("admin", "admin")) + .contentType(ContentType.JSON) + .queryParam("keyword", "Macbook") + .queryParam("category", "Laptop") + .queryParam("attribute", "CPU") + .queryParam("page", 0) + .queryParam("size", 12) + .get("/search/storefront/catalog-search") + .then() + .statusCode(HttpStatus.OK.value()) + .body("pageNo", equalTo(0)) + .body("pageSize", equalTo(12)) + .body("totalElements", equalTo(1)) + .log() + .ifValidationFails(); + } - @Test - public void test_findProductAdvance_shouldNotReturnAnyProduct() { - given(getRequestSpecification()) - .auth().oauth2(getAccessToken("admin", "admin")) - .contentType(ContentType.JSON) - .queryParam("keyword", "Macbook") - .queryParam("category", "Laptop") - .queryParam("brand", "Samsung") - .queryParam("page", 0) - .queryParam("size", 12) - .get("/search/storefront/catalog-search") - .then() - .statusCode(HttpStatus.OK.value()) - .body("pageNo", equalTo(0)) - .body("pageSize", equalTo(12)) - .body("totalElements", equalTo(0)) - .log() - .ifValidationFails(); - } + @Test + public void test_findProductAdvance_shouldNotReturnAnyProduct() { + given(getRequestSpecification()) + .auth().oauth2(getAccessToken("admin", "admin")) + .contentType(ContentType.JSON) + .queryParam("keyword", "Macbook") + .queryParam("category", "Laptop") + .queryParam("brand", "Samsung") + .queryParam("page", 0) + .queryParam("size", 12) + .get("/search/storefront/catalog-search") + .then() + .statusCode(HttpStatus.OK.value()) + .body("pageNo", equalTo(0)) + .body("pageSize", equalTo(12)) + .body("totalElements", equalTo(0)) + .log() + .ifValidationFails(); + } - @Test - public void test_productSearchAutoComplete_shouldReturnSuccessfully() { - given(getRequestSpecification()) - .auth().oauth2(getAccessToken("admin", "admin")) - .contentType(ContentType.JSON) - .queryParam("keyword", "Macbook") - .get("/search/storefront/search_suggest") - .then() - .statusCode(HttpStatus.OK.value()) - .body("productNames", hasSize(1)) - .log() - .ifValidationFails(); - } + @Test + public void test_productSearchAutoComplete_shouldReturnSuccessfully() { + given(getRequestSpecification()) + .auth().oauth2(getAccessToken("admin", "admin")) + .contentType(ContentType.JSON) + .queryParam("keyword", "Macbook") + .get("/search/storefront/search_suggest") + .then() + .statusCode(HttpStatus.OK.value()) + .body("productNames", hasSize(1)) + .log() + .ifValidationFails(); + } } diff --git a/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java new file mode 100644 index 0000000000..d530ee765d --- /dev/null +++ b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java @@ -0,0 +1,227 @@ +package com.yas.search.kafka; + +import static com.yas.commonlibrary.kafka.cdc.message.Operation.CREATE; +import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE; +import static com.yas.commonlibrary.kafka.cdc.message.Operation.UPDATE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yas.commonlibrary.kafka.cdc.message.Product; +import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.search.config.KafkaIntegrationTestConfiguration; +import com.yas.search.config.ServiceUrlConfig; +import com.yas.search.repository.ProductRepository; +import com.yas.search.service.ProductSyncDataService; +import com.yas.search.viewmodel.ProductEsDetailVm; +import common.kafka.CdcConsumerTest; +import java.net.URI; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.PropertySource; +import org.springframework.web.util.UriComponentsBuilder; + +@Import(KafkaIntegrationTestConfiguration.class) +@PropertySource("classpath:application.properties") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class ProductCdcConsumerTest extends CdcConsumerTest { + + public static final String STOREFRONT_PRODUCTS_ES_PATH = "/storefront/products-es/{id}"; + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private ServiceUrlConfig serviceUrlConfig; + + @Autowired + private ProductRepository productRepository; + + @SpyBean + private ProductSyncDataService productSyncDataService; + + public ProductCdcConsumerTest() { + super(ProductCdcMessage.class, "dbproduct.public.product"); + } + + @AfterEach + public void tearDown() { + productRepository.deleteAll(); + } + + @DisplayName("When having product create event, data must sync as create") + @Test + public void test_whenHavingCreateEvent_shouldSyncAsCreate() + throws ExecutionException, InterruptedException, TimeoutException { + // Given + long productId = 1L; + ProductEsDetailVm response = getSampleProduct(); + + // When + // Simulate Product Detail API response + final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product()) + .path(STOREFRONT_PRODUCTS_ES_PATH) + .buildAndExpand(productId) + .toUri(); + simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class); + + // Sending CDC Event + sendMsg( + ProductCdcMessage.builder() + .op(CREATE) + .after(Product.builder().id(productId).isPublished(true).build()) + .build() + ); + + // Then + // Verify consumer + waitForConsumer(2, 1, 0, 0); + verify(productSyncDataService, times(1)).createProduct(productId); + + // Verify ES Sync data + Optional product = productRepository.findById(productId); + assertTrue(product.isPresent(), "ElasticSearch must create data accordingly to CDC event."); + } + + @DisplayName("When having product create event, but consumer process failed, consumer must perform retry.") + @Test + public void test_whenHavingCreateEvent_thenProcessFailed_shouldPerformRetry() + throws ExecutionException, InterruptedException, TimeoutException { + // Given + long productId = 1L; + + // When + // Simulate Product Detail API throw errors + final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product()) + .path(STOREFRONT_PRODUCTS_ES_PATH) + .buildAndExpand(productId) + .toUri(); + simulateHttpRequestWithError(url, new RuntimeException("Invalid Request"), ProductEsDetailVm.class); + + // Sending CDC Event + sendMsg( + ProductCdcMessage.builder() + .op(CREATE) + .after(Product.builder().id(productId).isPublished(true).build()) + .build() + ); + + // Then + waitForConsumer(2, 1, 4, 6); + verify(productSyncDataService, times(4)).createProduct(productId); + } + + @DisplayName("When having product update event, data must sync as update") + @Test + public void test_whenHavingUpdateEvent_shouldSyncAsUpdate() + throws ExecutionException, InterruptedException, TimeoutException { + // Given + long productId = 1L; + ProductEsDetailVm response = getSampleProduct(); + + // Create existing product + com.yas.search.model.Product product = getSampleEsProduct(productId); + productRepository.save(product); + + // When + // Simulate Product Detail API response + final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product()) + .path(STOREFRONT_PRODUCTS_ES_PATH) + .buildAndExpand(productId) + .toUri(); + simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class); + + // Sending CDC Event + sendMsg( + ProductCdcMessage.builder() + .op(UPDATE) + .after(Product.builder().id(productId).isPublished(true).build()) + .build() + ); + + // Then + // Verify Consumer + waitForConsumer(2, 1, 0, 0); + verify(productSyncDataService, times(1)).updateProduct(productId); + Optional updated = productRepository.findById(productId); + + // Verify ES sync data + assertTrue(updated.isPresent(), "ElasticSearch must have product data."); + assertEquals(updated.get().getName(), response.name(), "Product name must be updated."); + } + + @DisplayName("When having product delete event, data must sync as delete") + @Test + public void test_whenHavingDeleteEvent_shouldSyncAsDelete() + throws ExecutionException, InterruptedException, TimeoutException { + // Given + long productId = 1L; + ProductEsDetailVm response = getSampleProduct(); + + // Create existing product + com.yas.search.model.Product product = getSampleEsProduct(productId); + productRepository.save(product); + + // When + // Simulate Product Detail API response + final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product()) + .path(STOREFRONT_PRODUCTS_ES_PATH) + .buildAndExpand(productId) + .toUri(); + simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class); + + // Sending CDC Event + sendMsg( + ProductCdcMessage.builder() + .op(DELETE) + .after(Product.builder().id(productId).isPublished(true).build()) + .build() + ); + + // Then + // Verify Consumer + waitForConsumer(2, 1, 0, 0); + verify(productSyncDataService, times(1)).deleteProduct(productId); + Optional updated = productRepository.findById(productId); + + // Verify ES sync data + assertTrue(updated.isEmpty(), "ElasticSearch must remove product data."); + } + + private static com.yas.search.model.@NotNull Product getSampleEsProduct(long productId) { + com.yas.search.model.Product product = new com.yas.search.model.Product(); + product.setId(productId); + product.setName("Modern Speaker"); + return product; + } + + private static @NotNull ProductEsDetailVm getSampleProduct() { + return new ProductEsDetailVm( + 1001L, + "Wireless Bluetooth Speaker", + "wireless-bluetooth-speaker", + 79.99, + true, + true, + true, + false, + 501L, + "SoundWave", + List.of("Electronics", "Audio"), + List.of("Bluetooth 5.0", "10-hour battery life") + ); + } + +} diff --git a/search/src/it/resources/application.properties b/search/src/it/resources/application.properties index a521186f4b..1adc98f409 100644 --- a/search/src/it/resources/application.properties +++ b/search/src/it/resources/application.properties @@ -28,6 +28,18 @@ elasticsearch.password= yas.services.product=http://api.yas.local/product spring.kafka.bootstrap-servers=localhost:9092 -spring.kafka.consumer.bootstrap-servers=localhost:9092 -spring.kafka.consumer.group-id=search + +# CDC Kafka Config product.topic.name=dbproduct.public.product + +# Kafka Consumer Config +spring.kafka.consumer.group-id=search +spring.kafka.consumer.auto-offset-reset=earliest + +# Kafka Producer Config +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer + +# TestContainers version +kafka.version=7.0.9 +elasticsearch.version=7.17.6 diff --git a/search/src/it/resources/logback-spring.xml b/search/src/it/resources/logback-spring.xml index e7d52d7271..33770030c4 100644 --- a/search/src/it/resources/logback-spring.xml +++ b/search/src/it/resources/logback-spring.xml @@ -9,6 +9,9 @@ + + +