Skip to content

Commit

Permalink
#326 - Implements kafka DLQ in streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Aug 20, 2023
1 parent 85708f6 commit 0309e06
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 106 deletions.
6 changes: 5 additions & 1 deletion order-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
`$ ./mvnw clean verify`

### Run locally
`$ ./mvnw docker:start spring-boot:run`
```shell
./mvnw docker:start spring-boot:run
```


### Useful Links
* Swagger UI: http://localhost:18282/order-service/swagger-ui.html
* Actuator Endpoint: http://localhost:18282/order-service/actuator
* Catalog Service : http://localhost:18080/catalog-service/swagger-ui.html

### Notes
* KafkaStream DeadLetter is configured in `KafkaStreamsConfig.java`

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Licensed under Apache-2.0 2022 */
package com.example.orderservice.config.kafka;

import static com.example.orderservice.utils.AppConstants.ORDERS_TOPIC;
import static com.example.orderservice.utils.AppConstants.PAYMENT_ORDERS_TOPIC;
import static com.example.orderservice.utils.AppConstants.STOCK_ORDERS_TOPIC;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;

@Configuration
@EnableKafka
@Slf4j
@RequiredArgsConstructor
public class KafkaConfig {

@Bean
KafkaAdmin.NewTopics topics() {
log.info(
"Inside creating topics :{}, {}, {}",
ORDERS_TOPIC,
PAYMENT_ORDERS_TOPIC,
STOCK_ORDERS_TOPIC);
return new KafkaAdmin.NewTopics(
TopicBuilder.name(ORDERS_TOPIC).partitions(3).replicas(1).build(),
TopicBuilder.name(PAYMENT_ORDERS_TOPIC).partitions(3).replicas(1).build(),
TopicBuilder.name(STOCK_ORDERS_TOPIC).partitions(3).replicas(1).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/* Licensed under Apache-2.0 2023 */
package com.example.orderservice.config.kafka;

import static com.example.orderservice.utils.AppConstants.ORDERS_TOPIC;
import static com.example.orderservice.utils.AppConstants.PAYMENT_ORDERS_TOPIC;
import static com.example.orderservice.utils.AppConstants.STOCK_ORDERS_TOPIC;

import com.example.common.dtos.OrderDto;
import com.example.orderservice.services.OrderManageService;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;

@Configuration
@EnableKafkaStreams
@Slf4j
@RequiredArgsConstructor
public class KafkaStreamsConfig {

private final OrderManageService orderManageService;
private final KafkaProperties kafkaProperties;

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return factoryBean -> {
factoryBean.setStateListener(
(newState, oldState) ->
log.info("State transition from {} to {} ", oldState, newState));
};
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

Map<String, Object> streamProperties = kafkaProperties.getStreams().buildProperties();
Map<String, Object> props = new HashMap<>(streamProperties);
props.putIfAbsent(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CollectionUtils.isEmpty(kafkaProperties.getStreams().getBootstrapServers())
? kafkaProperties.getBootstrapServers()
: kafkaProperties.getStreams().getBootstrapServers());
props.put(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(
RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER,
deadLetterPublishingRecoverer());
return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
byteKafkaTemplate(), (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

@Bean
public KafkaOperations<byte[], byte[]> byteKafkaTemplate() {
Map<String, Object> senderProps = new HashMap<>(3);
senderProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CollectionUtils.isEmpty(kafkaProperties.getProducer().getBootstrapServers())
? kafkaProperties.getBootstrapServers()
: kafkaProperties.getProducer().getBootstrapServers());
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(senderProps), true);
}

@Bean
KStream<Long, OrderDto> stream(StreamsBuilder kStreamBuilder) {
Serde<OrderDto> orderSerde = new JsonSerde<>(OrderDto.class);
KStream<Long, OrderDto> stream =
kStreamBuilder.stream(
PAYMENT_ORDERS_TOPIC, Consumed.with(Serdes.Long(), orderSerde));
stream.join(
kStreamBuilder.stream(STOCK_ORDERS_TOPIC),
orderManageService::confirm,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10)),
StreamJoined.with(Serdes.Long(), orderSerde, orderSerde))
.peek((k, o) -> log.info("Output of Stream : {} for key :{}", o, k))
.to(ORDERS_TOPIC);
stream.print(Printed.toSysOut());
return stream;
}

@Bean
KTable<Long, OrderDto> table(StreamsBuilder builder) {
log.info("Inside fetching KTable values");
KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(ORDERS_TOPIC);
JsonSerde<OrderDto> orderSerde = new JsonSerde<>(OrderDto.class);
KStream<Long, OrderDto> stream =
builder.stream(ORDERS_TOPIC, Consumed.with(Serdes.Long(), orderSerde));
return stream.toTable(
Materialized.<Long, OrderDto>as(store)
.withKeySerde(Serdes.Long())
.withValueSerde(orderSerde));
}

@Bean
Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("kafkaSender-");
executor.initialize();
return executor;
}
}
18 changes: 10 additions & 8 deletions order-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,29 @@ logging:
spring:
application:
name: order-service
mvc.problemdetails.enabled: true
config:
import: "optional:configserver:${CONFIG_SERVER:http://localhost:8888}/"
kafka:
bootstrap-servers:
- localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
'[spring.json.add.type.headers]': true
spring.json.add.type.headers: true
streams:
clientId: order-service-stream-client
replicationFactor: 1
producer.acks: all
applicationId: ${spring.application.name}
properties:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$LongSerde
value:
serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: 'com.example.common.dtos'
# state-dir: /tmp/kafka-streams/
mvc:
problemdetails:
enabled: true
spring.json.trusted.packages: 'com.example.common.dtos'
# state-dir: /tmp/kafka-streams/

spring.output.ansi.enabled: ALWAYS

logging.pattern.console: "%clr(%d{HH:mm:ss.SSS}){blue} %clr(---){faint} %clr([%15.15t]){yellow} %clr(:){red} %clr(%m){faint}%n"
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class OrderManageService {

private final CustomerRepository customerRepository;
private final KafkaTemplate<String, OrderDto> kafkaTemplate;
private final KafkaTemplate<Long, OrderDto> kafkaTemplate;

public void reserve(OrderDto orderDto) {
log.debug("Reserving Order in payment Service {}", orderDto);
Expand All @@ -43,8 +43,7 @@ public void reserve(OrderDto orderDto) {
orderDto.setSource(AppConstants.SOURCE);
log.info("Saving customer after reserving:{}", customer.getId());
customerRepository.save(customer);
kafkaTemplate.send(
AppConstants.PAYMENT_ORDERS_TOPIC, String.valueOf(orderDto.getOrderId()), orderDto);
kafkaTemplate.send(AppConstants.PAYMENT_ORDERS_TOPIC, orderDto.getOrderId(), orderDto);
log.info("Sent Reserved Order: {}", orderDto);
}

Expand Down

This file was deleted.

6 changes: 2 additions & 4 deletions payment-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ server.servlet.contextPath= /${spring.application.name}
spring.liquibase.change-log=classpath:/db/migration/liquibase-changelog.xml

######## Kafka Configuration #########
KAFKA_BROKER=localhost:9092
spring.kafka.bootstrap-servers=${KAFKA_BROKER}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=true

Expand Down
8 changes: 4 additions & 4 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ function stop() {
}

function start_infra() {
echo "Starting zipkin-server postgresql kafka config-server ...."
docker-compose -f ${dc_main} up -d zipkin-server postgresql kafka config-server
echo "Starting zipkin-server postgresql kafka config-server naming-server ...."
docker-compose -f ${dc_main} up -d zipkin-server postgresql kafka config-server naming-server
docker-compose -f ${dc_main} logs -f
}

Expand All @@ -34,8 +34,8 @@ function start_infra_full() {
}

function start_services() {
echo "Starting naming-server api-gateway catalog-service inventory-service order-service payment-service ...."
docker compose -f ${dc_main} up -d naming-server api-gateway catalog-service inventory-service order-service payment-service
echo "Starting api-gateway catalog-service inventory-service order-service payment-service ...."
docker compose -f ${dc_main} up -d api-gateway catalog-service inventory-service order-service payment-service
docker compose -f ${dc_main} logs -f
}

Expand Down
Loading

0 comments on commit 0309e06

Please sign in to comment.