From 824988c369c870b32584692b684828a1bb574142 Mon Sep 17 00:00:00 2001 From: DDonghyeo Date: Tue, 18 Jun 2024 23:25:56 +0900 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8Frefactor=20:=20Kafka=20Consum?= =?UTF-8?q?erConfig=20&=20=EB=A1=9C=EC=A7=81=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/KafkaConsumerConfig.java | 15 ++ .../notiservice/dto/kafka/KafkaDto.java | 7 +- .../notiservice/service/KafkaConsumer.java | 182 ++++++++++++++---- 3 files changed, 161 insertions(+), 43 deletions(-) diff --git a/noti-service/src/main/java/com/waither/notiservice/config/KafkaConsumerConfig.java b/noti-service/src/main/java/com/waither/notiservice/config/KafkaConsumerConfig.java index bfb71eae..35a685c1 100644 --- a/noti-service/src/main/java/com/waither/notiservice/config/KafkaConsumerConfig.java +++ b/noti-service/src/main/java/com/waither/notiservice/config/KafkaConsumerConfig.java @@ -143,6 +143,21 @@ private ConsumerFactory initialDataConsumerFact return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KafkaDto.InitialDataDto.class)); } + @Bean("weatherKafkaListenerContainerFactory") + public ConcurrentKafkaListenerContainerFactory weatherConcurrentKafkaListenerContainerFactory(){ + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(weatherConsumerFactory()); + factory.setConcurrency(3); + factory.setBatchListener(true); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); + return factory; + } + + private ConsumerFactory weatherConsumerFactory() { + Map props = dtoSettings(); + return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KafkaDto.WeatherDto.class)); + } + diff --git a/noti-service/src/main/java/com/waither/notiservice/dto/kafka/KafkaDto.java b/noti-service/src/main/java/com/waither/notiservice/dto/kafka/KafkaDto.java index 5af47db1..e752b2a9 100644 --- a/noti-service/src/main/java/com/waither/notiservice/dto/kafka/KafkaDto.java +++ b/noti-service/src/main/java/com/waither/notiservice/dto/kafka/KafkaDto.java @@ -3,7 +3,6 @@ import com.waither.notiservice.domain.UserData; import com.waither.notiservice.domain.UserMedian; import com.waither.notiservice.enums.Season; -import com.waither.notiservice.utils.TemperatureUtils; import lombok.Builder; import java.util.List; @@ -77,4 +76,10 @@ public record TokenDto( String token ){} + @Builder + public record WeatherDto( + String region, + String message + ) {} + } diff --git a/noti-service/src/main/java/com/waither/notiservice/service/KafkaConsumer.java b/noti-service/src/main/java/com/waither/notiservice/service/KafkaConsumer.java index 9bd9545e..88fbae82 100644 --- a/noti-service/src/main/java/com/waither/notiservice/service/KafkaConsumer.java +++ b/noti-service/src/main/java/com/waither/notiservice/service/KafkaConsumer.java @@ -2,22 +2,21 @@ import com.waither.notiservice.domain.UserData; import com.waither.notiservice.domain.UserMedian; +import com.waither.notiservice.domain.redis.NotificationRecord; import com.waither.notiservice.enums.Season; import com.waither.notiservice.dto.kafka.KafkaDto; -import com.waither.notiservice.global.exception.CustomException; -import com.waither.notiservice.global.response.ErrorCode; import com.waither.notiservice.repository.jpa.UserDataRepository; import com.waither.notiservice.repository.jpa.UserMedianRepository; +import com.waither.notiservice.repository.redis.NotificationRecordRepository; import com.waither.notiservice.utils.RedisUtils; import com.waither.notiservice.utils.TemperatureUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.util.ArrayList; +import java.time.LocalDateTime; import java.util.List; import java.util.Optional; @@ -26,8 +25,10 @@ @Component public class KafkaConsumer { + private final AlarmService alarmService; private final UserDataRepository userDataRepository; private final UserMedianRepository userMedianRepository; + private final NotificationRecordRepository notificationRecordRepository; private final RedisUtils redisUtils; /** @@ -122,82 +123,179 @@ public void consumeUserInit(KafkaDto.InitialDataDto initialDataDto) { /** * 바람 세기 알림 Listener - * */ + * @Query : 0200, 0500, 0800, 1100, 1400, 1700, 2000, 2300 */ @Transactional - @KafkaListener(topics = "alarm-wind") - public void consumeWindAlarm(@Payload String message) { + @KafkaListener(topics = "alarm-wind", containerFactory = "weatherKafkaListenerContainerFactory") + public void consumeWindAlarm(KafkaDto.WeatherDto weatherDto) { + + int currentHour = LocalDateTime.now().getHour(); + // 22:00 ~ 07:00 는 알림을 전송하지 않음 + if (currentHour >= 22 || currentHour <= 7) { + return; + } + + String title = "Waither 바람 세기 알림"; StringBuilder sb = new StringBuilder(); - Long windStrength = Long.valueOf(message); //바람세기 + String region = weatherDto.region(); + Double windStrength = Double.valueOf(weatherDto.message()); //바람세기 log.info("[ Kafka Listener ] 바람 세기"); log.info("[ Kafka Listener ] Wind Strength : --> {}", windStrength); - //TODO : 알림 보낼 사용자 정보 가져오기 (Redis) - List userIds = new ArrayList<>(); + // Wind Alert를 True로 설정한 User Query + List userData = userDataRepository.findAllByWindAlertIsTrue(); + + // 1. 현재 windStrength 보다 작게 설정한 유저 필터 + // 2. 이메일 추출 + // 3. 지역 일치 & 마지막 알림 3시간 경과했는지 필터 + List userEmails = userData.stream() + .filter(data -> data.getWindDegree() <= windStrength) + .map(UserData::getEmail) + .filter(email -> { + Optional notiRecord = notificationRecordRepository + .findByEmail(email); + if (notiRecord.isPresent()) { + //지역 일치 검사 + if (!notiRecord.get().getRegion().equals(region)) { + return false; + } + //경과 시간 검사 + int diff = notiRecord.get().getLastWindAlarmReceived().getHour() - currentHour; + if (Math.abs(diff) >= 3) { //3시간 이상 경과했는지? + return true; + } + } + log.warn(" [ Kafka Listener ] Notification Record 존재하지 않습니다. Email ---> {}", email); + return false; + }) + .toList(); //TODO : 바람 세기 알림 멘트 정리 sb.append("현재 바람 세기가 ").append(windStrength).append("m/s 이상입니다. 강풍에 주의하세요."); System.out.println("[ 푸시알림 ] 바람 세기 알림"); - sendAlarms(userIds, sb.toString()); + + alarmService.sendAlarms(userEmails,title, sb.toString()); + //TODO : Notification Record 저장 } /** - * 강설 정보 알림 Listener - * */ + * 강설 정보 알림 Listener
+ * 기상청 기준
+ * 약한 비 1~3mm
+ * 보통 비 3~15mm
+ * 강한 비 15~30mm
+ * 매우 강한 비 30mm 이상
+ * 참고 + */ @Transactional - @KafkaListener(topics = "alarm-snow") - public void consumeSnow(@Payload String message) { - String resultMessage = ""; - Double snow = Double.valueOf(message); //강수량 + @KafkaListener(topics = "alarm-snow", containerFactory = "weatherKafkaListenerContainerFactory") + public void consumeSnow(KafkaDto.WeatherDto weatherDto) { + + int currentHour = LocalDateTime.now().getHour(); + // 22:00 ~ 07:00 는 알림을 전송하지 않음 + if (currentHour >= 22 || currentHour <= 7) { + return; + } + + String title = "Waither 강수 정보 알림"; + StringBuilder sb = new StringBuilder(); + + String region = weatherDto.region(); + Double prediction = Double.valueOf(weatherDto.message()); //강수량 + + log.info("[ Kafka Listener ] 강수량 지역 --> {}", region); + log.info("[ Kafka Listener ] 걍수량 --> {}", prediction); - log.info("[ Kafka Listener ] 강수량"); - log.info("[ Kafka Listener ] Snow : --> {}", snow); + List userData = userDataRepository.findAllBySnowAlertIsTrue(); - //TODO : 알림 보낼 사용자 정보 가져오기 (Redis) - List userIds = new ArrayList<>(); + //예시 : 현재 서울특별시 지역에 2mm의 약한 비가 내릴 예정입니다. + //TODO: 언제 내리는지? 확인 필요 + sb.append("현재 ").append(region).append(" 지역에 ").append(prediction).append("mm의 ") + .append(getExpression(prediction)).append("가 내릴 예정입니다."); + + //알림 보낼 사용자 이메일 + List userEmails = filterRegionAndAlarm(region, userData, currentHour); - //TODO : 알림 멘트 정리 - resultMessage += "현재 강수량 " + snow + "m/s 이상입니다."; System.out.println("[ 푸시알림 ] 강수량 알림"); - sendAlarms(userIds, resultMessage); + alarmService.sendAlarms(userEmails, title, sb.toString()); + } /** * 기상 특보 알림 Listener * */ @Transactional - @KafkaListener(topics = "alarm-climate") - public void consumeClimateAlarm(@Payload String message) { - String resultMessage = ""; + @KafkaListener(topics = "alarm-climate", containerFactory = "weatherKafkaListenerContainerFactory") + public void consumeClimateAlarm(KafkaDto.WeatherDto weatherDto) { + int currentHour = LocalDateTime.now().getHour(); + // 22:00 ~ 07:00 는 알림을 전송하지 않음 + if (currentHour >= 22 || currentHour <= 7) { + return; + } + + String title = "Waither 기상 특보 알림"; + StringBuilder sb = new StringBuilder(); log.info("[ Kafka Listener ] 기상 특보"); - //TODO : 알림 보낼 사용자 정보 가져오기 (Redis) - List userIds = new ArrayList<>(); + String region = weatherDto.region(); + String message = weatherDto.message(); - resultMessage += "[기상청 기상 특보] " + message; + // Wind Climate를 True로 설정한 User Query + List userData = userDataRepository.findAllByClimateAlertIsTrue(); + + // 알림 보낼 사용자 이메일 + List userEmails = filterRegion(region, userData); + + sb.append("[기상청 기상 특보] ").append(message); - //TODO : 푸시알림 전송 System.out.println("[ 푸시알림 ] 기상 특보 알림"); - sendAlarms(userIds, resultMessage); + alarmService.sendAlarms(userEmails, title, sb.toString()); } - private void sendAlarms(List userEmails, String message) { - userEmails.forEach(email ->{ - String token = String.valueOf(redisUtils.get(String.valueOf(email))); - if (token == null) { //token을 찾지 못했을 경우 - throw new CustomException(ErrorCode.FIREBASE_TOKEN_NOT_FOUND); - } - + //지역 필터링 & 알림 규칙 검사 + private List filterRegionAndAlarm(String region, List userData, int currentHour) { + return userData.stream() + .filter(data -> { + Optional notiRecord = notificationRecordRepository.findByEmail(data.getEmail()); + return notiRecord.map(notificationRecord -> + (Math.abs(notiRecord.get().getLastWindAlarmReceived().getHour() - currentHour) >=3 + && notificationRecord.getRegion().equals(region) ) + ).orElse(false); + }) + .map(UserData::getEmail) + .toList(); + } - System.out.printf("[ 푸시알림 ] Email ---> {%d}", email); - System.out.printf("[ 푸시알림 ] message ---> {%s}", message); - }); + private List filterRegion(String region, List userData) { + return userData.stream() + .filter(data -> { + Optional notiRecord = notificationRecordRepository.findByEmail(data.getEmail()); + return notiRecord.map(notificationRecord -> notificationRecord.getRegion().equals(region)).orElse(false); + }) + .map(UserData::getEmail) + .toList(); } + //강수 표현 + private String getExpression(double prediction) { + //1~3mm : 약한 비 + if (prediction > 1 && prediction < 3) { + return "약한 비"; + //3~15mm : 비 + } else if (prediction >=3 && prediction < 15) { + return "비"; + //15~30mm 강한 비 + } else if (prediction >= 15 &&prediction <= 30) { + return "강한 비"; + //30mm~ 매우 강한 비 + } else if (prediction >= 30) { + return "매우 강한 비"; + } else return "비"; + } }