Skip to content

Commit

Permalink
fix: 예외 처리 수정
Browse files Browse the repository at this point in the history
- Message Listener와 Scheduler가 서로 메세지를 읽으면 입금이 중복으로 발생할 수 있는 문제를 발견했습니다.
- 마지막으로 읽은 시간이 일정 시간이 넘어야 하고 ack를 완료했을 때만 입금 로직을 처리할 수 있도록 변경했습니다.
  • Loading branch information
seungh1024 committed May 20, 2024
1 parent 7a46b29 commit 7240ae2
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 42 deletions.
16 changes: 13 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,21 @@ jacocoTestReport {
}
}

//sonar {
// properties {
// property "sonar.projectKey", "C4-ComeTrue_c4-cometrue-assignment"
// property "sonar.organization", "c4-cometrue"
// property "sonar.host.url", "https://sonarcloud.io"
// property "sonar.coverage.jacoco.xmlReportPaths", "build/reports/jacoco/test/jacocoTestReport.xml"
// property "sonar.java.checkstyle.reportPaths", "build/reports/checkstyle/main.xml"
// }
//}

sonar {
properties {
property "sonar.projectKey", "C4-ComeTrue_c4-cometrue-assignment"
property "sonar.organization", "c4-cometrue"
property "sonar.host.url", "https://sonarcloud.io"
property "sonar.projectKey", "minipay"
property "sonar.host.url", "http://localhost:9000"
property "sonar.login", "squ_539c8dfaf3c63cef6480b7260f958529af24e66a"
property "sonar.coverage.jacoco.xmlReportPaths", "build/reports/jacoco/test/jacocoTestReport.xml"
property "sonar.java.checkstyle.reportPaths", "build/reports/checkstyle/main.xml"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

import java.util.concurrent.ThreadPoolExecutor;

import org.c4marathon.assignment.bankaccount.exception.async.AccountAsyncExceptionHandler;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync
public class DepositConfig {
public class DepositConfig implements AsyncConfigurer {

@Value("${bank-account.deposit.core-pool-size}")
private int corePoolSize;
Expand Down Expand Up @@ -40,4 +43,27 @@ public ThreadPoolTaskExecutor depositExecutor() {

return executor;
}

@Bean
public ThreadPoolTaskExecutor rollbackExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(completeOnShutdown);
executor.setThreadNamePrefix("RB_THREAD_");
executor.initialize();

return executor;
}

/**
*
* doDeposit() 메소드 예외 처리 핸들러
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AccountAsyncExceptionHandler();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.c4marathon.assignment.bankaccount.exception.async;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public enum AccountAsyncErrorCode {
SEND_ROLLBACK_FAILED("이체 롤백에 실패했습니다"),
;
private final String message;

public AccountAsyncException accountAsyncException() {
return new AccountAsyncException(name(), getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.c4marathon.assignment.bankaccount.exception.async;

import lombok.Getter;

@Getter
public class AccountAsyncException extends RuntimeException {
private final String errorName;
private final String message;
private final Exception exception;

public AccountAsyncException(String errorName, String message) {
this.errorName = errorName;
this.message = message;
this.exception = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.c4marathon.assignment.bankaccount.exception.async;

import java.lang.reflect.Method;
import java.lang.reflect.Parameter;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AccountAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
StringBuilder sb = new StringBuilder();

Parameter[] parameters = method.getParameters();
int parameterIndex = 0;
for (Object param : params) {
sb.append(parameters[parameterIndex++]).append(" = ").append(param).append(" , ");
}
AccountAsyncException exception = (AccountAsyncException)ex;

// 실제 환경에서는 메일 같은 것으로 에러 발생을 알려줘야 할 것 같다.
log.error(
"error name : {} | error message: {} | error method: {} | error parameters: ",
exception.getErrorName(), exception.getMessage(), method.getName(),
sb);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ public class RedisStreamConsumer implements StreamListener<String, MapRecord<Str
private String consumerGroupName;
@Value("${redis-stream.consumer-name}")
private String consumerName;
@Value("${redis-stream.is-test}")
private boolean isTest;

private final RedisOperator redisOperator;

private final DepositHandlerService depositHandlerService;

@Override
public void destroy() throws Exception {
public void destroy() {
if (subscription != null) {
subscription.cancel();
}
Expand All @@ -63,7 +65,6 @@ public void afterPropertiesSet() throws Exception {
this
);

// 2초마다 message를 얻는다.
subscription.await(Duration.ofSeconds(2));

listenerContainer.start();
Expand All @@ -72,20 +73,28 @@ public void afterPropertiesSet() throws Exception {
/**
*
* Redis Stream 메세지를 처리하는 메소드
* XADD된 메세지를 먼저 처리한다. 처리와 동시에 consumer-group에 pending 한다.
*/
@Override
public void onMessage(MapRecord<String, Object, Object> message) {
// 테스트에서 ack 처리 되지 않도록 처리
if (isTest) {
return;
}
Set<Map.Entry<Object, Object>> entries = message.getValue().entrySet();
Long[] depositData = new Long[3];
Long[] accountData = new Long[3];
int index = 0;

// depositData에 send-pk, deposit-pk, money를 차례대로 담는다.
for (Map.Entry<Object, Object> entry : entries) {
String value = (String)entry.getValue();
depositData[index++] = Long.valueOf(value);
accountData[index++] = Long.valueOf(value);
}

// deposit-pk에 money를 추가하는 task 추가
depositHandlerService.doDeposit(depositData[1], depositData[2], message.getId().toString());
Long ackResult = redisOperator.ackStream(streamKey, consumerGroupName, message.getId().toString());
if (ackResult != 0) {
depositHandlerService.doDeposit(accountData[0], accountData[1], accountData[2]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,37 @@ public class PendingMessageScheduler {
private String consumerName;
@Value("${redis-stream.claim-consumer-name}")
private String claimConsumerName; // xclaim에 사용할 새로운 소비자 이름
@Value("${redis-stream.pending-time}")
private int pendingTime;

private final RedisOperator redisOperator;
private final DepositHandlerService depositHandlerService;

/**
*
* 어떠한 문제로 처리되지 않은 메세지를 처리하는 메소드
* 기존 consumer가 중복하여 처리하지 않도록 consumer-name을 변경
* 이후 doDeposit 메소드로
* 어떠한 문제로 처리되지 않은 pending 메세지를 처리하는 메소드
* 기존 consumer가 중복하여 처리하지 않도록 consumer-name을 변경한다.
*/
@Scheduled(fixedRate = 5000)
public void consumePendingMessage() {
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroupName, consumerName);
for (PendingMessage pendingMessage : pendingMessages) {
// 기존 consumer가 처리하지 못하도록 consumer name을 claim-consumer로 변경한다.
// StreamListener가 처리중인 메세지일 수 있으므로 pendingTime보다 작은 시간이면 처리하지 않는다.
if (pendingMessage.getElapsedTimeSinceLastDelivery().toMillis() < pendingTime) {
break;
}
// 기존 consumer가 중복하여 처리하지 못하도록 consumer name을 claim-consumer로 변경한다.
redisOperator.claimMessage(pendingMessage, streamKey, claimConsumerName);

// streamKey와 id에 해당하는 sned-pk, deposit-pk, money를 차례대로 담은 Long 배열을 조회한다.
Long[] data = redisOperator.findMessageById(streamKey,
pendingMessage.getIdAsString());

Long ackResult = redisOperator.ackStream(streamKey, consumerGroupName, pendingMessage.getIdAsString());
// 처리되지 않은 이체 로그는 롤백을 시켜준다.
if (data != null && data[2] != 0) {
if (data != null && data[2] != 0 && ackResult == 1) {
// 롤백을 하므로 send-pk에 money를 추가해준다.
depositHandlerService.doDeposit(data[0], data[2], pendingMessage.getIdAsString());
depositHandlerService.doDeposit(data[1], data[0], data[2]);
}
}
}
Expand All @@ -62,8 +69,9 @@ public void consumeClaimMessage() {
Long[] data = redisOperator.findMessageById(streamKey,
pendingMessage.getIdAsString());

if (data != null && data[2] != 0) {
depositHandlerService.doDeposit(data[0], data[2], pendingMessage.getIdAsString());
Long ackResult = redisOperator.ackStream(streamKey, consumerGroupName, pendingMessage.getIdAsString());
if (data != null && data[2] != 0 && ackResult == 1) {
depositHandlerService.doDeposit(data[1], data[0], data[2]);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Map;

import org.c4marathon.assignment.bankaccount.service.SendRollbackHandlerService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
Expand All @@ -21,6 +23,7 @@
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
Expand All @@ -36,9 +39,13 @@
public class RedisOperator {

private final RedisTemplate<String, Object> redisTemplate;
private final SendRollbackHandlerService sendRollbackHandlerService;

public void ackStream(String streamKey, String consumerGroup, String id) {
redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, id);
@Value("${redis-stream.pending-time}")
private int pendingTime;

public Long ackStream(String streamKey, String consumerGroup, String id) {
return redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, id);
}

/**
Expand All @@ -63,7 +70,14 @@ public void addStream(String streamKey, long sendPk, long depositPk, long money)
.add("deposit-pk").add(depositPk)
.add("money").add(money);

commands.dispatch(CommandType.XADD, new StatusOutput<>(StringCodec.UTF8), commandArgs);
RedisFuture dispatch = commands.dispatch(CommandType.XADD, new StatusOutput<>(StringCodec.UTF8), commandArgs);
dispatch.handle((result, exception) -> {
// 예외가 발생하면 이체 롤백 요청을 보낸다.
if (exception != null) {
sendRollbackHandlerService.rollBackDeposit(sendPk, depositPk, money);
}
return result;
});
}

public PendingMessages findPendingMessages(String streamKey, String consumerGroupName, String consumerName) {
Expand Down Expand Up @@ -118,7 +132,7 @@ public void claimMessage(PendingMessage pendingMessage, String streamKey, String
CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8).addKey(streamKey)
.add(pendingMessage.getGroupName())
.add(consumerName)
.add("3000")
.add(pendingTime)
.add(pendingMessage.getIdAsString());

commands.dispatch(CommandType.XCLAIM, new StatusOutput<>(StringCodec.UTF8), commandArgs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ public interface MainAccountRepository extends JpaRepository<MainAccount, Long>
@Query("select ma from MainAccount ma where ma.accountPk = :accountPk")
Optional<MainAccount> findByPkForUpdate(@Param("accountPk") long accountPk);

@Modifying
@Modifying(clearAutomatically = true) // 영속성 컨텍스트와 동기화가 필요하다.
@Query("""
update MainAccount ma
set ma.money = ma.money + :chargeMoney
where ma.accountPk = :accountPk
""")
void deposit(@Param("accountPk") long accountPk, @Param("chargeMoney") long chargeMoney);
int deposit(@Param("accountPk") long accountPk, @Param("chargeMoney") long chargeMoney);

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package org.c4marathon.assignment.bankaccount.service;

import org.c4marathon.assignment.bankaccount.message.util.RedisOperator;
import org.c4marathon.assignment.bankaccount.repository.MainAccountRepository;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -14,17 +11,24 @@
@Service
@RequiredArgsConstructor
public class DepositHandlerService {
@Value("${redis-stream.stream-key}")
private String streamKey;
@Value("${redis-stream.consumer-group-name}")
private String consumerGroup;
private final MainAccountRepository mainAccountRepository;
private final RedisOperator redisOperator;

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
/**
*
* 입금 로직을 처리하는 메소드.
*
* @param sendPk 보내는 사람의 계좌
* @param depositPk 받는 사람의 계좌
* @param money 이체할 금액
*/
@Async("depositExecutor")
public void doDeposit(long accountPk, long money, String streamId) {
mainAccountRepository.deposit(accountPk, money);
redisOperator.ackStream(streamKey, consumerGroup, streamId);
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void doDeposit(long sendPk, long depositPk, long money) {
int updateResult = mainAccountRepository.deposit(depositPk, money);
// 상대 계좌에 업데이트가 되지 않은 경우 롤백해야 한다.
if (updateResult == 0) {
mainAccountRepository.deposit(sendPk, money);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.c4marathon.assignment.bankaccount.service;

import org.c4marathon.assignment.bankaccount.exception.async.AccountAsyncErrorCode;
import org.c4marathon.assignment.bankaccount.repository.MainAccountRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class SendRollbackHandlerService {
private final MainAccountRepository mainAccountRepository;

@Async("rollbackExecutor")
@Transactional
public void rollBackDeposit(long sendPk, long depositPk, long money) {
int updateResult = mainAccountRepository.deposit(sendPk, money);

// 상대 계좌에 업데이트가 되지 않은 경우 롤백 실패 예외가 발생한다.
if (updateResult == 0) {
throw AccountAsyncErrorCode.SEND_ROLLBACK_FAILED.accountAsyncException();
}

}
}
Loading

0 comments on commit 7240ae2

Please sign in to comment.