Sample project to show how to implement Integration Test in Spring Boot. With Spring Kafka and EmbeddedKafka.
Example with Spring Boot 2.2.6 (Spring Kafka 2.4.5)
@Service
public class ConsumerService {
Logger log = LoggerFactory.getLogger(ConsumerService.class);
private ExampleRepository exampleRepository;
ConsumerService(ExampleRepository exampleRepository) {
this.exampleRepository = exampleRepository;
}
/**
* Consume ExampleDTO on topic : TOPIC_EXAMPLE
* Then save it in database.
*
* @param exampleDTO {@link ExampleDTO}
*/
@KafkaListener(topics = "TOPIC_EXAMPLE", groupId = "consumer_example_dto")
public void consumeExampleDTO(ExampleDTO exampleDTO) {
log.info("Received from topic=TOPIC_EXAMPLE ExampleDTO={}", exampleDTO);
exampleRepository.save(convertToExampleEntity(exampleDTO));
log.info("saved in database {}", exampleDTO);
}
/**
* In Java world you should use an Mapper, or an dedicated service to do this.
*/
public ExampleEntity convertToExampleEntity(ExampleDTO exampleDTO) {
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setDescription(exampleDTO.getDescription());
exampleEntity.setName(exampleDTO.getName());
return exampleEntity;
}
}
@Service
public class ProducerService {
Logger log = LoggerFactory.getLogger(ProducerService.class);
private String topic = "TOPIC_EXAMPLE_EXTERNE";
private KafkaTemplate<String, ExampleDTO> kafkaTemplate;
ProducerService(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* Send ExampleDTO to an external topic : TOPIC_EXAMPLE_EXTERNE.
*
* @param exampleDTO
*/
public void send(ExampleDTO exampleDTO) {
log.info("send to topic={} ExampleDTO={}", topic, exampleDTO);
kafkaTemplate.send(topic, exampleDTO);
}
}
@ExtendWith(SpringExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = {"TOPIC_EXAMPLE", "TOPIC_EXAMPLE_EXTERNE"})
public class ConsumerServiceIntegrationTest {
Logger log = LoggerFactory.getLogger(ConsumerServiceIntegrationTest.class);
private static final String TOPIC_EXAMPLE = "TOPIC_EXAMPLE";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ExampleRepository exampleRepository;
public ExampleDTO mockExampleDTO(String name, String description) {
ExampleDTO exampleDTO = new ExampleDTO();
exampleDTO.setDescription(description);
exampleDTO.setName(name);
return exampleDTO;
}
/**
* We verify the output in the topic. But aslo in the database.
*/
@Test
public void itShould_ConsumeCorrectExampleDTO_from_TOPIC_EXAMPLE_and_should_saveCorrectExampleEntity() throws ExecutionException, InterruptedException {
// GIVEN
ExampleDTO exampleDTO = mockExampleDTO("Un nom 2", "Une description 2");
// simulation consumer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
log.info("props {}", producerProps);
Producer<String, ExampleDTO> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>());
// Or
// ProducerFactory producerFactory = new DefaultKafkaProducerFactory<String, ExampleDTO>(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>());
// Producer<String, ExampleDTO> producerTest = producerFactory.createProducer();
// Or
// ProducerRecord<String, ExampleDTO> producerRecord = new ProducerRecord<String, ExampleDTO>(TOPIC_EXAMPLE, "key", exampleDTO);
// KafkaTemplate<String, ExampleDTO> template = new KafkaTemplate<>(producerFactory);
// template.setDefaultTopic(TOPIC_EXAMPLE);
// template.send(producerRecord);
// WHEN
producerTest.send(new ProducerRecord(TOPIC_EXAMPLE, "", exampleDTO));
// THEN
// we must have 1 entity inserted
// We cannot predict when the insertion into the database will occur. So we wait until the value is present. Thank to Awaitility.
await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
var exampleEntityList = exampleRepository.findAll();
assertEquals(1, exampleEntityList.size());
ExampleEntity firstEntity = exampleEntityList.get(0);
assertEquals(exampleDTO.getDescription(), firstEntity.getDescription());
assertEquals(exampleDTO.getName(), firstEntity.getName());
});
producerTest.close();
}
}
@ExtendWith(SpringExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = {"TOPIC_EXAMPLE", "TOPIC_EXAMPLE_EXTERNE"})
public class ProducerServiceIntegrationTest {
private static final String TOPIC_EXAMPLE_EXTERNE = "TOPIC_EXAMPLE_EXTERNE";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ProducerService producerService;
public ExampleDTO mockExampleDTO(String name, String description) {
ExampleDTO exampleDTO = new ExampleDTO();
exampleDTO.setDescription(description);
exampleDTO.setName(name);
return exampleDTO;
}
/**
* We verify the output in the topic. With an simulated consumer.
*/
@Test
public void itShould_ProduceCorrectExampleDTO_to_TOPIC_EXAMPLE_EXTERNE() {
// GIVEN
ExampleDTO exampleDTO = mockExampleDTO("Un nom", "Une description");
// simulation consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group_consumer_test", "false", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory cf = new DefaultKafkaConsumerFactory<String, ExampleDTO>(consumerProps, new StringDeserializer(), new JsonDeserializer<>(ExampleDTO.class, false));
Consumer<String, ExampleDTO> consumerServiceTest = cf.createConsumer();
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumerServiceTest, TOPIC_EXAMPLE_EXTERNE);
// WHEN
producerService.send(exampleDTO);
// THEN
ConsumerRecord<String, ExampleDTO> consumerRecordOfExampleDTO = KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_EXAMPLE_EXTERNE);
ExampleDTO valueReceived = consumerRecordOfExampleDTO.value();
assertEquals("Une description", valueReceived.getDescription());
assertEquals("Un nom", valueReceived.getName());
consumerServiceTest.close();
}
}
Official Example : github.com/spring-projects/spring-kafka
To write integration tests you can also have a look at :