Skip to content

Commit

Permalink
cleanup file cache on deleting index/shard directory
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr committed Dec 4, 2023
1 parent 69cc2a1 commit 50c3b60
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -28,12 +29,17 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
Expand All @@ -47,6 +53,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -750,4 +757,75 @@ private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, i
// Verifies if all the shards (primary and replica) have been deleted
assertEquals(numCacheFolderCount, searchNodeFileCachePaths.size());
}

public void testRelocateSearchableSnapshotIndex() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx-1";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

String searchNode1 = internalCluster().startSearchNode();
internalCluster().validateClusterFormed();
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

String searchNode2 = internalCluster().startSearchNode();
internalCluster().validateClusterFormed();

final Index index = resolveIndex(restoredIndexName);
assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, true);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);

// relocate the shard from node1 to node2
client.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(restoredIndexName, 0, searchNode1, searchNode2))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client.admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertDocCount(restoredIndexName, 100L);

assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, false);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, true);
deleteIndicesAndEnsureGreen(client, restoredIndexName);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
final ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardId);

assertBusy(() -> {
assertTrue(
"shard state path should " + (exists ? "exist" : "not exist"),
Files.exists(shardPath.getShardStatePath()) == exists
);
assertTrue("shard cache path should " + (exists ? "exist" : "not exist"), Files.exists(shardPath.getDataPath()) == exists);
}, 30, TimeUnit.SECONDS);

final Path indexDataPath = node.getNodeEnvironment().fileCacheNodePath().fileCachePath.resolve(index.getUUID());
final Path indexPath = node.getNodeEnvironment().fileCacheNodePath().indicesPath.resolve(index.getUUID());
assertBusy(() -> {
assertTrue("index path should " + (exists ? "exist" : "not exist"), Files.exists(indexDataPath) == exists);
assertTrue("index cache path should " + (exists ? "exist" : "not exist"), Files.exists(indexPath) == exists);
}, 30, TimeUnit.SECONDS);
}
}
67 changes: 67 additions & 0 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.util.SetOnce;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -105,6 +107,7 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* A component that holds all data paths for a single node.
Expand Down Expand Up @@ -199,6 +202,8 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private final SetOnce<FileCache> fileCache = new SetOnce<>();

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -393,6 +398,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
}

public void setFileCache(final FileCache fileCache) {
this.fileCache.set(fileCache);
}

/**
* Resolve a specific nodes/{node.id} path for the specified path and node lock id.
*
Expand Down Expand Up @@ -577,6 +586,14 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh
public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException {
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";

if (indexSettings.isRemoteSnapshot()) {
final ShardPath shardPath = ShardPath.loadFileCachePath(this, shardId);
cleanupShardFileCache(shardPath);
deleteShardFileCacheDirectory(shardPath);
logger.trace("deleted shard {} file cache directory, path: [{}]", shardId, shardPath.getDataPath());
}

final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
Expand All @@ -592,6 +609,40 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet
assert assertPathsDoNotExist(paths);
}

/**
* Cleans up the corresponding index file path entries from FileCache
*
* @param shardPath the shard path
*/
private void cleanupShardFileCache(ShardPath shardPath) {
try {
final FileCache fc = fileCache.get();
assert fc != null;
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fc.remove(subPath.toRealPath());
}
}
} catch (IOException ioe) {
logger.error(
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),
ioe
);
}
}

private void deleteShardFileCacheDirectory(ShardPath shardPath) {
final Path path = shardPath.getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e);
}
}

private static boolean assertPathsDoNotExist(final Path[] paths) {
Set<Path> existingPaths = Stream.of(paths).filter(FileSystemUtils::exists).filter(leftOver -> {
// Relaxed assertion for the special case where only the empty state directory exists after deleting
Expand Down Expand Up @@ -653,6 +704,10 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
if (indexSettings.isRemoteSnapshot()) {
deleteIndexFileCacheDirectory(index);
}

final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
Expand All @@ -663,6 +718,18 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
}
}

private void deleteIndexFileCacheDirectory(Index index) {
final Path indexCachePath = fileCacheNodePath().fileCachePath.resolve(index.getUUID());
logger.trace("deleting index {} file cache directory, path: [{}]", index, indexCachePath);
if (Files.exists(indexCachePath)) {
try {
IOUtils.rm(indexCachePath);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
}
}
}

/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
* a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -349,7 +348,6 @@ public class IndicesService extends AbstractLifecycleComponent
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private volatile TimeValue clusterDefaultRefreshInterval;
private volatile TimeValue clusterRemoteTranslogBufferInterval;
private final FileCacheCleaner fileCacheCleaner;

private final SearchRequestStats searchRequestStats;

Expand Down Expand Up @@ -382,7 +380,6 @@ public IndicesService(
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier,
FileCacheCleaner fileCacheCleaner,
SearchRequestStats searchRequestStats,
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
RecoverySettings recoverySettings
Expand Down Expand Up @@ -431,7 +428,6 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCacheCleaner = fileCacheCleaner;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -747,7 +743,6 @@ public void onStoreClosed(ShardId shardId) {
};
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
finalListeners.add(fileCacheCleaner);
final IndexService indexService = createIndexService(
CREATE_INDEX,
indexMetadata,
Expand Down
Loading

0 comments on commit 50c3b60

Please sign in to comment.