Skip to content

Commit

Permalink
Implemented Deadletter queue for order topic #326
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Sep 5, 2023
1 parent b539556 commit 2e2d118
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,40 @@
package com.example.paymentservice.services.listener;

import com.example.common.dtos.OrderDto;
import com.example.paymentservice.exception.CustomerNotFoundException;
import com.example.paymentservice.services.PaymentOrderManageService;
import com.example.paymentservice.utils.AppConstants;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.CountDownLatch;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RequiredArgsConstructor
@EnableKafka
public class KafkaListenerConfig {

private final PaymentOrderManageService paymentOrderManageService;

@Getter private final CountDownLatch deadLetterLatch = new CountDownLatch(1);

public KafkaListenerConfig(PaymentOrderManageService paymentOrderManageService) {
this.paymentOrderManageService = paymentOrderManageService;
}

// retries if processing of event fails
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
exclude = {CustomerNotFoundException.class},
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(id = "orders", topics = AppConstants.ORDERS_TOPIC, groupId = "payment")
public void onEvent(OrderDto orderDto) {
log.info(
Expand All @@ -31,4 +49,10 @@ public void onEvent(OrderDto orderDto) {
paymentOrderManageService.confirm(orderDto);
}
}

@DltHandler
public void dlt(OrderDto orderDto, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.error("Received dead-letter message : {} from topic {}", orderDto, topic);
deadLetterLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package com.example.paymentservice.services.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;

import com.example.common.dtos.OrderDto;
import com.example.common.dtos.OrderItemDto;
Expand All @@ -24,6 +24,9 @@ class KafkaListenerConfigIntegrationTest extends AbstractIntegrationTest {
@Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate;

@Autowired private CustomerRepository customerRepository;

@Autowired private KafkaListenerConfig kafkaListenerConfig;

private Customer customer;

@BeforeEach
Expand Down Expand Up @@ -60,6 +63,26 @@ void onEventReserveOrder() {
});
}

@Test
void onEventReserveOrderDlt() {
OrderDto orderDto = getOrderDto("NEW");
long customerId = orderDto.getCustomerId() + 10_000;
orderDto.setCustomerId(customerId);

// When
kafkaTemplate.send("orders", orderDto.getOrderId(), orderDto);

assertThat(kafkaListenerConfig.getDeadLetterLatch().getCount()).isEqualTo(1);
// Then
await().pollDelay(3, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(5))
.untilAsserted(
() ->
assertThat(kafkaListenerConfig.getDeadLetterLatch().getCount())
.isZero());
}

@Test
void onEventConfirmOrder() {

Expand Down
2 changes: 2 additions & 0 deletions payment-service/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@

<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
<logger name="com.example.paymentservice" level="DEBUG"/>

</configuration>

0 comments on commit 2e2d118

Please sign in to comment.