Skip to content

Commit

Permalink
remove try lock interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Sep 3, 2024
1 parent a255e52 commit 7e7e0da
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public interface AsyncLock {

Expand Down Expand Up @@ -53,22 +52,6 @@ enum LockStatus {
*/
CompletableFuture<Void> tryLock();

/**
* Tries to acquire the lock asynchronously within a specified time.
*
* @param time the time to wait
* @param unit the time unit of the time parameter
* @return a CompletableFuture that completes when the lock is acquired or not. The future will
* complete exceptionally with a {@link
* io.streamnative.oxia.client.api.exceptions.LockException.LockBusyException} if the lock is
* acquired by others, or with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} if the
* current lock status is not {@link LockStatus#INIT} or {@link LockStatus#RELEASED}.
* Additionally, the future will complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException} in case of an unknown error.
*/
CompletableFuture<Void> tryLock(long time, TimeUnit unit);

/**
* Asynchronously releases the lock.
*
Expand Down Expand Up @@ -104,24 +87,6 @@ enum LockStatus {
*/
CompletableFuture<Void> tryLock(ExecutorService executorService);

/**
* Tries to acquire the lock asynchronously within a specified time using a specified
* ExecutorService.
*
* @param time the time to wait
* @param unit the time unit of the time parameter
* @param executorService the ExecutorService to use for acquiring the lock
* @return a CompletableFuture that completes when the lock is acquired or not. The future will
* complete exceptionally with a {@link
* io.streamnative.oxia.client.api.exceptions.LockException.LockBusyException} if the lock is
* acquired by others, or with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} if the
* current lock status is not {@link LockStatus#INIT} or {@link LockStatus#RELEASED}.
* Additionally, the future will complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException} in case of an unknown error.
*/
CompletableFuture<Void> tryLock(long time, TimeUnit unit, ExecutorService executorService);

/**
* Asynchronously releases the lock using a specified ExecutorService.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
*/
package io.streamnative.oxia.client.api;

import static java.util.concurrent.TimeUnit.*;

import java.time.Clock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.TimeUnit;

public record OptionBackoff(
long initDelay, TimeUnit initDelayUnit, long maxDelay, TimeUnit maxDelayUnit, Clock clock) {
long initDelay, TimeUnit initDelayUnit, long maxDelay, TimeUnit maxDelayUnit) {
public static OptionBackoff DEFAULT =
new OptionBackoff(10, MILLISECONDS, 500, MILLISECONDS, Clock.systemUTC());
new OptionBackoff(10, MILLISECONDS, 500, MILLISECONDS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public void increment() {
@Test
public void testCounterWithSyncLock() throws InterruptedException {
final String lockKey = UUID.randomUUID().toString();
@Cleanup("shutdown")
final ExecutorService service =
@Cleanup("shutdown") final ExecutorService service =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final Map<String, AsyncOxiaClient> clients = new ConcurrentHashMap<>();
final Map<String, LockManager> lockManager = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -151,8 +150,7 @@ public void testCounterWithSyncLock() throws InterruptedException {
@Test
public void testCounterWithAsyncLock() throws InterruptedException {
final String lockKey = UUID.randomUUID().toString();
@Cleanup("shutdown")
final ExecutorService service =
@Cleanup("shutdown") final ExecutorService service =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final Map<String, AsyncOxiaClient> clients = new ConcurrentHashMap<>();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import static io.streamnative.oxia.client.api.AsyncLock.LockStatus.INIT;
import static io.streamnative.oxia.client.api.AsyncLock.LockStatus.RELEASED;
import static io.streamnative.oxia.client.api.AsyncLock.LockStatus.RELEASING;
import static io.streamnative.oxia.client.api.exceptions.LockException.AcquireTimeoutException;
import static io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException;
import static io.streamnative.oxia.client.api.exceptions.LockException.LockBusyException;
import static io.streamnative.oxia.client.util.CompletableFutures.unwrap;
import static io.streamnative.oxia.client.util.CompletableFutures.wrap;
import static io.streamnative.oxia.client.util.Runs.safeExecute;
import static io.streamnative.oxia.client.util.Runs.safeRun;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;

import com.google.common.base.Throwables;
Expand All @@ -50,7 +50,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -134,11 +133,6 @@ public CompletableFuture<Void> tryLock() {
return tryLock(ForkJoinPool.commonPool());
}

@Override
public CompletableFuture<Void> tryLock(long time, TimeUnit unit) {
return tryLock(time, unit, ForkJoinPool.commonPool());
}

@Override
public CompletableFuture<Void> unlock() {
return unlock(ForkJoinPool.commonPool());
Expand Down Expand Up @@ -266,21 +260,6 @@ private CompletableFuture<Void> tryLock1(long version) {
});
}

@Override
public CompletableFuture<Void> tryLock(
long time, TimeUnit unit, ExecutorService callbackService) {
return lock(callbackService)
.orTimeout(time, unit)
.exceptionally(
ex -> {
final Throwable rc = unwrap(ex);
if (rc instanceof CancellationException) {
throw new CompletionException(new AcquireTimeoutException());
}
throw new CompletionException(LockException.wrap(rc));
});
}

@Override
public CompletableFuture<Void> unlock(ExecutorService executorService) {
while (true) {
Expand All @@ -296,15 +275,11 @@ public CompletableFuture<Void> unlock(ExecutorService executorService) {
if (log.isDebugEnabled()) {
log.debug("busy wait for acquiring. it should be happened very rare.");
}
final CompletableFuture<Void> r;
try {
//noinspection BusyWait
Thread.sleep(1);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return runAsync(
() -> {
throw wrap(LockException.wrap(ex));
});
waitForAWhile();
} catch (Throwable ex) {
return failedFuture(ex);
}
}
case ACQUIRED -> {
Expand All @@ -313,14 +288,9 @@ public CompletableFuture<Void> unlock(ExecutorService executorService) {
log.debug("busy wait. expect: acquired, actual: {}", STATE_UPDATER.get(this));
}
try {
//noinspection BusyWait
Thread.sleep(1);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return runAsync(
() -> {
throw wrap(LockException.wrap(ex));
});
waitForAWhile();
} catch (Throwable ex) {
return failedFuture(ex);
}
continue;
}
Expand All @@ -347,6 +317,15 @@ public CompletableFuture<Void> unlock(ExecutorService executorService) {
}
}

private static void waitForAWhile() {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw wrap(LockException.wrap(ex));
}
}

private CompletableFuture<Void> unlock0(ExecutorService executorService) {
return client
.delete(key, Set.of(DeleteOption.IfVersionIdEquals(versionId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.util.Backoff;
import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -87,7 +87,7 @@ public AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) {
optionBackoff.initDelayUnit(),
optionBackoff.maxDelay(),
optionBackoff.maxDelayUnit(),
optionBackoff.clock()),
Clock.systemUTC()),
optionAutoRevalidate));
}

Expand All @@ -101,5 +101,5 @@ public void accept(Notification notification) {
}

@Override
public void close() throws IOException {}
public void close() {}
}

0 comments on commit 7e7e0da

Please sign in to comment.