Skip to content

Commit

Permalink
Fix shard failure on flush during upload failures for remote indexes (o…
Browse files Browse the repository at this point in the history
…pensearch-project#10513)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Oct 12, 2023
1 parent f518e9d commit 90c4297
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.flush.FlushResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
Expand Down Expand Up @@ -228,4 +229,32 @@ public void testSkipLoadGlobalCheckpointToReplicationTracker() {
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
ensureGreen(INDEX_NAME);
}

public void testFlushDuringRemoteUploadFailures() {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

logger.info("--> Indexing data");
indexData(randomIntBetween(1, 2), true);
logger.info("--> Indexing succeeded");
ensureGreen(INDEX_NAME);

MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName)
.repository(TRANSLOG_REPOSITORY_NAME);
logger.info("--> Failing all remote store interaction");
translogRepo.setRandomControlIOExceptionRate(1d);

Exception ex = assertThrows(UncategorizedExecutionException.class, () -> indexSingleDoc());
assertEquals("Failed execution", ex.getMessage());

FlushResponse flushResponse = client().admin().indices().prepareFlush(INDEX_NAME).setForce(true).execute().actionGet();
assertEquals(1, flushResponse.getFailedShards());
ensureGreen(INDEX_NAME);

logger.info("--> Stop failing all remote store interactions");
translogRepo.setRandomControlIOExceptionRate(0d);
flushResponse = client().admin().indices().prepareFlush(INDEX_NAME).setForce(true).execute().actionGet();
assertEquals(1, flushResponse.getSuccessfulShards());
assertEquals(0, flushResponse.getFailedShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ public void trimTranslog() {
/**
* Rolls the tranlog generation and cleans unneeded.
*/
public void rollTranslogGeneration() {
public void rollTranslogGeneration() throws IOException {
final Engine engine = getEngine();
engine.translogManager().rollTranslogGeneration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.index.engine.LifecycleAware;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -83,11 +84,14 @@ public InternalTranslogManager(
* Rolls the translog generation and cleans unneeded.
*/
@Override
public void rollTranslogGeneration() throws TranslogException {
public void rollTranslogGeneration() throws TranslogException, IOException {
try (ReleasableLock ignored = readLock.acquire()) {
engineLifeCycleAware.ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (TranslogUploadFailedException e) {
// Do not trigger the translogEventListener as it fails the Engine while this is only an issue with remote upload
throw e;
} catch (AlreadyClosedException e) {
translogEventListener.onFailure("translog roll generation failed", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface TranslogManager {
/**
* Rolls the translog generation and cleans unneeded.
*/
void rollTranslogGeneration() throws TranslogException;
void rollTranslogGeneration() throws TranslogException, IOException;

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L);
remoteTranslogTransferTracker.addUploadBytesFailed(metadataBytesToUpload);
// outer catch handles capturing stats on upload failure
throw exception;
throw new TranslogUploadFailedException("Failed to upload " + tlogMetadata.getName(), exception);
}

remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L);
Expand All @@ -185,7 +185,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
} else {
Exception ex = new IOException("Failed to upload " + exceptionList.size() + " files during transfer");
Exception ex = new TranslogUploadFailedException("Failed to upload " + exceptionList.size() + " files during transfer");
exceptionList.forEach(ex::addSuppressed);
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import java.io.IOException;

/**
* Exception is thrown if there are any exceptions while uploading translog to remote store.
* @opensearch.internal
*/
public class TranslogUploadFailedException extends IOException {

public TranslogUploadFailedException(String message) {
super(message);
}

public TranslogUploadFailedException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7256,7 +7256,11 @@ public void testMaxSeqNoInCommitUserData() throws Exception {
engine.ensureOpen();
while (running.get()
&& assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration() < 500) {
engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower
try {
engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower
} catch (IOException e) {
fail("io exception not expected");
}
}
});
rollTranslog.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -1113,7 +1114,7 @@ public void testSyncUpAlwaysFailure() throws IOException {
try {
translog.sync();
fail("io exception expected");
} catch (IOException e) {
} catch (TranslogUploadFailedException e) {
assertTrue("at least one operation pending", translog.syncNeeded());
}
}
Expand Down

0 comments on commit 90c4297

Please sign in to comment.