diff --git a/common-library/src/it/java/common/kafka/CdcConsumerTest.java b/common-library/src/it/java/common/kafka/CdcConsumerTest.java index d192a15b06..c92c8d8229 100644 --- a/common-library/src/it/java/common/kafka/CdcConsumerTest.java +++ b/common-library/src/it/java/common/kafka/CdcConsumerTest.java @@ -36,10 +36,11 @@ * @param Message Type */ @Getter -public abstract class CdcConsumerTest { +public abstract class CdcConsumerTest { private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class); + private final Class keyType; private final Class messageType; private final String cdcEvent; @@ -56,22 +57,24 @@ public abstract class CdcConsumerTest { @Mock RestClient.RequestHeadersUriSpec requestHeadersUriSpec; - private KafkaTemplate kafkaTemplate; + private KafkaTemplate kafkaTemplate; - public CdcConsumerTest(Class messageType, String topicEvent) { + public CdcConsumerTest(Class keyType, Class messageType, String topicEvent) { Assert.notNull(topicEvent, "CDC topic must not be null"); Assert.notNull(messageType, "Message type must not be null"); + Assert.notNull(keyType, "Key type must not be null"); this.cdcEvent = topicEvent; + this.keyType = keyType; this.messageType = messageType; } - public synchronized KafkaTemplate getKafkaTemplate() { + public synchronized KafkaTemplate getKafkaTemplate() { // Now, we haven't had any process need Kafka Producer, // then just temporary config producer like this for testing if (kafkaTemplate == null) { synchronized (this) { final Map props = getProducerProps(); - kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(props)); + kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(props)); } } return kafkaTemplate; @@ -85,6 +88,14 @@ protected void sendMsg(M message) logger.info("Sent message completed: {}", rs); } + protected void sendMsg(K key, M message) + throws InterruptedException, ExecutionException, TimeoutException { + var rs = getKafkaTemplate() + .send(this.cdcEvent, key, message) + .get(10, TimeUnit.SECONDS); + logger.info("Sent message completed: {}", rs); + } + protected void simulateHttpRequestWithResponse(URI url, R response, Class responseType) { setupMockGetRequest(url); when(responseSpec.body(responseType)).thenReturn(response); @@ -130,7 +141,7 @@ public void waitForConsumer( props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java index 9b694706b4..a427ed7405 100644 --- a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java +++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/BaseCdcConsumer.java @@ -1,5 +1,6 @@ package com.yas.commonlibrary.kafka.cdc; +import java.util.function.BiConsumer; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,20 +11,26 @@ * Base class for CDC (Change Data Capture) Kafka consumers. * Provides common methods for processing messages and handling Dead Letter Topic (DLT) events. * - * @param Type of the message payload. + * @param Type of the message payload. */ -public abstract class BaseCdcConsumer { +public abstract class BaseCdcConsumer { public static final Logger LOGGER = LoggerFactory.getLogger(BaseCdcConsumer.class); + public static final String RECEIVED_MESSAGE_HEADERS = "## Received message - headers: {}"; + public static final String PROCESSING_RECORD_KEY_VALUE = "## Processing record - Key: {} | Value: {}"; + public static final String RECORD_PROCESSED_SUCCESSFULLY_KEY = "## Record processed successfully - Key: {} \n"; - protected void processMessage(T record, MessageHeaders headers, Consumer consumer) { - LOGGER.debug("## Received message - headers: {}", headers); - if (record == null) { - LOGGER.warn("## Null payload received"); - } else { - LOGGER.debug("## Processing record - Key: {} | Value: {}", headers.get(KafkaHeaders.RECEIVED_KEY), record); - consumer.accept(record); - LOGGER.debug("## Record processed successfully - Key: {} \n", headers.get(KafkaHeaders.RECEIVED_KEY)); - } + protected void processMessage(V record, MessageHeaders headers, Consumer consumer) { + LOGGER.debug(RECEIVED_MESSAGE_HEADERS, headers); + LOGGER.debug(PROCESSING_RECORD_KEY_VALUE, headers.get(KafkaHeaders.RECEIVED_KEY), record); + consumer.accept(record); + LOGGER.debug(RECORD_PROCESSED_SUCCESSFULLY_KEY, headers.get(KafkaHeaders.RECEIVED_KEY)); + } + + protected void processMessage(K key, V value, MessageHeaders headers, BiConsumer consumer) { + LOGGER.debug(RECEIVED_MESSAGE_HEADERS, headers); + LOGGER.debug(PROCESSING_RECORD_KEY_VALUE, key, value); + consumer.accept(key, value); + LOGGER.debug(RECORD_PROCESSED_SUCCESSFULLY_KEY, key); } } diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java index 78169271ec..4b54680200 100644 --- a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java +++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/config/BaseKafkaListenerConfig.java @@ -1,7 +1,6 @@ package com.yas.commonlibrary.kafka.cdc.config; import java.util.Map; -import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; @@ -12,15 +11,17 @@ /** * Base configuration class for setting up Kafka consumers with typed deserialization. * - * @param The type of messages consumed. + * @param The type of messages consumed. */ -public abstract class BaseKafkaListenerConfig { +public abstract class BaseKafkaListenerConfig { - private final Class type; + private final Class keyType; + private final Class valueType; private final KafkaProperties kafkaProperties; - public BaseKafkaListenerConfig(Class type, KafkaProperties kafkaProperties) { - this.type = type; + public BaseKafkaListenerConfig(Class keyType, Class type, KafkaProperties kafkaProperties) { + this.valueType = type; + this.keyType = keyType; this.kafkaProperties = kafkaProperties; } @@ -31,27 +32,31 @@ public BaseKafkaListenerConfig(Class type, KafkaProperties kafkaProperties) { * * @return a configured instance of {@link ConcurrentKafkaListenerContainerFactory}. */ - public abstract ConcurrentKafkaListenerContainerFactory listenerContainerFactory(); + public abstract ConcurrentKafkaListenerContainerFactory listenerContainerFactory(); /** * Common instance type ConcurrentKafkaListenerContainerFactory. * * @return concurrentKafkaListenerContainerFactory {@link ConcurrentKafkaListenerContainerFactory}. */ - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - var factory = new ConcurrentKafkaListenerContainerFactory(); - factory.setConsumerFactory(typeConsumerFactory(type)); + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(typeConsumerFactory(keyType, valueType)); return factory; } - private ConsumerFactory typeConsumerFactory(Class clazz) { + private ConsumerFactory typeConsumerFactory(Class keyClazz, Class valueClazz) { Map props = buildConsumerProperties(); - var serialize = new StringDeserializer(); // wrapper in case serialization/deserialization occur + var keyDeserialize = new ErrorHandlingDeserializer<>(gettJsonDeserializer(keyClazz)); + var valueDeserialize = new ErrorHandlingDeserializer<>(gettJsonDeserializer(valueClazz)); + return new DefaultKafkaConsumerFactory<>(props, keyDeserialize, valueDeserialize); + } + + private static JsonDeserializer gettJsonDeserializer(Class clazz) { var jsonDeserializer = new JsonDeserializer<>(clazz); jsonDeserializer.addTrustedPackages("*"); - var deserialize = new ErrorHandlingDeserializer<>(jsonDeserializer); - return new DefaultKafkaConsumerFactory<>(props, serialize, deserialize); + return jsonDeserializer; } private Map buildConsumerProperties() { diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java index e73ed2cfa6..ed3c4ad83a 100644 --- a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java +++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductCdcMessage.java @@ -1,6 +1,5 @@ package com.yas.commonlibrary.kafka.cdc.message; -import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NoArgsConstructor; @@ -12,12 +11,10 @@ @AllArgsConstructor public class ProductCdcMessage { - @NotNull private Product after; private Product before; - @NotNull private Operation op; } diff --git a/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductMsgKey.java b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductMsgKey.java new file mode 100644 index 0000000000..cdfcd98879 --- /dev/null +++ b/common-library/src/main/java/com/yas/commonlibrary/kafka/cdc/message/ProductMsgKey.java @@ -0,0 +1,14 @@ +package com.yas.commonlibrary.kafka.cdc.message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; + +@lombok.Getter +@lombok.Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ProductMsgKey { + private Long id; +} diff --git a/docker-compose.yml b/docker-compose.yml index 5c01a8488a..a3da78bdd8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -605,18 +605,12 @@ services: - OFFSET_STORAGE_TOPIC=kafka_connect_offsets networks: - yas-network - akhq: - image: tchiotludo/akhq:0.25.1 + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest environment: - AKHQ_CONFIGURATION: | - akhq: - connections: - docker-kafka-server: - properties: - bootstrap.servers: 'kafka:9092' - connect: - - name: "kafka-connect" - url: "http://kafka-connect:8083/" + DYNAMIC_CONFIG_ENABLED: 'true' + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 ports: - 8089:8080 depends_on: diff --git a/recommendation/src/main/java/com/yas/recommendation/configuration/EmbeddingSearchConfiguration.java b/recommendation/src/main/java/com/yas/recommendation/configuration/EmbeddingSearchConfiguration.java index e2e0304167..54aa5faed4 100644 --- a/recommendation/src/main/java/com/yas/recommendation/configuration/EmbeddingSearchConfiguration.java +++ b/recommendation/src/main/java/com/yas/recommendation/configuration/EmbeddingSearchConfiguration.java @@ -3,4 +3,4 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "yas.recommendation.embedding-based.search") -public record EmbeddingSearchConfiguration(Double similarityThreshold, int topK, boolean initDefaultData) {} +public record EmbeddingSearchConfiguration(Double similarityThreshold, int topK) {} diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java b/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java index 53f0050471..7101c60af2 100644 --- a/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java +++ b/recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java @@ -1,6 +1,7 @@ package com.yas.recommendation.kafka.config.consumer; import com.yas.commonlibrary.kafka.cdc.config.BaseKafkaListenerConfig; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; @@ -13,17 +14,17 @@ */ @EnableKafka @Configuration -public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig { +public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig { public static final String PRODUCT_CDC_LISTENER_CONTAINER_FACTORY = "productCdcListenerContainerFactory"; public ProductCdcKafkaListenerConfig(KafkaProperties kafkaProperties) { - super(ProductCdcMessage.class, kafkaProperties); + super(ProductMsgKey.class, ProductCdcMessage.class, kafkaProperties); } @Bean(name = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY) @Override - public ConcurrentKafkaListenerContainerFactory listenerContainerFactory() { + public ConcurrentKafkaListenerContainerFactory listenerContainerFactory() { return super.kafkaListenerContainerFactory(); } diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java index 617e7e7f9a..405c2cae21 100644 --- a/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java +++ b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncDataConsumer.java @@ -5,9 +5,12 @@ import com.yas.commonlibrary.kafka.cdc.BaseCdcConsumer; import com.yas.commonlibrary.kafka.cdc.RetrySupportDql; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import jakarta.validation.Valid; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @@ -16,7 +19,7 @@ * Product synchronize data consumer for pgvector. */ @Component -public class ProductSyncDataConsumer extends BaseCdcConsumer { +public class ProductSyncDataConsumer extends BaseCdcConsumer { private final ProductSyncService productSyncService; @@ -32,9 +35,10 @@ public ProductSyncDataConsumer(ProductSyncService productSyncService) { ) @RetrySupportDql(listenerContainerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY) public void processMessage( + @Header(KafkaHeaders.RECEIVED_KEY) ProductMsgKey key, @Payload(required = false) @Valid ProductCdcMessage productCdcMessage, @Headers MessageHeaders headers ) { - processMessage(productCdcMessage, headers, productSyncService::sync); + processMessage(key, productCdcMessage, headers, productSyncService::sync); } } diff --git a/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java index 5df1442ab3..14b5872851 100644 --- a/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java +++ b/recommendation/src/main/java/com/yas/recommendation/kafka/consumer/ProductSyncService.java @@ -1,6 +1,9 @@ package com.yas.recommendation.kafka.consumer; +import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE; + import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.recommendation.vector.product.service.ProductVectorSyncService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -23,20 +26,18 @@ public ProductSyncService(ProductVectorSyncService productVectorSyncService) { * * @param productCdcMessage {@link ProductCdcMessage} CDC message. */ - public void sync(ProductCdcMessage productCdcMessage) { - if (productCdcMessage.getAfter() != null) { + public void sync(ProductMsgKey key, ProductCdcMessage productCdcMessage) { + boolean isHardDeleteEvent = productCdcMessage == null || DELETE.equals(productCdcMessage.getOp()); + if (isHardDeleteEvent) { + log.warn("Having hard delete event for product: '{}'", key.getId()); + productVectorSyncService.deleteProductVector(key.getId()); + } else if (productCdcMessage.getAfter() != null) { var operation = productCdcMessage.getOp(); var product = productCdcMessage.getAfter(); switch (operation) { - case CREATE, READ: - productVectorSyncService.createProductVector(product); - break; - case UPDATE: - productVectorSyncService.updateProductVector(product); - break; - default: - log.warn("Unsupported operation '{}' for product: '{}'", operation, product.getId()); - break; + case CREATE, READ -> productVectorSyncService.createProductVector(product); + case UPDATE -> productVectorSyncService.updateProductVector(product); + default -> log.warn("Unsupported operation '{}' for product: '{}'", operation, product.getId()); } } } diff --git a/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java b/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java index e9c8605264..f06e8eab12 100644 --- a/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java +++ b/recommendation/src/main/java/com/yas/recommendation/vector/product/service/ProductVectorSyncService.java @@ -48,5 +48,4 @@ public void deleteProductVector(Long productId) { productVectorRepository.delete(productId); } - } diff --git a/recommendation/src/main/resources/application.properties b/recommendation/src/main/resources/application.properties index f725c576da..cd7d5278d3 100644 --- a/recommendation/src/main/resources/application.properties +++ b/recommendation/src/main/resources/application.properties @@ -27,9 +27,9 @@ spring.ai.vectorstore.pgvector.index-type=HNSW spring.ai.vectorstore.pgvector.distance-type=COSINE_DISTANCE # Azure AI configuration -spring.ai.azure.openai.api-key= -spring.ai.azure.openai.endpoint= -spring.ai.azure.openai.embedding.options.model= +spring.ai.azure.openai.api-key=${SPRING_AI_AZURE_OPENAI_API_KEY} +spring.ai.azure.openai.endpoint=${SPRING_AI_AZURE_OPENAI_ENDPOINT} +spring.ai.azure.openai.embedding.options.model=${SPRING_AI_AZURE_OPENAI_EMBEDDING_OPTIONS_MODEL} # swagger-ui custom path springdoc.swagger-ui.path=/swagger-ui @@ -52,7 +52,7 @@ spring.aop.proxy-target-class=true # Kafka Producer spring.kafka.producer.bootstrap-servers=kafka:9092 -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer # Similarity Search Config diff --git a/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java index 8dfa6f45f6..b4f1facdd4 100644 --- a/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java +++ b/search/src/it/java/com/yas/search/kafka/ProductCdcConsumerTest.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.yas.commonlibrary.kafka.cdc.message.Product; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.search.config.KafkaIntegrationTestConfiguration; import com.yas.search.config.ServiceUrlConfig; import com.yas.search.repository.ProductRepository; @@ -37,7 +38,7 @@ @Import(KafkaIntegrationTestConfiguration.class) @PropertySource("classpath:application.properties") @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -public class ProductCdcConsumerTest extends CdcConsumerTest { +public class ProductCdcConsumerTest extends CdcConsumerTest { public static final String STOREFRONT_PRODUCTS_ES_PATH = "/storefront/products-es/{id}"; @@ -54,7 +55,7 @@ public class ProductCdcConsumerTest extends CdcConsumerTest { private ProductSyncDataService productSyncDataService; public ProductCdcConsumerTest() { - super(ProductCdcMessage.class, "dbproduct.public.product"); + super(ProductMsgKey.class, ProductCdcMessage.class, "dbproduct.public.product"); } @AfterEach @@ -80,6 +81,7 @@ public void test_whenHavingCreateEvent_shouldSyncAsCreate() // Sending CDC Event sendMsg( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .op(CREATE) .after(Product.builder().id(productId).isPublished(true).build()) @@ -113,6 +115,7 @@ public void test_whenHavingCreateEvent_thenProcessFailed_shouldPerformRetry() // Sending CDC Event sendMsg( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .op(CREATE) .after(Product.builder().id(productId).isPublished(true).build()) @@ -146,6 +149,7 @@ public void test_whenHavingUpdateEvent_shouldSyncAsUpdate() // Sending CDC Event sendMsg( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .op(UPDATE) .after(Product.builder().id(productId).isPublished(true).build()) @@ -163,7 +167,6 @@ public void test_whenHavingUpdateEvent_shouldSyncAsUpdate() assertEquals(updated.get().getName(), response.name(), "Product name must be updated."); } - @Disabled("Handle later once elasticsearch sync delete complete") @DisplayName("When having product delete event, data must sync as delete") @Test public void test_whenHavingDeleteEvent_shouldSyncAsDelete() @@ -186,6 +189,7 @@ public void test_whenHavingDeleteEvent_shouldSyncAsDelete() // Sending CDC Event sendMsg( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .op(DELETE) .after(Product.builder().id(productId).isPublished(true).build()) diff --git a/search/src/it/resources/application.properties b/search/src/it/resources/application.properties index 1adc98f409..2fc3373117 100644 --- a/search/src/it/resources/application.properties +++ b/search/src/it/resources/application.properties @@ -37,7 +37,7 @@ spring.kafka.consumer.group-id=search spring.kafka.consumer.auto-offset-reset=earliest # Kafka Producer Config -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer # TestContainers version diff --git a/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java b/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java index 2e217f40e3..3cf23e50d3 100644 --- a/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java +++ b/search/src/main/java/com/yas/search/kafka/config/consumer/ProductCdcKafkaListenerConfig.java @@ -1,6 +1,7 @@ package com.yas.search.kafka.config.consumer; import com.yas.commonlibrary.kafka.cdc.config.BaseKafkaListenerConfig; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; @@ -13,17 +14,17 @@ */ @EnableKafka @Configuration -public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig { +public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig { public static final String PRODUCT_CDC_LISTENER_CONTAINER_FACTORY = "productCdcListenerContainerFactory"; public ProductCdcKafkaListenerConfig(KafkaProperties kafkaProperties) { - super(ProductCdcMessage.class, kafkaProperties); + super(ProductMsgKey.class, ProductCdcMessage.class, kafkaProperties); } @Bean(name = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY) @Override - public ConcurrentKafkaListenerContainerFactory listenerContainerFactory() { + public ConcurrentKafkaListenerContainerFactory listenerContainerFactory() { return super.kafkaListenerContainerFactory(); } diff --git a/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java b/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java index c10738c0bb..488454313e 100644 --- a/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java +++ b/search/src/main/java/com/yas/search/kafka/consumer/ProductSyncDataConsumer.java @@ -1,15 +1,19 @@ package com.yas.search.kafka.consumer; +import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE; import static com.yas.search.kafka.config.consumer.ProductCdcKafkaListenerConfig.PRODUCT_CDC_LISTENER_CONTAINER_FACTORY; import com.yas.commonlibrary.kafka.cdc.BaseCdcConsumer; import com.yas.commonlibrary.kafka.cdc.RetrySupportDql; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.search.service.ProductSyncDataService; import jakarta.validation.Valid; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @@ -19,7 +23,7 @@ */ @Slf4j @Service -public class ProductSyncDataConsumer extends BaseCdcConsumer { +public class ProductSyncDataConsumer extends BaseCdcConsumer { private final ProductSyncDataService productSyncDataService; @@ -35,26 +39,25 @@ public ProductSyncDataConsumer(ProductSyncDataService productSyncDataService) { ) @RetrySupportDql(listenerContainerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY) public void processMessage( + @Header(KafkaHeaders.RECEIVED_KEY) ProductMsgKey key, @Payload(required = false) @Valid ProductCdcMessage productCdcMessage, @Headers MessageHeaders headers ) { - processMessage(productCdcMessage, headers, this::sync); + processMessage(key, productCdcMessage, headers, this::sync); } - public void sync(ProductCdcMessage productCdcMessage) { - if (productCdcMessage.getAfter() != null) { + public void sync(ProductMsgKey key, ProductCdcMessage productCdcMessage) { + boolean isHardDeleteEvent = productCdcMessage == null || DELETE.equals(productCdcMessage.getOp()); + if (isHardDeleteEvent) { + log.warn("Having hard delete event for product: '{}'", key.getId()); + productSyncDataService.deleteProduct(key.getId()); + } else { var operation = productCdcMessage.getOp(); - var productId = productCdcMessage.getAfter().getId(); + var productId = key.getId(); switch (operation) { - case CREATE, READ: - productSyncDataService.createProduct(productId); - break; - case UPDATE: - productSyncDataService.updateProduct(productId); - break; - default: - log.warn("Unsupported operation '{}' for product: '{}'", operation, productId); - break; + case CREATE, READ -> productSyncDataService.createProduct(productId); + case UPDATE -> productSyncDataService.updateProduct(productId); + default -> log.warn("Unsupported operation '{}' for product: '{}'", operation, productId); } } } diff --git a/search/src/main/java/com/yas/search/service/ProductSyncDataService.java b/search/src/main/java/com/yas/search/service/ProductSyncDataService.java index 48b8de1b7c..2a33cf2eac 100644 --- a/search/src/main/java/com/yas/search/service/ProductSyncDataService.java +++ b/search/src/main/java/com/yas/search/service/ProductSyncDataService.java @@ -8,6 +8,8 @@ import com.yas.search.viewmodel.ProductEsDetailVm; import java.net.URI; import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClient; import org.springframework.web.util.UriComponentsBuilder; @@ -15,6 +17,9 @@ @Service @RequiredArgsConstructor public class ProductSyncDataService { + + private final Logger log = LoggerFactory.getLogger(ProductSyncDataService.class); + private final RestClient restClient; private final ServiceUrlConfig serviceUrlConfig; private final ProductRepository productRepository; @@ -75,10 +80,10 @@ public void createProduct(Long id) { public void deleteProduct(Long id) { final boolean isProductExisted = productRepository.existsById(id); - if (!isProductExisted) { - throw new NotFoundException(MessageCode.PRODUCT_NOT_FOUND, id); + if (isProductExisted) { + productRepository.deleteById(id); + } else { + log.warn("Product {} doesn't exist in Elasticsearch.", id); } - - productRepository.deleteById(id); } } diff --git a/search/src/main/resources/application.properties b/search/src/main/resources/application.properties index 8d8229e9a6..75c09b5052 100644 --- a/search/src/main/resources/application.properties +++ b/search/src/main/resources/application.properties @@ -30,6 +30,6 @@ spring.kafka.consumer.group-id=search product.topic.name=dbproduct.public.product spring.kafka.producer.bootstrap-servers=kafka:9092 -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer diff --git a/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java b/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java index 2497292d72..0cd241383f 100644 --- a/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java +++ b/search/src/test/java/com/yas/search/consumer/ProductSyncDataConsumerTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.verify; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.search.kafka.consumer.ProductSyncDataConsumer; import com.yas.commonlibrary.kafka.cdc.message.Product; import com.yas.search.service.ProductSyncDataService; @@ -35,6 +36,7 @@ void testSync_whenCreateAction_createProduct() { // When long productId = 1L; productSyncDataConsumer.sync( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .after(Product.builder().id(productId).build()) .op(CREATE) @@ -50,6 +52,7 @@ void testSync_whenUpdateAction_updateProduct() { // When long productId = 2L; productSyncDataConsumer.sync( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .after(Product.builder().id(productId).build()) .op(UPDATE) @@ -66,6 +69,7 @@ void testSync_whenDeleteAction_deleteProduct() { // When final long productId = 3L; productSyncDataConsumer.sync( + ProductMsgKey.builder().id(productId).build(), ProductCdcMessage.builder() .after(Product.builder().id(productId).build()) .op(DELETE) diff --git a/search/src/test/java/com/yas/search/service/ProductSyncDataServiceTest.java b/search/src/test/java/com/yas/search/service/ProductSyncDataServiceTest.java index 821ade42c2..f0e17bbebc 100644 --- a/search/src/test/java/com/yas/search/service/ProductSyncDataServiceTest.java +++ b/search/src/test/java/com/yas/search/service/ProductSyncDataServiceTest.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.web.client.RestClient; @@ -218,6 +219,7 @@ void testDeleteProduct_whenProductExists_deletesProduct() { verify(productRepository).deleteById(id); } + @Disabled @Test void testDeleteProduct_whenProductDoesNotExist_throwsNotFoundException() { Long id = 1L;