Skip to content

Commit

Permalink
πŸ› fix: Kafka Consumer Type Mapping μ„€μ • μΆ”κ°€
Browse files Browse the repository at this point in the history
  • Loading branch information
DDonghyeo committed Jul 5, 2024
1 parent 1a14ca9 commit 286f4af
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ public ConcurrentKafkaListenerContainerFactory<String, KafkaDto.WeatherDto> weat

private ConsumerFactory<String, KafkaDto.WeatherDto> weatherConsumerFactory() {
Map<String, Object> props = dtoSettings();
props.put(JsonDeserializer.TYPE_MAPPINGS, "KafkaMessage:com.waither.notiservice.dto.kafka.KafkaDto.WeatherDto");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.waither.*");
props.put(JsonDeserializer.TYPE_MAPPINGS, "com.waither.weatherservice.kafka.KafkaMessage:com.waither.notiservice.dto.kafka.KafkaDto$WeatherDto");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.waither.*,com.waither.weatherservice.kafka");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.waither.notiservice.dto.kafka.KafkaDto$WeatherDto");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), createJsonDeserializer(KafkaDto.WeatherDto.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -108,7 +109,7 @@ public void consumeUserInit(KafkaDto.InitialDataDto initialDataDto) {
* @Query : 0200, 0500, 0800, 1100, 1400, 1700, 2000, 2300 */
@Transactional
@KafkaListener(topics = "alarm-wind", containerFactory = "weatherKafkaListenerContainerFactory")
public void consumeWindAlarm(KafkaDto.WeatherDto weatherDto) {
public void consumeWindAlarm(@Payload KafkaDto.WeatherDto weatherDto) {

int currentHour = LocalDateTime.now().getHour();
// 22:00 ~ 07:00 λŠ” μ•Œλ¦Όμ„ μ „μ†‘ν•˜μ§€ μ•ŠμŒ
Expand Down Expand Up @@ -165,7 +166,7 @@ public void consumeWindAlarm(KafkaDto.WeatherDto weatherDto) {
*/
@Transactional
@KafkaListener(topics = "alarm-rain", containerFactory = "weatherKafkaListenerContainerFactory")
public void consumeRain(KafkaDto.WeatherDto weatherDto) {
public void consumeRain(@Payload KafkaDto.WeatherDto weatherDto) {

int currentHour = LocalDateTime.now().getHour();
// 22:00 ~ 07:00 λŠ” μ•Œλ¦Όμ„ μ „μ†‘ν•˜μ§€ μ•ŠμŒ
Expand Down Expand Up @@ -230,7 +231,7 @@ public void consumeRain(KafkaDto.WeatherDto weatherDto) {
* */
@Transactional
@KafkaListener(topics = "alarm-climate", containerFactory = "weatherKafkaListenerContainerFactory")
public void consumeClimateAlarm(KafkaDto.WeatherDto weatherDto) {
public void consumeClimateAlarm(@Payload KafkaDto.WeatherDto weatherDto) {
int currentHour = LocalDateTime.now().getHour();
// 22:00 ~ 07:00 λŠ” μ•Œλ¦Όμ„ μ „μ†‘ν•˜μ§€ μ•ŠμŒ
if (currentHour >= 22 || currentHour <= 7) {
Expand Down

0 comments on commit 286f4af

Please sign in to comment.