Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1227 - Handling hard delete data from Sample Data #1252

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions common-library/src/it/java/common/kafka/CdcConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
* @param <M> Message Type
*/
@Getter
public abstract class CdcConsumerTest<M> {
public abstract class CdcConsumerTest<K, M> {

private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);

private final Class<K> keyType;
private final Class<M> messageType;

private final String cdcEvent;
Expand All @@ -56,22 +57,24 @@ public abstract class CdcConsumerTest<M> {
@Mock
RestClient.RequestHeadersUriSpec requestHeadersUriSpec;

private KafkaTemplate<String, M> kafkaTemplate;
private KafkaTemplate<K, M> kafkaTemplate;

public CdcConsumerTest(Class<M> messageType, String topicEvent) {
public CdcConsumerTest(Class<K> keyType, Class<M> messageType, String topicEvent) {
Assert.notNull(topicEvent, "CDC topic must not be null");
Assert.notNull(messageType, "Message type must not be null");
Assert.notNull(keyType, "Key type must not be null");
this.cdcEvent = topicEvent;
this.keyType = keyType;
this.messageType = messageType;
}

public synchronized KafkaTemplate<String, M> getKafkaTemplate() {
public synchronized KafkaTemplate<K, M> getKafkaTemplate() {
// Now, we haven't had any process need Kafka Producer,
// then just temporary config producer like this for testing
if (kafkaTemplate == null) {
synchronized (this) {
final Map<String, Object> props = getProducerProps();
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, M>(props));
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<K, M>(props));
}
}
return kafkaTemplate;
Expand All @@ -85,6 +88,14 @@ protected void sendMsg(M message)
logger.info("Sent message completed: {}", rs);
}

protected void sendMsg(K key, M message)
throws InterruptedException, ExecutionException, TimeoutException {
var rs = getKafkaTemplate()
.send(this.cdcEvent, key, message)
.get(10, TimeUnit.SECONDS);
logger.info("Sent message completed: {}", rs);
}

protected <R> void simulateHttpRequestWithResponse(URI url, R response, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenReturn(response);
Expand Down Expand Up @@ -130,7 +141,7 @@ public void waitForConsumer(
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.yas.commonlibrary.kafka.cdc;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -10,20 +11,26 @@
* Base class for CDC (Change Data Capture) Kafka consumers.
* Provides common methods for processing messages and handling Dead Letter Topic (DLT) events.
*
* @param <T> Type of the message payload.
* @param <K> Type of the message payload.
*/
public abstract class BaseCdcConsumer<T> {
public abstract class BaseCdcConsumer<K, V> {

public static final Logger LOGGER = LoggerFactory.getLogger(BaseCdcConsumer.class);
public static final String RECEIVED_MESSAGE_HEADERS = "## Received message - headers: {}";
public static final String PROCESSING_RECORD_KEY_VALUE = "## Processing record - Key: {} | Value: {}";
public static final String RECORD_PROCESSED_SUCCESSFULLY_KEY = "## Record processed successfully - Key: {} \n";

protected void processMessage(T record, MessageHeaders headers, Consumer<T> consumer) {
LOGGER.debug("## Received message - headers: {}", headers);
if (record == null) {
LOGGER.warn("## Null payload received");
} else {
LOGGER.debug("## Processing record - Key: {} | Value: {}", headers.get(KafkaHeaders.RECEIVED_KEY), record);
consumer.accept(record);
LOGGER.debug("## Record processed successfully - Key: {} \n", headers.get(KafkaHeaders.RECEIVED_KEY));
}
protected void processMessage(V record, MessageHeaders headers, Consumer<V> consumer) {
LOGGER.debug(RECEIVED_MESSAGE_HEADERS, headers);
LOGGER.debug(PROCESSING_RECORD_KEY_VALUE, headers.get(KafkaHeaders.RECEIVED_KEY), record);
consumer.accept(record);
LOGGER.debug(RECORD_PROCESSED_SUCCESSFULLY_KEY, headers.get(KafkaHeaders.RECEIVED_KEY));
}

protected void processMessage(K key, V value, MessageHeaders headers, BiConsumer<K, V> consumer) {
LOGGER.debug(RECEIVED_MESSAGE_HEADERS, headers);
LOGGER.debug(PROCESSING_RECORD_KEY_VALUE, key, value);
consumer.accept(key, value);
LOGGER.debug(RECORD_PROCESSED_SUCCESSFULLY_KEY, key);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.yas.commonlibrary.kafka.cdc.config;

import java.util.Map;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
Expand All @@ -12,15 +11,17 @@
/**
* Base configuration class for setting up Kafka consumers with typed deserialization.
*
* @param <T> The type of messages consumed.
* @param <V> The type of messages consumed.
*/
public abstract class BaseKafkaListenerConfig<T> {
public abstract class BaseKafkaListenerConfig<K, V> {

private final Class<T> type;
private final Class<K> keyType;
private final Class<V> valueType;
private final KafkaProperties kafkaProperties;

public BaseKafkaListenerConfig(Class<T> type, KafkaProperties kafkaProperties) {
this.type = type;
public BaseKafkaListenerConfig(Class<K> keyType, Class<V> type, KafkaProperties kafkaProperties) {
this.valueType = type;
this.keyType = keyType;
this.kafkaProperties = kafkaProperties;
}

Expand All @@ -31,27 +32,31 @@ public BaseKafkaListenerConfig(Class<T> type, KafkaProperties kafkaProperties) {
*
* @return a configured instance of {@link ConcurrentKafkaListenerContainerFactory}.
*/
public abstract ConcurrentKafkaListenerContainerFactory<String, T> listenerContainerFactory();
public abstract ConcurrentKafkaListenerContainerFactory<K, V> listenerContainerFactory();

/**
* Common instance type ConcurrentKafkaListenerContainerFactory.
*
* @return concurrentKafkaListenerContainerFactory {@link ConcurrentKafkaListenerContainerFactory}.
*/
public ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, T>();
factory.setConsumerFactory(typeConsumerFactory(type));
public ConcurrentKafkaListenerContainerFactory<K, V> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<K, V>();
factory.setConsumerFactory(typeConsumerFactory(keyType, valueType));
return factory;
}

private ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
private ConsumerFactory<K, V> typeConsumerFactory(Class<K> keyClazz, Class<V> valueClazz) {
Map<String, Object> props = buildConsumerProperties();
var serialize = new StringDeserializer();
// wrapper in case serialization/deserialization occur
var keyDeserialize = new ErrorHandlingDeserializer<>(gettJsonDeserializer(keyClazz));
var valueDeserialize = new ErrorHandlingDeserializer<>(gettJsonDeserializer(valueClazz));
return new DefaultKafkaConsumerFactory<>(props, keyDeserialize, valueDeserialize);
}

private static <T> JsonDeserializer<T> gettJsonDeserializer(Class<T> clazz) {
var jsonDeserializer = new JsonDeserializer<>(clazz);
jsonDeserializer.addTrustedPackages("*");
var deserialize = new ErrorHandlingDeserializer<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>(props, serialize, deserialize);
return jsonDeserializer;
}

private Map<String, Object> buildConsumerProperties() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.yas.commonlibrary.kafka.cdc.message;

import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
Expand All @@ -12,12 +11,10 @@
@AllArgsConstructor
public class ProductCdcMessage {

@NotNull
private Product after;

private Product before;

@NotNull
private Operation op;

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.yas.commonlibrary.kafka.cdc.message;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;

@lombok.Getter
@lombok.Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProductMsgKey {
private Long id;
}
16 changes: 5 additions & 11 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -605,18 +605,12 @@ services:
- OFFSET_STORAGE_TOPIC=kafka_connect_offsets
networks:
- yas-network
akhq:
image: tchiotludo/akhq:0.25.1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: 'kafka:9092'
connect:
- name: "kafka-connect"
url: "http://kafka-connect:8083/"
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
ports:
- 8089:8080
depends_on:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "yas.recommendation.embedding-based.search")
public record EmbeddingSearchConfiguration(Double similarityThreshold, int topK, boolean initDefaultData) {}
public record EmbeddingSearchConfiguration(Double similarityThreshold, int topK) {}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.yas.recommendation.kafka.config.consumer;

import com.yas.commonlibrary.kafka.cdc.config.BaseKafkaListenerConfig;
import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey;
import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;

Check warning on line 5 in recommendation/src/main/java/com/yas/recommendation/kafka/config/consumer/ProductCdcKafkaListenerConfig.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage' import. Should be before 'com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey'.
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -13,17 +14,17 @@
*/
@EnableKafka
@Configuration
public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig<ProductCdcMessage> {
public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig<ProductMsgKey, ProductCdcMessage> {

public static final String PRODUCT_CDC_LISTENER_CONTAINER_FACTORY = "productCdcListenerContainerFactory";

public ProductCdcKafkaListenerConfig(KafkaProperties kafkaProperties) {
super(ProductCdcMessage.class, kafkaProperties);
super(ProductMsgKey.class, ProductCdcMessage.class, kafkaProperties);
}

@Bean(name = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY)
@Override
public ConcurrentKafkaListenerContainerFactory<String, ProductCdcMessage> listenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<ProductMsgKey, ProductCdcMessage> listenerContainerFactory() {
return super.kafkaListenerContainerFactory();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
import com.yas.commonlibrary.kafka.cdc.BaseCdcConsumer;
import com.yas.commonlibrary.kafka.cdc.RetrySupportDql;
import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey;
import jakarta.validation.Valid;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
Expand All @@ -16,7 +19,7 @@
* Product synchronize data consumer for pgvector.
*/
@Component
public class ProductSyncDataConsumer extends BaseCdcConsumer<ProductCdcMessage> {
public class ProductSyncDataConsumer extends BaseCdcConsumer<ProductMsgKey, ProductCdcMessage> {

private final ProductSyncService productSyncService;

Expand All @@ -32,9 +35,10 @@ public ProductSyncDataConsumer(ProductSyncService productSyncService) {
)
@RetrySupportDql(listenerContainerFactory = PRODUCT_CDC_LISTENER_CONTAINER_FACTORY)
public void processMessage(
@Header(KafkaHeaders.RECEIVED_KEY) ProductMsgKey key,
@Payload(required = false) @Valid ProductCdcMessage productCdcMessage,
@Headers MessageHeaders headers
) {
processMessage(productCdcMessage, headers, productSyncService::sync);
processMessage(key, productCdcMessage, headers, productSyncService::sync);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.yas.recommendation.kafka.consumer;

import static com.yas.commonlibrary.kafka.cdc.message.Operation.DELETE;

import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
import com.yas.commonlibrary.kafka.cdc.message.ProductMsgKey;
import com.yas.recommendation.vector.product.service.ProductVectorSyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
Expand All @@ -23,20 +26,18 @@ public ProductSyncService(ProductVectorSyncService productVectorSyncService) {
*
* @param productCdcMessage {@link ProductCdcMessage} CDC message.
*/
public void sync(ProductCdcMessage productCdcMessage) {
if (productCdcMessage.getAfter() != null) {
public void sync(ProductMsgKey key, ProductCdcMessage productCdcMessage) {
boolean isHardDeleteEvent = productCdcMessage == null || DELETE.equals(productCdcMessage.getOp());
if (isHardDeleteEvent) {
log.warn("Having hard delete event for product: '{}'", key.getId());
productVectorSyncService.deleteProductVector(key.getId());
} else if (productCdcMessage.getAfter() != null) {
var operation = productCdcMessage.getOp();
var product = productCdcMessage.getAfter();
switch (operation) {
case CREATE, READ:
productVectorSyncService.createProductVector(product);
break;
case UPDATE:
productVectorSyncService.updateProductVector(product);
break;
default:
log.warn("Unsupported operation '{}' for product: '{}'", operation, product.getId());
break;
case CREATE, READ -> productVectorSyncService.createProductVector(product);
case UPDATE -> productVectorSyncService.updateProductVector(product);
default -> log.warn("Unsupported operation '{}' for product: '{}'", operation, product.getId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ public void deleteProductVector(Long productId) {
productVectorRepository.delete(productId);
}


}
8 changes: 4 additions & 4 deletions recommendation/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ spring.ai.vectorstore.pgvector.index-type=HNSW
spring.ai.vectorstore.pgvector.distance-type=COSINE_DISTANCE

# Azure AI configuration
spring.ai.azure.openai.api-key=
spring.ai.azure.openai.endpoint=
spring.ai.azure.openai.embedding.options.model=
spring.ai.azure.openai.api-key=${SPRING_AI_AZURE_OPENAI_API_KEY}
spring.ai.azure.openai.endpoint=${SPRING_AI_AZURE_OPENAI_ENDPOINT}
spring.ai.azure.openai.embedding.options.model=${SPRING_AI_AZURE_OPENAI_EMBEDDING_OPTIONS_MODEL}

# swagger-ui custom path
springdoc.swagger-ui.path=/swagger-ui
Expand All @@ -52,7 +52,7 @@ spring.aop.proxy-target-class=true

# Kafka Producer
spring.kafka.producer.bootstrap-servers=kafka:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# Similarity Search Config
Expand Down
Loading
Loading