Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow additional settings for hybridfs from plugin #5735

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Allow additional settings for hybridfs from plugin ([#5735](https://github.com/opensearch-project/OpenSearch/pull/5735))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
protected Directory newFSDirectory(
Path location,
LockFactory lockFactory,
IndexSettings indexSettings,
Map<IndexModule.Type, Set<String>> additionalExtensions
) throws IOException {
return new SmbDirectoryWrapper(
setPreload(
new MMapDirectory(location, lockFactory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NIOFSDirectory;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.SmbDirectoryWrapper;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;

/**
* Factory to create a new NIO File System type directory accessible as a SMB share
*/
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
protected Directory newFSDirectory(
Path location,
LockFactory lockFactory,
IndexSettings indexSettings,
Map<IndexModule.Type, Set<String>> additionalExtensions
) throws IOException {
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -701,7 +702,8 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
null,
Map.of()
);
}

Expand Down
31 changes: 30 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public final class IndexModule {
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final List<Map<Type, Set<String>>> fsExtensions = new ArrayList<>();

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand Down Expand Up @@ -520,6 +521,14 @@ public IndexService newIndexService(
if (IndexService.needsMapperService(indexSettings, indexCreationContext)) {
indexAnalyzers = analysisRegistry.build(indexSettings);
}
final Map<Type, Set<String>> fsExtensionsByType = new HashMap<>();
for (Type type : Type.values()) {
fsExtensionsByType.put(type, new HashSet<>());
}
for (Map<Type, Set<String>> extensions : fsExtensions) {
extensions.forEach((key, value) -> fsExtensionsByType.get(key).addAll(value));
}

final IndexService indexService = new IndexService(
indexSettings,
indexCreationContext,
Expand Down Expand Up @@ -551,7 +560,8 @@ public IndexService newIndexService(
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory,
translogFactorySupplier
translogFactorySupplier,
fsExtensionsByType
);
success = true;
return indexService;
Expand Down Expand Up @@ -674,4 +684,23 @@ public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirect
}
return factories;
}

/**
* Adds a supplier of extension's list. All listeners added here are maintained for the entire index lifecycle on
* this node. Once an index is closed or deleted these listeners go out of scope.
* <p>
* Note: an index might be created on a node multiple times. For instance if the last shard from an index is
* relocated to another node the internal representation will be destroyed which includes the registered listeners.
* Once the node holds at least one shard of an index all modules are reloaded and listeners are registered again.
* Listeners can't be unregistered they will stay alive for the entire time the index is allocated on a node.
* </p>
*/
public void addFSTypeFileExtensions(final Map<Type, Set<String>> extensionsByType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The propagation of the FS extensions is not clear to me, who will call this method and when? I see it only being used in tests ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the plugin who needs to register additional extensions, it has IndexModule instance which is available via Plugin class and this will be part of initialization. It's similar to what plugin do today to register event listeners, e.g. k-NN has custom listener for SettingsUpdate event.

So method call in plugin will be something like:

public void onIndexModule(IndexModule indexModule) {
        indexModule. addFSTypeFileExtensions(Map.of(IndexModule.Type.HYBRIDFS, Set.of("ext1", "ext2")));
 }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent, so what we could try here is to leverage IndexSettings, something along these lines (the functionality does not exist yet):

public void onIndexModule(IndexModule indexModule) {
        indexModule.updateIndexSetting (Module.INDEX_STORE_HYBRID_MMAP_EXTENSIONS,  Set.of("ext1", "ext2")),
          ... merge function);
 }

In this case you could get rid of all FS specific methods, additionlaExtensions arguments and purely rely on existing IndexSettings mechanism.

Copy link
Member Author

@martin-gaievski martin-gaievski Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you idea, basically update the setting for hybridFs extensions and all the values will flow to FsFactory class without any additional changes. One concern that I have is that we can freely update any setting, I may need to test it (with this public method) but I'm not sure it's possible with non-dynamic settings. Currently that hybridFsExtensions is not dynamic.
Also such calls will trigger any registered onSettingUpdate listeners, and maybe will expose setting for API's update (this is in case we need to make it dynamic). Just want to make sure this is acceptable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern that I have is that we can freely update any setting, I may need to test it (with this public method) but I'm not sure it's possible with non-dynamic settings.

Correct, the hybridFsExtensions is not dynamic and probably should not be (since once FS is initialized, it could not be changed on the fly).

Also such calls will trigger any registered onSettingUpdate listeners

That could happen but I don't see update listener for hybridFsExtensions (since this is not dynamic). I have just this general idea which we could work through but the reasoning behind is - FS code should not leak anywhere (like it does now), IndexSettings is the only source for index configuration.

Copy link
Member Author

@martin-gaievski martin-gaievski Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta I see a problem with a Settings approach. If setting is not dynamic then I can't register customer update handler (like it's done for other existing settings) to perform "append"-like update and combine existing core and custom extensions into one list. With default updater new value overwrite existing ones, which normally should be enough but not in this case. The problem is in this validation that checks if setting is dynamic.

Copy link
Collaborator

@reta reta Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-gaievski I think I found exactly what we need, each Plugin has the way to contribute additional IndexSettings, it may need some minor tweaks (I can help with that), but I believe it should address all the concerns raised:

    /**
     * Get the setting upgraders provided by this plugin.
     *
     * @return the settings upgraders
     */
    public List<SettingUpgrader<?>> getSettingUpgraders() {
        return Collections.emptyList();
    }

    /**
     * An {@link IndexSettingProvider} allows hooking in to parts of an index
     * lifecycle to provide explicit default settings for newly created indices. Rather than changing
     * the default values for an index-level setting, these act as though the setting has been set
     * explicitly, but still allow the setting to be overridden by a template or creation request body.
     */
    public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders() {
        return Collections.emptyList();
    }

Copy link
Member Author

@martin-gaievski martin-gaievski Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired by your finding, I tested even simpler method

    /**
     * Additional node settings loaded by the plugin. Note that settings that are explicit in the nodes settings can't be
     * overwritten with the additional settings. These settings added if they don't exist.
     */
    public Settings additionalSettings() {
        return Settings.Builder.EMPTY_SETTINGS;
    }

It works for our setting, just only one thing is that it overrides value in core, so I had to do read-update.

public Settings additionalSettings() {
        final List<String> knnSpecificSettings = List.of("vem", "vec");
        final List<String> combinedSettings = Stream.concat(
            IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS.getDefault(Settings.EMPTY).stream(),
            knnSpecificSettings.stream()
        ).collect(Collectors.toList());
        return Settings.builder().putList(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS.getKey(), combinedSettings).build();
    }

I think that's ok as core allowing it in the first place. With this all changes are on plugin side, I'll be closing this PR. Thank you @reta for suggesting this approach, it makes more sense than initial one.

ensureNotFrozen();
if (extensionsByType == null) {
throw new IllegalArgumentException("supplier must not be null");
}

this.fsExtensions.add(extensionsByType);
}
}
10 changes: 7 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Map<IndexModule.Type, Set<String>> fsExtensions;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -207,7 +208,8 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Map<IndexModule.Type, Set<String>> fsExtensions
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -280,6 +282,7 @@ public IndexService(
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.translogFactorySupplier = translogFactorySupplier;
this.fsExtensions = Objects.requireNonNull(fsExtensions);
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -522,7 +525,7 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory directory = directoryFactory.newDirectory(this.indexSettings, path, this.fsExtensions);
store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -554,7 +557,8 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
remoteStore,
fsExtensions
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Map<IndexModule.Type, Set<String>> fsExtensions;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -347,7 +348,8 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
@Nullable final Store remoteStore,
final Map<IndexModule.Type, Set<String>> fsExtensions
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -430,6 +432,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.fsExtensions = fsExtensions;
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -76,14 +78,25 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
}, Property.IndexScope, Property.NodeScope);

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return newDirectory(indexSettings, shardPath, Map.of());
}

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Map<IndexModule.Type, Set<String>> additionalExtensions)
throws IOException {
final Path location = path.resolveIndex();
final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING);
Files.createDirectories(location);
return newFSDirectory(location, lockFactory, indexSettings);
return newFSDirectory(location, lockFactory, indexSettings, additionalExtensions);
}

protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
protected Directory newFSDirectory(
Path location,
LockFactory lockFactory,
IndexSettings indexSettings,
Map<IndexModule.Type, Set<String>> additionalExtensions
) throws IOException {
final String storeType = indexSettings.getSettings()
.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey());
IndexModule.Type type;
Expand All @@ -98,6 +111,10 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index
// Use Lucene defaults
final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory);
final Set<String> mmapExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS));
Objects.requireNonNull(additionalExtensions);
if (additionalExtensions.containsKey(IndexModule.Type.HYBRIDFS)) {
mmapExtensions.addAll(additionalExtensions.get(IndexModule.Type.HYBRIDFS));
}
if (primaryDirectory instanceof MMapDirectory) {
MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory;
return new HybridDirectory(lockFactory, setPreload(mMapDirectory, lockFactory, preLoadExtensions), mmapExtensions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* A plugin that provides alternative directory implementations.
Expand All @@ -64,6 +66,19 @@ interface DirectoryFactory {
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;

/**
* Creates a new directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param extensions map of providers for additional settings where key is a setting name
* @return a new lucene directory instance
* @throws IOException if an IOException occurs while opening the directory
*/
default Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Map<IndexModule.Type, Set<String>> extensions)
throws IOException {
return newDirectory(indexSettings, shardPath);
}
}

/**
Expand Down
Loading