Skip to content

Commit

Permalink
feat: 카프카 설정 및 사용자 설정 정보값 / Median 테이블 전송 (#81)
Browse files Browse the repository at this point in the history
feat: 카프카 설정 및 사용자 설정 정보값 / Median 테이블 전송 (#81)
  • Loading branch information
jinho7 authored May 29, 2024
2 parents 4b1d058 + 74a4f92 commit bbbf993
Show file tree
Hide file tree
Showing 23 changed files with 415 additions and 60 deletions.
1 change: 1 addition & 0 deletions user-service/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
data/**

### STS ###
.apt_generated
Expand Down
2 changes: 2 additions & 0 deletions user-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ dependencies {

//Kafka
implementation 'org.springframework.kafka:spring-kafka'
//Kafka Test
testImplementation 'org.springframework.kafka:spring-kafka-test'

//Redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
Expand Down
22 changes: 22 additions & 0 deletions user-service/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3'

services:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
expose:
- "2181"

kafka:
container_name: kafka
image: wurstmeister/kafka:latest
depends_on:
- "zookeeper"
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.ArrayList;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Configuration
public class CorsConfig implements WebMvcConfigurer {

public static CorsConfigurationSource apiConfigurationSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Excepti

http
.addFilterAt(loginFilter, UsernamePasswordAuthenticationFilter.class);
http
.addFilterBefore(new JwtAuthorizationFilter(jwtUtil, redisUtil), JwtAuthenticationFilter.class);
// http
// .addFilterBefore(new JwtAuthorizationFilter(jwtUtil, redisUtil), JwtAuthenticationFilter.class);
http
.addFilterBefore(new JwtExceptionFilter(), JwtAuthenticationFilter.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ApiResponse<String> submitTemporaryPassword(@RequestParam String email) {
}

// 닉네임 변경
@PutMapping("/update-nickname")
@PutMapping("/nickname")
public ApiResponse<String> updateNickname(@AuthUser User user,
@RequestBody UserReqDto.NicknameDto nicknameDto) {
userService.updateNickname(user, nicknameDto.nickname());
Expand All @@ -82,7 +82,7 @@ public ApiResponse<String> passwordCheckEmail(@AuthUser User user,
}

// 비밀번호 변경
@PutMapping("/update-password")
@PutMapping("/password")
public ApiResponse<String> updatePassword(@AuthUser User user,
@Valid @RequestBody UserReqDto.UpdatePasswordDto updatePasswordDto) {
userService.updatePassword(user, updatePasswordDto.password());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import com.waither.userservice.global.response.ErrorCode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.experimental.UtilityClass;

import static com.waither.userservice.util.CalculateUtil.calculateMedian;
import static com.waither.userservice.global.util.CalculateUtil.calculateMedian;


@NoArgsConstructor(access = AccessLevel.PRIVATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public class SurveyReqDto {

// Todo : DTO에서 날짜/시간 관련 필드는 String 타입으로 선언하고, 서비스 계층에서 원하는 형식으로 파싱하는 것이 좋은가 고민중.
public record SurveyRequestDto(

// level
Integer ans,
LocalDateTime time
) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import lombok.*;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.ColumnDefault;
import org.hibernate.annotations.DynamicUpdate;

// 코드 일부 생략
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import jakarta.persistence.*;
import lombok.*;

import static com.waither.userservice.util.CalculateUtil.calculateMedian;
import static com.waither.userservice.global.util.CalculateUtil.calculateMedian;

@Builder
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
import org.springframework.web.filter.OncePerRequestFilter;

import java.io.IOException;
@Slf4j
@RequiredArgsConstructor
public class JwtAuthorizationFilter extends OncePerRequestFilter {

private final JwtUtil jwtUtil;
private final RedisUtil redisUtil;

@Override
protected void doFilterInternal(
@NonNull HttpServletRequest request,
@NonNull HttpServletResponse response,
@NonNull FilterChain filterChain
) throws ServletException, IOException {
log.info("[*] Jwt Filter");
if (request.getServletPath().equals("/login")) {
filterChain.doFilter(request, response);
return;
}

filterChain.doFilter(request, response);

}
}
//@Slf4j
//@RequiredArgsConstructor
//public class JwtAuthorizationFilter extends OncePerRequestFilter {
//
// private final JwtUtil jwtUtil;
// private final RedisUtil redisUtil;
//
// @Override
// protected void doFilterInternal(
// @NonNull HttpServletRequest request,
// @NonNull HttpServletResponse response,
// @NonNull FilterChain filterChain
// ) throws ServletException, IOException {
// log.info("[*] Jwt Filter");
// if (request.getServletPath().equals("/login")) {
// filterChain.doFilter(request, response);
// return;
// }
//
// filterChain.doFilter(request, response);
//
// }
//}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.waither.userservice.util;
package com.waither.userservice.global.util;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.waither.userservice.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@EnableKafka
@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

// Producer 관련 설정
private <T> ProducerFactory<String, T> producerFactory(Class<T> valueClass) {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

if (valueClass == String.class) {
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
} else {
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
}

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory(Object.class));
}

@Bean
public KafkaTemplate<String, String> kafkaStringTemplate() {
return new KafkaTemplate<>(producerFactory(String.class));
}

// Consumer 관련 설정
private <T> ConsumerFactory<String, T> consumerFactory(Class<T> valueClass) {
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(valueClass));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(Object.class));
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaStringListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(String.class));
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//package com.waither.userservice.kafka;
//
//import lombok.RequiredArgsConstructor;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.stereotype.Component;
//
//@Slf4j
//@Component
//@RequiredArgsConstructor
//public class KafkaConsumer {
//
// // @KafkaListener 통해 topic, group 구독
// @KafkaListener(topics = "${spring.kafka.template.topic}", groupId = "${spring.kafka.consumer.group-id}")
// public void consume(KafkaMessage message) {
// log.info("[*] Kafka Consumer(Object) Message : {} ", message);
// }
//
// // @KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}")
// // public void consume(String message) {
// // log.info("[*] Kafka Consumer(String) Message : {} ", message);
// // }
//}
//
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.waither.userservice.kafka;

import com.waither.userservice.entity.UserMedian;
import com.waither.userservice.entity.Setting;
import com.waither.userservice.entity.User;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class KafkaConverter {

public static KafkaDto.InitialDataDto toInitialData(User user, Setting setting, UserMedian userMedian
) {
return KafkaDto.InitialDataDto.builder()
.nickName(user.getNickname())
.climateAlert(setting.isClimateAlert())
.userAlert(setting.isUserAlert())
.snowAlert(setting.isSnowAlert())
.windAlert(setting.isWindAlert())
.windDegree(setting.getWindDegree())
.regionReport(setting.isRegionReport())
.weight(setting.getWeight())
.medianOf1And2(userMedian.getMedianOf1And2())
.medianOf2And3(userMedian.getMedianOf2And3())
.medianOf3And4(userMedian.getMedianOf3And4())
.medianOf4And5(userMedian.getMedianOf4And5())
.build();
}

public static KafkaDto.UserMedianDto toUserMedianDto(User user, UserMedian userMedian) {
List<Map<String, Double>> medians = Arrays.asList(
Map.of("medianOf1And2", userMedian.getMedianOf1And2()),
Map.of("medianOf2And3", userMedian.getMedianOf2And3()),
Map.of("medianOf3And4", userMedian.getMedianOf3And4()),
Map.of("medianOf4And5", userMedian.getMedianOf4And5())
);
return new KafkaDto.UserMedianDto(user.getId(), medians);
}

public static KafkaDto.UserSettingsDto toSettingDto(User user, String key, String value) {
return new KafkaDto.UserSettingsDto(user.getId(), key, value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.waither.userservice.kafka;

import lombok.Builder;

import java.util.List;
import java.util.Map;

public class KafkaDto {

@Builder
public record InitialDataDto(

String nickName,
boolean climateAlert,
boolean userAlert,
boolean snowAlert,
boolean windAlert,
Integer windDegree,
boolean regionReport,
Double weight,
Double medianOf1And2,
Double medianOf2And3,
Double medianOf3And4,
Double medianOf4And5
) {}

public record UserMedianDto(
Long userId,
List<Map<String, Double>> medians

) {}

public record UserSettingsDto(
Long userId,
String key,
String value
) {}

}
Loading

0 comments on commit bbbf993

Please sign in to comment.