Skip to content

Commit

Permalink
Add IRC specific cache cleaner and remove from IndicesService
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 committed Mar 12, 2024
1 parent 2d99197 commit 1185cab
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand All @@ -61,6 +63,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -95,7 +98,7 @@
*
* @opensearch.internal
*/
public final class IndicesRequestCache implements RemovalListener<IndicesRequestCache.Key, BytesReference>, Closeable {
public final class IndicesRequestCache extends AbstractLifecycleComponent implements RemovalListener<IndicesRequestCache.Key, BytesReference>, Closeable {

private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class);

Expand Down Expand Up @@ -138,13 +141,26 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;
private final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;

IndicesRequestCache(Settings settings, Function<ShardId, Optional<CacheEntity>> cacheEntityFunction, CacheService cacheService) {
// pkg-private for testing
final IndicesRequestCacheCleanupManager cacheCleanupManager;
private final IndicesRequestCacheCleaner cacheCleaner;
private final TimeValue cleanInterval;
private final ThreadPool threadpool;

IndicesRequestCache(
Settings settings,
Function<ShardId, Optional<CacheEntity>> cacheEntityFunction,
CacheService cacheService,
ThreadPool threadPool
) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
this.cleanInterval = INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
long sizeInBytes = size.getBytes();
ToLongBiFunction<Key, BytesReference> weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed();
this.cacheCleanupManager = new IndicesRequestCacheCleanupManager(getStalenessThreshold(settings));
this.threadpool = threadPool;
this.cacheCleaner = new IndicesRequestCacheCleaner(this, this.threadpool, this.cleanInterval);
this.cacheEntityLookup = cacheEntityFunction;
this.cache = cacheService.createCache(
new CacheConfig.Builder<Key, BytesReference>().setSettings(settings)
Expand All @@ -159,6 +175,19 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
);
}

@Override
protected void doStart() {
threadpool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);
}

@Override
protected void doStop() {
cacheCleaner.close();
}

@Override
protected void doClose() throws IOException {}

@Override
public void close() {
cache.invalidateAll();
Expand Down Expand Up @@ -230,6 +259,39 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade
cache.invalidate(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId));
}

private final class IndicesRequestCacheCleaner implements Runnable, Releasable {

private final IndicesRequestCache indicesRequestCache;
private final ThreadPool threadPool;
private final TimeValue interval;

IndicesRequestCacheCleaner(IndicesRequestCache indicesRequestCache, ThreadPool threadPool, TimeValue interval) {
this.indicesRequestCache = indicesRequestCache;
this.threadPool = threadPool;
this.interval = interval;
}

private final AtomicBoolean closed = new AtomicBoolean(false);

@Override
public void run() {
try {
this.indicesRequestCache.cacheCleanupManager.cleanCache();
} catch (Exception e) {
logger.warn("Exception during periodic indices request cache cleanup:", e);
}
// Reschedule itself to run again if not closed
if (closed.get() == false) {
threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this);
}
}

@Override
public void close() {
closed.compareAndSet(false, true);
}
}

/**
* Loader for the request cache
*
Expand Down
24 changes: 8 additions & 16 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,18 @@ public IndicesService(
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
this.analysisRegistry = analysisRegistry;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> {
this.indicesRequestCache = new IndicesRequestCache(
settings,
(shardId -> {
IndexService indexService = this.indices.get(shardId.getIndex().getUUID());
if (indexService == null) {
return Optional.empty();
}
return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));
}), cacheService);
}),
cacheService,
threadPool
);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -443,7 +448,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
}
});
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
this.engineFactoryProviders = engineFactoryProviders;

Expand Down Expand Up @@ -1587,17 +1592,14 @@ private static final class CacheCleaner implements Runnable, Releasable {
private final ThreadPool threadPool;
private final TimeValue interval;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final IndicesRequestCache requestCache;

CacheCleaner(
IndicesFieldDataCache cache,
IndicesRequestCache requestCache,
Logger logger,
ThreadPool threadPool,
TimeValue interval
) {
this.cache = cache;
this.requestCache = requestCache;
this.logger = logger;
this.threadPool = threadPool;
this.interval = interval;
Expand All @@ -1620,16 +1622,6 @@ public void run() {
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
}

try {
this.requestCache.cleanCache();
} catch (Exception e) {
logger.warn("Exception during periodic request cache cleanup:", e);
}
// Reschedule itself to run again if not closed
if (closed.get() == false) {
threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this);
}
}

@Override
Expand Down

0 comments on commit 1185cab

Please sign in to comment.