Skip to content

Commit

Permalink
Update CacheMaintainer to be a generic ScheduledExecutor class that c…
Browse files Browse the repository at this point in the history
…an accept a Runnable and an interval

Signed-off-by: owenhalpert <[email protected]>
  • Loading branch information
owenhalpert committed Dec 4, 2024
1 parent 8392b1d commit 601a24a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 54 deletions.
42 changes: 0 additions & 42 deletions src/main/java/org/opensearch/knn/index/CacheMaintainer.java

This file was deleted.

40 changes: 40 additions & 0 deletions src/main/java/org/opensearch/knn/index/ScheduledExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Executes a task periodically
*/
public class ScheduledExecutor implements Closeable {
private final ScheduledExecutorService executor;
final Runnable task;

/**
* @param task task to be completed
* @param scheduleMillis time in milliseconds to wait before executing the task again
*/
public ScheduledExecutor(Runnable task, long scheduleMillis) {
this.task = task;
this.executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(
task,
0,
scheduleMillis,
TimeUnit.MILLISECONDS
);
}

@Override
public void close() {
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.CacheMaintainer;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.ScheduledExecutor;
import org.opensearch.knn.plugin.stats.StatNames;

import java.io.Closeable;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private CacheMaintainer<String, NativeMemoryAllocation> cacheMaintainer;
private ScheduledExecutor cacheMaintainer;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;

Expand Down Expand Up @@ -105,14 +105,12 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {

if (nativeMemoryCacheDTO.isExpirationLimited()) {
cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES);
this.cacheMaintainer = new ScheduledExecutor(() -> cache.cleanUp(), 60 * 1000);
}

cacheCapacityReached = new AtomicBoolean(false);
accessRecencyQueue = new ConcurrentLinkedDeque<>();
cache = cacheBuilder.build();

this.cacheMaintainer = new CacheMaintainer<>(cache);
this.cacheMaintainer.startMaintenance();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import lombok.extern.log4j.Log4j2;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.index.CacheMaintainer;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.ScheduledExecutor;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -33,7 +33,7 @@ public class QuantizationStateCache implements Closeable {

private static volatile QuantizationStateCache instance;
private Cache<String, QuantizationState> cache;
private CacheMaintainer<String, QuantizationState> cacheMaintainer;
private ScheduledExecutor cacheMaintainer;
@Getter
private long maxCacheSizeInKB;
@Getter
Expand Down Expand Up @@ -79,8 +79,7 @@ private void buildCache() {
.removalListener(this::onRemoval)
.build();

this.cacheMaintainer = new CacheMaintainer<>(cache);
this.cacheMaintainer.startMaintenance();
this.cacheMaintainer = new ScheduledExecutor(() -> cache.cleanUp(), 60 * 1000);
}

synchronized void rebuildCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ public class CacheMaintainerTests {
public void testCacheEviction() throws InterruptedException {
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();

CacheMaintainer<String, String> cleaner = new CacheMaintainer<>(testCache);
ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000);

testCache.put("key1", "value1");
assertEquals(testCache.size(), 1);

Thread.sleep(1500);

cleaner.cleanCache();
executor.task.run();

assertEquals(testCache.size(), 0);

cleaner.close();
executor.close();
}
}

0 comments on commit 601a24a

Please sign in to comment.