diff --git a/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java index d5b882aa..188c44aa 100644 --- a/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java +++ b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java @@ -1,4 +1,4 @@ -/* Licensed under Apache-2.0 2022 */ +/* Licensed under Apache-2.0 2022-2023 */ package com.example.orderservice.config.kafka; import static com.example.orderservice.utils.AppConstants.ORDERS_TOPIC; diff --git a/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java index e6b671a0..72d6bb59 100644 --- a/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java +++ b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java @@ -8,14 +8,12 @@ import com.example.common.dtos.OrderDto; import com.example.orderservice.services.OrderManageService; import java.time.Duration; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -27,24 +25,24 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Printed; import org.apache.kafka.streams.kstream.StreamJoined; -import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler; import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.CollectionUtils; @Configuration @EnableKafkaStreams @@ -53,10 +51,9 @@ public class KafkaStreamsConfig { private final OrderManageService orderManageService; - private final KafkaProperties kafkaProperties; @Bean - public StreamsBuilderFactoryBeanConfigurer configurer() { + StreamsBuilderFactoryBeanConfigurer configurer() { return factoryBean -> { factoryBean.setStateListener( (newState, oldState) -> @@ -64,46 +61,41 @@ public StreamsBuilderFactoryBeanConfigurer configurer() { }; } - @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) - public KafkaStreamsConfiguration kStreamsConfigs() { - - Map streamProperties = kafkaProperties.getStreams().buildProperties(); - Map props = new HashMap<>(streamProperties); - props.putIfAbsent( - StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, - CollectionUtils.isEmpty(kafkaProperties.getStreams().getBootstrapServers()) - ? kafkaProperties.getBootstrapServers() - : kafkaProperties.getStreams().getBootstrapServers()); - props.put( - StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - WallclockTimestampExtractor.class.getName()); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); - props.put( + @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + KafkaStreamsConfiguration defaultKafkaStreamsConfig( + Environment environment, + KafkaConnectionDetails connectionDetails, + KafkaProperties kafkaProperties, + DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { + Map properties = kafkaProperties.buildStreamsProperties(); + properties.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + connectionDetails.getStreamsBootstrapServers()); + if (kafkaProperties.getStreams().getApplicationId() == null) { + String applicationName = environment.getProperty("spring.application.name"); + if (applicationName == null) { + throw new InvalidConfigurationPropertyValueException( + "spring.kafka.streams.application-id", + null, + "This property is mandatory and fallback 'spring.application.name' is not set either."); + } + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName); + } + properties.put( StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringDeserializationExceptionHandler.class); - props.put( + properties.put( RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, - deadLetterPublishingRecoverer()); - return new KafkaStreamsConfiguration(props); + deadLetterPublishingRecoverer); + return new KafkaStreamsConfiguration(properties); } @Bean - public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() { + DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( + ProducerFactory producerFactory) { return new DeadLetterPublishingRecoverer( - byteKafkaTemplate(), (record, ex) -> new TopicPartition("recovererDLQ", -1)); - } - - @Bean - public KafkaOperations byteKafkaTemplate() { - Map senderProps = new HashMap<>(3); - senderProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - CollectionUtils.isEmpty(kafkaProperties.getProducer().getBootstrapServers()) - ? kafkaProperties.getBootstrapServers() - : kafkaProperties.getProducer().getBootstrapServers()); - senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(senderProps), true); + new KafkaTemplate<>(producerFactory), + (record, ex) -> new TopicPartition("recovererDLQ", -1)); } @Bean diff --git a/order-service/src/main/resources/application.yml b/order-service/src/main/resources/application.yml index 774b968e..e2671854 100644 --- a/order-service/src/main/resources/application.yml +++ b/order-service/src/main/resources/application.yml @@ -24,9 +24,14 @@ spring: clientId: order-service-stream-client replicationFactor: 1 producer.acks: all - applicationId: ${spring.application.name} + application-id: ${spring.application.name} properties: + commit: + interval: + ms: 100 default: + timestamp: + extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor key: serde: org.apache.kafka.common.serialization.Serdes$LongSerde value: diff --git a/order-service/src/test/java/com/example/orderservice/OrderServiceApplicationIntegrationTest.java b/order-service/src/test/java/com/example/orderservice/OrderServiceApplicationIntegrationTest.java index 24db2964..c659088b 100644 --- a/order-service/src/test/java/com/example/orderservice/OrderServiceApplicationIntegrationTest.java +++ b/order-service/src/test/java/com/example/orderservice/OrderServiceApplicationIntegrationTest.java @@ -1,13 +1,15 @@ /* Licensed under Apache-2.0 2021-2023 */ package com.example.orderservice; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.CoreMatchers.is; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import com.example.orderservice.common.AbstractIntegrationTest; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; class OrderServiceApplicationIntegrationTest extends AbstractIntegrationTest { @@ -15,10 +17,14 @@ class OrderServiceApplicationIntegrationTest extends AbstractIntegrationTest { @Test void shouldFetchAllOrdersFromStream() throws Exception { // waiting till is kafka stream is changed from PARTITIONS_ASSIGNED to RUNNING - TimeUnit.SECONDS.sleep(5); - this.mockMvc - .perform(get("/api/orders/all")) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.size()", is(0))); + Awaitility.await() + .atMost(10, SECONDS) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted( + () -> + this.mockMvc + .perform(get("/api/orders/all")) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.size()", is(0)))); } } diff --git a/order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java b/order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java index dd739282..e22306d1 100644 --- a/order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java +++ b/order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java @@ -1,35 +1,37 @@ /* Licensed under Apache-2.0 2023 */ package com.example.orderservice; -import com.example.orderservice.config.MyTestContainers; import org.springframework.boot.SpringApplication; import org.springframework.boot.devtools.restart.RestartScope; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.testcontainers.context.ImportTestcontainers; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.DynamicPropertyRegistry; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @TestConfiguration(proxyBeanMethods = false) -@ImportTestcontainers(MyTestContainers.class) public class TestOrderServiceApplication { @Bean @ServiceConnection @RestartScope - KafkaContainer kafkaContainer(DynamicPropertyRegistry propertyRegistry) { - KafkaContainer kafkaContainer = - new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1")) - .withKraft(); - propertyRegistry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); - propertyRegistry.add( - "spring.kafka.streams.consumer.bootstrap-servers", - kafkaContainer::getBootstrapServers); - propertyRegistry.add( - "spring.kafka.streams.bootstrap-servers", kafkaContainer::getBootstrapServers); - return kafkaContainer; + KafkaContainer kafkaContainer() { + return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1")) + .withKraft(); + } + + @ServiceConnection + @Bean + PostgreSQLContainer postgreSQLContainer() { + return new PostgreSQLContainer<>(DockerImageName.parse("postgres").withTag("15.4-alpine")); + } + + @Bean + @ServiceConnection(name = "openzipkin/zipkin") + GenericContainer zipkinContainer() { + return new GenericContainer(DockerImageName.parse("openzipkin/zipkin")); } public static void main(String[] args) { diff --git a/order-service/src/test/java/com/example/orderservice/config/MyTestContainers.java b/order-service/src/test/java/com/example/orderservice/config/MyTestContainers.java deleted file mode 100644 index 618d9a81..00000000 --- a/order-service/src/test/java/com/example/orderservice/config/MyTestContainers.java +++ /dev/null @@ -1,20 +0,0 @@ -/* Licensed under Apache-2.0 2023 */ -package com.example.orderservice.config; - -import org.springframework.boot.testcontainers.service.connection.ServiceConnection; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.utility.DockerImageName; - -public interface MyTestContainers { - - @ServiceConnection @Container - PostgreSQLContainer POSTGRE_SQL_CONTAINER = - new PostgreSQLContainer<>(DockerImageName.parse("postgres").withTag("15.4-alpine")); - - @Container - @ServiceConnection(name = "openzipkin/zipkin") - GenericContainer zipkinContainer = - new GenericContainer(DockerImageName.parse("openzipkin/zipkin")); -}