Skip to content

Commit

Permalink
Apply new potential fix to random bug #545
Browse files Browse the repository at this point in the history
Notice: Had to fix frontend dependencies going crazy when doing a clean install

Also fixed a mysql container error due to changes in modern mysql version
  • Loading branch information
KevinGuancheDarias committed Sep 7, 2024
1 parent fa5090b commit 5409ec5
Show file tree
Hide file tree
Showing 9 changed files with 7,022 additions and 7,798 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public static Set<String> get() {
}
return instance;
}

/**
* Notice: Should only be use by thread executors to pass the context to the thread, example:
* {@link com.kevinguanchedarias.owgejava.configurations.TaskExecutorConfiguration.ContextAwarePoolExecutor}
*/
public static void set(Set<String> keys) {
LOCKED_IDS_FOR_CURRENT_THREAD.set(keys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

@Service
@AllArgsConstructor
Expand All @@ -28,34 +29,48 @@ public class MysqlLockUtilService {
private final MysqlInformationRepository mysqlInformationRepository;

public void doInsideLock(Set<String> wantedKeys, Runnable runnable) {
var alreadyLockedSet = MysqlLockState.get();
var keys = wantedKeys.stream().filter(wantedKey -> !alreadyLockedSet.contains(wantedKey)).collect(Collectors.toSet());
if (keys.isEmpty()) {
log.debug("Not locking as already locked, wanted to lock = {}, already thread-locked = {}", wantedKeys, alreadyLockedSet);
runnable.run();
} else {
log.trace("Applying the following locks {} of wanted = {}", keys, wantedKeys);
var keysAsList = keys.stream().sorted().toList();
var commandLambda = (PreparedStatementCallback<String>) ps -> {
generateBindParams(keysAsList, ps);
var rs = ps.executeQuery();
rs.next();
return rs.getString(1);
};
maybeTriggerThreadWarning();
var previouslyLocked = unlockAlreadyLockedAndReturn();
Set<String> keys = new HashSet<>();
keys.addAll(wantedKeys);
keys.addAll(previouslyLocked);
log.trace("Applying the following locks {} of wanted = {}, previously had locked {}", keys, wantedKeys, previouslyLocked);
var keysAsList = keys.stream().sorted().toList();
var commandLambda = (PreparedStatementCallback<String>) ps -> {
generateBindParams(keysAsList, ps);
var rs = ps.executeQuery();
rs.next();
return rs.getString(1);
};

try {
tryGainLock(keysAsList, commandLambda, runnable, 1);
} finally {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
log.debug("Mysql lock was invoked with an active transaction");
transactionUtilService.doAfterCompletion(() -> doReleaseLock(keysAsList));
} else {
doReleaseLock(keysAsList);
}
try {
tryGainLock(keysAsList, commandLambda, runnable, 1);
} finally {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
log.debug("Mysql lock was invoked with an active transaction");
transactionUtilService.doAfterCompletion(() -> doReleaseLock(keysAsList));
} else {
doReleaseLock(keysAsList);
}
}
}

private Set<String> unlockAlreadyLockedAndReturn() {
var alreadyLockedSet = MysqlLockState.get();
if (!CollectionUtils.isEmpty(alreadyLockedSet)) {
doReleaseLock(alreadyLockedSet.stream().sorted().toList());
}

return alreadyLockedSet;
}

private void maybeTriggerThreadWarning() {
var threadName = Thread.currentThread().getName();
if(!threadName.startsWith("virtual") && !threadName.startsWith("OWGE_BACKGROUND")) {
log.warn("This has been invoked from outside of a trusted thread... random bug may appear, name: {}", threadName);
}
}

private void tryGainLock(
List<String> keysAsList, PreparedStatementCallback<String> preparedStatementCallback, Runnable action, int times
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.kevinguanchedarias.owgejava.configurations;

import com.kevinguanchedarias.owgejava.business.mysql.MysqlLockState;
import com.kevinguanchedarias.owgejava.context.OwgeContextHolder;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
Expand All @@ -8,11 +11,23 @@
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.io.Serial;

@Configuration
@EnableAsync
public class TaskExecutorConfiguration {

public class ContextAwareCallable implements Runnable {
@Bean(name = "contextAwareTaskExecutor")
public TaskExecutor getContextAwareTaskExecutor() {
var taskExecutor = new ContextAwarePoolExecutor();
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(20);
taskExecutor.setQueueCapacity(1000);
taskExecutor.setThreadNamePrefix("ContextAwareExecutor-");
return taskExecutor;
}

public static class ContextAwareCallable implements Runnable {

private final Runnable task;
private final RequestAttributes context;
Expand All @@ -31,22 +46,22 @@ public void run() {
}
}

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
public static class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
@Serial
private static final long serialVersionUID = 430481863356216895L;

@Override
public void execute(Runnable task) {
super.execute(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
}
}
public void execute(@NotNull Runnable task) {
var mysqlLockState = MysqlLockState.get();
var owgeContext = OwgeContextHolder.get();

@Bean(name = "contextAwareTaskExecutor")
public TaskExecutor getContextAwareTaskExecutor() {
var taskExecutor = new ContextAwarePoolExecutor();
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(20);
taskExecutor.setQueueCapacity(1000);
taskExecutor.setThreadNamePrefix("ContextAwareExecutor-");
return taskExecutor;
Runnable taskWithFullContext = () -> {
// This is run inside a thread in the pool
MysqlLockState.set(mysqlLockState);
owgeContext.ifPresent(OwgeContextHolder::set);
task.run();
};
super.execute(new ContextAwareCallable(taskWithFullContext, RequestContextHolder.currentRequestAttributes()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand All @@ -24,7 +23,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import static com.kevinguanchedarias.owgejava.business.mysql.MysqlLockUtilService.TIMEOUT_SECONDS;
Expand All @@ -49,15 +47,14 @@ class MysqlLockUtilServiceTest {
private static final String KEY_1 = "foo_key";
private static final String KEY_2 = "bar_key";
private static final Set<String> KEY_LIST = new LinkedHashSet<>();
private static final String EXPECTED_SQL_FOR_LOCK = "SELECT CONCAT(GET_LOCK(?,?),',',GET_LOCK(?,?));";
private static final String EXPECTED_SQL_FOR_RELEASE_LOCK = "SELECT CONCAT(RELEASE_LOCK(?),',',RELEASE_LOCK(?));";

static {
KEY_LIST.add(KEY_1);
KEY_LIST.add(KEY_2);
}

private static final String EXPECTED_SQL_FOR_LOCK = "SELECT CONCAT(GET_LOCK(?,?),',',GET_LOCK(?,?));";
private static final String EXPECTED_SQL_FOR_RELEASE_LOCK = "SELECT CONCAT(RELEASE_LOCK(?),',',RELEASE_LOCK(?));";

private final MysqlLockUtilService mysqlLockUtilService;
private final JdbcTemplate jdbcTemplate;
private final TransactionUtilService transactionUtilService;
Expand Down Expand Up @@ -89,27 +86,6 @@ public void clear() {
MysqlLockState.clear();
}

@Test
void doInsideLock_should_lock_nothing_if_empty_keys() {
mysqlLockUtilService.doInsideLock(Set.of(), runnableMock);

verify(runnableMock, times(1)).run();
verifyNoInteractions(jdbcTemplate);
}

@Test
void doInsideLock_should_lock_nothing_if_keys_already_locked() {
try (var mockedStatic = mockStatic(MysqlLockState.class)) {
mockedStatic.when(MysqlLockState::get).thenReturn(KEY_LIST);
mysqlLockUtilService.doInsideLock(KEY_LIST, runnableMock);

mockedStatic.verify(() -> MysqlLockState.addAll(any()), never());
}

verify(runnableMock, times(1)).run();
verifyNoInteractions(jdbcTemplate);
}

@Test
void doInsideLock_should_work() throws SQLException {
var preparedStatementMockForLock = handlePreparedStatementForLock();
Expand All @@ -130,35 +106,6 @@ void doInsideLock_should_work() throws SQLException {
verify(preparedStatementMockForReleaseLock, times(1)).setString(2, KEY_1);
}

@Test
void doInsideLock_should_log_thread_has_other_locks(CapturedOutput capturedOutput) throws SQLException {
var preparedStatementMockForLock = handlePreparedStatementForLock();
var resultSetMock = mock(ResultSet.class);
var preparedStatementMockForReleaseLock = handlePreparedStatementForReleaseLock();
given(preparedStatementMockForLock.executeQuery()).willReturn(resultSetMock);
given(preparedStatementMockForReleaseLock.executeQuery()).willReturn(resultSetMock);
var alreadyLockedKey = "SOME_ADDITIONAL_KEY";
try (var mockedStatic = mockStatic(MysqlLockState.class)) {
mockedStatic.when(MysqlLockState::get).thenReturn(Set.of(alreadyLockedKey));
mysqlLockUtilService.doInsideLock(Set.of(KEY_1, KEY_2, alreadyLockedKey), runnableMock);

assertThat(capturedOutput.getOut()).contains("While keys").contains("has been deleted, the thread");
var captor = ArgumentCaptor.forClass(List.class);
mockedStatic.verify(() -> MysqlLockState.addAll(captor.capture()), times(1));
var results = captor.getValue();
assertThat(results).containsExactlyInAnyOrder(KEY_1, KEY_2);
}

verify(jdbcTemplate, times(1)).execute(eq(EXPECTED_SQL_FOR_LOCK), any(PreparedStatementCallback.class));
verify(jdbcTemplate, times(1)).execute(eq(EXPECTED_SQL_FOR_RELEASE_LOCK), any(PreparedStatementCallback.class));
verify(preparedStatementMockForLock, times(1)).setString(1, KEY_2);
verify(preparedStatementMockForLock, times(1)).setInt(2, TIMEOUT_SECONDS);
verify(preparedStatementMockForLock, times(1)).setString(3, KEY_1);
verify(preparedStatementMockForLock, times(1)).setInt(4, TIMEOUT_SECONDS);
verify(preparedStatementMockForReleaseLock, times(1)).setString(1, KEY_2);
verify(preparedStatementMockForReleaseLock, times(1)).setString(2, KEY_1);
}

@CsvSource({
"true,1",
"false,0"
Expand Down Expand Up @@ -199,7 +146,7 @@ void doInsideLock_should_properly_handle_db_error_on_lock_arg_binding() throws S
}

@Test
void doInsideLock_should_retry_if_deadlock(CapturedOutput capturedOutput) throws SQLException {
void doInsideLock_should_retry_if_deadlock(CapturedOutput capturedOutput) {
given(jdbcTemplate.execute(eq(EXPECTED_SQL_FOR_LOCK), any(PreparedStatementCallback.class)))
.willReturn("1,0")
.willReturn("1,1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ COPY *.sql /tmp/
RUN echo "[mysqld]" > /etc/mysql/conf.d/mysqld.cnf
RUN echo "lower_case_table_names=1" >> /etc/mysql/conf.d/mysqld.cnf
RUN echo "innodb_use_native_aio=0" >> /etc/mysql/conf.d/mysqld.cnf
RUN echo "default-authentication-plugin=mysql_native_password" >> /etc/mysql/conf.d/mysqld.cnf
RUN echo "mysql_native_password=ON" >> /etc/mysql/conf.d/mysqld.cnf
RUN cat /tmp/*.sql > /docker-entrypoint-initdb.d/init.sql
Loading

0 comments on commit 5409ec5

Please sign in to comment.