diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java index 9d4d8aa24bd51..2053800504c89 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -160,7 +160,7 @@ private String getLocalSegmentFilename(String remoteFilename) { return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; } - private IndexResponse indexSingleDoc() { + protected IndexResponse indexSingleDoc() { return client().prepareIndex(INDEX_NAME) .setId(UUIDs.randomBase64UUID()) .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java similarity index 67% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java index 3462054c23630..2c6db6ae19a9a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java @@ -12,12 +12,18 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.IndexService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; @@ -33,7 +39,7 @@ import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { +public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception { // Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions // fail leading to consecutive failure limit getting exceeded and leading to rejections. @@ -156,4 +162,70 @@ private String generateString(int sizeInBytes) { sb.append("}"); return sb.toString(); } + + /** + * Fixes Github#10398 + */ + public void testAsyncTrimTaskSucceeds() { + Path location = randomRepoPath().toAbsolutePath(); + String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); + + logger.info("Increasing the frequency of async trim task to ensure it runs in background while indexing"); + IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); + ((AbstractAsyncTask) indexService.getTrimTranslogTask()).setInterval(TimeValue.timeValueMillis(100)); + + logger.info("--> Indexing data"); + indexData(randomIntBetween(2, 5), true); + logger.info("--> Indexing succeeded"); + + MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName) + .repository(TRANSLOG_REPOSITORY_NAME); + logger.info("--> Failing all remote store interaction"); + translogRepo.setRandomControlIOExceptionRate(1d); + + for (int i = 0; i < randomIntBetween(5, 10); i++) { + UncategorizedExecutionException exception = assertThrows(UncategorizedExecutionException.class, this::indexSingleDoc); + assertEquals("Failed execution", exception.getMessage()); + } + + translogRepo.setRandomControlIOExceptionRate(0d); + indexSingleDoc(); + logger.info("Indexed single doc successfully"); + } + + /** + * Fixes Github#10400 + */ + public void testSkipLoadGlobalCheckpointToReplicationTracker() { + Path location = randomRepoPath().toAbsolutePath(); + String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); + + logger.info("--> Indexing data"); + indexData(randomIntBetween(1, 2), true); + logger.info("--> Indexing succeeded"); + + IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); + IndexShard indexShard = indexService.getShard(0); + indexShard.failShard("failing shard", null); + + ensureRed(INDEX_NAME); + + MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName) + .repository(TRANSLOG_REPOSITORY_NAME); + logger.info("--> Failing all remote store interaction"); + translogRepo.setRandomControlIOExceptionRate(1d); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + // CLuster stays red still as the remote interactions are still failing + ensureRed(INDEX_NAME); + + logger.info("Retrying to allocate failed shards"); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + // CLuster stays red still as the remote interactions are still failing + ensureRed(INDEX_NAME); + + logger.info("Stop failing all remote store interactions"); + translogRepo.setRandomControlIOExceptionRate(0d); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + ensureGreen(INDEX_NAME); + } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index df8e8070b8e03..af23145be9f89 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -1286,7 +1286,7 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } - AsyncTrimTranslogTask getTrimTranslogTask() { // for tests + public AsyncTrimTranslogTask getTrimTranslogTask() { // for tests return trimTranslogTask; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 38b349dc1763b..e40dcfb96f705 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1470,6 +1470,9 @@ public void flush(FlushRequest request) { * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details */ public void trimTranslog() { + if (isRemoteTranslogEnabled()) { + return; + } verifyNotClosed(); final Engine engine = getEngine(); engine.trimUnreferencedTranslogFiles(); @@ -2320,7 +2323,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; // Do not load the global checkpoint if this is a remote snapshot index - if (indexSettings.isRemoteSnapshot() == false) { + if (indexSettings.isRemoteSnapshot() == false && indexSettings.isRemoteTranslogStoreEnabled() == false) { loadGlobalCheckpointToReplicationTracker(); }