From 0309e06d135e10bfff9451908055a692aa0c6618 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Sun, 20 Aug 2023 20:26:23 +0000 Subject: [PATCH] #326 - Implements kafka DLQ in streams --- order-service/README.md | 6 +- .../orderservice/config/KafkaConfig.java | 80 ---------- .../config/kafka/KafkaConfig.java | 34 ++++ .../config/kafka/KafkaStreamsConfig.java | 148 ++++++++++++++++++ .../src/main/resources/application.yml | 18 ++- .../services/OrderManageService.java | 5 +- .../resources/application-heroku.properties | 2 - .../src/main/resources/application.properties | 6 +- run.sh | 8 +- test-em-all.sh | 9 +- 10 files changed, 210 insertions(+), 106 deletions(-) delete mode 100644 order-service/src/main/java/com/example/orderservice/config/KafkaConfig.java create mode 100644 order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java create mode 100644 order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java delete mode 100644 payment-service/src/main/resources/application-heroku.properties diff --git a/order-service/README.md b/order-service/README.md index 28b2d464..4f1b3285 100644 --- a/order-service/README.md +++ b/order-service/README.md @@ -4,7 +4,9 @@ `$ ./mvnw clean verify` ### Run locally -`$ ./mvnw docker:start spring-boot:run` +```shell +./mvnw docker:start spring-boot:run +``` ### Useful Links @@ -12,3 +14,5 @@ * Actuator Endpoint: http://localhost:18282/order-service/actuator * Catalog Service : http://localhost:18080/catalog-service/swagger-ui.html +### Notes +* KafkaStream DeadLetter is configured in `KafkaStreamsConfig.java` diff --git a/order-service/src/main/java/com/example/orderservice/config/KafkaConfig.java b/order-service/src/main/java/com/example/orderservice/config/KafkaConfig.java deleted file mode 100644 index c2fa29c3..00000000 --- a/order-service/src/main/java/com/example/orderservice/config/KafkaConfig.java +++ /dev/null @@ -1,80 +0,0 @@ -/* Licensed under Apache-2.0 2022 */ -package com.example.orderservice.config; - -import static com.example.orderservice.utils.AppConstants.*; - -import com.example.common.dtos.OrderDto; -import com.example.orderservice.services.OrderManageService; -import java.time.Duration; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.StreamJoined; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafkaStreams; -import org.springframework.kafka.config.TopicBuilder; -import org.springframework.kafka.core.KafkaAdmin; -import org.springframework.kafka.support.serializer.JsonSerde; - -@Configuration -@EnableKafkaStreams -@Slf4j -@RequiredArgsConstructor -public class KafkaConfig { - - private final OrderManageService orderManageService; - - @Bean - KafkaAdmin.NewTopics topics() { - log.info( - "Inside creating topics :{}, {}, {}", - ORDERS_TOPIC, - PAYMENT_ORDERS_TOPIC, - STOCK_ORDERS_TOPIC); - return new KafkaAdmin.NewTopics( - TopicBuilder.name(ORDERS_TOPIC).partitions(3).replicas(1).compact().build(), - TopicBuilder.name(PAYMENT_ORDERS_TOPIC).partitions(3).replicas(1).compact().build(), - TopicBuilder.name(STOCK_ORDERS_TOPIC).partitions(3).replicas(1).compact().build()); - } - - @Bean - KStream stream(StreamsBuilder kStreamBuilder) { - log.info("Inside stream Processing"); - JsonSerde orderSerde = new JsonSerde<>(OrderDto.class); - KStream stream = - kStreamBuilder.stream( - PAYMENT_ORDERS_TOPIC, Consumed.with(Serdes.Long(), orderSerde)); - - stream.join( - kStreamBuilder.stream(STOCK_ORDERS_TOPIC), - orderManageService::confirm, - JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10)), - StreamJoined.with(Serdes.Long(), orderSerde, orderSerde)) - .peek((k, o) -> log.info("Output of Stream : {} for key :{}", o, k)) - .to(ORDERS_TOPIC); - - return stream; - } - - @Bean - KTable table(StreamsBuilder builder) { - log.info("Inside fetching KTable values"); - KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(ORDERS_TOPIC); - JsonSerde orderSerde = new JsonSerde<>(OrderDto.class); - KStream stream = - builder.stream(ORDERS_TOPIC, Consumed.with(Serdes.Long(), orderSerde)); - return stream.toTable( - Materialized.as(store) - .withKeySerde(Serdes.Long()) - .withValueSerde(orderSerde)); - } -} diff --git a/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java new file mode 100644 index 00000000..d5b882aa --- /dev/null +++ b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java @@ -0,0 +1,34 @@ +/* Licensed under Apache-2.0 2022 */ +package com.example.orderservice.config.kafka; + +import static com.example.orderservice.utils.AppConstants.ORDERS_TOPIC; +import static com.example.orderservice.utils.AppConstants.PAYMENT_ORDERS_TOPIC; +import static com.example.orderservice.utils.AppConstants.STOCK_ORDERS_TOPIC; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; + +@Configuration +@EnableKafka +@Slf4j +@RequiredArgsConstructor +public class KafkaConfig { + + @Bean + KafkaAdmin.NewTopics topics() { + log.info( + "Inside creating topics :{}, {}, {}", + ORDERS_TOPIC, + PAYMENT_ORDERS_TOPIC, + STOCK_ORDERS_TOPIC); + return new KafkaAdmin.NewTopics( + TopicBuilder.name(ORDERS_TOPIC).partitions(3).replicas(1).build(), + TopicBuilder.name(PAYMENT_ORDERS_TOPIC).partitions(3).replicas(1).build(), + TopicBuilder.name(STOCK_ORDERS_TOPIC).partitions(3).replicas(1).build()); + } +} diff --git a/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java new file mode 100644 index 00000000..e6b671a0 --- /dev/null +++ b/order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java @@ -0,0 +1,148 @@ +/* Licensed under Apache-2.0 2023 */ +package com.example.orderservice.config.kafka; + +import static com.example.orderservice.utils.AppConstants.ORDERS_TOPIC; +import static com.example.orderservice.utils.AppConstants.PAYMENT_ORDERS_TOPIC; +import static com.example.orderservice.utils.AppConstants.STOCK_ORDERS_TOPIC; + +import com.example.common.dtos.OrderDto; +import com.example.orderservice.services.OrderManageService; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Printed; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.processor.WallclockTimestampExtractor; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.CollectionUtils; + +@Configuration +@EnableKafkaStreams +@Slf4j +@RequiredArgsConstructor +public class KafkaStreamsConfig { + + private final OrderManageService orderManageService; + private final KafkaProperties kafkaProperties; + + @Bean + public StreamsBuilderFactoryBeanConfigurer configurer() { + return factoryBean -> { + factoryBean.setStateListener( + (newState, oldState) -> + log.info("State transition from {} to {} ", oldState, newState)); + }; + } + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kStreamsConfigs() { + + Map streamProperties = kafkaProperties.getStreams().buildProperties(); + Map props = new HashMap<>(streamProperties); + props.putIfAbsent( + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, + CollectionUtils.isEmpty(kafkaProperties.getStreams().getBootstrapServers()) + ? kafkaProperties.getBootstrapServers() + : kafkaProperties.getStreams().getBootstrapServers()); + props.put( + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + WallclockTimestampExtractor.class.getName()); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); + props.put( + StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + RecoveringDeserializationExceptionHandler.class); + props.put( + RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, + deadLetterPublishingRecoverer()); + return new KafkaStreamsConfiguration(props); + } + + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() { + return new DeadLetterPublishingRecoverer( + byteKafkaTemplate(), (record, ex) -> new TopicPartition("recovererDLQ", -1)); + } + + @Bean + public KafkaOperations byteKafkaTemplate() { + Map senderProps = new HashMap<>(3); + senderProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + CollectionUtils.isEmpty(kafkaProperties.getProducer().getBootstrapServers()) + ? kafkaProperties.getBootstrapServers() + : kafkaProperties.getProducer().getBootstrapServers()); + senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(senderProps), true); + } + + @Bean + KStream stream(StreamsBuilder kStreamBuilder) { + Serde orderSerde = new JsonSerde<>(OrderDto.class); + KStream stream = + kStreamBuilder.stream( + PAYMENT_ORDERS_TOPIC, Consumed.with(Serdes.Long(), orderSerde)); + stream.join( + kStreamBuilder.stream(STOCK_ORDERS_TOPIC), + orderManageService::confirm, + JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10)), + StreamJoined.with(Serdes.Long(), orderSerde, orderSerde)) + .peek((k, o) -> log.info("Output of Stream : {} for key :{}", o, k)) + .to(ORDERS_TOPIC); + stream.print(Printed.toSysOut()); + return stream; + } + + @Bean + KTable table(StreamsBuilder builder) { + log.info("Inside fetching KTable values"); + KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(ORDERS_TOPIC); + JsonSerde orderSerde = new JsonSerde<>(OrderDto.class); + KStream stream = + builder.stream(ORDERS_TOPIC, Consumed.with(Serdes.Long(), orderSerde)); + return stream.toTable( + Materialized.as(store) + .withKeySerde(Serdes.Long()) + .withValueSerde(orderSerde)); + } + + @Bean + Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(5); + executor.setThreadNamePrefix("kafkaSender-"); + executor.initialize(); + return executor; + } +} diff --git a/order-service/src/main/resources/application.yml b/order-service/src/main/resources/application.yml index 67ea9ef9..774b968e 100644 --- a/order-service/src/main/resources/application.yml +++ b/order-service/src/main/resources/application.yml @@ -11,27 +11,29 @@ logging: spring: application: name: order-service + mvc.problemdetails.enabled: true config: import: "optional:configserver:${CONFIG_SERVER:http://localhost:8888}/" kafka: - bootstrap-servers: - - localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.LongSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: - '[spring.json.add.type.headers]': true + spring.json.add.type.headers: true streams: clientId: order-service-stream-client replicationFactor: 1 + producer.acks: all + applicationId: ${spring.application.name} properties: default: key: serde: org.apache.kafka.common.serialization.Serdes$LongSerde value: serde: org.springframework.kafka.support.serializer.JsonSerde - spring.json.trusted.packages: 'com.example.common.dtos' -# state-dir: /tmp/kafka-streams/ - mvc: - problemdetails: - enabled: true \ No newline at end of file + spring.json.trusted.packages: 'com.example.common.dtos' +# state-dir: /tmp/kafka-streams/ + +spring.output.ansi.enabled: ALWAYS + +logging.pattern.console: "%clr(%d{HH:mm:ss.SSS}){blue} %clr(---){faint} %clr([%15.15t]){yellow} %clr(:){red} %clr(%m){faint}%n" \ No newline at end of file diff --git a/payment-service/src/main/java/com/example/paymentservice/services/OrderManageService.java b/payment-service/src/main/java/com/example/paymentservice/services/OrderManageService.java index 837216f4..0b6905bc 100644 --- a/payment-service/src/main/java/com/example/paymentservice/services/OrderManageService.java +++ b/payment-service/src/main/java/com/example/paymentservice/services/OrderManageService.java @@ -19,7 +19,7 @@ public class OrderManageService { private final CustomerRepository customerRepository; - private final KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; public void reserve(OrderDto orderDto) { log.debug("Reserving Order in payment Service {}", orderDto); @@ -43,8 +43,7 @@ public void reserve(OrderDto orderDto) { orderDto.setSource(AppConstants.SOURCE); log.info("Saving customer after reserving:{}", customer.getId()); customerRepository.save(customer); - kafkaTemplate.send( - AppConstants.PAYMENT_ORDERS_TOPIC, String.valueOf(orderDto.getOrderId()), orderDto); + kafkaTemplate.send(AppConstants.PAYMENT_ORDERS_TOPIC, orderDto.getOrderId(), orderDto); log.info("Sent Reserved Order: {}", orderDto); } diff --git a/payment-service/src/main/resources/application-heroku.properties b/payment-service/src/main/resources/application-heroku.properties deleted file mode 100644 index 6a400b90..00000000 --- a/payment-service/src/main/resources/application-heroku.properties +++ /dev/null @@ -1,2 +0,0 @@ -spring.datasource.driver-class-name=org.postgresql.Driver -spring.datasource.url=${JDBC_DATABASE_URL} diff --git a/payment-service/src/main/resources/application.properties b/payment-service/src/main/resources/application.properties index 6b416fcb..9f0d9acb 100644 --- a/payment-service/src/main/resources/application.properties +++ b/payment-service/src/main/resources/application.properties @@ -5,13 +5,11 @@ server.servlet.contextPath= /${spring.application.name} spring.liquibase.change-log=classpath:/db/migration/liquibase-changelog.xml ######## Kafka Configuration ######### -KAFKA_BROKER=localhost:9092 -spring.kafka.bootstrap-servers=${KAFKA_BROKER} spring.kafka.consumer.auto-offset-reset=earliest -spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=true diff --git a/run.sh b/run.sh index 84cffacd..1894caeb 100755 --- a/run.sh +++ b/run.sh @@ -22,8 +22,8 @@ function stop() { } function start_infra() { - echo "Starting zipkin-server postgresql kafka config-server ...." - docker-compose -f ${dc_main} up -d zipkin-server postgresql kafka config-server + echo "Starting zipkin-server postgresql kafka config-server naming-server ...." + docker-compose -f ${dc_main} up -d zipkin-server postgresql kafka config-server naming-server docker-compose -f ${dc_main} logs -f } @@ -34,8 +34,8 @@ function start_infra_full() { } function start_services() { - echo "Starting naming-server api-gateway catalog-service inventory-service order-service payment-service ...." - docker compose -f ${dc_main} up -d naming-server api-gateway catalog-service inventory-service order-service payment-service + echo "Starting api-gateway catalog-service inventory-service order-service payment-service ...." + docker compose -f ${dc_main} up -d api-gateway catalog-service inventory-service order-service payment-service docker compose -f ${dc_main} logs -f } diff --git a/test-em-all.sh b/test-em-all.sh index 3bf48be2..2aa4257d 100755 --- a/test-em-all.sh +++ b/test-em-all.sh @@ -101,10 +101,11 @@ function setupTestData() { '","productName":"product name A","price":100, "description": "A Beautiful Product"}' # Creating Product + echo "creating product" recreateComposite "$PROD_CODE" "$body" "catalog-service/api/catalog" "POST" - # waiting for kafka to process the catalog creation request - sleep 3 + # waiting for kafka to process the catalog creation request, as it is first time kafka initialization takes time + sleep 10 # Verify that a normal request works, expect record exists with product code assertCurl 200 "curl -k http://$HOST:$PORT/inventory-service/api/inventory/$PROD_CODE" assertEqual \"${PROD_CODE}\" $(echo ${RESPONSE} | jq .productCode) @@ -142,8 +143,8 @@ function verifyAPIs() { local ORDER_ID=$(echo ${COMPOSITE_RESPONSE} | jq .orderId) - echo "Sleeping for 3 sec for order processing" - sleep 3 + echo "Sleeping for 10 sec for order processing. as it is first time" + sleep 10 # Verify that order processing is completed and status is CONFIRMED assertCurl 200 "curl -k http://$HOST:$PORT/order-service/api/orders/$ORDER_ID"