Skip to content

Commit

Permalink
feat(lock): add distributed lock (#30)
Browse files Browse the repository at this point in the history
* add MySQLReentrantLock

* feat(lock):support MySQLReentrantLock to concurrent

* add concurrent test functions

* add distributed_reentrant_locks table

* update migration and RedisReentrantLock

* add some comments

* add MySQLReentrantReadWriteLock

* update read and write lock

* add RedisReentrantReadWriteLock

* add RedisReentrantReadWriteLockTest

* update MySQLReentrantReadWriteLock

* update RedisReentrantReadWriteLock

* update comments

* fix RedisReentrantReadWriteLock comments

* fix():remove entityManager

* fix(lock): test lock degradation and fix some comments
  • Loading branch information
GODVvVZzz authored Oct 30, 2023
1 parent 8e7a4d0 commit 69d99ee
Show file tree
Hide file tree
Showing 20 changed files with 2,598 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.quarkus.redis.client.RedisClient;
import xyz.eulix.platform.services.config.ApplicationProperties;
import xyz.eulix.platform.services.lock.service.ReentrantLockService;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
Expand All @@ -28,12 +29,23 @@ public class DistributedLockFactory {
@Inject
RedisClient redisClient;

@Inject
ReentrantLockService mysqlLockService;

@Inject
ApplicationProperties applicationProperties;

public DistributedLock newLock(String keyName) {
public DistributedLock newLock(String keyName, LockType lockType) {
String lockValue = UUID.randomUUID().toString();
Integer timeout = applicationProperties.getLockExpireTime(); // 单位s
return new RedisReentrantLock(redisClient, keyName, lockValue, timeout);

if (lockType.equals(LockType.RedisReentrantLock)) {
return new RedisReentrantLock(redisClient, keyName, lockValue, timeout);
} else if(lockType.equals(LockType.MySQLReentrantLock)) {
return new MySQLReentrantLock(mysqlLockService, keyName, lockValue, timeout);
} else {
throw new IllegalArgumentException("Invalid lock type: " + lockType);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package xyz.eulix.platform.services.lock;


/**
* @author VvV
* @date 2023/8/4
*/
public interface DistributedReadWriteLock {
/**
* Returns the lock used for reading.
*/
DistributedLock readLock();

/**
* Returns the lock used for writing.
*/
DistributedLock writeLock();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package xyz.eulix.platform.services.lock;

import io.quarkus.redis.client.RedisClient;
import xyz.eulix.platform.services.config.ApplicationProperties;
import xyz.eulix.platform.services.lock.service.ReentrantReadWriteLockService;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

/**
* @author VvV
* @date 2023/8/4
*/
@ApplicationScoped
public class DistributedReadWriteLockFactory {

@Inject
RedisClient redisClient;

@Inject
ReentrantReadWriteLockService mysqlLockService;

@Inject
ApplicationProperties applicationProperties;

public DistributedReadWriteLock newLock(String keyName, LockType lockType) {
Integer timeout = applicationProperties.getLockExpireTime(); // 单位s

if(lockType.equals(LockType.MySQLReentrantReadWriteLock)) {
return new MySQLReentrantReadWriteLock(mysqlLockService, keyName, timeout);
} else if (lockType.equals(LockType.RedisReentrantReadWriteLock)) {
return new RedisReentrantReadWriteLock(redisClient, keyName, timeout);
} else {
throw new IllegalArgumentException("Invalid lock type: " + lockType);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package xyz.eulix.platform.services.lock;

/**
* @author VvV
* @date 2023/7/27
*/
public enum LockType {

RedisReentrantLock,

MySQLReentrantLock,

MySQLReentrantReadWriteLock,

RedisReentrantReadWriteLock
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package xyz.eulix.platform.services.lock;

import org.jboss.logging.Logger;
import xyz.eulix.platform.services.lock.service.ReentrantLockService;

import java.util.concurrent.TimeUnit;

public class MySQLReentrantLock implements DistributedLock {

private static final Logger LOG = Logger.getLogger("app.log");

private ReentrantLockService lockService;

// 锁的key(资源的唯一标识)
private String keyName;

// 拥有锁的实例uuid
private String lockValue;

// 锁超时时间
private Integer timeout;

public MySQLReentrantLock(ReentrantLockService lockService, String keyName, String lockValue, Integer timeout) {
this.lockService = lockService;
this.keyName = keyName;
this.lockValue = lockValue;
this.timeout = timeout * 1000; // 单位ms
}

@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
long start = System.currentTimeMillis();
long end;
long sleepTime = 1L; // 重试间隔时间,单位ms。指数增长,最大值为1024ms
do {
//尝试获取锁
boolean success = tryLock();
if (success) {
//成功获取锁,返回
LOG.debugv("acquire lock success, keyName:{0}", keyName);
return true;
}
// 等待后继续尝试获取
if (sleepTime < 1000L) {
sleepTime = sleepTime << 1;
}
LOG.debugv("acquire lock fail, retry after: {0}ms", sleepTime);
Thread.sleep(sleepTime);
end = System.currentTimeMillis();
} while (end-start < unit.toMillis(waitTime));
LOG.debugv("acquire lock timeout, elapsed: {0}ms", System.currentTimeMillis() - start);
return false;
}

@Override
public boolean tryLock() {
lockService.deleteExpiredLock(keyName);
return lockService.tryLock(keyName, lockValue, timeout);
}

@Override
public void unlock() {
lockService.releaseLock(keyName, lockValue, timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package xyz.eulix.platform.services.lock;

import org.jboss.logging.Logger;
import xyz.eulix.platform.services.lock.service.ReentrantReadWriteLockService;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* @author VvV
* @date 2023/8/4
*/
public class MySQLReentrantReadWriteLock implements DistributedReadWriteLock{

private static final Logger LOG = Logger.getLogger("app.log");

private ReentrantReadWriteLockService lockService;
private String keyName;

private int timeout;

public MySQLReentrantReadWriteLock(ReentrantReadWriteLockService lockService, String keyName, int timeout) {
this.lockService = lockService;
this.keyName = keyName;
this.timeout = timeout * 1000;
}

/**
* Returns the lock used for reading.
*/
@Override
public MySQLReentrantReadWriteLock.ReadLock readLock() {
String lockValue = UUID.randomUUID().toString();
return new ReadLock(lockValue);
}

/**
* Returns the lock used for writing.
*/
@Override
public MySQLReentrantReadWriteLock.WriteLock writeLock() {
String lockValue = UUID.randomUUID() + ":write";
return new WriteLock(lockValue);
}

public class ReadLock implements DistributedLock {

private String lockValue;

public ReadLock(String lockValue) {
this.lockValue = lockValue;
}

public String getLockValue() {
return lockValue;
}

/**
* 在有效时间内阻塞加锁,可被中断
*
* @param waitTime
* @param unit
*/
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
long start = System.currentTimeMillis();
long end;
long sleepTime = 1L; // 重试间隔时间,单位ms。指数增长,最大值为1024ms
do {
//尝试获取锁
boolean success = tryLock();
if (success) {
//成功获取锁,返回
LOG.debugv("acquire read lock success, keyName:{0}", keyName);
return true;
}
// 等待后继续尝试获取
if (sleepTime < 1000L) {
sleepTime = sleepTime << 1;
}
LOG.debugv("acquire read lock fail, retry after: {0}ms", sleepTime);
Thread.sleep(sleepTime);
end = System.currentTimeMillis();
} while (end-start < unit.toMillis(waitTime));
LOG.debugv("acquire read lock timeout, elapsed: {0}ms", System.currentTimeMillis() - start);
return false;
}

/**
* 尝试加锁
*/
@Override
public boolean tryLock() {
return lockService.tryReadLock(keyName, lockValue, timeout);
}

/**
* 解锁操作
*/
@Override
public void unlock() {
lockService.releaseReadLock(keyName, lockValue, timeout);
}
}

public class WriteLock implements DistributedLock {

private String readLockValue;

private String lockValue;

public WriteLock(String lockValue) {
this.lockValue = lockValue + ":write";
this.readLockValue = lockValue;
}

public String getLockValue() {
return lockValue;
}

public ReadLock getCorrespondingReadLock() {
return new ReadLock(readLockValue);
}

/**
* 在有效时间内阻塞加锁,可被中断
*
* @param waitTime
* @param unit
*/
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
long start = System.currentTimeMillis();
long end;
long sleepTime = 1L; // 重试间隔时间,单位ms。指数增长,最大值为1024ms
do {
//尝试获取锁
boolean success = tryLock();
if (success) {
//成功获取锁,返回
LOG.debugv("acquire write lock success, keyName:{0}", keyName);
return true;
}
// 等待后继续尝试获取
if (sleepTime < 1000L) {
sleepTime = sleepTime << 1;
}
LOG.debugv("acquire write lock fail, retry after: {0}ms", sleepTime);
Thread.sleep(sleepTime);
end = System.currentTimeMillis();
} while (end-start < unit.toMillis(waitTime));
LOG.debugv("acquire write lock timeout, elapsed: {0}ms", System.currentTimeMillis() - start);
return false;
}

/**
* 尝试加锁
*/
@Override
public boolean tryLock() {
return lockService.tryWriteLock(keyName, lockValue, timeout);
}

/**
* 解锁操作
*/
@Override
public void unlock() {
lockService.releaseWriteLock(keyName, lockValue, timeout);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ private void releaseLock(String key, String value, Integer timeout) {
}
Integer resultNum = result.toInteger();
if (resultNum == 1) {
LOG.debugv("release lock sucess, keyName:{0}, lockValue:{1}", key, value);
LOG.debugv("release lock success, keyName:{0}, lockValue:{1}", key, value);
} else {
LOG.debugv("Decrease lock times sucess, keyName:{0}, lockValue:{1}", key, value);
LOG.debugv("Decrease lock times success, keyName:{0}, lockValue:{1}", key, value);
}
}

Expand Down
Loading

0 comments on commit 69d99ee

Please sign in to comment.