Skip to content

Commit

Permalink
Downgrade write lock to read lock before translog upload to remote st…
Browse files Browse the repository at this point in the history
…ore (#10135) (#10187)

* Revert "Optimize read write lock constructs during translog upload to remote store (#9636)"

This reverts commit b5cc002.



* Downgrade writelock to readlock during translog upload



---------

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Sep 25, 2023
1 parent 3487e45 commit 59b8701
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -60,7 +59,6 @@ public class RemoteFsTranslog extends Translog {
private final BooleanSupplier primaryModeSupplier;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;
private final Object uploadMutex = new Object();

private volatile long minSeqNoToKeep;

Expand Down Expand Up @@ -239,19 +237,10 @@ public static TranslogTransferManager buildTranslogTransferManager(
@Override
public boolean ensureSynced(Location location) throws IOException {
try {
boolean shouldUpload = false;
try (ReleasableLock ignored = writeLock.acquire()) {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
if (prepareForUpload(location.generation) == false) {
return false;
}
shouldUpload = true;
}
}
if (shouldUpload) {
return performUpload(primaryTermSupplier.getAsLong(), location.generation);
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation);
}
} catch (final Exception ex) {
closeOnTragicEvent(ex);
Expand All @@ -266,12 +255,10 @@ public void rollGeneration() throws IOException {
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
return;
}
if (prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
}
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
}

private boolean prepareForUpload(Long generation) throws IOException {
private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
try (Releasable ignored = writeLock.acquire()) {
if (generation == null || generation == current.getGeneration()) {
try {
Expand All @@ -282,46 +269,36 @@ private boolean prepareForUpload(Long generation) throws IOException {
logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1);
current = createWriter(current.getGeneration() + 1);
}
assert writeLock.isHeldByCurrentThread() : "Write lock must be held before we acquire the read lock";
// Here we are downgrading the write lock by acquiring the read lock and releasing the write lock
// This ensures that other threads can still acquire the read locks while also protecting the
// readers and writer to not be mutated any further.
readLock.acquire();
} catch (final Exception e) {
tragedy.setTragicException(e);
closeOnTragicEvent(e);
throw e;
}
return true;
} else return generation >= current.getGeneration();
} else if (generation < current.getGeneration()) {
return false;
}
}
}

/**
* This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized
* is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the
* underlying translog readers are not deleted and the current writer is not converted to a reader at the time of
* upload.
*
* @param primaryTerm current primary term
* @param generation current generation
* @return true if upload is successful
* @throws IOException if the upload fails due to any underlying exceptions.
*/
private boolean performUpload(Long primaryTerm, Long generation) throws IOException {
synchronized (uploadMutex) {
try (Releasable ignored = readLock.acquire()) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
long generationToUpload;
if (generation == null) {
if (closed.get() == false) {
generationToUpload = current.getGeneration() - 1;
} else {
generationToUpload = current.getGeneration();
}
assert readLock.isHeldByCurrentThread() == true;
try (Releasable ignored = readLock; Releasable ignoredGenLock = deletionPolicy.acquireTranslogGen(getMinFileGeneration())) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
if (generation == null) {
if (closed.get() == false) {
return upload(primaryTerm, current.getGeneration() - 1);
} else {
generationToUpload = generation;
return upload(primaryTerm, current.getGeneration());
}
return upload(primaryTerm, generationToUpload);
} else {
return upload(primaryTerm, generation);
}
}
}
Expand All @@ -347,10 +324,9 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
Translog::getCommitCheckpointFileName
).build()
) {
Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration()));
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(transferReleasable, generation, primaryTerm)
new RemoteFsTranslogTransferListener(generation, primaryTerm)
);
}

Expand All @@ -373,8 +349,8 @@ private boolean syncToDisk() throws IOException {
@Override
public void sync() throws IOException {
try {
if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
if (syncToDisk() || syncNeeded()) {
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
}
} catch (final Exception e) {
tragedy.setTragicException(e);
Expand Down Expand Up @@ -529,16 +505,17 @@ protected void onDelete() {
translogTransferManager.delete();
}

// Visible for testing
boolean isRemoteGenerationDeletionPermitsAvailable() {
return remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS;
}

/**
* TranslogTransferListener implementation for RemoteFsTranslog
*
* @opensearch.internal
*/
private class RemoteFsTranslogTransferListener implements TranslogTransferListener {
/**
* Releasable instance for the translog
*/
private final Releasable transferReleasable;

/**
* Generation for the translog
Expand All @@ -550,8 +527,7 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
*/
private final Long primaryTerm;

RemoteFsTranslogTransferListener(Releasable transferReleasable, Long generation, Long primaryTerm) {
this.transferReleasable = transferReleasable;
RemoteFsTranslogTransferListener(Long generation, Long primaryTerm) {
this.generation = generation;
this.primaryTerm = primaryTerm;
}
Expand All @@ -571,10 +547,5 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
throw (RuntimeException) ex;
}
}

@Override
public void close() {
transferReleasable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration);

assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term";
assert this.generation == highestGeneration : " inconsistent generation ";
final long finalHighestGeneration = highestGeneration;
assert LongStream.iterate(lowestGeneration, i -> i + 1)
.limit(highestGeneration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded();
long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis();

try (translogTransferListener) {
try {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* @opensearch.internal
*/
public interface TranslogTransferListener extends AutoCloseable {
public interface TranslogTransferListener {
/**
* Invoked when the transfer of {@link TransferSnapshot} succeeds
* @param transferSnapshot the transfer snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ public void testSimpleOperationsUpload() throws Exception {

translog.setMinSeqNoToKeep(2);

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.trimUnreferencedReaders();
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) {
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
translogTransferFailed.incrementAndGet();
}

@Override
public void close() {}
}));
assertEquals(4, fileTransferSucceeded.get());
assertEquals(0, fileTransferFailed.get());
Expand Down

0 comments on commit 59b8701

Please sign in to comment.