Skip to content

Commit

Permalink
♻️ refactor: Kafka 설정 변경 및 API Docs 적용
Browse files Browse the repository at this point in the history
  • Loading branch information
seheonnn authored May 13, 2024
2 parents f684dd0 + d69073b commit ebb4399
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 46 deletions.
15 changes: 15 additions & 0 deletions weather-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
id 'java'
id 'org.springframework.boot' version '3.2.3'
id 'io.spring.dependency-management' version '1.1.4'
id "org.springdoc.openapi-gradle-plugin" version '1.8.0'
}

group = 'com.waither'
Expand Down Expand Up @@ -45,6 +46,20 @@ dependencies {

// Redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis'

//Springdoc
implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.1.0'
}

openApi {
apiDocsUrl.set("http://localhost:3000") // Document URL
outputDir.set(file("$rootDir/docs")) // Build Result Path
outputFileName.set("weather.json") // Build Result File Name
groupedApiMappings.set(Map.of("http://localhost:8081/weather/api-docs", "weather.json"))
waitTimeInSeconds.set(60) // Timeout
customBootRun {
args.add("--spring.profiles.active=dev")
}
}

dependencyManagement {
Expand Down
182 changes: 182 additions & 0 deletions weather-service/docs/weather.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
{
"openapi": "3.0.1",
"info": {
"title": "OpenAPI definition",
"version": "v0"
},
"servers": [
{
"url": "http://localhost:8081",
"description": "Generated server url"
}
],
"paths": {
"/api/v1/weather/short": {
"post": {
"tags": [
"weather-test-controller"
],
"operationId": "createExpectedWeatherTest",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ForeCastTestRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "OK"
}
}
}
},
"/api/v1/weather/msg": {
"post": {
"tags": [
"weather-test-controller"
],
"operationId": "createDisasterMsgTest",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/MsgTestRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "OK"
}
}
}
},
"/api/v1/weather/daily": {
"post": {
"tags": [
"weather-test-controller"
],
"operationId": "createDailyWeatherTest",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ForeCastTestRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "OK"
}
}
}
},
"/api/v1/weather/air": {
"post": {
"tags": [
"weather-test-controller"
],
"operationId": "airKoreaTest",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/AirTestRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "OK"
}
}
}
},
"/api/v1/weather/accuweather": {
"post": {
"tags": [
"weather-test-controller"
],
"operationId": "accuweatherTest",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/AccuweatherTestRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "OK"
}
}
}
}
},
"components": {
"schemas": {
"ForeCastTestRequest": {
"type": "object",
"properties": {
"nx": {
"type": "integer",
"format": "int32"
},
"ny": {
"type": "integer",
"format": "int32"
},
"baseDate": {
"type": "string"
},
"baseTime": {
"type": "string"
}
}
},
"MsgTestRequest": {
"type": "object",
"properties": {
"location": {
"type": "string"
}
}
},
"AirTestRequest": {
"type": "object",
"properties": {
"searchDate": {
"type": "string"
}
}
},
"AccuweatherTestRequest": {
"type": "object",
"properties": {
"latitude": {
"type": "number",
"format": "double"
},
"longitude": {
"type": "number",
"format": "double"
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package com.waither.weatherservice.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@RequiredArgsConstructor
public class Consumer {

@KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "dailyWeatherConcurrentKafkaListenerContainerFactory")
public void dailyWeatherConsume(DailyWeatherKafkaMessage message) {

log.info("Consumer Test ========================== ");
log.info("[*] Consumer Message {} ", message);
}
}
// 테스트용 Consumer
// @Slf4j
// @Component
// @RequiredArgsConstructor
// public class Consumer {
//
// @KafkaListener(topics = "${spring.kafka.template.topic}", groupId = "${spring.kafka.consumer.group-id}")
// public void dailyWeatherConsume(String message) {
//
// log.info("Consumer Test ========================== ");
// log.info("[*] Consumer Message {} ", message);
// }
// }
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import lombok.Builder;

// 우선 문자열로 바람 세기만. 추후 변경 가능성
@Builder
public record DailyWeatherKafkaMessage(
String pop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -35,36 +32,37 @@ public class KafkaConfig {
private String groupId;

@Bean
public ProducerFactory<String, DailyWeatherKafkaMessage> dailyWeatherProducerFactory() {
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, DailyWeatherKafkaMessage> dailyWeatherKafkaTemplate() {
return new KafkaTemplate<>(dailyWeatherProducerFactory());
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, DailyWeatherKafkaMessage> dailyWeatherConsumerFactory() {
public ConsumerFactory<String, String> consumerStringFactory() {
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// Consumer Group
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(DailyWeatherKafkaMessage.class));
new JsonDeserializer<>(String.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> dailyWeatherConcurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(dailyWeatherConsumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory<String, DailyWeatherKafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(dailyWeatherConsumerFactory());
// factory.setConcurrency(3);
// factory.setBatchListener(true);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
// return factory;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
@RequiredArgsConstructor
public class Producer {

private final KafkaTemplate<String, DailyWeatherKafkaMessage> DailyWeatherKafkaTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.template.default-topic}")
@Value("${spring.kafka.template.topic}")
private String topic;

public void dailyWeatherProduceMessage(DailyWeatherKafkaMessage message) {
log.info("=================== Topic : {} ===================", topic);
public void produceMessage(String message) {
log.info("[*] Producer Message : {}", message);
DailyWeatherKafkaTemplate.send(topic, message);
kafkaTemplate.send(topic, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.waither.weatherservice.entity.DailyWeather;
import com.waither.weatherservice.entity.DisasterMessage;
import com.waither.weatherservice.entity.ExpectedWeather;
import com.waither.weatherservice.kafka.DailyWeatherKafkaMessage;
import com.waither.weatherservice.kafka.Producer;
import com.waither.weatherservice.openapi.ForeCastOpenApiResponse;
import com.waither.weatherservice.openapi.MsgOpenApiResponse;
Expand Down Expand Up @@ -62,7 +61,6 @@ public void createExpectedWeather(
.expectedSky(expectedSkyList)
.build();

// TODO 조회 테스트 후 삭제 예정
ExpectedWeather save = expectedWeatherRepository.save(expectedWeather);
log.info("[*] 예상 기후 : {}", save);
}
Expand Down Expand Up @@ -96,9 +94,10 @@ public void createDailyWeather(int nx,
.windDegree(wsd)
.build();

DailyWeatherKafkaMessage kafkaMessage = DailyWeatherKafkaMessage.from(dailyWeather);
// DailyWeatherKafkaMessage kafkaMessage = DailyWeatherKafkaMessage.from(dailyWeather);

producer.dailyWeatherProduceMessage(kafkaMessage);
// 바람 세기 Kafka 전송
producer.produceMessage(wsd);

// DailyWeather save = dailyWeatherRepository.save(dailyWeather);
log.info("[*] 하루 온도 : {}", dailyWeather);
Expand All @@ -118,7 +117,6 @@ public void createDisasterMsg(String location) throws URISyntaxException, IOExce
.message(msg)
.build();

// TODO 조회 테스트 후 삭제 예정
DisasterMessage save = disasterMessageRepository.save(disasterMessage);
log.info("[*] 재난 문자 : {}", save);
}
Expand Down

0 comments on commit ebb4399

Please sign in to comment.