Skip to content

Commit

Permalink
#1148 - Enhance Error Handler, Serialize, Deserialize
Browse files Browse the repository at this point in the history
  • Loading branch information
Duy Le Van committed Oct 21, 2024
1 parent cfcfd40 commit 9f3a36f
Show file tree
Hide file tree
Showing 22 changed files with 519 additions and 200 deletions.
6 changes: 6 additions & 0 deletions common-library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>

<!-- Kafka Dependencies -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.yas.commonlibrary.kafka.cdc;

import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;

/**
* Base class for CDC (Change Data Capture) Kafka consumers.
* Provides common methods for processing messages and handling Dead Letter Topic (DLT) events.
*
* @param <T> Type of the message payload.
*/
public abstract class BaseCdcConsumer<T> {

public static final Logger LOGGER = LoggerFactory.getLogger(BaseCdcConsumer.class);

protected void processMessage(T record, MessageHeaders headers, Consumer<T> consumer) {
LOGGER.info("## Received message - headers: {}", headers);
if (record == null) {
LOGGER.warn("## Null payload received");
} else {
LOGGER.info("## Processing record - Key: {} | Value: {}", headers.get(KafkaHeaders.RECEIVED_KEY), record);
consumer.accept(record);
LOGGER.info("## Record processed successfully - Key: {} \n", headers.get(KafkaHeaders.RECEIVED_KEY));
}
}

@DltHandler
public void dlt(T data, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
LOGGER.error("### Event from topic {} is dead lettered - event: {}", topic, data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.yas.commonlibrary.kafka.cdc;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.retry.annotation.Backoff;

/**
* Custom annotation that extends Spring's {@link RetryableTopic} to
* add retry and dead letter queue (DLQ) support for Kafka listeners.
* Provides additional configuration for retry backoff, number of attempts,
* topic creation, and exclusion of certain exceptions.
*/
@Documented
@RetryableTopic
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RetrySupportDql {

@AliasFor(annotation = RetryableTopic.class, attribute = "backoff")
Backoff backoff() default @Backoff(value = 6000);

@AliasFor(annotation = RetryableTopic.class, attribute = "attempts")
String attempts() default "4";

@AliasFor(annotation = RetryableTopic.class, attribute = "autoCreateTopics")
String autoCreateTopics() default "true";

@AliasFor(annotation = RetryableTopic.class, attribute = "listenerContainerFactory")
String listenerContainerFactory() default "";

@AliasFor(annotation = RetryableTopic.class, attribute = "exclude")
Class<? extends Throwable>[] exclude() default {};

@AliasFor(annotation = RetryableTopic.class, attribute = "sameIntervalTopicReuseStrategy")
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.SINGLE_TOPIC;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.yas.commonlibrary.kafka.cdc.config;

import java.util.Map;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

/**
* Base configuration class for setting up Kafka consumers with typed deserialization.
*
* @param <T> The type of messages consumed.
*/
public abstract class BaseKafkaListenerConfig<T> {

private final Class<T> type;
private final KafkaProperties kafkaProperties;

public BaseKafkaListenerConfig(Class<T> type, KafkaProperties kafkaProperties) {
this.type = type;
this.kafkaProperties = kafkaProperties;
}

public abstract ConcurrentKafkaListenerContainerFactory<String, T> listenerContainerFactory();

/**
* Common instance type ConcurrentKafkaListenerContainerFactory.
*
* @return concurrentKafkaListenerContainerFactory {@link ConcurrentKafkaListenerContainerFactory}.
*/
public ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, T>();
factory.setConsumerFactory(typeConsumerFactory(type));
return factory;
}

private ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
Map<String, Object> props = buildConsumerProperties();
var serialize = new StringDeserializer();
// wrapper in case serialization/deserialization occur
var jsonDeserializer = new JsonDeserializer<>(clazz);
jsonDeserializer.addTrustedPackages("*");
var deserialize = new ErrorHandlingDeserializer<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>(props, serialize, deserialize);
}

private Map<String, Object> buildConsumerProperties() {
return kafkaProperties.buildConsumerProperties(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.yas.commonlibrary.kafka.cdc.message;

import com.fasterxml.jackson.annotation.JsonValue;

public enum Operation {

READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");

private final String name;

Operation(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.yas.commonlibrary.kafka.cdc.message;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;

@lombok.Getter
@lombok.Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Product {

private long id;

@JsonProperty("is_published")
private boolean isPublished;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.yas.commonlibrary.kafka.cdc.message;

import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;

@lombok.Getter
@lombok.Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProductCdcMessage {

private Product after;

private Product before;

@NotNull
private Operation op;

}

5 changes: 5 additions & 0 deletions recommendation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
</dependency>

<!-- Common Library -->
<dependency>
<groupId>com.yas</groupId>
<artifactId>common-library</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.yas.recommendation.kafka.config.consumer;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;

@EnableKafka
@Configuration
public class AppKafkaListenerConfigurer implements KafkaListenerConfigurer {

private LocalValidatorFactoryBean validator;

public AppKafkaListenerConfigurer(LocalValidatorFactoryBean validator) {
this.validator = validator;
}

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
// Enable message validation
registrar.setValidator(this.validator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.yas.recommendation.kafka.config.consumer;

import com.yas.commonlibrary.kafka.cdc.config.BaseKafkaListenerConfig;
import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Product CDC kafka listener, support convert product cdc message to java object.
*/
@EnableKafka
@Configuration
@EnableScheduling
public class ProductCdcKafkaListenerConfig extends BaseKafkaListenerConfig<ProductCdcMessage> {

public ProductCdcKafkaListenerConfig(KafkaProperties kafkaProperties) {
super(ProductCdcMessage.class, kafkaProperties);
}

@Bean(name = "productCdcListenerContainerFactory")
@Override
public ConcurrentKafkaListenerContainerFactory<String, ProductCdcMessage> listenerContainerFactory() {
return super.kafkaListenerContainerFactory();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.yas.recommendation.kafka.consumer;

import com.yas.commonlibrary.kafka.cdc.BaseCdcConsumer;
import com.yas.commonlibrary.kafka.cdc.RetrySupportDql;
import com.yas.commonlibrary.kafka.cdc.message.ProductCdcMessage;
import jakarta.validation.Valid;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
* Product synchronize data consumer for pgvector.
*/
@Component
public class ProductSyncDataConsumer extends BaseCdcConsumer<ProductCdcMessage> {

private final ProductSyncService productSyncService;

public ProductSyncDataConsumer(ProductSyncService productSyncService) {
this.productSyncService = productSyncService;
}

@KafkaListener(
id = "product-sync-recommendation",
groupId = "product-sync",
topics = "${product.topic.name}",
containerFactory = "productCdcListenerContainerFactory"
)
@RetrySupportDql(listenerContainerFactory = "productCdcListenerContainerFactory")
public void processMessage(
@Payload(required = false) @Valid ProductCdcMessage productCdcMessage,
@Headers MessageHeaders headers
) {
processMessage(productCdcMessage, headers, productSyncService::sync);
}
}
Loading

0 comments on commit 9f3a36f

Please sign in to comment.