Skip to content

Commit

Permalink
Add UTs in RemoteFsTranslogTests & incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Nov 25, 2023
1 parent bdb92d4 commit 954a43d
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public final class EngineConfig {
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final BooleanSupplier startedPrimarySupplier;
private final Comparator<LeafReader> leafSorter;

/**
Expand Down Expand Up @@ -287,7 +287,7 @@ private EngineConfig(Builder builder) {
this.primaryTermSupplier = builder.primaryTermSupplier;
this.tombstoneDocSupplier = builder.tombstoneDocSupplier;
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.startedPrimarySupplier = builder.startedPrimarySupplier;
this.translogFactory = builder.translogFactory;
this.leafSorter = builder.leafSorter;
}
Expand Down Expand Up @@ -495,11 +495,11 @@ public boolean isReadOnlyReplica() {
}

/**
* Returns the underlying primaryModeSupplier.
* Returns the underlying startedPrimarySupplier.
* @return the primary mode supplier.
*/
public BooleanSupplier getPrimaryModeSupplier() {
return primaryModeSupplier;
public BooleanSupplier getStartedPrimarySupplier() {
return startedPrimarySupplier;
}

/**
Expand Down Expand Up @@ -577,7 +577,7 @@ public static class Builder {
private TombstoneDocSupplier tombstoneDocSupplier;
private TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private BooleanSupplier startedPrimarySupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();
Comparator<LeafReader> leafSorter;

Expand Down Expand Up @@ -701,8 +701,8 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) {
return this;
}

public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) {
this.primaryModeSupplier = primaryModeSupplier;
public Builder startedPrimarySupplier(BooleanSupplier startedPrimarySupplier) {
this.startedPrimarySupplier = startedPrimarySupplier;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public EngineConfig newEngineConfig(
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
BooleanSupplier startedPrimarySupplier,
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter
) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public EngineConfig newEngineConfig(
.primaryTermSupplier(primaryTermSupplier)
.tombstoneDocSupplier(tombstoneDocSupplier)
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.startedPrimarySupplier(startedPrimarySupplier)
.translogFactory(translogFactory)
.leafSorter(leafSorter)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void onFailure(String reason, Exception ex) {
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen,
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
engineConfig.getStartedPrimarySupplier()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void onAfterTranslogSync() {
},
this,
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
engineConfig.getStartedPrimarySupplier()
);
this.translogManager = translogManagerRef;
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {},
engineConfig.getPrimaryModeSupplier()
engineConfig.getStartedPrimarySupplier()
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {},
config.getPrimaryModeSupplier()
config.getStartedPrimarySupplier()
)
) {
return translog.stats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @opensearch.internal
*/
public class CheckpointRefreshListener extends CloseableRetryableRefreshListener {
public class CheckpointRefreshListener extends ReleasableRetryableRefreshListener {

protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);

Expand Down
16 changes: 9 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ public void relocated(
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
// The below list of releasable ensures that if the relocation does not happen, we undo the activity of close and
// acquire all permits. This will ensure that the remote store uploads can still be done by the existing primary shard.
List<Releasable> releasablesOnNoHandoff = new ArrayList<>();
List<Releasable> releasablesOnHandoffFailures = new ArrayList<>(2);
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();
Expand All @@ -862,13 +862,13 @@ public void relocated(

// Ensures all in-flight remote store refreshes drain, before we perform the performSegRep.
for (ReferenceManager.RefreshListener refreshListener : internalRefreshListener) {
if (refreshListener instanceof CloseableRetryableRefreshListener) {
releasablesOnNoHandoff.add(((CloseableRetryableRefreshListener) refreshListener).drainRefreshes());
if (refreshListener instanceof ReleasableRetryableRefreshListener) {
releasablesOnHandoffFailures.add(((ReleasableRetryableRefreshListener) refreshListener).drainRefreshes());
}
}

// Ensure all in-flight remote store translog upload drains, before we perform the performSegRep.
releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSync());
releasablesOnHandoffFailures.add(getEngine().translogManager().drainSync());

// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
Expand Down Expand Up @@ -903,14 +903,16 @@ public void relocated(
// Fail primary relocation source and target shards.
failShard("timed out waiting for relocation hand-off to complete", null);
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
} catch (Exception ex) {
logger.warn("exception occurred during relocation hand-off to complete errorMsg={}", ex.getMessage());
assert replicationTracker.isPrimaryMode();
throw ex;
} finally {
// If the primary mode is still true after the end of handoff attempt, it basically means that the relocation
// failed. The existing primary will continue to be the primary, so we need to allow the segments and translog
// upload to resume.
if (replicationTracker.isPrimaryMode()) {
for (Releasable releasable : releasablesOnNoHandoff) {
releasable.close();
}
Releasables.close(releasablesOnHandoffFailures);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the listener
* is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides
* RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the {@code drainRefreshes()}
* is called, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides
* necessary abstract methods to schedule retry.
*/
public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener {
public abstract class ReleasableRetryableRefreshListener implements ReferenceManager.RefreshListener {

/**
* Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time.
Expand All @@ -48,11 +48,11 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana
*/
private final AtomicBoolean retryScheduled = new AtomicBoolean(false);

public CloseableRetryableRefreshListener() {
public ReleasableRetryableRefreshListener() {
this.threadPool = null;
}

public CloseableRetryableRefreshListener(ThreadPool threadPool) {
public ReleasableRetryableRefreshListener(ThreadPool threadPool) {
assert Objects.nonNull(threadPool);
this.threadPool = threadPool;
}
Expand Down Expand Up @@ -215,7 +215,7 @@ public final Releasable drainRefreshes() {
/**
* Returns the timeout which is used while draining refreshes.
*/
protected TimeValue getDrainTimeout() {
TimeValue getDrainTimeout() {
return DRAIN_TIMEOUT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
*
* @opensearch.internal
*/
public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshListener {
public final class RemoteStoreRefreshListener extends ReleasableRetryableRefreshListener {

private final Logger logger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Translog newTranslog(
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
BooleanSupplier startedPrimarySupplier
) throws IOException {

return new LocalTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public InternalTranslogManager(
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
BooleanSupplier startedPrimarySupplier
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -72,7 +72,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID, translogFactory, primaryModeSupplier);
}, translogUUID, translogFactory, startedPrimarySupplier);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -369,7 +369,7 @@ protected Translog openTranslog(
LongConsumer persistedSequenceNumberConsumer,
String translogUUID,
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
BooleanSupplier startedPrimarySupplier
) throws IOException {
return translogFactory.newTranslog(
translogConfig,
Expand All @@ -378,7 +378,7 @@ protected Translog openTranslog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
primaryModeSupplier
startedPrimarySupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Translog newTranslog(
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
BooleanSupplier startedPrimarySupplier
) throws IOException {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -73,7 +73,7 @@ public Translog newTranslog(
persistedSequenceNumberConsumer,
blobStoreRepository,
threadPool,
primaryModeSupplier,
startedPrimarySupplier,
remoteTranslogTransferTracker
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class RemoteFsTranslog extends Translog {
private final Logger logger;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
private final BooleanSupplier startedPrimarySupplier;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;

Expand All @@ -79,8 +79,8 @@ public class RemoteFsTranslog extends Translog {
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);

// These permits exist to allow any inflight background triggered upload.
private static final int SYNC_PERMITS = 1;
private final Semaphore syncPermits = new Semaphore(SYNC_PERMITS);
private static final int SYNC_PERMIT = 1;
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);

public RemoteFsTranslog(
TranslogConfig config,
Expand All @@ -91,12 +91,12 @@ public RemoteFsTranslog(
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier primaryModeSupplier,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.primaryModeSupplier = primaryModeSupplier;
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
this.translogTransferManager = buildTranslogTransferManager(
Expand Down Expand Up @@ -325,14 +325,15 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
// During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having
// ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync`
// action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here -
// 1. Using primaryModeSupplier, we prevent the new primary to do pre-emptive syncs
// 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs
// 2. Using syncPermits, we prevent syncs at the desired time during primary relocation.
if (primaryModeSupplier.getAsBoolean() == false || syncPermits.tryAcquire(1) == false) {
logger.debug("skipped uploading translog for {} {} uploadPermits={}", primaryTerm, generation, syncPermits.availablePermits());
if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) {
logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits());
// NO-OP
return true;
}
logger.trace("uploading translog for {} {}", primaryTerm, generation);
induceDelayForTest();
try (
TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
primaryTerm,
Expand All @@ -348,11 +349,16 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
);
} finally {
syncPermits.release(1);
syncPermit.release(SYNC_PERMIT);
}

}

// Made available for testing only
void induceDelayForTest() {

}

// Visible for testing
public Set<String> allUploaded() {
return fileTransferTracker.allUploaded();
Expand Down Expand Up @@ -434,12 +440,12 @@ protected void setMinSeqNoToKeep(long seqNo) {
@Override
protected Releasable drainSync() {
try {
if (syncPermits.tryAcquire(SYNC_PERMITS, 1, TimeUnit.MINUTES)) {
logger.info("All permits acquired");
if (syncPermit.tryAcquire(SYNC_PERMIT, 1, TimeUnit.MINUTES)) {
logger.info("All inflight remote translog syncs finished and further syncs paused");
return Releasables.releaseOnce(() -> {
syncPermits.release(SYNC_PERMITS);
assert syncPermits.availablePermits() == SYNC_PERMITS : "Available permits is " + syncPermits.availablePermits();
logger.info("All permits released");
syncPermit.release(SYNC_PERMIT);
assert syncPermit.availablePermits() == SYNC_PERMIT : "Available permits is " + syncPermit.availablePermits();
logger.info("Resumed remote translog sync back on relocation failure");
});
} else {
throw new TimeoutException("Timeout while acquiring all permits");
Expand All @@ -456,7 +462,7 @@ public void trimUnreferencedReaders() throws IOException {

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (syncPermits.availablePermits() == 0) {
if (syncPermit.availablePermits() != SYNC_PERMIT) {
return;
}

Expand Down Expand Up @@ -598,4 +604,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
}

// Visible for testing
int availablePermits() {
return syncPermit.availablePermits();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ Translog newTranslog(
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer,
final BooleanSupplier primaryModeSupplier
final BooleanSupplier startedPrimarySupplier
) throws IOException;
}
Loading

0 comments on commit 954a43d

Please sign in to comment.