Skip to content

Commit

Permalink
#326 - polish configuration to work in testcontainers
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Aug 23, 2023
1 parent b9882cb commit 8474ff1
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Licensed under Apache-2.0 2022 */
/* Licensed under Apache-2.0 2022-2023 */
package com.example.orderservice.config.kafka;

import static com.example.orderservice.utils.AppConstants.ORDERS_TOPIC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
import com.example.common.dtos.OrderDto;
import com.example.orderservice.services.OrderManageService;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -27,24 +25,24 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;

@Configuration
@EnableKafkaStreams
Expand All @@ -53,57 +51,51 @@
public class KafkaStreamsConfig {

private final OrderManageService orderManageService;
private final KafkaProperties kafkaProperties;

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
StreamsBuilderFactoryBeanConfigurer configurer() {
return factoryBean -> {
factoryBean.setStateListener(
(newState, oldState) ->
log.info("State transition from {} to {} ", oldState, newState));
};
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

Map<String, Object> streamProperties = kafkaProperties.getStreams().buildProperties();
Map<String, Object> props = new HashMap<>(streamProperties);
props.putIfAbsent(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CollectionUtils.isEmpty(kafkaProperties.getStreams().getBootstrapServers())
? kafkaProperties.getBootstrapServers()
: kafkaProperties.getStreams().getBootstrapServers());
props.put(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration defaultKafkaStreamsConfig(
Environment environment,
KafkaConnectionDetails connectionDetails,
KafkaProperties kafkaProperties,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
Map<String, Object> properties = kafkaProperties.buildStreamsProperties();
properties.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
connectionDetails.getStreamsBootstrapServers());
if (kafkaProperties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");
if (applicationName == null) {
throw new InvalidConfigurationPropertyValueException(
"spring.kafka.streams.application-id",
null,
"This property is mandatory and fallback 'spring.application.name' is not set either.");
}
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
}
properties.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(
properties.put(
RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER,
deadLetterPublishingRecoverer());
return new KafkaStreamsConfiguration(props);
deadLetterPublishingRecoverer);
return new KafkaStreamsConfiguration(properties);
}

@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
ProducerFactory<byte[], byte[]> producerFactory) {
return new DeadLetterPublishingRecoverer(
byteKafkaTemplate(), (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

@Bean
public KafkaOperations<byte[], byte[]> byteKafkaTemplate() {
Map<String, Object> senderProps = new HashMap<>(3);
senderProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CollectionUtils.isEmpty(kafkaProperties.getProducer().getBootstrapServers())
? kafkaProperties.getBootstrapServers()
: kafkaProperties.getProducer().getBootstrapServers());
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(senderProps), true);
new KafkaTemplate<>(producerFactory),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}

@Bean
Expand Down
7 changes: 6 additions & 1 deletion order-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ spring:
clientId: order-service-stream-client
replicationFactor: 1
producer.acks: all
applicationId: ${spring.application.name}
application-id: ${spring.application.name}
properties:
commit:
interval:
ms: 100
default:
timestamp:
extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
key:
serde: org.apache.kafka.common.serialization.Serdes$LongSerde
value:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
/* Licensed under Apache-2.0 2021-2023 */
package com.example.orderservice;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.is;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.orderservice.common.AbstractIntegrationTest;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

class OrderServiceApplicationIntegrationTest extends AbstractIntegrationTest {

@Test
void shouldFetchAllOrdersFromStream() throws Exception {
// waiting till is kafka stream is changed from PARTITIONS_ASSIGNED to RUNNING
TimeUnit.SECONDS.sleep(5);
this.mockMvc
.perform(get("/api/orders/all"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.size()", is(0)));
Awaitility.await()
.atMost(10, SECONDS)
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(
() ->
this.mockMvc
.perform(get("/api/orders/all"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.size()", is(0))));
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
/* Licensed under Apache-2.0 2023 */
package com.example.orderservice;

import com.example.orderservice.config.MyTestContainers;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.devtools.restart.RestartScope;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.context.ImportTestcontainers;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
@ImportTestcontainers(MyTestContainers.class)
public class TestOrderServiceApplication {

@Bean
@ServiceConnection
@RestartScope
KafkaContainer kafkaContainer(DynamicPropertyRegistry propertyRegistry) {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1"))
.withKraft();
propertyRegistry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
propertyRegistry.add(
"spring.kafka.streams.consumer.bootstrap-servers",
kafkaContainer::getBootstrapServers);
propertyRegistry.add(
"spring.kafka.streams.bootstrap-servers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1"))
.withKraft();
}

@ServiceConnection
@Bean
PostgreSQLContainer<?> postgreSQLContainer() {
return new PostgreSQLContainer<>(DockerImageName.parse("postgres").withTag("15.4-alpine"));
}

@Bean
@ServiceConnection(name = "openzipkin/zipkin")
GenericContainer<?> zipkinContainer() {
return new GenericContainer(DockerImageName.parse("openzipkin/zipkin"));
}

public static void main(String[] args) {
Expand Down

This file was deleted.

0 comments on commit 8474ff1

Please sign in to comment.