Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23303 Improve transaction performance. #4700

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public void testInitialAssignmentIsRetrievedFromPlacementDriver() {
public void testUpdateByEvent() {
tracker.start();

assertEquals(1, tracker.maxStartTime());
driver.updateReplica("s3", TABLE_ID, 0, 2);

assertEquals(2, tracker.maxStartTime());
driver.updateReplica("s3", TABLE_ID, 0, 3);

assertEquals(3, tracker.maxStartTime());

PrimaryReplicasResult replicas = tracker.primaryReplicasAsync(TABLE_ID, null).join();
assertEquals(PARTITIONS, replicas.nodeNames().size());
Expand All @@ -105,10 +105,10 @@ public void testNullReplicas() {
driver.updateReplica(null, TABLE_ID, 0, 2);
tracker.start();

assertEquals(1, tracker.maxStartTime());
driver.updateReplica(null, TABLE_ID, 1, 2);

assertEquals(2, tracker.maxStartTime());
driver.updateReplica(null, TABLE_ID, 1, 3);

assertEquals(3, tracker.maxStartTime());

PrimaryReplicasResult replicas = tracker.primaryReplicasAsync(TABLE_ID, null).join();
assertEquals(PARTITIONS, replicas.nodeNames().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable runnable)
return CompletableFuture.runAsync(runnable);
}

@Override
public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, boolean commit) {
}

@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
Expand Down Expand Up @@ -239,4 +235,9 @@ public int finished() {
public int pending() {
return 0;
}

@Override
public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, HybridTimestamp ts, boolean commit) {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
public interface HybridClock {
/**
* Creates a timestamp for new event.
* Creates a timestamp for new event. A timestamp is guarantied to be unique and monotonically grown.
*
* @return The hybrid timestamp.
*/
Expand All @@ -37,7 +37,7 @@ public interface HybridClock {
long currentLong();

/**
* Creates a timestamp for new event.
* Creates a timestamp for new event. A timestamp is guarantied to be unique and monotonically grown.
*
* @return The hybrid timestamp.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
Expand All @@ -38,55 +37,42 @@ public class HybridClockImpl implements HybridClock {
/**
* Var handle for {@link #latestTime}.
*/
private static final VarHandle LATEST_TIME;

static {
try {
LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime", long.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
private static final AtomicLongFieldUpdater<HybridClockImpl> LATEST_TIME = AtomicLongFieldUpdater.newUpdater(HybridClockImpl.class,
"latestTime");

private volatile long latestTime;

private final List<ClockUpdateListener> updateListeners = new CopyOnWriteArrayList<>();

/**
* The constructor which initializes the latest time to current time by system clock.
*/
public HybridClockImpl() {
this.latestTime = currentTime();
}

/**
* System current time in milliseconds shifting left to free insignificant bytes.
* This method is marked with a public modifier to mock in tests because there is no way to mock currentTimeMillis.
* Returns current physical time in milliseconds.
*
* @return Current time in milliseconds shifted right on two bytes.
* @return Current time.
*/
public static long currentTime() {
return System.currentTimeMillis() << LOGICAL_TIME_BITS_SIZE;
protected long physicalTime() {
return System.currentTimeMillis();
}

@Override
public long nowLong() {
public final long nowLong() {
while (true) {
long now = currentTime();

// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = latestTime;

long newLatestTime = max(oldLatestTime + 1, now);
if (oldLatestTime >= now) {
return LATEST_TIME.incrementAndGet(this);
}

if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) {
return newLatestTime;
if (LATEST_TIME.compareAndSet(this, oldLatestTime, now)) {
return now;
}
}
}

@Override
public long currentLong() {
public final long currentLong() {
long current = currentTime();
rpuch marked this conversation as resolved.
Show resolved Hide resolved

return max(latestTime, current);
Expand All @@ -107,33 +93,34 @@ private void notifyUpdateListeners(long newTs) {
}

@Override
public HybridTimestamp now() {
public final HybridTimestamp now() {
return hybridTimestamp(nowLong());
}

@Override
public HybridTimestamp current() {
public final HybridTimestamp current() {
return hybridTimestamp(currentLong());
}

/**
* Updates the clock in accordance with an external event timestamp. If the supplied timestamp is ahead of the
* current clock timestamp, the clock gets adjusted to make sure it never returns any timestamp before (or equal to)
* the supplied external timestamp.
* Updates the clock in accordance with an external event timestamp. If the supplied timestamp is ahead of the current clock timestamp,
* the clock gets adjusted to make sure it never returns any timestamp before (or equal to) the supplied external timestamp.
*
* @param requestTime Timestamp from request.
* @return The resulting timestamp (guaranteed to exceed both previous clock 'currentTs' and the supplied external ts).
*/
@Override
public HybridTimestamp update(HybridTimestamp requestTime) {
public final HybridTimestamp update(HybridTimestamp requestTime) {
while (true) {
long now = currentTime();

// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = this.latestTime;
long oldLatestTime = latestTime;

long newLatestTime = max(requestTime.longValue() + 1, max(now, oldLatestTime + 1));

// TODO https://issues.apache.org/jira/browse/IGNITE-23707 avoid CAS on logical part update.

if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) {
notifyUpdateListeners(newLatestTime);

Expand All @@ -142,6 +129,10 @@ public HybridTimestamp update(HybridTimestamp requestTime) {
}
}

private long currentTime() {
return physicalTime() << LOGICAL_TIME_BITS_SIZE;
}

@Override
public void addUpdateListener(ClockUpdateListener listener) {
updateListeners.add(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
* supports reentrancy semantics like {@link ReentrantReadWriteLock}.
*/
public class IgniteStripedReadWriteLock implements ReadWriteLock {
/** Default concurrency. */
private static final int CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);

/** Index generator. */
private static final AtomicInteger IDX_GEN = new AtomicInteger();

Expand All @@ -42,6 +45,13 @@ public class IgniteStripedReadWriteLock implements ReadWriteLock {
/** Composite write lock. */
private final WriteLock writeLock;

/**
* Creates a new instance with default concurrency level.
*/
public IgniteStripedReadWriteLock() {
this(CONCURRENCY);
}

/**
* Creates a new instance with given concurrency level.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.invoke.VarHandle;
import java.util.Comparator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class PendingComparableValuesTracker<T extends Comparable<T>, R> implemen
private volatile boolean closeGuard;

/** Busy lock to close synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final IgniteStripedReadWriteLock busyLock = new IgniteStripedReadWriteLock();
sanpwc marked this conversation as resolved.
Show resolved Hide resolved

private final Comparator<Map.Entry<T, @Nullable R>> comparator;

Expand All @@ -86,7 +87,7 @@ public PendingComparableValuesTracker(T initialValue) {
*/
public void update(T newValue, @Nullable R futureResult) {
while (true) {
if (!busyLock.enterBusy()) {
if (!busyLock.readLock().tryLock()) {
throw new TrackerClosedException();
}

Expand All @@ -104,7 +105,7 @@ public void update(T newValue, @Nullable R futureResult) {
break;
}
} finally {
busyLock.leaveBusy();
busyLock.readLock().unlock();
}
}
}
Expand All @@ -118,18 +119,20 @@ public void update(T newValue, @Nullable R futureResult) {
* @param valueToWait Value to wait.
*/
public CompletableFuture<R> waitFor(T valueToWait) {
if (!busyLock.enterBusy()) {
if (!busyLock.readLock().tryLock()) {
return failedFuture(new TrackerClosedException());
}

try {
if (current.getKey().compareTo(valueToWait) >= 0) {
return completedFuture(current.getValue());
Entry<T, @Nullable R> currentKeyValue = current;

if (currentKeyValue.getKey().compareTo(valueToWait) >= 0) {
return completedFuture(currentKeyValue.getValue());
}

return addNewWaiter(valueToWait);
} finally {
busyLock.leaveBusy();
busyLock.readLock().unlock();
}
}

Expand All @@ -139,14 +142,14 @@ public CompletableFuture<R> waitFor(T valueToWait) {
* @throws TrackerClosedException if the tracker is closed.
*/
public T current() {
if (!busyLock.enterBusy()) {
if (!busyLock.readLock().tryLock()) {
throw new TrackerClosedException();
}

try {
return current.getKey();
} finally {
busyLock.leaveBusy();
busyLock.readLock().unlock();
}
}

Expand All @@ -156,7 +159,7 @@ public void close() {
return;
}

busyLock.block();
busyLock.writeLock().lock();

TrackerClosedException trackerClosedException = new TrackerClosedException();

Expand Down
Loading