From 328a07b331b04e8b447a43c1e72cf316255eaa09 Mon Sep 17 00:00:00 2001
From: Martin Gaievski
Date: Mon, 9 Jan 2023 21:06:07 -0800
Subject: [PATCH] Focus on extensions by type, refactor code
Signed-off-by: Martin Gaievski
---
.../smbmmapfs/SmbMmapFsDirectoryFactory.java | 4 +--
.../smbniofs/SmbNIOFsDirectoryFactory.java | 5 ++--
.../org/opensearch/index/IndexModule.java | 27 +++++++++---------
.../org/opensearch/index/IndexService.java | 10 +++----
.../opensearch/index/shard/IndexShard.java | 6 ++--
.../index/store/FsDirectoryFactory.java | 18 +++++++-----
.../RemoteSnapshotDirectoryFactory.java | 8 +-----
.../opensearch/plugins/IndexStorePlugin.java | 22 +++++++++++----
.../opensearch/index/IndexModuleTests.java | 28 ++++++++-----------
.../index/store/FsDirectoryFactoryTests.java | 20 ++++++-------
.../test/store/MockFSDirectoryFactory.java | 6 ++--
11 files changed, 76 insertions(+), 78 deletions(-)
diff --git a/plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java b/plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java
index 516481870bd00..6adaa6ae22eba 100644
--- a/plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java
+++ b/plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java
@@ -43,8 +43,8 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {
@@ -53,7 +53,7 @@ protected Directory newFSDirectory(
Path location,
LockFactory lockFactory,
IndexSettings indexSettings,
- Map> additionalSettingProviders
+ Map> additionalExtensions
) throws IOException {
return new SmbDirectoryWrapper(
setPreload(
diff --git a/plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java b/plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java
index 038b106efd55b..c9764f69d22bd 100644
--- a/plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java
+++ b/plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java
@@ -11,14 +11,15 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NIOFSDirectory;
+import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.SmbDirectoryWrapper;
import java.io.IOException;
import java.nio.file.Path;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Factory to create a new NIO File System type directory accessible as a SMB share
@@ -30,7 +31,7 @@ protected Directory newFSDirectory(
Path location,
LockFactory lockFactory,
IndexSettings indexSettings,
- Map> additionalSettingProviders
+ Map> additionalExtensions
) throws IOException {
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
}
diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java
index d8b5931b22208..6d1d2c5ce2881 100644
--- a/server/src/main/java/org/opensearch/index/IndexModule.java
+++ b/server/src/main/java/org/opensearch/index/IndexModule.java
@@ -196,7 +196,7 @@ public final class IndexModule {
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map recoveryStateFactories;
- private final List>> hybridFSMMapExtensionsListSuppliers = new ArrayList<>();
+ private final List
*/
- public void addHybridFSExtensions(final Supplier> extensionsListSupplier) {
+ public void addFSTypeFileExtensions(final Map> extensionsByType) {
ensureNotFrozen();
- if (extensionsListSupplier == null) {
+ if (extensionsByType == null) {
throw new IllegalArgumentException("supplier must not be null");
}
- this.hybridFSMMapExtensionsListSuppliers.add(extensionsListSupplier);
+ this.fsExtensions.add(extensionsByType);
}
}
diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java
index 709c1e8cb1f5c..0f482198cdadd 100644
--- a/server/src/main/java/org/opensearch/index/IndexService.java
+++ b/server/src/main/java/org/opensearch/index/IndexService.java
@@ -177,7 +177,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Supplier indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier repositoriesServiceSupplier;
- private final Map> additionalSettingProviders;
+ private final Map> fsExtensions;
public IndexService(
IndexSettings indexSettings,
@@ -211,7 +211,7 @@ public IndexService(
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier repositoriesServiceSupplier,
- Map> additionalSettingProviders
+ Map> fsExtensions
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -284,7 +284,7 @@ public IndexService(
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
- this.additionalSettingProviders = Collections.unmodifiableMap(additionalSettingProviders);
+ this.fsExtensions = Objects.requireNonNull(fsExtensions);
updateFsyncTaskIfNecessary();
}
@@ -534,7 +534,7 @@ public synchronized IndexShard createShard(
this.indexSettings.getRemoteStoreTranslogRepository()
)
: new InternalTranslogFactory();
- Directory directory = directoryFactory.newDirectory(this.indexSettings, path, this.additionalSettingProviders);
+ Directory directory = directoryFactory.newDirectory(this.indexSettings, path, this.fsExtensions);
store = new Store(
shardId,
this.indexSettings,
@@ -567,7 +567,7 @@ public synchronized IndexShard createShard(
translogFactory,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
- additionalSettingProviders
+ fsExtensions
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index f8e7f64d160d9..e7cb0244528ed 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -320,7 +320,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final TranslogFactory translogFactory;
- private final Map> additionalSettingProviders;
+ private final Map> fsExtensions;
public IndexShard(
final ShardRouting shardRouting,
@@ -346,7 +346,7 @@ public IndexShard(
final TranslogFactory translogFactory,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
- final Map> additionalSettingProviders
+ final Map> fsExtensions
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
@@ -429,7 +429,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
- this.additionalSettingProviders = additionalSettingProviders;
+ this.fsExtensions = fsExtensions;
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
diff --git a/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java
index d926e63cf9a64..3c5dcdf59e347 100644
--- a/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java
+++ b/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java
@@ -55,7 +55,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -79,19 +78,24 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
}, Property.IndexScope, Property.NodeScope);
@Override
- public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Map> additionalSettingProviders)
+ public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
+ return newDirectory(indexSettings, shardPath, Map.of());
+ }
+
+ @Override
+ public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Map> additionalExtensions)
throws IOException {
final Path location = path.resolveIndex();
final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING);
Files.createDirectories(location);
- return newFSDirectory(location, lockFactory, indexSettings, additionalSettingProviders);
+ return newFSDirectory(location, lockFactory, indexSettings, additionalExtensions);
}
protected Directory newFSDirectory(
Path location,
LockFactory lockFactory,
IndexSettings indexSettings,
- Map> additionalSettingProviders
+ Map> additionalExtensions
) throws IOException {
final String storeType = indexSettings.getSettings()
.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey());
@@ -107,9 +111,9 @@ protected Directory newFSDirectory(
// Use Lucene defaults
final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory);
final Set mmapExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS));
- Objects.requireNonNull(additionalSettingProviders);
- if (additionalSettingProviders.containsKey(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS.getKey())) {
- mmapExtensions.addAll(additionalSettingProviders.get(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS.getKey()));
+ Objects.requireNonNull(additionalExtensions);
+ if (additionalExtensions.containsKey(IndexModule.Type.HYBRIDFS)) {
+ mmapExtensions.addAll(additionalExtensions.get(IndexModule.Type.HYBRIDFS));
}
if (primaryDirectory instanceof MMapDirectory) {
MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory;
diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java
index 8fed5a876d7de..fed1f127d113f 100644
--- a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java
+++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java
@@ -11,8 +11,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;
@@ -50,11 +48,7 @@ public RemoteSnapshotDirectoryFactory(Supplier repositories
}
@Override
- public Directory newDirectory(
- IndexSettings indexSettings,
- ShardPath localShardPath,
- Map> additionalSettingProviders
- ) throws IOException {
+ public Directory newDirectory(IndexSettings indexSettings, ShardPath localShardPath) throws IOException {
final String repositoryName = IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY.get(indexSettings.getSettings());
final Repository repository = repositoriesService.get().repository(repositoryName);
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
index 9f36597f23ba4..24677b2d09c81 100644
--- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
+++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
@@ -36,14 +36,15 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
+import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A plugin that provides alternative directory implementations.
@@ -59,14 +60,25 @@ public interface IndexStorePlugin {
interface DirectoryFactory {
/**
* Creates a new directory per shard. This method is called once per shard on shard creation.
- *
* @param indexSettings the shards index settings
- * @param shardPath the path the shard is using
+ * @param shardPath the path the shard is using
+ * @return a new lucene directory instance
+ * @throws IOException if an IOException occurs while opening the directory
+ */
+ Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
+
+ /**
+ * Creates a new directory per shard. This method is called once per shard on shard creation.
+ * @param indexSettings the shards index settings
+ * @param shardPath the path the shard is using
+ * @param extensions map of providers for additional settings where key is a setting name
* @return a new lucene directory instance
* @throws IOException if an IOException occurs while opening the directory
*/
- Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Map> additionalSettingProviders)
- throws IOException;
+ default Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Map> extensions)
+ throws IOException {
+ return newDirectory(indexSettings, shardPath);
+ }
}
/**
diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java
index b0cf17cb3f539..ea725ae78fe66 100644
--- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java
+++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java
@@ -46,6 +46,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.SetOnce.AlreadySetException;
+import org.mockito.ArgumentCaptor;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
@@ -121,7 +122,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -130,7 +130,6 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -653,8 +652,8 @@ public void testHybridFSExtensionsList() throws IOException {
);
try {
- module.addHybridFSExtensions(() -> List.of("vec", "vem"));
- module.addHybridFSExtensions(() -> List.of("txt", "vem"));
+ module.addFSTypeFileExtensions(Map.of(IndexModule.Type.HYBRIDFS, Set.of("vec", "vem")));
+ module.addFSTypeFileExtensions(Map.of(IndexModule.Type.HYBRIDFS, Set.of("txt", "vem")));
} catch (Exception ex) {
fail("not registered");
}
@@ -671,13 +670,12 @@ public void testHybridFSExtensionsList() throws IOException {
);
indexService.createShard(shardRouting, s -> {}, RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY);
-
- List expectedListOfExtensions = List.of("vec", "vem", "txt");
- verify(directoryFactory, times(1)).newDirectory(
- any(),
- any(),
- eq(Map.of("index.store.hybrid.mmap.extensions", expectedListOfExtensions))
- );
+ ArgumentCaptor