Skip to content

Commit

Permalink
Add RemoteDirectoryFactory and use RemoteDirectory instance in Refres…
Browse files Browse the repository at this point in the history
…hListener (opensearch-project#3285)

Co-authored-by: Sachin Kale <[email protected]>
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale committed May 27, 2022
1 parent 07f6f6c commit f2a3889
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
}

Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -118,6 +119,8 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -516,6 +519,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
28 changes: 26 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -96,6 +97,9 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -190,6 +195,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -260,6 +266,7 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
Expand Down Expand Up @@ -430,7 +437,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -504,6 +512,21 @@ public synchronized IndexShard createShard(
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository("dragon-stone");
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
}

store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -533,7 +556,8 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStoreRefreshListener
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
16 changes: 11 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final RemoteStoreRefreshListener remoteStoreRefreshListener;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -325,7 +327,8 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStoreRefreshListener remoteStoreRefreshListener
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -413,6 +416,7 @@ public boolean shouldCache(Query query) {
} else {
this.checkpointRefreshListener = null;
}
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3131,11 +3135,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener;
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener.add(remoteStoreRefreshListener);
}
if (this.checkpointRefreshListener != null) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
internalRefreshListener.add(checkpointRefreshListener);
}

return this.engineConfigFactory.newEngineConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;

import java.io.IOException;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {

private final Directory storeDirectory;
private final Directory remoteDirectory;

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
}

@Override
public void beforeRefresh() throws IOException {
// ToDo Add implementation
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
// ToDo Add implementation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;

/**
* Factory for a remote store directory
*
* @opensearch.internal
*/
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,13 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
IndexShard indexShard = indexService.createShard(
shardRouting,
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
repositoriesService
);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.Repository;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -66,6 +67,22 @@ interface DirectoryFactory {
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* An interface that describes how to create a new remote directory instance per shard.
*/
@FunctionalInterface
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param repository to get the BlobContainer details
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
}

/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

public class RemoteDirectoryFactoryTests extends OpenSearchTestCase {

private RemoteDirectoryFactory remoteDirectoryFactory;

@Before
public void setup() {
remoteDirectoryFactory = new RemoteDirectoryFactory();
}

public void testNewDirectory() throws IOException {
Settings settings = Settings.builder().build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
BlobContainer blobContainer = mock(BlobContainer.class);
when(repository.blobStore()).thenReturn(blobStore);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());

Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository);
assertTrue(directory instanceof RemoteDirectory);
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
verify(blobStore).blobContainer(blobPathCaptor.capture());
BlobPath blobPath = blobPathCaptor.getValue();
assertEquals("foo/0/", blobPath.buildAsString());

directory.listAll();
verify(blobContainer).listBlobs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
newRouting,
s -> {},
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,8 @@ protected IndexShard newShard(
globalCheckpointSyncer,
retentionLeaseSyncer,
breakerService,
checkpointPublisher
checkpointPublisher,
null
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;
Expand Down

0 comments on commit f2a3889

Please sign in to comment.