Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs causing red indexes with remote indexes during translog upload & store recovery #10449

Merged
merged 5 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
protected IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -33,7 +39,7 @@
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
Expand Down Expand Up @@ -156,4 +162,70 @@ private String generateString(int sizeInBytes) {
sb.append("}");
return sb.toString();
}

/**
* Fixes <a href="https://github.com/opensearch-project/OpenSearch/issues/10398">Github#10398</a>
*/
public void testAsyncTrimTaskSucceeds() {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

logger.info("Increasing the frequency of async trim task to ensure it runs in background while indexing");
IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();
((AbstractAsyncTask) indexService.getTrimTranslogTask()).setInterval(TimeValue.timeValueMillis(100));

logger.info("--> Indexing data");
indexData(randomIntBetween(2, 5), true);
logger.info("--> Indexing succeeded");

MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName)
.repository(TRANSLOG_REPOSITORY_NAME);
logger.info("--> Failing all remote store interaction");
translogRepo.setRandomControlIOExceptionRate(1d);

for (int i = 0; i < randomIntBetween(5, 10); i++) {
UncategorizedExecutionException exception = assertThrows(UncategorizedExecutionException.class, this::indexSingleDoc);
assertEquals("Failed execution", exception.getMessage());
}

translogRepo.setRandomControlIOExceptionRate(0d);
indexSingleDoc();
logger.info("Indexed single doc successfully");
}

/**
* Fixes <a href="https://github.com/opensearch-project/OpenSearch/issues/10400">Github#10400</a>
*/
public void testSkipLoadGlobalCheckpointToReplicationTracker() {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

logger.info("--> Indexing data");
indexData(randomIntBetween(1, 2), true);
logger.info("--> Indexing succeeded");

IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();
IndexShard indexShard = indexService.getShard(0);
indexShard.failShard("failing shard", null);

ensureRed(INDEX_NAME);

MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName)
.repository(TRANSLOG_REPOSITORY_NAME);
logger.info("--> Failing all remote store interaction");
translogRepo.setRandomControlIOExceptionRate(1d);
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
// CLuster stays red still as the remote interactions are still failing
ensureRed(INDEX_NAME);

logger.info("Retrying to allocate failed shards");
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
// CLuster stays red still as the remote interactions are still failing
ensureRed(INDEX_NAME);

logger.info("Stop failing all remote store interactions");
translogRepo.setRandomControlIOExceptionRate(0d);
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
public AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
return trimTranslogTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,9 @@
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public void trimTranslog() {
if (isRemoteTranslogEnabled()) {
return;

Check warning on line 1469 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1469

Added line #L1469 was not covered by tests
}
verifyNotClosed();
final Engine engine = getEngine();
engine.translogManager().trimUnreferencedTranslogFiles();
Expand Down Expand Up @@ -2314,7 +2317,7 @@
};

// Do not load the global checkpoint if this is a remote snapshot index
if (indexSettings.isRemoteSnapshot() == false) {
if (indexSettings.isRemoteSnapshot() == false && indexSettings.isRemoteTranslogStoreEnabled() == false) {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
loadGlobalCheckpointToReplicationTracker();
}

Expand Down
Loading