Skip to content

Commit

Permalink
test: 처리하지 못한 pending 메세지 처리 테스트 수정
Browse files Browse the repository at this point in the history
  • Loading branch information
seungh1024 committed May 17, 2024
1 parent c78a99c commit 3d558d5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ void concurrency_send_to_other_account_with_same_condition() throws InterruptedE
countDownLatch.await();
executorService.shutdown();

executor.getThreadPoolExecutor().awaitTermination(5, TimeUnit.SECONDS);
executor.getThreadPoolExecutor().awaitTermination(5, TimeUnit.SECONDS);

MainAccount resultMainAccount1 = mainAccountRepository.findById(mainAccountPk[0]).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.ActiveProfiles;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -73,9 +74,8 @@ public class PendingMessageTest {
class SendToSavingAccount {

@BeforeEach
void accountInit() {
void accountInit() throws InterruptedException {
createAccount();
createPendingMessage();
}

@AfterEach
Expand All @@ -91,17 +91,12 @@ void pending_message_will_rollback() throws InterruptedException {
// Given
MainAccount sendAccount = mainAccountRepository.findById(mainAccountPk[0]).get();
long originMoney = sendAccount.getMoney();

PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroup,
claimConsumerName);
for (PendingMessage pendingMessage : pendingMessages) {
redisOperator.claimMessage(pendingMessage, streamKey, consumerName); // 원래 consumer name으로 변경
}
makePendingMessage();

// When
executor.initialize();
pendingMessageScheduler.consumePendingMessage();
executor.getThreadPoolExecutor().awaitTermination(5, TimeUnit.SECONDS);
executor.getThreadPoolExecutor().awaitTermination(2, TimeUnit.SECONDS);

// Then
MainAccount rollBackMember = mainAccountRepository.findById(mainAccountPk[0]).get();
Expand All @@ -114,19 +109,27 @@ void not_consumed_pending_message_will_rollback() throws InterruptedException {
// Given
MainAccount sendAccount = mainAccountRepository.findById(mainAccountPk[0]).get();
long originMoney = sendAccount.getMoney();
// 3초 이상 된 메세지만 claim 처리를 하므로 3초 기다린다
executor.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS);
makePendingMessage();

// claim처리 안된 메세지 claim 처리
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroup, consumerName);
for (PendingMessage pendingMessage : pendingMessages) {
redisOperator.claimMessage(pendingMessage, streamKey, claimConsumerName);
}
Awaitility.await().atLeast(3, TimeUnit.SECONDS).until(() -> {
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroup,
consumerName);
if (pendingMessages.isEmpty()) {
return true;
}
// System.out.println(pendingMessages);
for (PendingMessage pendingMessage : pendingMessages) {
redisOperator.claimMessage(pendingMessage, streamKey, claimConsumerName);
// System.out.println("pending message = " + pendingMessage);
}
return false;
});

// When
executor.initialize();
pendingMessageScheduler.consumeClaimMessage();
executor.getThreadPoolExecutor().awaitTermination(10, TimeUnit.SECONDS);
executor.getThreadPoolExecutor().awaitTermination(2, TimeUnit.SECONDS);

// Then
MainAccount rollBackMember = mainAccountRepository.findById(mainAccountPk[0]).get();
Expand All @@ -135,14 +138,13 @@ void not_consumed_pending_message_will_rollback() throws InterruptedException {

}

void createPendingMessage() {
// 레디스에 값 넣자마자 스레드 풀이 동작하기 때문에 잠시 꺼두고 ack하기 전에 redis의 메세지 pending 처리
void makePendingMessage() throws InterruptedException {
executor.shutdown();
redisOperator.addStream(streamKey, mainAccountPk[0], mainAccountPk[1], 1000);
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroup, consumerName);
for (PendingMessage pendingMessage : pendingMessages) {
redisOperator.claimMessage(pendingMessage, streamKey, claimConsumerName);
}
// 이벤트 발생으로 스레드 풀에 들어간 작업 제거
executor.getThreadPoolExecutor().remove(() -> {

});
}

void createAccount() {
Expand Down

0 comments on commit 3d558d5

Please sign in to comment.