diff --git a/recommendation/src/it/java/com/yas/recommendation/kafka/ProductCdcConsumerTest.java b/recommendation/src/it/java/com/yas/recommendation/kafka/ProductCdcConsumerTest.java index 4ff3860e31..1f2a967a96 100644 --- a/recommendation/src/it/java/com/yas/recommendation/kafka/ProductCdcConsumerTest.java +++ b/recommendation/src/it/java/com/yas/recommendation/kafka/ProductCdcConsumerTest.java @@ -15,6 +15,7 @@ import com.yas.commonlibrary.kafka.cdc.message.Product; import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage; +import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey; import com.yas.recommendation.configuration.EmbeddingSearchConfiguration; import com.yas.recommendation.configuration.KafkaConfiguration; import com.yas.recommendation.configuration.RecommendationConfig; @@ -53,7 +54,7 @@ @Import(KafkaConfiguration.class) @PropertySource("classpath:application.properties") @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -public class ProductCdcConsumerTest extends CdcConsumerTest { +public class ProductCdcConsumerTest extends CdcConsumerTest { public static final String STOREFRONT_PRODUCTS_PATH = "/storefront/products/detail/{id}"; private static final String PRODUCT_NAME_UPDATE = "IPhone 14 Pro New"; @Autowired @@ -75,7 +76,7 @@ public class ProductCdcConsumerTest extends CdcConsumerTest { private RelatedProductQuery relatedProductQuery; public ProductCdcConsumerTest() { - super(ProductCdcMessage.class, "dbproduct.public.product"); + super(ProductMsgKey.class, ProductCdcMessage.class, "dbproduct.public.product"); } @BeforeEach @@ -123,10 +124,13 @@ public void test_whenHavingCreateEvent_thenProcessFailed_shouldPerformRetry() simulateHttpRequestWithError(url, new RuntimeException("Missing ResponseSpec.toEntity"), ProductDetailVm.class); // Sending CDC Event - sendMsg(ProductCdcMessage.builder() + sendMsg( + ProductMsgKey.builder().id(productId).build(), + ProductCdcMessage.builder() .op(CREATE) .after(Product.builder().id(productId).isPublished(true).build()) - .build()); + .build() + ); // Then waitForConsumer(2, 1, 4, 6); @@ -154,10 +158,13 @@ public void test_whenHavingCreateEvent_shouldSyncAsCreate_andSimiliarProduct() simulateHttpRequestWithResponseToEntity(url, response, ProductDetailVm.class); // Sending CDC Event - sendMsg(ProductCdcMessage.builder() + sendMsg( + ProductMsgKey.builder().id(productId).build(), + ProductCdcMessage.builder() .op(CREATE) .after(Product.builder().id(productId).isPublished(true).build()) - .build()); + .build() + ); // Then waitForConsumer(2, 1, 0, 0); @@ -175,10 +182,13 @@ public void test_whenHavingCreateEvent_shouldSyncAsCreate_andSimiliarProduct() simulateHttpRequestWithResponseToEntity(url2, response2, ProductDetailVm.class); // Sending CDC Event - sendMsg(ProductCdcMessage.builder() + sendMsg( + ProductMsgKey.builder().id(productId).build(), + ProductCdcMessage.builder() .op(CREATE) .after(Product.builder().id(productId2).isPublished(true).build()) - .build()); + .build() + ); // Verify consumer waitForConsumer(2, 1, 0, 0); @@ -224,10 +234,13 @@ public void test_whenHavingUpdateEvent_shouldSyncAsUpdate() simulateHttpRequestWithResponseToEntity(url, response, ProductDetailVm.class); // Sending CDC Event - sendMsg(ProductCdcMessage.builder() + sendMsg( + ProductMsgKey.builder().id(productId).build(), + ProductCdcMessage.builder() .op(UPDATE) .after(Product.builder().id(productId).isPublished(true).build()) - .build()); + .build() + ); // Verify consumer waitForConsumer(2, 2, 0, 0); @@ -240,7 +253,6 @@ public void test_whenHavingUpdateEvent_shouldSyncAsUpdate() assertTrue(firstRow.get("content").toString().contains(PRODUCT_NAME_UPDATE), "Content is not correct."); } - @Disabled @DisplayName("When having product delete event, data must sync as delete") @Test public void test_whenHavingDeleteEvent_shouldSyncAsDelete() @@ -258,10 +270,13 @@ public void test_whenHavingDeleteEvent_shouldSyncAsDelete() assertThat(results).isNotEmpty(); // Sending CDC Event - sendMsg(ProductCdcMessage.builder() + sendMsg( + ProductMsgKey.builder().id(productId).build(), + ProductCdcMessage.builder() .op(DELETE) .after(Product.builder().id(productId).isPublished(true).build()) - .build()); + .build() + ); // Verify consumer waitForConsumer(2, 2, 0, 0); @@ -288,10 +303,13 @@ private void sendEventCreateProduct(long productId) simulateHttpRequestWithResponseToEntity(url, response, ProductDetailVm.class); // Sending CDC Event - sendMsg(ProductCdcMessage.builder() + sendMsg( + ProductMsgKey.builder().id(productId).build(), + ProductCdcMessage.builder() .op(CREATE) .after(Product.builder().id(productId).isPublished(true).build()) - .build()); + .build() + ); } private List> findAll() { diff --git a/recommendation/src/it/resources/application.properties b/recommendation/src/it/resources/application.properties index aae44d956f..b2f7c6e730 100644 --- a/recommendation/src/it/resources/application.properties +++ b/recommendation/src/it/resources/application.properties @@ -62,7 +62,7 @@ spring.aop.proxy-target-class=true # Kafka Producer spring.kafka.producer.bootstrap-servers=kafka:9092 -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer # Similarity Search Config