From be750ea00eeb081f7272c06e7b93601b8b5744be Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 27 Nov 2023 12:38:09 +0530 Subject: [PATCH] Address comment - Move upload check in prepareAndUpload Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShard.java | 1 - .../index/translog/RemoteFsTranslog.java | 22 +++++++++---------- .../index/translog/RemoteFsTranslogTests.java | 8 +++---- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5a9087c685fdb..1979005d9729a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -904,7 +904,6 @@ public void relocated( failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); } catch (Exception ex) { - logger.warn("exception occurred during relocation hand-off to complete errorMsg={}", ex.getMessage()); assert replicationTracker.isPrimaryMode(); // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index effbc4b9d7fa4..7b969a37e4aa6 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -275,6 +275,16 @@ public void rollGeneration() throws IOException { } private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { + // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having + // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` + // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - + // 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs + // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. + if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) { + logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits()); + // NO-OP + return false; + } long maxSeqNo = -1; try (Releasable ignored = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { @@ -324,16 +334,6 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException { - // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having - // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` - // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - - // 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs - // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. - if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) { - logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits()); - // NO-OP - return true; - } logger.trace("uploading translog for {} {}", primaryTerm, generation); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( @@ -462,7 +462,7 @@ public void trimUnreferencedReaders() throws IOException { // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (pauseSync.get()) { + if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { return; } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 026749e7903dc..6bfab278993ed 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -879,21 +879,21 @@ public void testDrainSync() throws Exception { ops, new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 }) ); - assertEquals(2, translog.readers.size()); + assertEquals(1, translog.readers.size()); assertEquals(6, translog.allUploaded().size()); assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); // Refill the permits back Releasables.close(releasable); addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 })); - assertEquals(3, translog.readers.size()); - assertEquals(10, translog.allUploaded().size()); + assertEquals(2, translog.readers.size()); + assertEquals(8, translog.allUploaded().size()); assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); translog.setMinSeqNoToKeep(3); translog.trimUnreferencedReaders(); assertEquals(1, translog.readers.size()); - assertBusy(() -> assertEquals(6, translog.allUploaded().size())); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); }