Skip to content

Commit

Permalink
Add segmented locks to TSC
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed May 30, 2024
1 parent c7e8421 commit 4dea454
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final TieredSpilloverCacheStatsHolder statsHolder;
private ToLongBiFunction<ICacheKey<K>, V> weigher;
private final List<String> dimensionNames;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

// The locks ensure keys can't end up in both the heap and disk tier simultaneously.
// For performance, we have several locks, which effectively segment the cache by key.hashCode().
private final int NUM_LOCKS = 256;
private final ReleasableLock[] readLocks;
private final ReleasableLock[] writeLocks;

/**
* Maintains caching tiers in ascending order of cache latency.
*/
Expand Down Expand Up @@ -139,6 +143,15 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
builder.cacheConfig.getClusterSettings()
.addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache);

ReadWriteLock[] locks = new ReadWriteLock[NUM_LOCKS];
this.readLocks = new ReleasableLock[NUM_LOCKS];
this.writeLocks = new ReleasableLock[NUM_LOCKS];
for (int i = 0; i < NUM_LOCKS; i++) {
locks[i] = new ReentrantReadWriteLock();
readLocks[i] = new ReleasableLock(locks[i].readLock());
writeLocks[i] = new ReleasableLock(locks[i].writeLock());
}
}

// Package private for testing
Expand Down Expand Up @@ -170,7 +183,7 @@ public V get(ICacheKey<K> key) {

@Override
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
try (ReleasableLock ignore = getWriteLockForKey(key).acquire()) {
onHeapCache.put(key, value);
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
Expand All @@ -191,7 +204,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
try (ReleasableLock ignore = getWriteLockForKey(key).acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
// Handle stats
Expand Down Expand Up @@ -234,7 +247,7 @@ public void invalidate(ICacheKey<K> key) {
statsHolder.removeDimensions(dimensionValues);
}
if (key.key != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
try (ReleasableLock ignore = getWriteLockForKey(key).acquire()) {
cacheEntry.getKey().invalidate(key);
}
}
Expand All @@ -243,10 +256,20 @@ public void invalidate(ICacheKey<K> key) {

@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
// For cache-wide operations like refresh() and invalidateAll(), all the segment locks must be acquired.
// To avoid possible deadlock if they run at the same time, they acquire the locks in index order and
// release them in reverse order.
try {
for (int i = 0; i < NUM_LOCKS; i++) {
writeLocks[i].acquire();
}
for (Map.Entry<ICache<K, V>, TierInfo> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidateAll();
}
} finally {
for (int i = NUM_LOCKS - 1; i >= 0; i--) {
writeLocks[i].close();
}
}
statsHolder.reset();
}
Expand Down Expand Up @@ -275,10 +298,18 @@ public long count() {

@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
try {
// Acquire the locks in index order
for (int i = 0; i < NUM_LOCKS; i++) {
writeLocks[i].acquire();
}
for (Map.Entry<ICache<K, V>, TierInfo> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().refresh();
}
} finally {
for (int i = NUM_LOCKS - 1; i >= 0; i--) {
writeLocks[i].close();
}
}
}

Expand All @@ -302,7 +333,7 @@ public ImmutableCacheStatsHolder stats(String[] levels) {
*/
private Function<ICacheKey<K>, Tuple<V, String>> getValueFromTieredCache(boolean captureStats) {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
try (ReleasableLock ignore = getReadLockForKey(key).acquire()) {
for (Map.Entry<ICache<K, V>, TierInfo> cacheEntry : caches.entrySet()) {
if (cacheEntry.getValue().isEnabled()) {
V value = cacheEntry.getKey().get(key);
Expand All @@ -328,7 +359,7 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
ICacheKey<K> key = notification.getKey();
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) {
try (ReleasableLock ignore = writeLock.acquire()) {
try (ReleasableLock ignore = getWriteLockForKey(key).acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
}
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
Expand Down Expand Up @@ -371,6 +402,22 @@ boolean evaluatePolicies(V value) {
return true;
}

private int getLockIndexForKey(ICacheKey<K> key) {
// Since OpensearchOnHeapCache also uses segments based on the least significant byte of the key
// (key.hashCode() & 0xff), we use the second-least significant byte. This way, if two keys face
// lock contention in the TSC's locks, they will be unlikely to also face lock contention in OpensearchOnHeapCache.
// This should help p100 times.
return (key.hashCode() & 0xff00) >> 8;
}

private ReleasableLock getReadLockForKey(ICacheKey<K> key) {
return readLocks[getLockIndexForKey(key)];
}

private ReleasableLock getWriteLockForKey(ICacheKey<K> key) {
return writeLocks[getLockIndexForKey(key)];
}

/**
* A class which receives removal events from the heap tier.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,86 @@ public void testTierStatsAddCorrectly() throws Exception {

}

public void testGlobalOperationsDoNotCauseDeadlock() throws Exception {
// Confirm refresh() and invalidateAll(), which both require all segment locks, don't cause deadlock if run concurrently
int numEntries = 250;
int onHeapCacheSize = randomIntBetween(10, 30);
int diskCacheSize = randomIntBetween(numEntries, numEntries + 100);
int keyValueSize = 50;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
keyValueSize,
diskCacheSize,
removalListener,
Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build(),
0
);

// First try refresh() and then invalidateAll()
// Put some values in the cache
for (int i = 0; i < numEntries; i++) {
tieredSpilloverCache.computeIfAbsent(getICacheKey(UUID.randomUUID().toString()), getLoadAwareCacheLoader());
}
assertEquals(numEntries, tieredSpilloverCache.count());

Phaser phaser = new Phaser(3);
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread refreshThread = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
tieredSpilloverCache.refresh();
countDownLatch.countDown();
});
Thread invalidateThread = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
tieredSpilloverCache.invalidateAll();
countDownLatch.countDown();
});

refreshThread.start();
invalidateThread.start();
phaser.arriveAndAwaitAdvance();
countDownLatch.await();

// This should terminate and we should see an empty cache
assertEquals(0, tieredSpilloverCache.count());

// Do it again, running invalidateAll() first and then refresh()
for (int i = 0; i < numEntries; i++) {
tieredSpilloverCache.computeIfAbsent(getICacheKey(UUID.randomUUID().toString()), getLoadAwareCacheLoader());
}
// By successfully adding values back to the cache we show the locks were correctly released by the previous cache-wide operations
assertEquals(numEntries, tieredSpilloverCache.count());

Phaser secondPhaser = new Phaser(3);
CountDownLatch secondCountDownLatch = new CountDownLatch(2);
refreshThread = new Thread(() -> {
secondPhaser.arriveAndAwaitAdvance();
tieredSpilloverCache.refresh();
secondCountDownLatch.countDown();
});
invalidateThread = new Thread(() -> {
secondPhaser.arriveAndAwaitAdvance();
tieredSpilloverCache.invalidateAll();
secondCountDownLatch.countDown();
});

invalidateThread.start();
refreshThread.start();

secondPhaser.arriveAndAwaitAdvance();
secondCountDownLatch.await();

// This should terminate and we should see an empty cache
assertEquals(0, tieredSpilloverCache.count());
}

private List<String> getMockDimensions() {
List<String> dims = new ArrayList<>();
for (String dimensionName : dimensionNames) {
Expand Down

0 comments on commit 4dea454

Please sign in to comment.