Skip to content

Commit

Permalink
♻️refactor : Kafka ConsumerConfig & 로직 변경
Browse files Browse the repository at this point in the history
  • Loading branch information
DDonghyeo committed Jun 18, 2024
1 parent 110cb35 commit 824988c
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ private ConsumerFactory<String, KafkaDto.InitialDataDto> initialDataConsumerFact
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KafkaDto.InitialDataDto.class));
}

@Bean("weatherKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, KafkaDto.WeatherDto> weatherConcurrentKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, KafkaDto.WeatherDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(weatherConsumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}

private ConsumerFactory<String, KafkaDto.WeatherDto> weatherConsumerFactory() {
Map<String, Object> props = dtoSettings();
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KafkaDto.WeatherDto.class));
}




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.waither.notiservice.domain.UserData;
import com.waither.notiservice.domain.UserMedian;
import com.waither.notiservice.enums.Season;
import com.waither.notiservice.utils.TemperatureUtils;
import lombok.Builder;

import java.util.List;
Expand Down Expand Up @@ -77,4 +76,10 @@ public record TokenDto(
String token
){}

@Builder
public record WeatherDto(
String region,
String message
) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@

import com.waither.notiservice.domain.UserData;
import com.waither.notiservice.domain.UserMedian;
import com.waither.notiservice.domain.redis.NotificationRecord;
import com.waither.notiservice.enums.Season;
import com.waither.notiservice.dto.kafka.KafkaDto;
import com.waither.notiservice.global.exception.CustomException;
import com.waither.notiservice.global.response.ErrorCode;
import com.waither.notiservice.repository.jpa.UserDataRepository;
import com.waither.notiservice.repository.jpa.UserMedianRepository;
import com.waither.notiservice.repository.redis.NotificationRecordRepository;
import com.waither.notiservice.utils.RedisUtils;
import com.waither.notiservice.utils.TemperatureUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;

Expand All @@ -26,8 +25,10 @@
@Component
public class KafkaConsumer {

private final AlarmService alarmService;
private final UserDataRepository userDataRepository;
private final UserMedianRepository userMedianRepository;
private final NotificationRecordRepository notificationRecordRepository;
private final RedisUtils redisUtils;

/**
Expand Down Expand Up @@ -122,82 +123,179 @@ public void consumeUserInit(KafkaDto.InitialDataDto initialDataDto) {

/**
* 바람 세기 알림 Listener
* */
* @Query : 0200, 0500, 0800, 1100, 1400, 1700, 2000, 2300 */
@Transactional
@KafkaListener(topics = "alarm-wind")
public void consumeWindAlarm(@Payload String message) {
@KafkaListener(topics = "alarm-wind", containerFactory = "weatherKafkaListenerContainerFactory")
public void consumeWindAlarm(KafkaDto.WeatherDto weatherDto) {

int currentHour = LocalDateTime.now().getHour();
// 22:00 ~ 07:00 는 알림을 전송하지 않음
if (currentHour >= 22 || currentHour <= 7) {
return;
}

String title = "Waither 바람 세기 알림";
StringBuilder sb = new StringBuilder();

Long windStrength = Long.valueOf(message); //바람세기
String region = weatherDto.region();
Double windStrength = Double.valueOf(weatherDto.message()); //바람세기

log.info("[ Kafka Listener ] 바람 세기");
log.info("[ Kafka Listener ] Wind Strength : --> {}", windStrength);

//TODO : 알림 보낼 사용자 정보 가져오기 (Redis)
List<Long> userIds = new ArrayList<>();
// Wind Alert를 True로 설정한 User Query
List<UserData> userData = userDataRepository.findAllByWindAlertIsTrue();

// 1. 현재 windStrength 보다 작게 설정한 유저 필터
// 2. 이메일 추출
// 3. 지역 일치 & 마지막 알림 3시간 경과했는지 필터
List<String> userEmails = userData.stream()
.filter(data -> data.getWindDegree() <= windStrength)
.map(UserData::getEmail)
.filter(email -> {
Optional<NotificationRecord> notiRecord = notificationRecordRepository
.findByEmail(email);
if (notiRecord.isPresent()) {
//지역 일치 검사
if (!notiRecord.get().getRegion().equals(region)) {
return false;
}
//경과 시간 검사
int diff = notiRecord.get().getLastWindAlarmReceived().getHour() - currentHour;
if (Math.abs(diff) >= 3) { //3시간 이상 경과했는지?
return true;
}
}
log.warn(" [ Kafka Listener ] Notification Record 존재하지 않습니다. Email ---> {}", email);
return false;
})
.toList();

//TODO : 바람 세기 알림 멘트 정리
sb.append("현재 바람 세기가 ").append(windStrength).append("m/s 이상입니다. 강풍에 주의하세요.");

System.out.println("[ 푸시알림 ] 바람 세기 알림");
sendAlarms(userIds, sb.toString());

alarmService.sendAlarms(userEmails,title, sb.toString());
//TODO : Notification Record 저장
}


/**
* 강설 정보 알림 Listener
* */
* 강설 정보 알림 Listener <br>
* 기상청 기준 <br>
* 약한 비 1~3mm <br>
* 보통 비 3~15mm <br>
* 강한 비 15~30mm <br>
* 매우 강한 비 30mm 이상 <br>
* <a href="https://www.kma.go.kr/kma/biz/forecast05.jsp">참고</a>
*/
@Transactional
@KafkaListener(topics = "alarm-snow")
public void consumeSnow(@Payload String message) {
String resultMessage = "";
Double snow = Double.valueOf(message); //강수량
@KafkaListener(topics = "alarm-snow", containerFactory = "weatherKafkaListenerContainerFactory")
public void consumeSnow(KafkaDto.WeatherDto weatherDto) {

int currentHour = LocalDateTime.now().getHour();
// 22:00 ~ 07:00 는 알림을 전송하지 않음
if (currentHour >= 22 || currentHour <= 7) {
return;
}

String title = "Waither 강수 정보 알림";
StringBuilder sb = new StringBuilder();

String region = weatherDto.region();
Double prediction = Double.valueOf(weatherDto.message()); //강수량

log.info("[ Kafka Listener ] 강수량 지역 --> {}", region);
log.info("[ Kafka Listener ] 걍수량 --> {}", prediction);

log.info("[ Kafka Listener ] 강수량");
log.info("[ Kafka Listener ] Snow : --> {}", snow);
List<UserData> userData = userDataRepository.findAllBySnowAlertIsTrue();

//TODO : 알림 보낼 사용자 정보 가져오기 (Redis)
List<Long> userIds = new ArrayList<>();
//예시 : 현재 서울특별시 지역에 2mm의 약한 비가 내릴 예정입니다.
//TODO: 언제 내리는지? 확인 필요
sb.append("현재 ").append(region).append(" 지역에 ").append(prediction).append("mm의 ")
.append(getExpression(prediction)).append("가 내릴 예정입니다.");

//알림 보낼 사용자 이메일
List<String> userEmails = filterRegionAndAlarm(region, userData, currentHour);

//TODO : 알림 멘트 정리
resultMessage += "현재 강수량 " + snow + "m/s 이상입니다.";

System.out.println("[ 푸시알림 ] 강수량 알림");
sendAlarms(userIds, resultMessage);
alarmService.sendAlarms(userEmails, title, sb.toString());

}

/**
* 기상 특보 알림 Listener
* */
@Transactional
@KafkaListener(topics = "alarm-climate")
public void consumeClimateAlarm(@Payload String message) {
String resultMessage = "";
@KafkaListener(topics = "alarm-climate", containerFactory = "weatherKafkaListenerContainerFactory")
public void consumeClimateAlarm(KafkaDto.WeatherDto weatherDto) {
int currentHour = LocalDateTime.now().getHour();
// 22:00 ~ 07:00 는 알림을 전송하지 않음
if (currentHour >= 22 || currentHour <= 7) {
return;
}

String title = "Waither 기상 특보 알림";
StringBuilder sb = new StringBuilder();

log.info("[ Kafka Listener ] 기상 특보");

//TODO : 알림 보낼 사용자 정보 가져오기 (Redis)
List<Long> userIds = new ArrayList<>();
String region = weatherDto.region();
String message = weatherDto.message();

resultMessage += "[기상청 기상 특보] " + message;
// Wind Climate를 True로 설정한 User Query
List<UserData> userData = userDataRepository.findAllByClimateAlertIsTrue();

// 알림 보낼 사용자 이메일
List<String> userEmails = filterRegion(region, userData);

sb.append("[기상청 기상 특보] ").append(message);

//TODO : 푸시알림 전송
System.out.println("[ 푸시알림 ] 기상 특보 알림");
sendAlarms(userIds, resultMessage);
alarmService.sendAlarms(userEmails, title, sb.toString());
}


private void sendAlarms(List<Long> userEmails, String message) {
userEmails.forEach(email ->{
String token = String.valueOf(redisUtils.get(String.valueOf(email)));
if (token == null) { //token을 찾지 못했을 경우
throw new CustomException(ErrorCode.FIREBASE_TOKEN_NOT_FOUND);
}

//지역 필터링 & 알림 규칙 검사
private List<String> filterRegionAndAlarm(String region, List<UserData> userData, int currentHour) {
return userData.stream()
.filter(data -> {
Optional<NotificationRecord> notiRecord = notificationRecordRepository.findByEmail(data.getEmail());
return notiRecord.map(notificationRecord ->
(Math.abs(notiRecord.get().getLastWindAlarmReceived().getHour() - currentHour) >=3
&& notificationRecord.getRegion().equals(region) )
).orElse(false);
})
.map(UserData::getEmail)
.toList();
}

System.out.printf("[ 푸시알림 ] Email ---> {%d}", email);
System.out.printf("[ 푸시알림 ] message ---> {%s}", message);
});
private List<String> filterRegion(String region, List<UserData> userData) {
return userData.stream()
.filter(data -> {
Optional<NotificationRecord> notiRecord = notificationRecordRepository.findByEmail(data.getEmail());
return notiRecord.map(notificationRecord -> notificationRecord.getRegion().equals(region)).orElse(false);
})
.map(UserData::getEmail)
.toList();
}

//강수 표현
private String getExpression(double prediction) {
//1~3mm : 약한 비
if (prediction > 1 && prediction < 3) {
return "약한 비";
//3~15mm : 비
} else if (prediction >=3 && prediction < 15) {
return "비";
//15~30mm 강한 비
} else if (prediction >= 15 &&prediction <= 30) {
return "강한 비";
//30mm~ 매우 강한 비
} else if (prediction >= 30) {
return "매우 강한 비";
} else return "비";
}
}

0 comments on commit 824988c

Please sign in to comment.