Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1185 - Integration Test for Kafka/ElasticSearch #1197

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common-library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,24 @@
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>

<!-- Kafka Dependencies -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>

<!-- Test Dependency -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
46 changes: 46 additions & 0 deletions common-library/src/it/java/common/container/ContainerFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package common.container;

import dasniko.testcontainers.keycloak.KeycloakContainer;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/**
* Factory class that holds and provides containers used for testing with Testcontainers.
*/
public final class ContainerFactory {

private ContainerFactory() {}

public static KeycloakContainer keycloakContainer(DynamicPropertyRegistry registry) {
KeycloakContainer keycloak = new KeycloakContainer()
.withRealmImportFiles("/test-realm.json")
.withReuse(true);

registry.add(
"spring.security.oauth2.resourceserver.jwt.issuer-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus")
);
registry.add(
"spring.security.oauth2.resourceserver.jwt.jwk-set-uri",
() -> "%s%s".formatted(keycloak.getAuthServerUrl(), "/realms/quarkus/protocol/openid-connect/certs")
);
return keycloak;
}

public static KafkaContainer kafkaContainer(DynamicPropertyRegistry registry, String version) {
var kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(version))
);
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Consumer properties
registry.add("auto.offset.reset", () -> "earliest");
registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);

// Producer properties
registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

}
138 changes: 138 additions & 0 deletions common-library/src/it/java/common/kafka/CdcConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package common.kafka;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.Assert;
import org.springframework.web.client.RestClient;
import org.testcontainers.containers.KafkaContainer;

/**
* Base class providing support for testing a CDC (Change Data Capture) consumer.
* This class requires Kafka containers to be running (e.g., {@link Import}), and it provides support
* for a Kafka producer to send messages, as well as simulating a REST client
* for microservice integration.
*
* @param <M> Message Type
*/
@Getter
public abstract class CdcConsumerTest<M> {

private final Logger logger = LoggerFactory.getLogger(CdcConsumerTest.class);

private final Class<M> messageType;

private final String cdcEvent;

@Autowired
private KafkaContainer kafkaContainer;

@MockBean
private RestClient restClient;

@Mock
RestClient.ResponseSpec responseSpec;

@Mock
RestClient.RequestHeadersUriSpec requestHeadersUriSpec;

private KafkaTemplate<String, M> kafkaTemplate;

public CdcConsumerTest(Class<M> messageType, String topicEvent) {
Assert.notNull(topicEvent, "CDC topic must not be null");
Assert.notNull(messageType, "Message type must not be null");
this.cdcEvent = topicEvent;
this.messageType = messageType;
}

public synchronized KafkaTemplate<String, M> getKafkaTemplate() {
// Now, we haven't had any process need Kafka Producer,
// then just temporary config producer like this for testing
if (kafkaTemplate == null) {
synchronized (this) {
final Map<String, Object> props = getProducerProps();
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, M>(props));
}
}
return kafkaTemplate;
}

protected void sendMsg(M message)
throws InterruptedException, ExecutionException, TimeoutException {
var rs = getKafkaTemplate()
.send(this.cdcEvent, message)
.get(10, TimeUnit.SECONDS);
logger.info("Sent message completed: {}", rs);
}

protected <R> void simulateHttpRequestWithResponse(URI url, R response, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenReturn(response);
}

protected <R> void simulateHttpRequestWithError(URI url, Throwable exception, Class<R> responseType) {
setupMockGetRequest(url);
when(responseSpec.body(responseType)).thenThrow(exception);
}

private void setupMockGetRequest(URI url) {
when(restClient.get()).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.uri(url)).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.headers(any())).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec);
}

/**
* Pauses the current thread until the consumer has finished processing. The wait time is dynamic and
* depends on the specific scenario (e.g., retry cases or normal processing).
*
* @param processTime The time (in seconds) the consumer takes to process each record.
* @param numOfRecords The number of records sent to the consumer.
* @param attempts The number of retry attempts. If no retries are needed, set this to '0'.
* @param backOff The backoff time (in seconds) between retries.
* @throws InterruptedException If the thread is interrupted while sleeping.
*/
public void waitForConsumer(
long processTime,
int numOfRecords,
int attempts,
long backOff
) throws InterruptedException {
// retryTime = (1st run) + (total run when retrying) + (total back off time)
long retryTime = processTime + (attempts * processTime) + (backOff * attempts);
long waitTime = (attempts > 0 ? retryTime : processTime) * numOfRecords;
logger.info("Consumer Processing expected time: {}s", waitTime);
Thread.sleep(Duration.ofSeconds(waitTime));
}

private @NotNull Map<String, Object> getProducerProps() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.yas.commonlibrary.kafka.cdc;

import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;

/**
* Base class for CDC (Change Data Capture) Kafka consumers.
* Provides common methods for processing messages and handling Dead Letter Topic (DLT) events.
*
* @param <T> Type of the message payload.
*/
public abstract class BaseCdcConsumer<T> {

public static final Logger LOGGER = LoggerFactory.getLogger(BaseCdcConsumer.class);

protected void processMessage(T record, MessageHeaders headers, Consumer<T> consumer) {
LOGGER.info("## Received message - headers: {}", headers);
if (record == null) {
LOGGER.warn("## Null payload received");
} else {
LOGGER.debug("## Processing record - Key: {} | Value: {}", headers.get(KafkaHeaders.RECEIVED_KEY), record);
consumer.accept(record);
LOGGER.debug("## Record processed successfully - Key: {} \n", headers.get(KafkaHeaders.RECEIVED_KEY));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.yas.commonlibrary.kafka.cdc;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.retry.annotation.Backoff;

/**
* Custom annotation that extends Spring's {@link RetryableTopic} to
* add retry and dead letter queue (DLQ) support for Kafka listeners.
* Provides additional configuration for retry backoff, number of attempts,
* topic creation, and exclusion of certain exceptions.
*/
@Documented
@RetryableTopic
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RetrySupportDql {

@AliasFor(annotation = RetryableTopic.class, attribute = "backoff")
Backoff backoff() default @Backoff(value = 6000);

@AliasFor(annotation = RetryableTopic.class, attribute = "attempts")
String attempts() default "4";

@AliasFor(annotation = RetryableTopic.class, attribute = "autoCreateTopics")
String autoCreateTopics() default "true";

@AliasFor(annotation = RetryableTopic.class, attribute = "listenerContainerFactory")
String listenerContainerFactory() default "";

@AliasFor(annotation = RetryableTopic.class, attribute = "exclude")
Class<? extends Throwable>[] exclude() default {};

@AliasFor(annotation = RetryableTopic.class, attribute = "sameIntervalTopicReuseStrategy")
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.SINGLE_TOPIC;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.yas.commonlibrary.kafka.cdc.config;

import java.util.Map;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

/**
* Base configuration class for setting up Kafka consumers with typed deserialization.
*
* @param <T> The type of messages consumed.
*/
public abstract class BaseKafkaListenerConfig<T> {

private final Class<T> type;
private final KafkaProperties kafkaProperties;

public BaseKafkaListenerConfig(Class<T> type, KafkaProperties kafkaProperties) {
this.type = type;
this.kafkaProperties = kafkaProperties;
}

/**
* Abstract method to provide a custom instance of {@link ConcurrentKafkaListenerContainerFactory}.
* (override method must be recognized as bean)
* In case, using default want, let's get the default from {@link #kafkaListenerContainerFactory()}
*
* @return a configured instance of {@link ConcurrentKafkaListenerContainerFactory}.
*/
public abstract ConcurrentKafkaListenerContainerFactory<String, T> listenerContainerFactory();

/**
* Common instance type ConcurrentKafkaListenerContainerFactory.
*
* @return concurrentKafkaListenerContainerFactory {@link ConcurrentKafkaListenerContainerFactory}.
*/
public ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, T>();
factory.setConsumerFactory(typeConsumerFactory(type));
return factory;
}

private ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
Map<String, Object> props = buildConsumerProperties();
var serialize = new StringDeserializer();
// wrapper in case serialization/deserialization occur
var jsonDeserializer = new JsonDeserializer<>(clazz);
jsonDeserializer.addTrustedPackages("*");
var deserialize = new ErrorHandlingDeserializer<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>(props, serialize, deserialize);
}

private Map<String, Object> buildConsumerProperties() {
return kafkaProperties.buildConsumerProperties(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.yas.commonlibrary.kafka.cdc.message;

import com.fasterxml.jackson.annotation.JsonValue;

public enum Operation {

READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");

private final String name;

Operation(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.yas.commonlibrary.kafka.cdc.message;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;

@lombok.Getter
@lombok.Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Product {

private long id;

@JsonProperty("is_published")
private boolean isPublished;

}
Loading
Loading