From f2a3889a0c72c2bb8d118cdac9390cacf00e4774 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 17 May 2022 00:23:35 +0530 Subject: [PATCH] Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener (#3285) Co-authored-by: Sachin Kale Signed-off-by: Sachin Kale --- .../opensearch/index/shard/IndexShardIT.java | 3 +- .../org/opensearch/index/IndexModule.java | 4 ++ .../org/opensearch/index/IndexService.java | 28 +++++++- .../opensearch/index/shard/IndexShard.java | 16 +++-- .../shard/RemoteStoreRefreshListener.java | 38 +++++++++++ .../index/store/RemoteDirectoryFactory.java | 37 +++++++++++ .../opensearch/indices/IndicesService.java | 8 ++- .../opensearch/plugins/IndexStorePlugin.java | 17 +++++ .../store/RemoteDirectoryFactoryTests.java | 65 +++++++++++++++++++ ...dicesLifecycleListenerSingleNodeTests.java | 3 +- .../index/shard/IndexShardTestCase.java | 3 +- 11 files changed, 211 insertions(+), 11 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 888881d43eb11..2bf73b34247b3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -675,7 +675,8 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 49daf8293656c..2cea0e4e3e95c 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -70,6 +70,7 @@ import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -118,6 +119,8 @@ public final class IndexModule { private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory(); + private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory(); + private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new; public static final Setting INDEX_STORE_TYPE_SETTING = new Setting<>( @@ -516,6 +519,7 @@ public IndexService newIndexService( client, queryCache, directoryFactory, + REMOTE_DIRECTORY_FACTORY, eventListener, readerWrapperFactory, mapperRegistry, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 0a6d1501f2bea..90f6499927aef 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,6 +81,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexingOperationListener; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -96,6 +97,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final NodeEnvironment nodeEnv; private final ShardStoreDeleter shardStoreDeleter; private final IndexStorePlugin.DirectoryFactory directoryFactory; + private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; private final CheckedFunction readerWrapper; private final IndexCache indexCache; @@ -190,6 +195,7 @@ public IndexService( Client client, QueryCache queryCache, IndexStorePlugin.DirectoryFactory directoryFactory, + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, IndexEventListener eventListener, Function> wrapperFactory, MapperRegistry mapperRegistry, @@ -260,6 +266,7 @@ public IndexService( this.eventListener = eventListener; this.nodeEnv = nodeEnv; this.directoryFactory = directoryFactory; + this.remoteDirectoryFactory = remoteDirectoryFactory; this.recoveryStateFactory = recoveryStateFactory; this.engineFactory = Objects.requireNonNull(engineFactory); this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory); @@ -430,7 +437,8 @@ public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RepositoriesService repositoriesService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -504,6 +512,21 @@ public synchronized IndexShard createShard( } }; Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory remoteDirectory = null; + RemoteStoreRefreshListener remoteStoreRefreshListener = null; + if (this.indexSettings.isRemoteStoreEnabled()) { + try { + Repository repository = repositoriesService.repository("dragon-stone"); + remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException( + "Repository should be created before creating index with remote_store enabled setting", + e + ); + } + } + store = new Store( shardId, this.indexSettings, @@ -533,7 +556,8 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null + this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null, + remoteStoreRefreshListener ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 995a92e94aeb3..a74d5bb9aee99 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -304,6 +304,8 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final ReferenceManager.RefreshListener checkpointRefreshListener; + private final RemoteStoreRefreshListener remoteStoreRefreshListener; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -325,7 +327,8 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher + @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteStoreRefreshListener remoteStoreRefreshListener ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -413,6 +416,7 @@ public boolean shouldCache(Query query) { } else { this.checkpointRefreshListener = null; } + this.remoteStoreRefreshListener = remoteStoreRefreshListener; } public ThreadPool getThreadPool() { @@ -3131,11 +3135,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } }; - final List internalRefreshListener; + final List internalRefreshListener = new ArrayList<>(); + internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + if (remoteStoreRefreshListener != null && shardRouting.primary()) { + internalRefreshListener.add(remoteStoreRefreshListener); + } if (this.checkpointRefreshListener != null) { - internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); - } else { - internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + internalRefreshListener.add(checkpointRefreshListener); } return this.engineConfigFactory.newEngineConfig( diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java new file mode 100644 index 0000000000000..c41fe0ba27177 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.Directory; + +import java.io.IOException; + +/** + * RefreshListener implementation to upload newly created segment files to the remote store + */ +public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + + private final Directory storeDirectory; + private final Directory remoteDirectory; + + public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) { + this.storeDirectory = storeDirectory; + this.remoteDirectory = remoteDirectory; + } + + @Override + public void beforeRefresh() throws IOException { + // ToDo Add implementation + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + // ToDo Add implementation + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java new file mode 100644 index 0000000000000..eb7912a1f4a2b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; + +/** + * Factory for a remote store directory + * + * @opensearch.internal + */ +public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { + + @Override + public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobPath blobPath = new BlobPath(); + blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); + BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); + return new RemoteDirectory(blobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 79fd2893fb78c..b2f6f10c19638 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -859,7 +859,13 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard( + shardRouting, + globalCheckpointSyncer, + retentionLeaseSyncer, + checkpointPublisher, + repositoriesService + ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index 2f549fec54759..52ddf6dcf2753 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,6 +39,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.repositories.Repository; import java.io.IOException; import java.util.Collections; @@ -66,6 +67,22 @@ interface DirectoryFactory { Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException; } + /** + * An interface that describes how to create a new remote directory instance per shard. + */ + @FunctionalInterface + interface RemoteDirectoryFactory { + /** + * Creates a new remote directory per shard. This method is called once per shard on shard creation. + * @param indexSettings the shards index settings + * @param shardPath the path the shard is using + * @param repository to get the BlobContainer details + * @return a new RemoteDirectory instance + * @throws IOException if an IOException occurs while opening the directory + */ + Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException; + } + /** * The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting * {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java new file mode 100644 index 0000000000000..d781fad9ab99c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { + + private RemoteDirectoryFactory remoteDirectoryFactory; + + @Before + public void setup() { + remoteDirectoryFactory = new RemoteDirectoryFactory(); + } + + public void testNewDirectory() throws IOException { + Settings settings = Settings.builder().build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); + Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); + ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(repository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository); + assertTrue(directory instanceof RemoteDirectory); + ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); + verify(blobStore).blobContainer(blobPathCaptor.capture()); + BlobPath blobPath = blobPathCaptor.getValue(); + assertEquals("foo/0/", blobPath.buildAsString()); + + directory.listAll(); + verify(blobContainer).listBlobs(); + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..213a22539971f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 371fa6d102304..62c52ab636255 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -525,7 +525,8 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - checkpointPublisher + checkpointPublisher, + null ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;