Skip to content

Commit

Permalink
๐Ÿ› fix: Kafka Consumer ์˜ˆ์™ธ ์ƒํ™ฉ ์ฒ˜๋ฆฌ
Browse files Browse the repository at this point in the history
  • Loading branch information
DDonghyeo committed Jul 5, 2024
1 parent 7ea6c8d commit 364d294
Showing 1 changed file with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,17 @@ public void consumeWindAlarm(KafkaDto.WeatherDto weatherDto) {

// Wind Alert๋ฅผ True๋กœ ์„ค์ •ํ•œ User Query
List<UserData> userData = userDataRepository.findAllByWindAlertIsTrue();
if (userData.isEmpty()) {
log.info("[ Kafka Listener ] ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์—†์Œ.");
return;
}

//์•Œ๋ฆผ ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์ด๋ฉ”์ผ
List<String> userEmails = filterRegionAndWindAlarm(region, userData, currentHour);
if (userEmails.isEmpty()) {
log.info("[ Kafka Listener ] ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์—†์Œ.");
return;
}

sb.append("ํ˜„์žฌ ๋ฐ”๋žŒ ์„ธ๊ธฐ๊ฐ€ ").append(windStrength).append("m/s ์ด์ƒ์ž…๋‹ˆ๋‹ค.");

Expand Down Expand Up @@ -165,15 +173,27 @@ public void consumeRain(KafkaDto.WeatherDto weatherDto) {
return;
}

String title = "Waither ๊ฐ•์ˆ˜ ์ •๋ณด ์•Œ๋ฆผ";
List<UserData> userData = userDataRepository.findAllBySnowAlertIsTrue();
StringBuilder sb = new StringBuilder();

//์ง€์—ญ
String region = weatherDto.region();
log.info("[ Kafka Listener ] ๊ฐ•์ˆ˜๋Ÿ‰ ์ง€์—ญ --> {}", region);
String message = weatherDto.message();

String title = "Waither ๊ฐ•์ˆ˜ ์ •๋ณด ์•Œ๋ฆผ";
List<UserData> userData = userDataRepository.findAllBySnowAlertIsTrue();
if (userData.isEmpty()) {
log.info("[ Kafka Listener ] ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์—†์Œ.");
return;
}

List<String> userEmails = filterRegionAndRainAlarm(region, userData, currentHour);
if (userEmails.isEmpty()) {
log.info("[ Kafka Listener ] ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์—†์Œ.");
return;
}

StringBuilder sb = new StringBuilder();


//1์‹œ๊ฐ„ ๋’ค, 2์‹œ๊ฐ„ ๋’ค, 3์‹œ๊ฐ„ ๋’ค, 4์‹œ๊ฐ„ ๋’ค, 5์‹œ๊ฐ„ ๋’ค, 6์‹œ๊ฐ„ ๋’ค
List<Double> predictions = Arrays.stream(message.split(","))
.map(String::trim) //๊ณต๋ฐฑ ์ œ๊ฑฐ
Expand All @@ -183,13 +203,14 @@ public void consumeRain(KafkaDto.WeatherDto weatherDto) {
String rainMessage = WeatherMessageUtils.getRainPredictions(predictions);

if (rainMessage == null) {
log.info("[ Kafka Listener ] 6์‹œ๊ฐ„ ๋™์•ˆ ๊ฐ•์ˆ˜ ์ •๋ณด ์—†์Œ.");
//6์‹œ๊ฐ„ ๋™์•ˆ ๊ฐ•์ˆ˜ ์ •๋ณด ์—†์Œ
return;
}

sb.append("ํ˜„์žฌ ").append(region).append(" ์ง€์—ญ์— ").append(rainMessage);
//์•Œ๋ฆผ ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์ด๋ฉ”์ผ
List<String> userEmails = filterRegionAndRainAlarm(region, userData, currentHour);


System.out.println("[ ํ‘ธ์‹œ์•Œ๋ฆผ ] ๊ฐ•์ˆ˜๋Ÿ‰ ์•Œ๋ฆผ");
alarmService.sendAlarms(userEmails, title, sb.toString());
Expand Down Expand Up @@ -226,9 +247,17 @@ public void consumeClimateAlarm(KafkaDto.WeatherDto weatherDto) {

// Wind Climate๋ฅผ True๋กœ ์„ค์ •ํ•œ User Query
List<UserData> userData = userDataRepository.findAllByClimateAlertIsTrue();
if (userData.isEmpty()) {
log.info("[ Kafka Listener ] ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์—†์Œ.");
return;
}

// ์•Œ๋ฆผ ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์ด๋ฉ”์ผ
List<String> userEmails = filterRegion(region, userData);
if (userEmails.isEmpty()) {
log.info("[ Kafka Listener ] ๋ณด๋‚ผ ์‚ฌ์šฉ์ž ์—†์Œ.");
return;
}

sb.append("[๊ธฐ์ƒ์ฒญ ๊ธฐ์ƒ ํŠน๋ณด] ").append(message);

Expand Down

0 comments on commit 364d294

Please sign in to comment.