-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add RemoteDirectory instance to Store as a secondary directory #3285
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,6 +81,7 @@ | |
import org.opensearch.index.shard.IndexShard; | ||
import org.opensearch.index.shard.IndexShardClosedException; | ||
import org.opensearch.index.shard.IndexingOperationListener; | ||
import org.opensearch.index.shard.RemoteStoreRefreshListener; | ||
import org.opensearch.index.shard.SearchOperationListener; | ||
import org.opensearch.index.shard.ShardId; | ||
import org.opensearch.index.shard.ShardNotFoundException; | ||
|
@@ -95,6 +96,9 @@ | |
import org.opensearch.indices.mapper.MapperRegistry; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.plugins.IndexStorePlugin; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.repositories.Repository; | ||
import org.opensearch.repositories.RepositoryMissingException; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.search.aggregations.support.ValuesSourceRegistry; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
@@ -135,6 +139,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust | |
private final NodeEnvironment nodeEnv; | ||
private final ShardStoreDeleter shardStoreDeleter; | ||
private final IndexStorePlugin.DirectoryFactory directoryFactory; | ||
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; | ||
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; | ||
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper; | ||
private final IndexCache indexCache; | ||
|
@@ -189,6 +194,7 @@ public IndexService( | |
Client client, | ||
QueryCache queryCache, | ||
IndexStorePlugin.DirectoryFactory directoryFactory, | ||
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, | ||
IndexEventListener eventListener, | ||
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory, | ||
MapperRegistry mapperRegistry, | ||
|
@@ -259,6 +265,7 @@ public IndexService( | |
this.eventListener = eventListener; | ||
this.nodeEnv = nodeEnv; | ||
this.directoryFactory = directoryFactory; | ||
this.remoteDirectoryFactory = remoteDirectoryFactory; | ||
this.recoveryStateFactory = recoveryStateFactory; | ||
this.engineFactory = Objects.requireNonNull(engineFactory); | ||
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory); | ||
|
@@ -423,7 +430,8 @@ private long getAvgShardSizeInBytes() throws IOException { | |
public synchronized IndexShard createShard( | ||
final ShardRouting routing, | ||
final Consumer<ShardId> globalCheckpointSyncer, | ||
final RetentionLeaseSyncer retentionLeaseSyncer | ||
final RetentionLeaseSyncer retentionLeaseSyncer, | ||
final RepositoriesService repositoriesService | ||
) throws IOException { | ||
Objects.requireNonNull(retentionLeaseSyncer); | ||
/* | ||
|
@@ -497,6 +505,21 @@ public synchronized IndexShard createShard( | |
} | ||
}; | ||
Directory directory = directoryFactory.newDirectory(this.indexSettings, path); | ||
Directory remoteDirectory = null; | ||
RemoteStoreRefreshListener remoteStoreRefreshListener = null; | ||
if (this.indexSettings.isRemoteStoreEnabled()) { | ||
try { | ||
Repository repository = repositoriesService.repository("dragon-stone"); | ||
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository); | ||
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory); | ||
} catch (RepositoryMissingException e) { | ||
throw new IllegalArgumentException( | ||
"Repository should be created before creating index with remote_store enabled setting", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is repository created by user or by system? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As of now, it is created by user. Once we make the name configurable as a part of #3286, we can check if the repository can be created during bootstrap. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, you're saying as of now it will have to be created by user but he can only name it as "dragon-stone"? That doesn't seem correct right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
e | ||
); | ||
} | ||
} | ||
|
||
store = new Store( | ||
shardId, | ||
this.indexSettings, | ||
|
@@ -525,7 +548,8 @@ public synchronized IndexShard createShard( | |
indexingOperationListeners, | ||
() -> globalCheckpointSyncer.accept(shardId), | ||
retentionLeaseSyncer, | ||
circuitBreakerService | ||
circuitBreakerService, | ||
remoteStoreRefreshListener | ||
); | ||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); | ||
eventListener.afterIndexShardCreated(indexShard); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -299,6 +299,8 @@ Runnable getGlobalCheckpointSyncer() { | |
private final RefreshPendingLocationListener refreshPendingLocationListener; | ||
private volatile boolean useRetentionLeasesInPeerRecovery; | ||
|
||
private final RemoteStoreRefreshListener remoteStoreRefreshListener; | ||
|
||
public IndexShard( | ||
final ShardRouting shardRouting, | ||
final IndexSettings indexSettings, | ||
|
@@ -319,7 +321,8 @@ public IndexShard( | |
final List<IndexingOperationListener> listeners, | ||
final Runnable globalCheckpointSyncer, | ||
final RetentionLeaseSyncer retentionLeaseSyncer, | ||
final CircuitBreakerService circuitBreakerService | ||
final CircuitBreakerService circuitBreakerService, | ||
final RemoteStoreRefreshListener remoteStoreRefreshListener | ||
) throws IOException { | ||
super(shardRouting.shardId(), indexSettings); | ||
assert shardRouting.initializing(); | ||
|
@@ -402,6 +405,7 @@ public boolean shouldCache(Query query) { | |
persistMetadata(path, indexSettings, shardRouting, null, logger); | ||
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); | ||
this.refreshPendingLocationListener = new RefreshPendingLocationListener(); | ||
this.remoteStoreRefreshListener = remoteStoreRefreshListener; | ||
} | ||
|
||
public ThreadPool getThreadPool() { | ||
|
@@ -3100,6 +3104,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { | |
} | ||
}; | ||
|
||
final List<ReferenceManager.RefreshListener> internalRefreshListener; | ||
if (remoteStoreRefreshListener != null && shardRouting.primary()) { | ||
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), remoteStoreRefreshListener); | ||
} else { | ||
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); | ||
} | ||
Comment on lines
+3107
to
+3112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets push this down to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not get this. You mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I meant IndexServices that create There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this to avoid null check? As There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am fine with what we have now, but if the checks grow at multiple place we might want to give this a revisit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is identical to the listener for segrep - here. What about building a list of additional listeners in IndexService and pass directly to IndexShard? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Bukhtawar Agree, adding a task for the same. |
||
return this.engineConfigFactory.newEngineConfig( | ||
shardId, | ||
threadPool, | ||
|
@@ -3116,7 +3126,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { | |
translogConfig, | ||
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), | ||
Arrays.asList(refreshListeners, refreshPendingLocationListener), | ||
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), | ||
internalRefreshListener, | ||
indexSort, | ||
circuitBreakerService, | ||
globalCheckpointSupplier, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.shard; | ||
|
||
import org.apache.lucene.search.ReferenceManager; | ||
import org.apache.lucene.store.Directory; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* RefreshListener implementation to upload newly created segment files to the remote store | ||
*/ | ||
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { | ||
|
||
private final Directory storeDirectory; | ||
private final Directory remoteDirectory; | ||
|
||
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) { | ||
this.storeDirectory = storeDirectory; | ||
this.remoteDirectory = remoteDirectory; | ||
} | ||
|
||
@Override | ||
public void beforeRefresh() throws IOException { | ||
// ToDo Add implementation | ||
} | ||
|
||
@Override | ||
public void afterRefresh(boolean didRefresh) throws IOException { | ||
// ToDo Add implementation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actual implementation will be added in the next commit |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store; | ||
|
||
import org.apache.lucene.store.Directory; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.shard.ShardPath; | ||
import org.opensearch.plugins.IndexStorePlugin; | ||
import org.opensearch.repositories.Repository; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
|
||
import java.io.IOException; | ||
|
||
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { | ||
|
||
@Override | ||
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException { | ||
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; | ||
BlobPath blobPath = new BlobPath(); | ||
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); | ||
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); | ||
return new RemoteDirectory(blobContainer); | ||
} | ||
} | ||
Comment on lines
+22
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the factory supposed to vend out another variation based on the inputs? If not do we need a factory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, there will be a directory instance per shard as path will be different per directory. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if that qualifies to be a factory f.e look at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. You mean, if it is only one variant each time, we do not need a factory, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we call it a |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store; | ||
|
||
import org.apache.lucene.store.Directory; | ||
import org.junit.Before; | ||
import org.mockito.ArgumentCaptor; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.blobstore.BlobStore; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.shard.ShardId; | ||
import org.opensearch.index.shard.ShardPath; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
import org.opensearch.test.IndexSettingsModule; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.Collections; | ||
|
||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
import static org.mockito.Mockito.verify; | ||
|
||
public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { | ||
|
||
private RemoteDirectoryFactory remoteDirectoryFactory; | ||
|
||
@Before | ||
public void setup() { | ||
remoteDirectoryFactory = new RemoteDirectoryFactory(); | ||
} | ||
|
||
public void testNewDirectory() throws IOException { | ||
Settings settings = Settings.builder().build(); | ||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); | ||
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); | ||
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); | ||
BlobStoreRepository repository = mock(BlobStoreRepository.class); | ||
BlobStore blobStore = mock(BlobStore.class); | ||
BlobContainer blobContainer = mock(BlobContainer.class); | ||
when(repository.blobStore()).thenReturn(blobStore); | ||
when(blobStore.blobContainer(any())).thenReturn(blobContainer); | ||
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); | ||
|
||
Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository); | ||
assertTrue(directory instanceof RemoteDirectory); | ||
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); | ||
verify(blobStore).blobContainer(blobPathCaptor.capture()); | ||
BlobPath blobPath = blobPathCaptor.getValue(); | ||
assertEquals("foo/0/", blobPath.buildAsString()); | ||
|
||
directory.listAll(); | ||
verify(blobContainer).listBlobs(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This repository name is hardcoded for now. It would be made configurable as a part of #3286