diff --git a/src/main/java/org/opensearch/knn/index/CacheMaintainer.java b/src/main/java/org/opensearch/knn/index/CacheMaintainer.java deleted file mode 100644 index c4df39d35..000000000 --- a/src/main/java/org/opensearch/knn/index/CacheMaintainer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.knn.index; - -import com.google.common.cache.Cache; - -import java.io.Closeable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Performs periodic maintenance for a Guava cache. The Guava cache is implemented in a way that maintenance operations (such as evicting expired - * entries) will only occur when the cache is accessed. See {@see Guava Cache Guide} - * for more details. Thus, to perform any pending maintenance, the cleanUp method will be called periodically from a CacheMaintainer instance. - */ -public class CacheMaintainer implements Closeable { - private final Cache cache; - private final ScheduledExecutorService executor; - private static final int DEFAULT_INTERVAL_SECONDS = 60; - - public CacheMaintainer(Cache cache) { - this.cache = cache; - this.executor = Executors.newSingleThreadScheduledExecutor(); - } - - public void startMaintenance() { - executor.scheduleAtFixedRate(this::cleanCache, DEFAULT_INTERVAL_SECONDS, DEFAULT_INTERVAL_SECONDS, TimeUnit.SECONDS); - } - - public void cleanCache() { - cache.cleanUp(); - } - - @Override - public void close() { - executor.shutdown(); - } -} diff --git a/src/main/java/org/opensearch/knn/index/ScheduledExecutor.java b/src/main/java/org/opensearch/knn/index/ScheduledExecutor.java new file mode 100644 index 000000000..74c1c4a8b --- /dev/null +++ b/src/main/java/org/opensearch/knn/index/ScheduledExecutor.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index; + +import java.io.Closeable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Executes a task periodically + + */ +public class ScheduledExecutor implements Closeable { + private final ScheduledExecutorService executor; + final Runnable task; + + /** + * @param task task to be completed + * @param scheduleMillis time in milliseconds to wait before executing the task again + */ + public ScheduledExecutor(Runnable task, long scheduleMillis) { + this.task = task; + this.executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate( + task, + 0, + scheduleMillis, + TimeUnit.MILLISECONDS + ); + } + + @Override + public void close() { + executor.shutdown(); + } +} diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index e20c81442..f0d932cd5 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -22,8 +22,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.knn.common.exception.OutOfNativeMemoryException; import org.opensearch.knn.common.featureflags.KNNFeatureFlags; -import org.opensearch.knn.index.CacheMaintainer; import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.ScheduledExecutor; import org.opensearch.knn.plugin.stats.StatNames; import java.io.Closeable; @@ -52,7 +52,7 @@ public class NativeMemoryCacheManager implements Closeable { private Cache cache; private Deque accessRecencyQueue; private final ExecutorService executor; - private CacheMaintainer cacheMaintainer; + private ScheduledExecutor cacheMaintainer; private AtomicBoolean cacheCapacityReached; private long maxWeight; @@ -105,14 +105,12 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { if (nativeMemoryCacheDTO.isExpirationLimited()) { cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES); + this.cacheMaintainer = new ScheduledExecutor(() -> cache.cleanUp(), 60 * 1000); } cacheCapacityReached = new AtomicBoolean(false); accessRecencyQueue = new ConcurrentLinkedDeque<>(); cache = cacheBuilder.build(); - - this.cacheMaintainer = new CacheMaintainer<>(cache); - this.cacheMaintainer.startMaintenance(); } /** diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 77afabacc..06a4dbbbf 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -14,8 +14,8 @@ import lombok.extern.log4j.Log4j2; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.knn.index.CacheMaintainer; import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.ScheduledExecutor; import java.io.Closeable; import java.io.IOException; @@ -33,7 +33,7 @@ public class QuantizationStateCache implements Closeable { private static volatile QuantizationStateCache instance; private Cache cache; - private CacheMaintainer cacheMaintainer; + private ScheduledExecutor cacheMaintainer; @Getter private long maxCacheSizeInKB; @Getter @@ -79,8 +79,7 @@ private void buildCache() { .removalListener(this::onRemoval) .build(); - this.cacheMaintainer = new CacheMaintainer<>(cache); - this.cacheMaintainer.startMaintenance(); + this.cacheMaintainer = new ScheduledExecutor(() -> cache.cleanUp(), 60 * 1000); } synchronized void rebuildCache() { diff --git a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java index 18acc54eb..0281593e6 100644 --- a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java +++ b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java @@ -18,16 +18,17 @@ public class CacheMaintainerTests { public void testCacheEviction() throws InterruptedException { Cache testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(); - CacheMaintainer cleaner = new CacheMaintainer<>(testCache); + ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000); testCache.put("key1", "value1"); assertEquals(testCache.size(), 1); Thread.sleep(1500); - cleaner.cleanCache(); + executor.task.run(); + assertEquals(testCache.size(), 0); - cleaner.close(); + executor.close(); } }