Skip to content

Commit

Permalink
Fix #516 ... again
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinGuancheDarias committed Feb 15, 2024
1 parent bc88245 commit 216ce4c
Show file tree
Hide file tree
Showing 36 changed files with 757 additions and 314 deletions.
1 change: 1 addition & 0 deletions business/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lombok.copyableAnnotations += org.springframework.context.annotation.Lazy
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void runAsyncWithoutContext(Runnable supplier) {
* @since 0.9.10
*/
public void runAsyncWithoutContextDelayed(Runnable task, long delay, int priority) {
Thread thread = new Thread(() -> {
var thread = ThreadUtil.ofVirtualUnStarted(() -> {
ThreadUtil.sleep(delay);
task.run();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public List<ObjectRelation> findByRequirementTypeAndSecondValue(RequirementTypeE
secondValue);
}

public List<ObjectRelation> findByRequirementTypeAndSecondValueIn(RequirementTypeEnum type, List<Long> secondValues) {
return objectRelationsRepository.findByRequirementsRequirementCodeAndRequirementsSecondValueIn(type.name(),
secondValues);
}

/**
* Finds by type, secondValue, and where thirdValue is greater or equal to x<br>
* Example resultant SQL: WHERE type = '$type' AND secondValue = '$secondValue'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.kevinguanchedarias.owgejava.business.speedimpactgroup.UnlockedSpeedImpactGroupService;
import com.kevinguanchedarias.owgejava.business.timespecial.UnlockableTimeSpecialService;
import com.kevinguanchedarias.owgejava.business.unit.UnlockableUnitService;
import com.kevinguanchedarias.owgejava.business.user.UserPlanetLockService;
import com.kevinguanchedarias.owgejava.business.util.TransactionUtilService;
import com.kevinguanchedarias.owgejava.dto.DtoFromEntity;
import com.kevinguanchedarias.owgejava.dto.RequirementInformationDto;
Expand All @@ -28,7 +29,6 @@
import jakarta.persistence.EntityManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -73,10 +73,9 @@ public class RequirementBo implements Serializable {
private final ObtainedUpgradeRepository obtainedUpgradeRepository;
private final UnlockedRelationRepository unlockedRelationRepository;
private final UserStorageRepository userStorageRepository;

@Autowired
private final transient UserPlanetLockService userPlanetLockService;
@Lazy
private transient RequirementInternalEventEmitterService requirementInternalEventEmitterService;
private final transient RequirementInternalEventEmitterService requirementInternalEventEmitterService;

/**
* Checks that the {@link RequirementTypeEnum} enum matches the database values
Expand Down Expand Up @@ -186,18 +185,34 @@ public void triggerLevelUpCompleted(UserStorage user, Integer upgradeId) {
*
* @author Kevin Guanche Darias
*/
@Transactional
public void triggerUnitBuildCompletedOrKilled(UserStorage user, Unit unit) {
processRelationList(objectRelationBo.findByRequirementTypeAndSecondValue(RequirementTypeEnum.HAVE_UNIT,
unit.getId().longValue()), user);
triggerUnitAmountChanged(user, unit);
triggerUnitBuildCompletedOrKilled(user, List.of(unit));
}

public void triggerUnitBuildCompletedOrKilled(UserStorage user, List<Unit> units) {
userPlanetLockService.runLockedForUser(user, () -> {
processRelationList(objectRelationBo.findByRequirementTypeAndSecondValueIn(
RequirementTypeEnum.HAVE_UNIT,
units.stream().map(unit -> unit.getId().longValue()).toList()
), user);
triggerUnitAmountChanged(user, units);
});
}

@Transactional
public void triggerUnitAmountChanged(UserStorage user, Unit unit) {
long count = obtainedUnitRepository.countByUserAndUnit(user, unit);
processRelationList(objectRelationBo.findByRequirementTypeAndSecondValueAndThirdValueGreaterThanEqual(
RequirementTypeEnum.UNIT_AMOUNT, unit.getId().longValue(), count), user);
userPlanetLockService.runLockedForUser(user, () -> {
long count = obtainedUnitRepository.countByUserAndUnit(user, unit);
processRelationList(objectRelationBo.findByRequirementTypeAndSecondValueAndThirdValueGreaterThanEqual(
RequirementTypeEnum.UNIT_AMOUNT, unit.getId().longValue(), count), user);
});
}

public void triggerUnitAmountChanged(UserStorage user, List<Unit> units) {
userPlanetLockService.runLockedForUser(user, () -> units.forEach(unit -> {
long count = obtainedUnitRepository.countByUserAndUnit(user, unit);
processRelationList(objectRelationBo.findByRequirementTypeAndSecondValueAndThirdValueGreaterThanEqual(
RequirementTypeEnum.UNIT_AMOUNT, unit.getId().longValue(), count), user);
}));
}

/**
Expand Down Expand Up @@ -489,9 +504,6 @@ private void emitUnlockedChange(UnlockedRelation unlockedRelation, ObjectEnum ob
var userId = unlockedRelation.getUser().getId();
var eventPrefix = object.name().toLowerCase();
transactionUtilService.doAfterCommit(() -> {
if (entityManager.contains(unlockedRelation)) {
entityManager.refresh(unlockedRelation);
}
socketIoService.sendMessage(userId, eventPrefix + "_unlocked_change",
() -> dtoUtilService.convertEntireArray(bo.getDtoClass(), bo.findUnlocked(userId)));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import com.kevinguanchedarias.owgejava.business.util.TransactionUtilService;
import com.kevinguanchedarias.owgejava.entity.Mission;
import com.kevinguanchedarias.owgejava.entity.UserStorage;
import com.kevinguanchedarias.owgejava.entity.util.EntityRefreshUtilService;
import com.kevinguanchedarias.owgejava.pojo.websocket.MissionWebsocketMessage;
import com.kevinguanchedarias.owgejava.repository.MissionRepository;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;

import jakarta.persistence.EntityManager;

import java.util.List;

import static com.kevinguanchedarias.owgejava.business.MissionBo.UNIT_BUILD_MISSION_CHANGE;
Expand All @@ -23,7 +22,7 @@ public class MissionEventEmitterBo {
public static final String MISSIONS_COUNT_CHANGE = "missions_count_change";

private final TransactionUtilService transactionUtilService;
private final EntityManager entityManager;
private final EntityRefreshUtilService entityRefreshUtilService;
private final SocketIoService socketIoService;
private final RunningMissionFinderBo runningMissionFinderBo;
private final MissionRepository missionRepository;
Expand Down Expand Up @@ -71,7 +70,7 @@ public void emitUnitMissionsAfterCommit(Integer userId) {
}

public void emitLocalMissionChange(Mission mission, Integer userId) {
entityManager.refresh(mission);
mission = entityRefreshUtilService.refresh(mission);
if (Boolean.FALSE.equals(mission.getInvisible())) {
emitEnemyMissionsChange(mission);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.kevinguanchedarias.owgejava.business.mission.processor;

import com.kevinguanchedarias.owgejava.builder.UnitMissionReportBuilder;
import com.kevinguanchedarias.owgejava.business.RequirementBo;
import com.kevinguanchedarias.owgejava.business.mission.MissionEventEmitterBo;
import com.kevinguanchedarias.owgejava.business.mission.MissionUnitsFinderBo;
import com.kevinguanchedarias.owgejava.business.unit.HiddenUnitBo;
Expand All @@ -10,14 +11,13 @@
import com.kevinguanchedarias.owgejava.entity.Mission;
import com.kevinguanchedarias.owgejava.entity.ObtainedUnit;
import com.kevinguanchedarias.owgejava.entity.UserStorage;
import com.kevinguanchedarias.owgejava.entity.util.EntityRefreshUtilService;
import com.kevinguanchedarias.owgejava.enumerations.MissionType;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import jakarta.persistence.EntityManager;

import java.util.List;

@Service
Expand All @@ -28,8 +28,9 @@ public class DeployMissionProcessor implements MissionProcessor {
private final TransactionUtilService transactionUtilService;
private final ObtainedUnitEventEmitter obtainedUnitEventEmitter;
private final MissionEventEmitterBo missionEventEmitterBo;
private final EntityManager entityManager;
private final EntityRefreshUtilService entityRefreshUtilService;
private final HiddenUnitBo hiddenUnitBo;
private final RequirementBo requirementBo;

@Override
public boolean supports(MissionType missionType) {
Expand All @@ -46,7 +47,7 @@ public UnitMissionReportBuilder process(Mission mission, List<ObtainedUnit> invo
.map(current -> obtainedUnitBo.moveUnit(current, userId, mission.getTargetPlanet().getId()))
.toList();

var deployedMission = alteredUnits.get(0).getMission();
var deployedMission = alteredUnits.getFirst().getMission();
if (deployedMission != null) {
deployedMission.setInvisible(deployedMission.getInvolvedUnits().stream().allMatch(
involvedUnit -> hiddenUnitBo.isHiddenUnit(user, involvedUnit.getUnit())
Expand All @@ -55,9 +56,12 @@ public UnitMissionReportBuilder process(Mission mission, List<ObtainedUnit> invo

mission.setResolved(true);
transactionUtilService.doAfterCommit(() -> {
alteredUnits.forEach(entityManager::refresh);
alteredUnits.forEach(entityRefreshUtilService::refresh);
if (user.equals(mission.getTargetPlanet().getOwner())) {
obtainedUnitEventEmitter.emitObtainedUnits(user);
transactionUtilService.runWithRequiresNew(() ->
requirementBo.triggerUnitBuildCompletedOrKilled(user, alteredUnits.stream().map(ObtainedUnit::getUnit).toList())
);
}
missionEventEmitterBo.emitLocalMissionChange(mission, user.getId());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ public UnitMissionReportBuilder process(Mission mission, List<ObtainedUnit> invo
.runAsyncWithoutContextDelayed(
() -> {
if (planetOwnerOpt.isPresent() && planetOwnerOpt.get().getId().equals(userId)) {
obtainedUnits.stream().map(ObtainedUnit::getUnit)
.forEach(current -> {
requirementBo.triggerUnitBuildCompletedOrKilled(user, current);
requirementBo.triggerUnitAmountChanged(user, current);
});
obtainedUnits.stream().map(ObtainedUnit::getUnit).forEach(current -> {
requirementBo.triggerUnitBuildCompletedOrKilled(user, current);
requirementBo.triggerUnitAmountChanged(user, current);
});
}
obtainedUnitEventEmitter.emitObtainedUnits(mission.getUser());
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.kevinguanchedarias.owgejava.business.mysql;

import lombok.experimental.UtilityClass;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

@UtilityClass
public class MysqlLockState {
private static final InheritableThreadLocal<Set<String>> LOCKED_IDS_FOR_CURRENT_THREAD = new InheritableThreadLocal<>();

public static void addAll(List<String> ids) {
get().addAll(ids);
}

public static void removeAll(List<String> id) {
id.forEach(get()::remove);
}

public static void clear() {
LOCKED_IDS_FOR_CURRENT_THREAD.remove();
}

public static Set<String> get() {
Set<String> instance;
if ((instance = LOCKED_IDS_FOR_CURRENT_THREAD.get()) == null) {
instance = new HashSet<>();
LOCKED_IDS_FOR_CURRENT_THREAD.set(instance);
}
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

@Service
@AllArgsConstructor
Expand All @@ -26,10 +27,14 @@ public class MysqlLockUtilService {
private final TransactionUtilService transactionUtilService;
private final MysqlInformationRepository mysqlInformationRepository;

public void doInsideLock(Set<String> keys, Runnable runnable) {
public void doInsideLock(Set<String> wantedKeys, Runnable runnable) {
var alreadyLockedSet = MysqlLockState.get();
var keys = wantedKeys.stream().filter(wantedKey -> !alreadyLockedSet.contains(wantedKey)).collect(Collectors.toSet());
if (keys.isEmpty()) {
log.debug("Not locking as already locked, wanted to lock = {}, already thread-locked = {}", wantedKeys, alreadyLockedSet);
runnable.run();
} else {
log.trace("Applying the following locks {} of wanted = {}", keys, wantedKeys);
var keysAsList = keys.stream().sorted().toList();
var commandLambda = (PreparedStatementCallback<String>) ps -> {
generateBindParams(keysAsList, ps);
Expand Down Expand Up @@ -60,6 +65,7 @@ private void tryGainLock(
} else {
int acquiredLocks = Arrays.stream(result.split(",")).mapToInt(Integer::valueOf).reduce(0, Integer::sum);
if (acquiredLocks == keysAsList.size()) {
MysqlLockState.addAll(keysAsList);
action.run();
} else if (times < 5) {
ThreadUtil.sleep(200);
Expand Down Expand Up @@ -89,7 +95,7 @@ private PreparedStatementCallback<String> releaseLockLambda(List<String> keysAsL
}

private String generateSql(String part, List<String> keys) {
var lastKey = keys.get(keys.size() - 1);
var lastKey = keys.getLast();
return keys.stream()
.reduce("SELECT CONCAT(", (buffer, currentKey) -> buffer + part + (currentKey.equals(lastKey) ? ");" : ",',',"));
}
Expand All @@ -99,6 +105,12 @@ private void doReleaseLock(List<String> keysAsList) {
generateSql("RELEASE_LOCK(?)", keysAsList),
releaseLockLambda(keysAsList)
);
MysqlLockState.removeAll(keysAsList);
var stillLockedKeys = MysqlLockState.get();
log.trace("Released locks {}", keysAsList);
if (!stillLockedKeys.isEmpty()) {
log.debug("While keys {} has been deleted, the thread still contains {}", keysAsList, stillLockedKeys);
}
}

private void generateBindParams(List<String> keys, PreparedStatement preparedStatement) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.kevinguanchedarias.owgejava.business.user;

import com.kevinguanchedarias.owgejava.business.AsyncRunnerBo;
import com.kevinguanchedarias.owgejava.business.planet.PlanetLockUtilService;
import com.kevinguanchedarias.owgejava.entity.UserStorage;
import com.kevinguanchedarias.owgejava.repository.PlanetRepository;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

/**
* Locks user planets
*/
@Service
@RequiredArgsConstructor
public class UserPlanetLockService {
private final PlanetRepository planetRepository;
private final PlanetLockUtilService planetLockUtilService;
private final AsyncRunnerBo asyncRunnerBo;

@Resource
@Lazy
private UserPlanetLockService selfProxied;

@Transactional
public void runLockedForUser(UserStorage user, Runnable task) {
planetLockUtilService.doInsideLock(planetRepository.findByOwnerId(user.getId()), task);
}

@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void runLockedForUserDelayed(UserStorage user, Runnable task, long delay) {
asyncRunnerBo.runAsyncWithoutContextDelayed(() ->
selfProxied.runLockedForUser(user, task)
, delay);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.kevinguanchedarias.owgejava.business.util;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
Expand All @@ -11,6 +12,7 @@
*/
@Service
public class TransactionUtilService {
private static final String ALREADY_COMMITTING_KEY = "owge_already_committing";

/**
* Runs lambda using a transaction with propagation REQUIRED
Expand All @@ -23,15 +25,31 @@ public void runWithRequired(Runnable action) {
action.run();
}

public void doAfterCommit(Runnable action) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
action.run();
}
});
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void runWithRequiresNew(Runnable action) {
action.run();
}

/**
* <b>Important: If the logic inside the after commit has modifications to the DB, they wouldn't work even if @Transactional is used,
* but won't fail, if logic to modify data is going to happen, ensure it has a brand new transaction (with Requires new)</b>
*/
public void doAfterCommit(Runnable action) {
if (TransactionSynchronizationManager.hasResource(ALREADY_COMMITTING_KEY)) {
action.run();
} else {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
if (!TransactionSynchronizationManager.hasResource(ALREADY_COMMITTING_KEY)) {
TransactionSynchronizationManager.bindResource(ALREADY_COMMITTING_KEY, true);
}
action.run();
}
});
}
}

public void doAfterCompletion(Runnable action) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
Expand Down
Loading

0 comments on commit 216ce4c

Please sign in to comment.