Skip to content

Commit

Permalink
♻️refactor : Kafka Consumer 수정
Browse files Browse the repository at this point in the history
  • Loading branch information
DDonghyeo committed Jun 19, 2024
1 parent 824988c commit b0005ab
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.data.redis.core.RedisHash;

import java.time.LocalDateTime;
import java.util.Date;

@Getter
@Builder
Expand All @@ -26,6 +27,14 @@ public class NotificationRecord {
//사용자 마지막 위치 (지역)
private String region;

public void initializeWindTime() {
lastWindAlarmReceived = LocalDateTime.now();
}

public void initializeRainTime() {
lastRainAlarmReceived = LocalDateTime.now();
}

public void setLastRainAlarmReceived(LocalDateTime lastRainAlarmReceived) {
this.lastRainAlarmReceived = lastRainAlarmReceived;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,38 +146,21 @@ public void consumeWindAlarm(KafkaDto.WeatherDto weatherDto) {
// Wind Alert를 True로 설정한 User Query
List<UserData> userData = userDataRepository.findAllByWindAlertIsTrue();

// 1. 현재 windStrength 보다 작게 설정한 유저 필터
// 2. 이메일 추출
// 3. 지역 일치 & 마지막 알림 3시간 경과했는지 필터
List<String> userEmails = userData.stream()
.filter(data -> data.getWindDegree() <= windStrength)
.map(UserData::getEmail)
.filter(email -> {
Optional<NotificationRecord> notiRecord = notificationRecordRepository
.findByEmail(email);
if (notiRecord.isPresent()) {
//지역 일치 검사
if (!notiRecord.get().getRegion().equals(region)) {
return false;
}
//경과 시간 검사
int diff = notiRecord.get().getLastWindAlarmReceived().getHour() - currentHour;
if (Math.abs(diff) >= 3) { //3시간 이상 경과했는지?
return true;
}
}
log.warn(" [ Kafka Listener ] Notification Record 존재하지 않습니다. Email ---> {}", email);
return false;
})
.toList();
//알림 보낼 사용자 이메일
List<String> userEmails = filterRegionAndWindAlarm(region, userData, currentHour);

//TODO : 바람 세기 알림 멘트 정리
sb.append("현재 바람 세기가 ").append(windStrength).append("m/s 이상입니다. 강풍에 주의하세요.");
sb.append("현재 바람 세기가 ").append(windStrength).append("m/s 이상입니다.");

System.out.println("[ 푸시알림 ] 바람 세기 알림");

alarmService.sendAlarms(userEmails,title, sb.toString());
//TODO : Notification Record 저장

//Record 알림 시간 초기화
userEmails
.forEach(email -> {
Optional<NotificationRecord> notificationRecord = notificationRecordRepository.findByEmail(email);
notificationRecord.ifPresent(NotificationRecord::initializeWindTime);
});
}


Expand Down Expand Up @@ -217,12 +200,19 @@ public void consumeSnow(KafkaDto.WeatherDto weatherDto) {
.append(getExpression(prediction)).append("가 내릴 예정입니다.");

//알림 보낼 사용자 이메일
List<String> userEmails = filterRegionAndAlarm(region, userData, currentHour);
List<String> userEmails = filterRegionAndRainAlarm(region, userData, currentHour);


System.out.println("[ 푸시알림 ] 강수량 알림");
alarmService.sendAlarms(userEmails, title, sb.toString());

//Record 알림 시간 초기화
userEmails
.forEach(email -> {
Optional<NotificationRecord> notificationRecord = notificationRecordRepository.findByEmail(email);
notificationRecord.ifPresent(NotificationRecord::initializeRainTime);
});

}

/**
Expand Down Expand Up @@ -259,7 +249,7 @@ public void consumeClimateAlarm(KafkaDto.WeatherDto weatherDto) {


//지역 필터링 & 알림 규칙 검사
private List<String> filterRegionAndAlarm(String region, List<UserData> userData, int currentHour) {
private List<String> filterRegionAndWindAlarm(String region, List<UserData> userData, int currentHour) {
return userData.stream()
.filter(data -> {
Optional<NotificationRecord> notiRecord = notificationRecordRepository.findByEmail(data.getEmail());
Expand All @@ -272,6 +262,20 @@ private List<String> filterRegionAndAlarm(String region, List<UserData> userData
.toList();
}

//지역 필터링 & 알림 규칙 검사
private List<String> filterRegionAndRainAlarm(String region, List<UserData> userData, int currentHour) {
return userData.stream()
.filter(data -> {
Optional<NotificationRecord> notiRecord = notificationRecordRepository.findByEmail(data.getEmail());
return notiRecord.map(notificationRecord ->
(Math.abs(notiRecord.get().getLastRainAlarmReceived().getHour() - currentHour) >=3
&& notificationRecord.getRegion().equals(region) )
).orElse(false);
})
.map(UserData::getEmail)
.toList();
}

private List<String> filterRegion(String region, List<UserData> userData) {
return userData.stream()
.filter(data -> {
Expand Down

0 comments on commit b0005ab

Please sign in to comment.