Skip to content

Commit

Permalink
Improve Kafka messages handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jaguililla committed Sep 11, 2024
1 parent 1d751a0 commit 57920fe
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 40 deletions.
File renamed without changes.
2 changes: 0 additions & 2 deletions .mvn/parent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<encoding>UTF-8</encoding>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
</configuration>
<executions>
<execution>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Example application to create appointments (REST API). Appointments are stored i
* No input ports: they don't need to be decoupled, they just use the domain (and that's acceptable).

## 📖 Architecture
![Architecture Diagram](doc/architecture.svg)
![Architecture Diagram](https://raw.githubusercontent.com/jaguililla/hexagonal_spring/main/doc/architecture.svg)
* **Port**: interface to set a boundary between application logic and implementation details.
* **Adapter**: port implementation to connect the application domain with the system's context.
* **Domain**: application logic and model entities.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,53 @@
import com.github.jaguililla.appointments.domain.AppointmentsService;
import com.github.jaguililla.appointments.domain.UsersRepository;
import com.github.jaguililla.appointments.output.notifiers.KafkaTemplateAppointmentsNotifier;
import com.github.jaguililla.appointments.output.repositories.JdbcTemplateAppointmentsRepository;
import com.github.jaguililla.appointments.output.repositories.JdbcTemplateUsersRepository;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import javax.sql.DataSource;
import java.util.Map;

@Configuration
class ApplicationConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConfiguration.class);

@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${notifierTopic}")
private String notifierTopic;
@Value(value = "${createMessage}")
private String createMessage;
@Value(value = "${deleteMessage}")
private String deleteMessage;

@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress));
}
// @Bean
// public KafkaAdmin kafkaAdmin() {
// return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress));
// }

@Bean
public NewTopic appointmentsTopic() {
return new NewTopic("appointments", 1, (short) 1);
}
// @Bean
// public NewTopic appointmentsTopic() {
// return new NewTopic("appointments", 1, (short) 1);
// }

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
// @Bean
// public ProducerFactory<String, String> producerFactory() {
// return new DefaultKafkaProducerFactory<>(Map.of(
// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
// ));
// }

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ConsumerConfig.GROUP_ID_CONFIG, "group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
));
}
// @Bean
// public ConsumerFactory<String, String> consumerFactory() {
// return new DefaultKafkaConsumerFactory<>(Map.of(
// ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
// ConsumerConfig.GROUP_ID_CONFIG, "group",
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
// ));
// }

@Bean
public KafkaTemplate<String, String> kafkaTemplate(
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,10 @@ spring:

kafka:
bootstrap-servers: ${KAFKA_SERVER:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

import com.github.jaguililla.appointments.http.controllers.messages.AppointmentRequest;
import com.github.jaguililla.appointments.http.controllers.messages.AppointmentResponse;
import java.time.Duration;
import java.util.List;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
Expand All @@ -34,6 +37,8 @@ class ApplicationIT {
private final TestTemplate client;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ConsumerFactory<String, String> consumerFactory;

ApplicationIT(@LocalServerPort final int portTest) {
client = new TestTemplate("http://localhost:" + portTest);
Expand Down Expand Up @@ -83,6 +88,13 @@ void existing_appointments_can_be_fetched() {

@Test
void appointments_can_be_created_read_and_deleted() {
try (var consumer = consumerFactory.createConsumer()) {
consumer.subscribe(List.of("appointments"));
for (var r : consumer.poll(Duration.ZERO)) {

}
}

client.post("/appointments", new AppointmentRequest()
.id(UUID.randomUUID())
.startTimestamp(LocalDateTime.now())
Expand Down

0 comments on commit 57920fe

Please sign in to comment.