diff --git a/common-library/pom.xml b/common-library/pom.xml
index a2d3fa5806..978509214f 100644
--- a/common-library/pom.xml
+++ b/common-library/pom.xml
@@ -37,11 +37,24 @@
org.springframework
spring-tx
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
com.opencsv
opencsv
${opencsv.version}
+
+
+
+ org.testcontainers
+ kafka
+ test
+
diff --git a/common-library/src/it/java/common/container/ContainerFactory.java b/common-library/src/it/java/common/container/ContainerFactory.java
new file mode 100644
index 0000000000..b10677d7f6
--- /dev/null
+++ b/common-library/src/it/java/common/container/ContainerFactory.java
@@ -0,0 +1,46 @@
+package common.container;
+
+import dasniko.testcontainers.keycloak.KeycloakContainer;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Factory class that holds and provides containers used for testing with Testcontainers.
+ */
+public final class ContainerFactory {
+
+ private ContainerFactory() {}
+
+ public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
+ KeycloakContainer keycloak = new KeycloakContainer()
+ .withRealmImportFiles("/test-realm.json")
+ .withReuse(true);
+
+ registry.add(
+ "spring.security.oauth2.resourceserver.jwt.issuer-uri",
+ () -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus")
+ );
+ registry.add(
+ "spring.security.oauth2.resourceserver.jwt.jwk-set-uri",
+ () -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs")
+ );
+ return keycloak;
+ }
+
+ public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) {
+ var kafkaContainer = new KafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version))
+ );
+ registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
+
+ // Consumer properties
+ registry.add("auto.offset.reset", () -> "earliest");
+ registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
+
+ // Producer properties
+ registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
+ return kafkaContainer;
+ }
+
+}
diff --git a/common-library/src/it/java/common/kafka/CdcConsumerTest.java b/common-library/src/it/java/common/kafka/CdcConsumerTest.java
new file mode 100644
index 0000000000..d192a15b06
--- /dev/null
+++ b/common-library/src/it/java/common/kafka/CdcConsumerTest.java
@@ -0,0 +1,138 @@
+package common.kafka;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Getter;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.Assert;
+import org.springframework.web.client.RestClient;
+import org.testcontainers.containers.KafkaContainer;
+
+/**
+ * Base class providing support for testing a CDC (Change Data Capture) consumer.
+ * This class requires Kafka containers to be running (e.g., {@link Import}), and it provides support
+ * for a Kafka producer to send messages, as well as simulating a REST client
+ * for microservice integration.
+ *
+ * @param Message Type
+ */
+@Getter
+public abstract class CdcConsumerTest {
+
+ private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);
+
+ private final Class messageType;
+
+ private final String cdcEvent;
+
+ @Autowired
+ private KafkaContainer kafkaContainer;
+
+ @MockBean
+ private RestClient restClient;
+
+ @Mock
+ RestClient.ResponseSpec responseSpec;
+
+ @Mock
+ RestClient.RequestHeadersUriSpec requestHeadersUriSpec;
+
+ private KafkaTemplate kafkaTemplate;
+
+ public CdcConsumerTest(Class messageType, String topicEvent) {
+ Assert.notNull(topicEvent, "CDC topic must not be null");
+ Assert.notNull(messageType, "Message type must not be null");
+ this.cdcEvent = topicEvent;
+ this.messageType = messageType;
+ }
+
+ public synchronized KafkaTemplate getKafkaTemplate() {
+ // Now, we haven't had any process need Kafka Producer,
+ // then just temporary config producer like this for testing
+ if (kafkaTemplate == null) {
+ synchronized (this) {
+ final Map props = getProducerProps();
+ kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(props));
+ }
+ }
+ return kafkaTemplate;
+ }
+
+ protected void sendMsg(M message)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ var rs = getKafkaTemplate()
+ .send(this.cdcEvent, message)
+ .get(10, TimeUnit.SECONDS);
+ logger.info("Sent message completed: {}", rs);
+ }
+
+ protected void simulateHttpRequestWithResponse(URI url, R response, Class responseType) {
+ setupMockGetRequest(url);
+ when(responseSpec.body(responseType)).thenReturn(response);
+ }
+
+ protected void simulateHttpRequestWithError(URI url, Throwable exception, Class responseType) {
+ setupMockGetRequest(url);
+ when(responseSpec.body(responseType)).thenThrow(exception);
+ }
+
+ private void setupMockGetRequest(URI url) {
+ when(restClient.get()).thenReturn(requestHeadersUriSpec);
+ when(requestHeadersUriSpec.uri(url)).thenReturn(requestHeadersUriSpec);
+ when(requestHeadersUriSpec.headers(any())).thenReturn(requestHeadersUriSpec);
+ when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec);
+ }
+
+ /**
+ * Pauses the current thread until the consumer has finished processing. The wait time is dynamic and
+ * depends on the specific scenario (e.g., retry cases or normal processing).
+ *
+ * @param processTime The time (in seconds) the consumer takes to process each record.
+ * @param numOfRecords The number of records sent to the consumer.
+ * @param attempts The number of retry attempts. If no retries are needed, set this to '0'.
+ * @param backOff The backoff time (in seconds) between retries.
+ * @throws InterruptedException If the thread is interrupted while sleeping.
+ */
+ public void waitForConsumer(
+ long processTime,
+ int numOfRecords,
+ int attempts,
+ long backOff
+ ) throws InterruptedException {
+ // retryTime = (1st run) + (total run when retrying) + (total back off time)
+ long retryTime = processTime + (attempts * processTime) + (backOff * attempts);
+ long waitTime = (attempts > 0 ? retryTime : processTime) * numOfRecords;
+ logger.info("Consumer Processing expected time: {}s", waitTime);
+ Thread.sleep(Duration.ofSeconds(waitTime));
+ }
+
+ private @NotNull Map getProducerProps() {
+ final Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return props;
+ }
+}
+
diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java
new file mode 100644
index 0000000000..1e446735b7
--- /dev/null
+++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java
@@ -0,0 +1,29 @@
+package com.yas.commonlibrary.kafka.cdc;
+
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.MessageHeaders;
+
+/**
+ * Base class for CDC (Change Data Capture) Kafka consumers.
+ * Provides common methods for processing messages and handling Dead Letter Topic (DLT) events.
+ *
+ * @param Type of the message payload.
+ */
+public abstract class BaseCdcConsumer {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(BaseCdcConsumer.class);
+
+ protected void processMessage(T record, MessageHeaders headers, Consumer consumer) {
+ LOGGER.info("## Received message - headers: {}", headers);
+ if (record == null) {
+ LOGGER.warn("## Null payload received");
+ } else {
+ LOGGER.debug("## Processing record - Key: {} | Value: {}", headers.get(KafkaHeaders.RECEIVED_KEY), record);
+ consumer.accept(record);
+ LOGGER.debug("## Record processed successfully - Key: {} \n", headers.get(KafkaHeaders.RECEIVED_KEY));
+ }
+ }
+}
diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/RetrySupportDql.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/RetrySupportDql.java
new file mode 100644
index 0000000000..d7f45e8a4c
--- /dev/null
+++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/RetrySupportDql.java
@@ -0,0 +1,43 @@
+package com.yas.commonlibrary.kafka.cdc;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.springframework.core.annotation.AliasFor;
+import org.springframework.kafka.annotation.RetryableTopic;
+import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
+import org.springframework.retry.annotation.Backoff;
+
+/**
+ * Custom annotation that extends Spring's {@link RetryableTopic} to
+ * add retry and dead letter queue (DLQ) support for Kafka listeners.
+ * Provides additional configuration for retry backoff, number of attempts,
+ * topic creation, and exclusion of certain exceptions.
+ */
+@Documented
+@RetryableTopic
+@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RetrySupportDql {
+
+ @AliasFor(annotation = RetryableTopic.class, attribute = "backoff")
+ Backoff backoff() default @Backoff(value = 6000);
+
+ @AliasFor(annotation = RetryableTopic.class, attribute = "attempts")
+ String attempts() default "4";
+
+ @AliasFor(annotation = RetryableTopic.class, attribute = "autoCreateTopics")
+ String autoCreateTopics() default "true";
+
+ @AliasFor(annotation = RetryableTopic.class, attribute = "listenerContainerFactory")
+ String listenerContainerFactory() default "";
+
+ @AliasFor(annotation = RetryableTopic.class, attribute = "exclude")
+ Class extends Throwable>[] exclude() default {};
+
+ @AliasFor(annotation = RetryableTopic.class, attribute = "sameIntervalTopicReuseStrategy")
+ SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.SINGLE_TOPIC;
+
+}
diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java
new file mode 100644
index 0000000000..78169271ec
--- /dev/null
+++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java
@@ -0,0 +1,61 @@
+package com.yas.commonlibrary.kafka.cdc.config;
+
+import java.util.Map;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+
+/**
+ * Base configuration class for setting up Kafka consumers with typed deserialization.
+ *
+ * @param The type of messages consumed.
+ */
+public abstract class BaseKafkaListenerConfig {
+
+ private final Class type;
+ private final KafkaProperties kafkaProperties;
+
+ public BaseKafkaListenerConfig(Class type, KafkaProperties kafkaProperties) {
+ this.type = type;
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ /**
+ * Abstract method to provide a custom instance of {@link ConcurrentKafkaListenerContainerFactory}.
+ * (override method must be recognized as bean)
+ * In case, using default want, let's get the default from {@link #kafkaListenerContainerFactory()}
+ *
+ * @return a configured instance of {@link ConcurrentKafkaListenerContainerFactory}.
+ */
+ public abstract ConcurrentKafkaListenerContainerFactory listenerContainerFactory();
+
+ /**
+ * Common instance type ConcurrentKafkaListenerContainerFactory.
+ *
+ * @return concurrentKafkaListenerContainerFactory {@link ConcurrentKafkaListenerContainerFactory}.
+ */
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ var factory = new ConcurrentKafkaListenerContainerFactory();
+ factory.setConsumerFactory(typeConsumerFactory(type));
+ return factory;
+ }
+
+ private ConsumerFactory typeConsumerFactory(Class clazz) {
+ Map props = buildConsumerProperties();
+ var serialize = new StringDeserializer();
+ // wrapper in case serialization/deserialization occur
+ var jsonDeserializer = new JsonDeserializer<>(clazz);
+ jsonDeserializer.addTrustedPackages("*");
+ var deserialize = new ErrorHandlingDeserializer<>(jsonDeserializer);
+ return new DefaultKafkaConsumerFactory<>(props, serialize, deserialize);
+ }
+
+ private Map buildConsumerProperties() {
+ return kafkaProperties.buildConsumerProperties(null);
+ }
+
+}
diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/Operation.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/Operation.java
new file mode 100644
index 0000000000..9fb999cdce
--- /dev/null
+++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/Operation.java
@@ -0,0 +1,22 @@
+package com.yas.commonlibrary.kafka.cdc.message;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+public enum Operation {
+
+ READ("r"),
+ CREATE("c"),
+ UPDATE("u"),
+ DELETE("d");
+
+ private final String name;
+
+ Operation(String name) {
+ this.name = name;
+ }
+
+ @JsonValue
+ public String getName() {
+ return name;
+ }
+}
diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/Product.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/Product.java
new file mode 100644
index 0000000000..68338de313
--- /dev/null
+++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/Product.java
@@ -0,0 +1,20 @@
+package com.yas.commonlibrary.kafka.cdc.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+
+@lombok.Getter
+@lombok.Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Product {
+
+ private long id;
+
+ @JsonProperty("is_published")
+ private boolean isPublished;
+
+}
diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java
new file mode 100644
index 0000000000..e73ed2cfa6
--- /dev/null
+++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java
@@ -0,0 +1,24 @@
+package com.yas.commonlibrary.kafka.cdc.message;
+
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+
+@lombok.Getter
+@lombok.Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProductCdcMessage {
+
+ @NotNull
+ private Product after;
+
+ private Product before;
+
+ @NotNull
+ private Operation op;
+
+}
+
diff --git a/recommendation/pom.xml b/recommendation/pom.xml
index 63e18caa1f..2de4957c93 100644
--- a/recommendation/pom.xml
+++ b/recommendation/pom.xml
@@ -40,6 +40,11 @@
+
+ com.yas
+ common-library
+ ${revision}
+
org.apache.commons
commons-text
diff --git a/recommendation/src/main/java/com/yas/recommendation/consumer/ProductSyncDataConsumer.java b/recommendation/src/main/java/com/yas/recommendation/consumer/ProductSyncDataConsumer.java
deleted file mode 100644
index eb3beff4d7..0000000000
--- a/recommendation/src/main/java/com/yas/recommendation/consumer/ProductSyncDataConsumer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.yas.recommendation.consumer;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.yas.recommendation.constant.Action;
-import com.yas.recommendation.vector.product.service.ProductVectorSyncService;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-/**
- * Service class that listens to Kafka topics related to product data.
- * The ProductSyncDataConsumer processes records for various actions
- * (CREATE, READ, UPDATE, DELETE) on product data and synchronizes the
- * product vectors accordingly through the ProductVectorSyncService.
- */
-@Service
-public class ProductSyncDataConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
-
- /**
- * Service to handle synchronization of product vectors.
- */
- @Autowired
- private ProductVectorSyncService productVectorSyncService;
-
- /**
- * Listens to the specified Kafka topic for product data changes.
- * Processes the incoming Kafka records to determine the action type
- * and synchronizes product vector data accordingly.
- *
- * @param consumerRecord The incoming record from the Kafka topic.
- * The key contains product identifier, and the
- * value contains the action type and product data.
- */
- @KafkaListener(topics = "${product.topic.name}")
- public void listen(ConsumerRecord, ?> consumerRecord) {
-
- if (consumerRecord != null) {
- JsonObject keyObject = new Gson().fromJson((String) consumerRecord.key(), JsonObject.class);
- if (keyObject != null) {
- JsonObject valueObject = new Gson().fromJson((String) consumerRecord.value(), JsonObject.class);
- if (valueObject != null) {
- String action = String.valueOf(valueObject.get("op")).replaceAll("\"", "");
- Long id = keyObject.get("id").getAsLong();
- JsonObject productObject = valueObject.get("after").getAsJsonObject();
- boolean isPublished = productObject != null && productObject.get("is_published").getAsBoolean();
-
- switch (action) {
- case Action.CREATE, Action.READ:
- productVectorSyncService.createProductVector(id, isPublished);
- break;
- case Action.UPDATE:
- productVectorSyncService.updateProductVector(id, isPublished);
- break;
- case Action.DELETE:
- productVectorSyncService.deleteProductVector(id);
- break;
- default:
- break;
- }
- }
- }
- }
- }
-}
diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/AppKafkaListenerConfigurer.java b/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/AppKafkaListenerConfigurer.java
new file mode 100644
index 0000000000..56a04ae6ce
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/AppKafkaListenerConfigurer.java
@@ -0,0 +1,24 @@
+package com.yas.recommendation.kafka.config.consumer;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.annotation.KafkaListenerConfigurer;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
+import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
+
+@EnableKafka
+@Configuration
+public class AppKafkaListenerConfigurer implements KafkaListenerConfigurer {
+
+ private LocalValidatorFactoryBean validator;
+
+ public AppKafkaListenerConfigurer(LocalValidatorFactoryBean validator) {
+ this.validator = validator;
+ }
+
+ @Override
+ public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+ // Enable message validation
+ registrar.setValidator(this.validator);
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java b/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java
new file mode 100644
index 0000000000..53f0050471
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java
@@ -0,0 +1,30 @@
+package com.yas.recommendation.kafka.config.consumer;
+
+import com.yas.commonlibrary.kafka.cdc.config.BaseKafkaListenerConfig;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+
+/**
+ * Product CDC kafka listener, support convert product cdc message to java object.
+ */
+@EnableKafka
+@Configuration
+public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig {
+
+ public static final String PRODUCT_CDC_LISTENER_CONTAINER_FACTORY = "productCdcListenerContainerFactory";
+
+ public ProductCdcKafkaListenerConfig(KafkaProperties kafkaProperties) {
+ super(ProductCdcMessage.class, kafkaProperties);
+ }
+
+ @Bean(name = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY)
+ @Override
+ public ConcurrentKafkaListenerContainerFactory listenerContainerFactory() {
+ return super.kafkaListenerContainerFactory();
+ }
+
+}
\ No newline at end of file
diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java
new file mode 100644
index 0000000000..617e7e7f9a
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java
@@ -0,0 +1,40 @@
+package com.yas.recommendation.kafka.consumer;
+
+import static com.yas.recommendation.kafka.config.consumer.ProductCdcKafkaListenerConfig.PRODUCT_CDC_LISTENER_CONTAINER_FACTORY;
+
+import com.yas.commonlibrary.kafka.cdc.BaseCdcConsumer;
+import com.yas.commonlibrary.kafka.cdc.RetrySupportDql;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import jakarta.validation.Valid;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Headers;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Component;
+
+/**
+ * Product synchronize data consumer for pgvector.
+ */
+@Component
+public class ProductSyncDataConsumer extends BaseCdcConsumer {
+
+ private final ProductSyncService productSyncService;
+
+ public ProductSyncDataConsumer(ProductSyncService productSyncService) {
+ this.productSyncService = productSyncService;
+ }
+
+ @KafkaListener(
+ id = "product-sync-recommendation",
+ groupId = "product-sync",
+ topics = "${product.topic.name}",
+ containerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY
+ )
+ @RetrySupportDql(listenerContainerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY)
+ public void processMessage(
+ @Payload(required = false) @Valid ProductCdcMessage productCdcMessage,
+ @Headers MessageHeaders headers
+ ) {
+ processMessage(productCdcMessage, headers, productSyncService::sync);
+ }
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java
new file mode 100644
index 0000000000..21a5a7c07d
--- /dev/null
+++ b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java
@@ -0,0 +1,47 @@
+package com.yas.recommendation.kafka.consumer;
+
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import com.yas.recommendation.vector.product.service.ProductVectorSyncService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * Product Sync Data service, support sync product data based on Product CDC message.
+ */
+@Slf4j
+@Service
+public class ProductSyncService {
+
+ private final ProductVectorSyncService productVectorSyncService;
+
+ public ProductSyncService(ProductVectorSyncService productVectorSyncService) {
+ this.productVectorSyncService = productVectorSyncService;
+ }
+
+ /**
+ * Synchronize Product Data to VectorDb based on Product CDC message.
+ *
+ * @param productCdcMessage {@link ProductCdcMessage} CDC message.
+ */
+ public void sync(ProductCdcMessage productCdcMessage) {
+ if (productCdcMessage.getAfter() != null) {
+ var operation = productCdcMessage.getOp();
+ var product = productCdcMessage.getAfter();
+ switch (operation) {
+ case CREATE, READ:
+ productVectorSyncService.createProductVector(product);
+ break;
+ case UPDATE:
+ productVectorSyncService.updateProductVector(product);
+ break;
+ case DELETE:
+ productVectorSyncService.deleteProductVector(product.getId());
+ break;
+ default:
+ log.info("Unsupported operation '{}' for product: '{}'", operation, product.getId());
+ break;
+ }
+ }
+ }
+
+}
diff --git a/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java b/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java
index df918095e6..e9c8605264 100644
--- a/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java
+++ b/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java
@@ -1,5 +1,6 @@
package com.yas.recommendation.vector.product.service;
+import com.yas.commonlibrary.kafka.cdc.message.Product;
import com.yas.recommendation.vector.product.store.ProductVectorRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@@ -17,29 +18,25 @@ public class ProductVectorSyncService {
/**
* Creates a product vector if the product is published.
*
- * @param productId The unique identifier of the product to be synchronized.
- * @param isPublished Indicates if the product is published.
- * The product vector is created only if this value is true.
+ * @param product {@link Product} the product to be synchronized.
*/
- public void createProductVector(Long productId, boolean isPublished) {
- if (isPublished) {
- productVectorRepository.add(productId);
+ public void createProductVector(Product product) {
+ if (product.isPublished()) {
+ productVectorRepository.add(product.getId());
}
}
/**
* Updates a product vector if the product is published; deletes it otherwise.
*
- * @param productId The unique identifier of the product to be updated.
- * @param isPublished Indicates if the product is published.
- * The product vector is updated if true, or deleted if false.
+ * @param product {@link Product} the product to be synchronized.
*/
- public void updateProductVector(Long productId, boolean isPublished) {
- if (!isPublished) {
- productVectorRepository.delete(productId);
- return;
+ public void updateProductVector(Product product) {
+ if (product.isPublished()) {
+ productVectorRepository.update(product.getId());
+ } else {
+ productVectorRepository.delete(product.getId());
}
- productVectorRepository.update(productId);
}
/**
diff --git a/recommendation/src/main/resources/application.properties b/recommendation/src/main/resources/application.properties
index 405a4390fb..f725c576da 100644
--- a/recommendation/src/main/resources/application.properties
+++ b/recommendation/src/main/resources/application.properties
@@ -42,10 +42,18 @@ spring.security.oauth2.resourceserver.jwt.issuer-uri=http://identity/realms/Yas
springdoc.oauthflow.authorization-url=http://identity/realms/Yas/protocol/openid-connect/auth
springdoc.oauthflow.token-url=http://identity/realms/Yas/protocol/openid-connect/token
-# Kafka Config
-product.topic.name = dbproduct.public.product
+# Kafka CDC Topic config
+product.topic.name=dbproduct.public.product
+
+# Kafka Consumer
spring.kafka.consumer.bootstrap-servers=kafka:9092
spring.kafka.consumer.group-id=recommendation
+spring.aop.proxy-target-class=true
+
+# Kafka Producer
+spring.kafka.producer.bootstrap-servers=kafka:9092
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Similarity Search Config
yas.recommendation.embedding-based.search.topK=10
diff --git a/search/src/it/java/com/yas/search/config/ElasticTestContainer.java b/search/src/it/java/com/yas/search/config/ElasticTestContainer.java
index 548a5f71d6..8a21186521 100644
--- a/search/src/it/java/com/yas/search/config/ElasticTestContainer.java
+++ b/search/src/it/java/com/yas/search/config/ElasticTestContainer.java
@@ -4,16 +4,15 @@
public class ElasticTestContainer extends ElasticsearchContainer {
- private static final String DOCKER_ELASTIC = "docker.elastic.co/elasticsearch/elasticsearch:7.17.6";
+ private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:%s";
private static final String CLUSTER_NAME = "cluster.name";
private static final String ELASTIC_SEARCH = "elasticsearch";
- public ElasticTestContainer() {
- super(DOCKER_ELASTIC);
+ public ElasticTestContainer(String version) {
+ super(IMAGE_NAME.formatted(version));
this.addFixedExposedPort(9200, 9200);
- this.addFixedExposedPort(9300, 9300);
this.addEnv(CLUSTER_NAME, ELASTIC_SEARCH);
this.withEnv("xpack.security.transport.ssl.enabled", "false");
this.withEnv("xpack.security.http.ssl.enabled", "false");
diff --git a/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java b/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java
new file mode 100644
index 0000000000..16908dd32b
--- /dev/null
+++ b/search/src/it/java/com/yas/search/config/KafkaIntegrationTestConfiguration.java
@@ -0,0 +1,32 @@
+package com.yas.search.config;
+
+import common.container.ContainerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.testcontainers.containers.KafkaContainer;
+
+@TestConfiguration
+public class KafkaIntegrationTestConfiguration {
+
+ @Value("${kafka.version}")
+ private String kafkaVersion;
+
+ @Value("${elasticsearch.version}")
+ private String elasticSearchVersion;
+
+ @Bean
+ @ServiceConnection
+ public KafkaContainer kafkaContainer(DynamicPropertyRegistry registry) {
+ return ContainerFactory.kafkaContainer(registry, kafkaVersion);
+ }
+
+ @Bean
+ @ServiceConnection
+ public ElasticTestContainer elasticTestContainer() {
+ return new ElasticTestContainer(elasticSearchVersion);
+ }
+
+}
diff --git a/search/src/it/java/com/yas/search/config/SearchTestConfig.java b/search/src/it/java/com/yas/search/config/SearchTestConfig.java
index 3d4621d4a7..bf92312283 100644
--- a/search/src/it/java/com/yas/search/config/SearchTestConfig.java
+++ b/search/src/it/java/com/yas/search/config/SearchTestConfig.java
@@ -1,6 +1,7 @@
package com.yas.search.config;
import com.yas.commonlibrary.IntegrationTestConfiguration;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
@@ -8,9 +9,12 @@
@TestConfiguration
public class SearchTestConfig extends IntegrationTestConfiguration {
+ @Value("${elasticsearch.version}")
+ private String elasticSearchVersion;
+
@Bean(destroyMethod = "stop")
@ServiceConnection
public ElasticTestContainer elasticTestContainer() {
- return new ElasticTestContainer();
+ return new ElasticTestContainer(elasticSearchVersion);
}
}
diff --git a/search/src/it/java/com/yas/search/controller/ProductControllerIT.java b/search/src/it/java/com/yas/search/controller/ProductControllerIT.java
index 33371751fa..93b10a37f8 100644
--- a/search/src/it/java/com/yas/search/controller/ProductControllerIT.java
+++ b/search/src/it/java/com/yas/search/controller/ProductControllerIT.java
@@ -25,78 +25,78 @@
@PropertySource("classpath:application.properties")
public class ProductControllerIT extends AbstractControllerIT {
- @Autowired
- ProductRepository productRepository;
+ @Autowired
+ ProductRepository productRepository;
- @BeforeEach
- public void init() {
+ @BeforeEach
+ public void init() {
- Product product = new Product();
- product.setId(1L);
- product.setName("Macbook M1");
- product.setBrand("Apple");
- product.setCategories(List.of("Laptop", "Macbook"));
- product.setAttributes(List.of("CPU", "RAM", "SSD"));
+ Product product = new Product();
+ product.setId(1L);
+ product.setName("Macbook M1");
+ product.setBrand("Apple");
+ product.setCategories(List.of("Laptop", "Macbook"));
+ product.setAttributes(List.of("CPU", "RAM", "SSD"));
- productRepository.save(product, RefreshPolicy.IMMEDIATE);
- }
+ productRepository.save(product, RefreshPolicy.IMMEDIATE);
+ }
- @AfterEach
- public void destroy() {
- productRepository.deleteAll();
- }
+ @AfterEach
+ public void destroy() {
+ productRepository.deleteAll();
+ }
- @Test
- public void test_findProductAdvance_shouldReturnSuccessfully() {
- given(getRequestSpecification())
- .auth().oauth2(getAccessToken("admin", "admin"))
- .contentType(ContentType.JSON)
- .queryParam("keyword", "Macbook")
- .queryParam("category", "Laptop")
- .queryParam("attribute", "CPU")
- .queryParam("page", 0)
- .queryParam("size", 12)
- .get("/search/storefront/catalog-search")
- .then()
- .statusCode(HttpStatus.OK.value())
- .body("pageNo", equalTo(0))
- .body("pageSize", equalTo(12))
- .body("totalElements", equalTo(1))
- .log()
- .ifValidationFails();
- }
+ @Test
+ public void test_findProductAdvance_shouldReturnSuccessfully() {
+ given(getRequestSpecification())
+ .auth().oauth2(getAccessToken("admin", "admin"))
+ .contentType(ContentType.JSON)
+ .queryParam("keyword", "Macbook")
+ .queryParam("category", "Laptop")
+ .queryParam("attribute", "CPU")
+ .queryParam("page", 0)
+ .queryParam("size", 12)
+ .get("/search/storefront/catalog-search")
+ .then()
+ .statusCode(HttpStatus.OK.value())
+ .body("pageNo", equalTo(0))
+ .body("pageSize", equalTo(12))
+ .body("totalElements", equalTo(1))
+ .log()
+ .ifValidationFails();
+ }
- @Test
- public void test_findProductAdvance_shouldNotReturnAnyProduct() {
- given(getRequestSpecification())
- .auth().oauth2(getAccessToken("admin", "admin"))
- .contentType(ContentType.JSON)
- .queryParam("keyword", "Macbook")
- .queryParam("category", "Laptop")
- .queryParam("brand", "Samsung")
- .queryParam("page", 0)
- .queryParam("size", 12)
- .get("/search/storefront/catalog-search")
- .then()
- .statusCode(HttpStatus.OK.value())
- .body("pageNo", equalTo(0))
- .body("pageSize", equalTo(12))
- .body("totalElements", equalTo(0))
- .log()
- .ifValidationFails();
- }
+ @Test
+ public void test_findProductAdvance_shouldNotReturnAnyProduct() {
+ given(getRequestSpecification())
+ .auth().oauth2(getAccessToken("admin", "admin"))
+ .contentType(ContentType.JSON)
+ .queryParam("keyword", "Macbook")
+ .queryParam("category", "Laptop")
+ .queryParam("brand", "Samsung")
+ .queryParam("page", 0)
+ .queryParam("size", 12)
+ .get("/search/storefront/catalog-search")
+ .then()
+ .statusCode(HttpStatus.OK.value())
+ .body("pageNo", equalTo(0))
+ .body("pageSize", equalTo(12))
+ .body("totalElements", equalTo(0))
+ .log()
+ .ifValidationFails();
+ }
- @Test
- public void test_productSearchAutoComplete_shouldReturnSuccessfully() {
- given(getRequestSpecification())
- .auth().oauth2(getAccessToken("admin", "admin"))
- .contentType(ContentType.JSON)
- .queryParam("keyword", "Macbook")
- .get("/search/storefront/search_suggest")
- .then()
- .statusCode(HttpStatus.OK.value())
- .body("productNames", hasSize(1))
- .log()
- .ifValidationFails();
- }
+ @Test
+ public void test_productSearchAutoComplete_shouldReturnSuccessfully() {
+ given(getRequestSpecification())
+ .auth().oauth2(getAccessToken("admin", "admin"))
+ .contentType(ContentType.JSON)
+ .queryParam("keyword", "Macbook")
+ .get("/search/storefront/search_suggest")
+ .then()
+ .statusCode(HttpStatus.OK.value())
+ .body("productNames", hasSize(1))
+ .log()
+ .ifValidationFails();
+ }
}
diff --git a/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java
new file mode 100644
index 0000000000..d530ee765d
--- /dev/null
+++ b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java
@@ -0,0 +1,227 @@
+package com.yas.search.kafka;
+
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.CREATE;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yas.commonlibrary.kafka.cdc.message.Product;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import com.yas.search.config.KafkaIntegrationTestConfiguration;
+import com.yas.search.config.ServiceUrlConfig;
+import com.yas.search.repository.ProductRepository;
+import com.yas.search.service.ProductSyncDataService;
+import com.yas.search.viewmodel.ProductEsDetailVm;
+import common.kafka.CdcConsumerTest;
+import java.net.URI;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.web.util.UriComponentsBuilder;
+
+@Import(KafkaIntegrationTestConfiguration.class)
+@PropertySource("classpath:application.properties")
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+public class ProductCdcConsumerTest extends CdcConsumerTest {
+
+ public static final String STOREFRONT_PRODUCTS_ES_PATH = "/storefront/products-es/{id}";
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Autowired
+ private ServiceUrlConfig serviceUrlConfig;
+
+ @Autowired
+ private ProductRepository productRepository;
+
+ @SpyBean
+ private ProductSyncDataService productSyncDataService;
+
+ public ProductCdcConsumerTest() {
+ super(ProductCdcMessage.class, "dbproduct.public.product");
+ }
+
+ @AfterEach
+ public void tearDown() {
+ productRepository.deleteAll();
+ }
+
+ @DisplayName("When having product create event, data must sync as create")
+ @Test
+ public void test_whenHavingCreateEvent_shouldSyncAsCreate()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+ ProductEsDetailVm response = getSampleProduct();
+
+ // When
+ // Simulate Product Detail API response
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(CREATE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ // Verify consumer
+ waitForConsumer(2, 1, 0, 0);
+ verify(productSyncDataService, times(1)).createProduct(productId);
+
+ // Verify ES Sync data
+ Optional product = productRepository.findById(productId);
+ assertTrue(product.isPresent(), "ElasticSearch must create data accordingly to CDC event.");
+ }
+
+ @DisplayName("When having product create event, but consumer process failed, consumer must perform retry.")
+ @Test
+ public void test_whenHavingCreateEvent_thenProcessFailed_shouldPerformRetry()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+
+ // When
+ // Simulate Product Detail API throw errors
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithError(url, new RuntimeException("Invalid Request"), ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(CREATE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ waitForConsumer(2, 1, 4, 6);
+ verify(productSyncDataService, times(4)).createProduct(productId);
+ }
+
+ @DisplayName("When having product update event, data must sync as update")
+ @Test
+ public void test_whenHavingUpdateEvent_shouldSyncAsUpdate()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+ ProductEsDetailVm response = getSampleProduct();
+
+ // Create existing product
+ com.yas.search.model.Product product = getSampleEsProduct(productId);
+ productRepository.save(product);
+
+ // When
+ // Simulate Product Detail API response
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(UPDATE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ // Verify Consumer
+ waitForConsumer(2, 1, 0, 0);
+ verify(productSyncDataService, times(1)).updateProduct(productId);
+ Optional updated = productRepository.findById(productId);
+
+ // Verify ES sync data
+ assertTrue(updated.isPresent(), "ElasticSearch must have product data.");
+ assertEquals(updated.get().getName(), response.name(), "Product name must be updated.");
+ }
+
+ @DisplayName("When having product delete event, data must sync as delete")
+ @Test
+ public void test_whenHavingDeleteEvent_shouldSyncAsDelete()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+ long productId = 1L;
+ ProductEsDetailVm response = getSampleProduct();
+
+ // Create existing product
+ com.yas.search.model.Product product = getSampleEsProduct(productId);
+ productRepository.save(product);
+
+ // When
+ // Simulate Product Detail API response
+ final URI url = UriComponentsBuilder.fromHttpUrl(serviceUrlConfig.product())
+ .path(STOREFRONT_PRODUCTS_ES_PATH)
+ .buildAndExpand(productId)
+ .toUri();
+ simulateHttpRequestWithResponse(url, response, ProductEsDetailVm.class);
+
+ // Sending CDC Event
+ sendMsg(
+ ProductCdcMessage.builder()
+ .op(DELETE)
+ .after(Product.builder().id(productId).isPublished(true).build())
+ .build()
+ );
+
+ // Then
+ // Verify Consumer
+ waitForConsumer(2, 1, 0, 0);
+ verify(productSyncDataService, times(1)).deleteProduct(productId);
+ Optional updated = productRepository.findById(productId);
+
+ // Verify ES sync data
+ assertTrue(updated.isEmpty(), "ElasticSearch must remove product data.");
+ }
+
+ private static com.yas.search.model.@NotNull Product getSampleEsProduct(long productId) {
+ com.yas.search.model.Product product = new com.yas.search.model.Product();
+ product.setId(productId);
+ product.setName("Modern Speaker");
+ return product;
+ }
+
+ private static @NotNull ProductEsDetailVm getSampleProduct() {
+ return new ProductEsDetailVm(
+ 1001L,
+ "Wireless Bluetooth Speaker",
+ "wireless-bluetooth-speaker",
+ 79.99,
+ true,
+ true,
+ true,
+ false,
+ 501L,
+ "SoundWave",
+ List.of("Electronics", "Audio"),
+ List.of("Bluetooth 5.0", "10-hour battery life")
+ );
+ }
+
+}
diff --git a/search/src/it/resources/application.properties b/search/src/it/resources/application.properties
index a521186f4b..1adc98f409 100644
--- a/search/src/it/resources/application.properties
+++ b/search/src/it/resources/application.properties
@@ -28,6 +28,18 @@ elasticsearch.password=
yas.services.product=http://api.yas.local/product
spring.kafka.bootstrap-servers=localhost:9092
-spring.kafka.consumer.bootstrap-servers=localhost:9092
-spring.kafka.consumer.group-id=search
+
+# CDC Kafka Config
product.topic.name=dbproduct.public.product
+
+# Kafka Consumer Config
+spring.kafka.consumer.group-id=search
+spring.kafka.consumer.auto-offset-reset=earliest
+
+# Kafka Producer Config
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
+
+# TestContainers version
+kafka.version=7.0.9
+elasticsearch.version=7.17.6
diff --git a/search/src/it/resources/logback-spring.xml b/search/src/it/resources/logback-spring.xml
index e7d52d7271..33770030c4 100644
--- a/search/src/it/resources/logback-spring.xml
+++ b/search/src/it/resources/logback-spring.xml
@@ -9,6 +9,9 @@
+
+
+
diff --git a/search/src/main/java/com/yas/search/ElasticsearchApplication.java b/search/src/main/java/com/yas/search/ElasticsearchApplication.java
index 8c0ab5faa8..e0dcacc07b 100644
--- a/search/src/main/java/com/yas/search/ElasticsearchApplication.java
+++ b/search/src/main/java/com/yas/search/ElasticsearchApplication.java
@@ -6,7 +6,6 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-
@EnableConfigurationProperties(ServiceUrlConfig.class)
@SpringBootApplication(scanBasePackages = {"com.yas.search", "com.yas.commonlibrary"})
@Configuration
diff --git a/search/src/main/java/com/yas/search/consumer/ProductSyncDataConsumer.java b/search/src/main/java/com/yas/search/consumer/ProductSyncDataConsumer.java
deleted file mode 100644
index 090168a842..0000000000
--- a/search/src/main/java/com/yas/search/consumer/ProductSyncDataConsumer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.yas.search.consumer;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.yas.search.constant.Action;
-import com.yas.search.service.ProductSyncDataService;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class ProductSyncDataConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
- @Autowired
- private ProductSyncDataService productSyncDataService;
-
- @KafkaListener(topics = "${product.topic.name}")
- public void listen(ConsumerRecord, ?> consumerRecord) {
-
- if (consumerRecord != null) {
- JsonObject keyObject = new Gson().fromJson((String) consumerRecord.key(), JsonObject.class);
- if (keyObject != null) {
- JsonObject valueObject = new Gson().fromJson((String) consumerRecord.value(), JsonObject.class);
- if (valueObject != null) {
- String action = String.valueOf(valueObject.get("op")).replaceAll("\"", "");
- Long id = keyObject.get("id").getAsLong();
-
- switch (action) {
- case Action.CREATE, Action.READ:
- productSyncDataService.createProduct(id);
- break;
- case Action.UPDATE:
- productSyncDataService.updateProduct(id);
- break;
- case Action.DELETE:
- productSyncDataService.deleteProduct(id);
- break;
- default:
- break;
- }
- }
- }
- }
- }
-}
diff --git a/search/src/main/java/com/yas/search/kafka/config/consumer/AppKafkaListenerConfigurer.java b/search/src/main/java/com/yas/search/kafka/config/consumer/AppKafkaListenerConfigurer.java
new file mode 100644
index 0000000000..bbd1515b89
--- /dev/null
+++ b/search/src/main/java/com/yas/search/kafka/config/consumer/AppKafkaListenerConfigurer.java
@@ -0,0 +1,24 @@
+package com.yas.search.kafka.config.consumer;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.annotation.KafkaListenerConfigurer;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
+import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
+
+@EnableKafka
+@Configuration
+public class AppKafkaListenerConfigurer implements KafkaListenerConfigurer {
+
+ private LocalValidatorFactoryBean validator;
+
+ public AppKafkaListenerConfigurer(LocalValidatorFactoryBean validator) {
+ this.validator = validator;
+ }
+
+ @Override
+ public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+ // Enable message validation
+ registrar.setValidator(this.validator);
+ }
+}
diff --git a/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java b/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java
new file mode 100644
index 0000000000..2e217f40e3
--- /dev/null
+++ b/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java
@@ -0,0 +1,30 @@
+package com.yas.search.kafka.config.consumer;
+
+import com.yas.commonlibrary.kafka.cdc.config.BaseKafkaListenerConfig;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+
+/**
+ * Product CDC kafka listener, support convert product cdc message to java object.
+ */
+@EnableKafka
+@Configuration
+public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig {
+
+ public static final String PRODUCT_CDC_LISTENER_CONTAINER_FACTORY = "productCdcListenerContainerFactory";
+
+ public ProductCdcKafkaListenerConfig(KafkaProperties kafkaProperties) {
+ super(ProductCdcMessage.class, kafkaProperties);
+ }
+
+ @Bean(name = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY)
+ @Override
+ public ConcurrentKafkaListenerContainerFactory listenerContainerFactory() {
+ return super.kafkaListenerContainerFactory();
+ }
+
+}
\ No newline at end of file
diff --git a/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java b/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java
new file mode 100644
index 0000000000..6249994424
--- /dev/null
+++ b/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java
@@ -0,0 +1,64 @@
+package com.yas.search.kafka.consumer;
+
+import static com.yas.search.kafka.config.consumer.ProductCdcKafkaListenerConfig.PRODUCT_CDC_LISTENER_CONTAINER_FACTORY;
+
+import com.yas.commonlibrary.kafka.cdc.BaseCdcConsumer;
+import com.yas.commonlibrary.kafka.cdc.RetrySupportDql;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import com.yas.search.service.ProductSyncDataService;
+import jakarta.validation.Valid;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Headers;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+/**
+ * Product synchronize data consumer for elasticsearch.
+ */
+@Slf4j
+@Service
+public class ProductSyncDataConsumer extends BaseCdcConsumer {
+
+ private final ProductSyncDataService productSyncDataService;
+
+ public ProductSyncDataConsumer(ProductSyncDataService productSyncDataService) {
+ this.productSyncDataService = productSyncDataService;
+ }
+
+ @KafkaListener(
+ id = "product-sync-es",
+ groupId = "product-sync-search",
+ topics = "${product.topic.name}",
+ containerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY
+ )
+ @RetrySupportDql(listenerContainerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY)
+ public void processMessage(
+ @Payload(required = false) @Valid ProductCdcMessage productCdcMessage,
+ @Headers MessageHeaders headers
+ ) {
+ processMessage(productCdcMessage, headers, this::sync);
+ }
+
+ public void sync(ProductCdcMessage productCdcMessage) {
+ if (productCdcMessage.getAfter() != null) {
+ var operation = productCdcMessage.getOp();
+ var productId = productCdcMessage.getAfter().getId();
+ switch (operation) {
+ case CREATE, READ:
+ productSyncDataService.createProduct(productId);
+ break;
+ case UPDATE:
+ productSyncDataService.updateProduct(productId);
+ break;
+ case DELETE:
+ productSyncDataService.deleteProduct(productId);
+ break;
+ default:
+ log.info("Unsupported operation '{}' for product: '{}'", operation, productId);
+ break;
+ }
+ }
+ }
+}
diff --git a/search/src/main/resources/application.properties b/search/src/main/resources/application.properties
index 48f2a2256b..8d8229e9a6 100644
--- a/search/src/main/resources/application.properties
+++ b/search/src/main/resources/application.properties
@@ -29,3 +29,7 @@ spring.kafka.consumer.bootstrap-servers=kafka:9092
spring.kafka.consumer.group-id=search
product.topic.name=dbproduct.public.product
+spring.kafka.producer.bootstrap-servers=kafka:9092
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
+
diff --git a/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java b/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java
index 9605a0a566..65f706341f 100644
--- a/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java
+++ b/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java
@@ -1,13 +1,15 @@
package com.yas.search.consumer;
-import static org.mockito.ArgumentMatchers.any;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.CREATE;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE;
+import static com.yas.commonlibrary.kafka.cdc.message.Operation.UPDATE;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import com.google.gson.JsonObject;
-import com.yas.search.constant.Action;
+import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
+import com.yas.search.kafka.consumer.ProductSyncDataConsumer;
+import com.yas.commonlibrary.kafka.cdc.message.Product;
import com.yas.search.service.ProductSyncDataService;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
@@ -28,77 +30,47 @@ void setUp() {
}
@Test
- void testListen_whenCreateAction_createProduct() {
-
- JsonObject keyObject = new JsonObject();
- keyObject.addProperty("id", 1L);
-
- JsonObject valueObject = new JsonObject();
- valueObject.addProperty("op", Action.CREATE);
-
- ConsumerRecord consumerRecord = new ConsumerRecord<>(
- "test-topic", 0, 0, keyObject.toString(), valueObject.toString()
- );
-
- productSyncDataConsumer.listen(consumerRecord);
-
- verify(productSyncDataService, times(1)).createProduct(1L);
- }
-
- @Test
- void testListen_whenUpdateAction_updateProduct() {
-
- JsonObject keyObject = new JsonObject();
- keyObject.addProperty("id", 2L);
-
- JsonObject valueObject = new JsonObject();
- valueObject.addProperty("op", Action.UPDATE);
-
- ConsumerRecord consumerRecord = new ConsumerRecord<>(
- "test-topic", 0, 0, keyObject.toString(), valueObject.toString()
+ void testSync_whenCreateAction_createProduct() {
+ // When
+ long productId = 1L;
+ productSyncDataConsumer.sync(
+ ProductCdcMessage.builder()
+ .after(Product.builder().id(productId).build())
+ .op(CREATE)
+ .build()
);
- productSyncDataConsumer.listen(consumerRecord);
-
- verify(productSyncDataService, times(1)).updateProduct(2L);
+ // Then
+ verify(productSyncDataService, times(1)).createProduct(productId);
}
@Test
- void testListen_whenDeleteAction_deleteProduct() {
-
- JsonObject keyObject = new JsonObject();
- keyObject.addProperty("id", 3L);
-
- JsonObject valueObject = new JsonObject();
- valueObject.addProperty("op", Action.DELETE);
-
- ConsumerRecord consumerRecord = new ConsumerRecord<>(
- "test-topic", 0, 0, keyObject.toString(), valueObject.toString()
+ void testSync_whenUpdateAction_updateProduct() {
+ // When
+ long productId = 2L;
+ productSyncDataConsumer.sync(
+ ProductCdcMessage.builder()
+ .after(Product.builder().id(productId).build())
+ .op(UPDATE)
+ .build()
);
- productSyncDataConsumer.listen(consumerRecord);
-
- verify(productSyncDataService, times(1)).deleteProduct(3L);
+ // Then
+ verify(productSyncDataService, times(1)).updateProduct(productId);
}
@Test
- void testListen_whenInvalidAction_noAction() {
-
- JsonObject keyObject = new JsonObject();
- keyObject.addProperty("id", 4L);
-
- JsonObject valueObject = new JsonObject();
- valueObject.addProperty("op", "INVALID_ACTION");
-
- ConsumerRecord consumerRecord = new ConsumerRecord<>(
- "test-topic", 0, 0, keyObject.toString(), valueObject.toString()
+ void testSync_whenDeleteAction_deleteProduct() {
+ // When
+ final long productId = 3L;
+ productSyncDataConsumer.sync(
+ ProductCdcMessage.builder()
+ .after(Product.builder().id(productId).build())
+ .op(DELETE)
+ .build()
);
-
- productSyncDataConsumer.listen(consumerRecord);
-
- verify(productSyncDataService, times(0)).createProduct(any());
- verify(productSyncDataService, times(0)).updateProduct(any());
- verify(productSyncDataService, times(0)).deleteProduct(any());
+ // Then
+ verify(productSyncDataService, times(1)).deleteProduct(productId);
}
}