From dbd87fb9c0004efe85cc160dbb8b896da1da0293 Mon Sep 17 00:00:00 2001 From: seheonnn Date: Sun, 12 May 2024 00:06:13 +0900 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20Kafka=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../weatherservice/kafka/Consumer.java | 4 +-- .../kafka/DailyWeatherKafkaMessage.java | 1 + .../weatherservice/kafka/KafkaConfig.java | 34 +++++++++---------- .../weatherservice/kafka/Producer.java | 9 +++-- .../service/WeatherService.java | 5 ++- 5 files changed, 25 insertions(+), 28 deletions(-) diff --git a/weather-service/src/main/java/com/waither/weatherservice/kafka/Consumer.java b/weather-service/src/main/java/com/waither/weatherservice/kafka/Consumer.java index 247cc81a..b846262f 100644 --- a/weather-service/src/main/java/com/waither/weatherservice/kafka/Consumer.java +++ b/weather-service/src/main/java/com/waither/weatherservice/kafka/Consumer.java @@ -11,8 +11,8 @@ @RequiredArgsConstructor public class Consumer { - @KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "dailyWeatherConcurrentKafkaListenerContainerFactory") - public void dailyWeatherConsume(DailyWeatherKafkaMessage message) { + @KafkaListener(topics = "${spring.kafka.template.topic}", groupId = "${spring.kafka.consumer.group-id}") + public void dailyWeatherConsume(String message) { log.info("Consumer Test ========================== "); log.info("[*] Consumer Message {} ", message); diff --git a/weather-service/src/main/java/com/waither/weatherservice/kafka/DailyWeatherKafkaMessage.java b/weather-service/src/main/java/com/waither/weatherservice/kafka/DailyWeatherKafkaMessage.java index 59e2a524..cb942b12 100644 --- a/weather-service/src/main/java/com/waither/weatherservice/kafka/DailyWeatherKafkaMessage.java +++ b/weather-service/src/main/java/com/waither/weatherservice/kafka/DailyWeatherKafkaMessage.java @@ -4,6 +4,7 @@ import lombok.Builder; +// 우선 문자열로 바람 세기만. 추후 변경 가능성 @Builder public record DailyWeatherKafkaMessage( String pop, diff --git a/weather-service/src/main/java/com/waither/weatherservice/kafka/KafkaConfig.java b/weather-service/src/main/java/com/waither/weatherservice/kafka/KafkaConfig.java index 28afe049..d1a1a2c0 100644 --- a/weather-service/src/main/java/com/waither/weatherservice/kafka/KafkaConfig.java +++ b/weather-service/src/main/java/com/waither/weatherservice/kafka/KafkaConfig.java @@ -11,15 +11,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; import lombok.extern.slf4j.Slf4j; @@ -35,36 +32,37 @@ public class KafkaConfig { private String groupId; @Bean - public ProducerFactory dailyWeatherProducerFactory() { + public ProducerFactory producerFactory() { Map config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(config); } @Bean - public KafkaTemplate dailyWeatherKafkaTemplate() { - return new KafkaTemplate<>(dailyWeatherProducerFactory()); + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); } @Bean - public ConsumerFactory dailyWeatherConsumerFactory() { + public ConsumerFactory consumerStringFactory() { HashMap config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + // Consumer Group config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), - new JsonDeserializer<>(DailyWeatherKafkaMessage.class)); + new JsonDeserializer<>(String.class)); } - @Bean - public ConcurrentKafkaListenerContainerFactory dailyWeatherConcurrentKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(dailyWeatherConsumerFactory()); - factory.setConcurrency(3); - factory.setBatchListener(true); - factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); - return factory; - } + // @Bean + // public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + // ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + // factory.setConsumerFactory(dailyWeatherConsumerFactory()); + // factory.setConcurrency(3); + // factory.setBatchListener(true); + // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); + // return factory; + // } } diff --git a/weather-service/src/main/java/com/waither/weatherservice/kafka/Producer.java b/weather-service/src/main/java/com/waither/weatherservice/kafka/Producer.java index 3a701c8c..057fa514 100644 --- a/weather-service/src/main/java/com/waither/weatherservice/kafka/Producer.java +++ b/weather-service/src/main/java/com/waither/weatherservice/kafka/Producer.java @@ -12,14 +12,13 @@ @RequiredArgsConstructor public class Producer { - private final KafkaTemplate DailyWeatherKafkaTemplate; + private final KafkaTemplate kafkaTemplate; - @Value("${spring.kafka.template.default-topic}") + @Value("${spring.kafka.template.topic}") private String topic; - public void dailyWeatherProduceMessage(DailyWeatherKafkaMessage message) { - log.info("=================== Topic : {} ===================", topic); + public void dailyWeatherProduceMessage(String message) { log.info("[*] Producer Message : {}", message); - DailyWeatherKafkaTemplate.send(topic, message); + kafkaTemplate.send(topic, message); } } diff --git a/weather-service/src/main/java/com/waither/weatherservice/service/WeatherService.java b/weather-service/src/main/java/com/waither/weatherservice/service/WeatherService.java index 3d19623b..fa7603b6 100644 --- a/weather-service/src/main/java/com/waither/weatherservice/service/WeatherService.java +++ b/weather-service/src/main/java/com/waither/weatherservice/service/WeatherService.java @@ -11,7 +11,6 @@ import com.waither.weatherservice.entity.DailyWeather; import com.waither.weatherservice.entity.DisasterMessage; import com.waither.weatherservice.entity.ExpectedWeather; -import com.waither.weatherservice.kafka.DailyWeatherKafkaMessage; import com.waither.weatherservice.kafka.Producer; import com.waither.weatherservice.openapi.ForeCastOpenApiResponse; import com.waither.weatherservice.openapi.MsgOpenApiResponse; @@ -96,9 +95,9 @@ public void createDailyWeather(int nx, .windDegree(wsd) .build(); - DailyWeatherKafkaMessage kafkaMessage = DailyWeatherKafkaMessage.from(dailyWeather); + // DailyWeatherKafkaMessage kafkaMessage = DailyWeatherKafkaMessage.from(dailyWeather); - producer.dailyWeatherProduceMessage(kafkaMessage); + producer.dailyWeatherProduceMessage(wsd); // DailyWeather save = dailyWeatherRepository.save(dailyWeather); log.info("[*] 하루 온도 : {}", dailyWeather);