Skip to content

Commit

Permalink
♻️ refactor: Kafka 설정 변경
Browse files Browse the repository at this point in the history
  • Loading branch information
seheonnn committed May 11, 2024
1 parent e8e6663 commit dbd87fb
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
@RequiredArgsConstructor
public class Consumer {

@KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "dailyWeatherConcurrentKafkaListenerContainerFactory")
public void dailyWeatherConsume(DailyWeatherKafkaMessage message) {
@KafkaListener(topics = "${spring.kafka.template.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void dailyWeatherConsume(String message) {

log.info("Consumer Test ========================== ");
log.info("[*] Consumer Message {} ", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import lombok.Builder;

// 우선 문자열로 바람 세기만. 추후 변경 가능성
@Builder
public record DailyWeatherKafkaMessage(
String pop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -35,36 +32,37 @@ public class KafkaConfig {
private String groupId;

@Bean
public ProducerFactory<String, DailyWeatherKafkaMessage> dailyWeatherProducerFactory() {
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, DailyWeatherKafkaMessage> dailyWeatherKafkaTemplate() {
return new KafkaTemplate<>(dailyWeatherProducerFactory());
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, DailyWeatherKafkaMessage> dailyWeatherConsumerFactory() {
public ConsumerFactory<String, String> consumerStringFactory() {
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// Consumer Group
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(DailyWeatherKafkaMessage.class));
new JsonDeserializer<>(String.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> dailyWeatherConcurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(dailyWeatherConsumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(dailyWeatherConsumerFactory());
// factory.setConcurrency(3);
// factory.setBatchListener(true);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
// return factory;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
@RequiredArgsConstructor
public class Producer {

private final KafkaTemplate<String, DailyWeatherKafkaMessage> DailyWeatherKafkaTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.template.default-topic}")
@Value("${spring.kafka.template.topic}")
private String topic;

public void dailyWeatherProduceMessage(DailyWeatherKafkaMessage message) {
log.info("=================== Topic : {} ===================", topic);
public void dailyWeatherProduceMessage(String message) {
log.info("[*] Producer Message : {}", message);
DailyWeatherKafkaTemplate.send(topic, message);
kafkaTemplate.send(topic, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.waither.weatherservice.entity.DailyWeather;
import com.waither.weatherservice.entity.DisasterMessage;
import com.waither.weatherservice.entity.ExpectedWeather;
import com.waither.weatherservice.kafka.DailyWeatherKafkaMessage;
import com.waither.weatherservice.kafka.Producer;
import com.waither.weatherservice.openapi.ForeCastOpenApiResponse;
import com.waither.weatherservice.openapi.MsgOpenApiResponse;
Expand Down Expand Up @@ -96,9 +95,9 @@ public void createDailyWeather(int nx,
.windDegree(wsd)
.build();

DailyWeatherKafkaMessage kafkaMessage = DailyWeatherKafkaMessage.from(dailyWeather);
// DailyWeatherKafkaMessage kafkaMessage = DailyWeatherKafkaMessage.from(dailyWeather);

producer.dailyWeatherProduceMessage(kafkaMessage);
producer.dailyWeatherProduceMessage(wsd);

// DailyWeather save = dailyWeatherRepository.save(dailyWeather);
log.info("[*] 하루 온도 : {}", dailyWeather);
Expand Down

0 comments on commit dbd87fb

Please sign in to comment.