From 86ed7cb3947bd3e3834c035fd4756aeb92858220 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 26 Mar 2024 12:47:57 +0530 Subject: [PATCH] WIP Signed-off-by: Bhumika Saini --- .../RemotePrimaryLocalRecoveryIT.java | 106 ++++++++++++++++++ .../opensearch/common/blobstore/BlobPath.java | 2 + .../index/remote/RemoteStoreDataEnums.java | 4 + .../index/remote/RemoteStorePathType.java | 2 + .../opensearch/index/shard/IndexShard.java | 34 +++--- ...emoteBlobStoreInternalTranslogFactory.java | 9 +- .../index/translog/RemoteFsTranslog.java | 64 ++++++----- .../opensearch/index/translog/Translog.java | 2 + .../index/translog/TranslogWriter.java | 35 +++--- .../opensearch/indices/IndicesService.java | 6 +- .../opensearch/index/IndexModuleTests.java | 3 +- .../index/translog/RemoteFsTranslogTests.java | 12 +- .../translog/TranslogDeletionPolicyTests.java | 1 + .../index/shard/IndexShardTestCase.java | 3 +- 14 files changed, 217 insertions(+), 66 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java new file mode 100644 index 0000000000000..76a60df57cd9a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase { + public void testLocalRecoveryRollingRestart() throws Exception { + String docRepNode = internalCluster().startNode(); + Client client = internalCluster().client(docRepNode); + + // create shard with 0 replica and 1 shard + client().admin().indices().prepareCreate("idx1").setSettings(indexSettings()).setMapping("field", "type=text").get(); + ensureGreen("idx1"); + + AtomicInteger numAutoGenDocs = new AtomicInteger(); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); + refresh("idx1"); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // add remote node in mixed mode cluster + addRemote = true; + String remoteNode = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // rolling restart + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback()); + ensureStableCluster(2); + ensureGreen("idx1"); + assertEquals(internalCluster().size(), 2); + + // Index some more docs + int currentDoc = numAutoGenDocs.get(); + int finalCurrentDoc = currentDoc; + waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5); + + logger.info("--> relocating from {} to {} ", docRepNode, remoteNode); + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("idx1", 0, docRepNode, remoteNode)).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(60)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + assertEquals(0, clusterHealthResponse.getRelocatingShards()); + assertEquals(remoteNode, primaryNodeName("idx1")); + + OpenSearchAssertions.assertHitCount(client().prepareSearch("idx1").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("idx1").setTrackTotalHits(true).setQuery(QueryBuilders.termQuery("auto", true)).get(), + numAutoGenDocs.get() + ); + } + + private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 10_000) { + IndexResponse indexResponse = client().prepareIndex("idx1").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("idx1", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("idx1").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + } + }); + indexingThread.start(); + return indexingThread; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java index c54536e9c46e2..e1baace5105a3 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java @@ -33,6 +33,7 @@ package org.opensearch.common.blobstore; import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.PublicApi; import java.util.ArrayList; import java.util.Collections; @@ -44,6 +45,7 @@ * * @opensearch.internal */ +@PublicApi(since = "2.14.0") public class BlobPath implements Iterable { private static final String SEPARATOR = "/"; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreDataEnums.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreDataEnums.java index 1afd73bf1f1b3..a47049775db40 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreDataEnums.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreDataEnums.java @@ -8,6 +8,8 @@ package org.opensearch.index.remote; +import org.opensearch.common.annotation.PublicApi; + import java.util.Set; import static org.opensearch.index.remote.RemoteStoreDataEnums.DataType.DATA; @@ -23,6 +25,7 @@ public class RemoteStoreDataEnums { /** * Categories of the data in Remote store. */ + @PublicApi(since = "2.14.0") public enum DataCategory { SEGMENTS("segments", Set.of(DataType.values())), TRANSLOG("translog", Set.of(DATA, METADATA)); @@ -47,6 +50,7 @@ public String getName() { /** * Types of data in remote store. */ + @PublicApi(since = "2.14.0") public enum DataType { DATA("data"), METADATA("metadata"), diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java index d7d7a8cdfd644..742d7b501f227 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java @@ -8,6 +8,7 @@ package org.opensearch.index.remote; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.index.remote.RemoteStoreDataEnums.DataCategory; import org.opensearch.index.remote.RemoteStoreDataEnums.DataType; @@ -20,6 +21,7 @@ * * @opensearch.internal */ +@PublicApi(since = "2.14.0") public enum RemoteStorePathType { FIXED { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1d7aa6ac4958b..9a753e9f53a89 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -153,11 +153,18 @@ import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; +import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED; +import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED; +import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_NON_MIGRATING; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.RemoteSegmentStoreDirectory; @@ -171,6 +178,8 @@ import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.RemoteTranslogStats; import org.opensearch.index.translog.Translog; +import static org.opensearch.index.translog.Translog.Durability; +import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogRecoveryRunner; @@ -230,16 +239,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; -import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; -import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED; -import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED; -import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_NON_MIGRATING; -import static org.opensearch.index.translog.Translog.Durability; -import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; - /** * An OpenSearch index shard * @@ -2522,7 +2521,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b syncSegmentsFromRemoteSegmentStore(false); } if (shardRouting.primary()) { - if (syncFromRemote) { + if (syncFromRemote || this.isRemoteSeeded()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); } else { // we will enter this block when we do not want to recover from remote translog. @@ -3670,7 +3669,7 @@ public void startRecovery( RepositoriesService repositoriesService, Consumer mappingUpdateConsumer, IndicesService indicesService - ) { + ) throws IOException { // TODO: Create a proper object to encapsulate the recovery context // all of the current methods here follow a pattern of: // resolve context which isn't really dependent on the local shards and then async @@ -3692,7 +3691,16 @@ public void startRecovery( switch (recoveryState.getRecoverySource().getType()) { case EMPTY_STORE: case EXISTING_STORE: - executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); + if (shouldSeedRemoteStore() && routingEntry().primary()) { + deleteRemoteStoreContents(); + // This cleans up remote translog's 0 generation, as we don't want to get that uploaded + sync(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { refresh("local recovery during remote store migration"); }); + waitForRemoteStoreSync(); + logger.info("Remote Store is now seeded for {} after local recovery", shardId()); + } else { + executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); + } break; case REMOTE_STORE: executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l)); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index e100ffaabf13d..f6d74eee76cd2 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -34,11 +34,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; + private final boolean shouldSeedRemote; + public RemoteBlobStoreInternalTranslogFactory( Supplier repositoriesServiceSupplier, ThreadPool threadPool, String repositoryName, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + boolean shouldSeedRemote ) { Repository repository; try { @@ -49,6 +52,7 @@ public RemoteBlobStoreInternalTranslogFactory( this.repository = repository; this.threadPool = threadPool; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.shouldSeedRemote = shouldSeedRemote; } @Override @@ -74,7 +78,8 @@ public Translog newTranslog( blobStoreRepository, threadPool, startedPrimarySupplier, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + shouldSeedRemote ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f0fb03cc905a4..ad1126b6451ff 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -90,6 +90,8 @@ public class RemoteFsTranslog extends Translog { private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); private final AtomicBoolean pauseSync = new AtomicBoolean(false); + private final boolean shouldSeedRemote; + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -100,7 +102,8 @@ public RemoteFsTranslog( BlobStoreRepository blobStoreRepository, ThreadPool threadPool, BooleanSupplier startedPrimarySupplier, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + boolean shouldSeedRemote ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); @@ -115,33 +118,38 @@ public RemoteFsTranslog( remoteTranslogTransferTracker, indexSettings().getRemoteStorePathType() ); + this.shouldSeedRemote = shouldSeedRemote; try { - download(translogTransferManager, location, logger); - Checkpoint checkpoint = readCheckpoint(location); - logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo); - this.readers.addAll(recoverFromFiles(checkpoint)); - if (readers.isEmpty()) { - String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId); - logger.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - boolean success = false; - current = null; - try { - current = createWriter( - checkpoint.generation + 1, - getMinFileGeneration(), - checkpoint.globalCheckpoint, - persistedSequenceNumberConsumer - ); - success = true; - } finally { - // we have to close all the recovered ones otherwise we leak file handles here - // for instance if we have a lot of tlog and we can't create the writer we keep - // on holding - // on to all the uncommitted tlog files if we don't close - if (success == false) { - IOUtils.closeWhileHandlingException(readers); + if (shouldSeedRemote) { + sync(); + } else { + download(translogTransferManager, location, logger); + Checkpoint checkpoint = readCheckpoint(location); + logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo); + this.readers.addAll(recoverFromFiles(checkpoint)); + if (readers.isEmpty()) { + String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId); + logger.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + boolean success = false; + current = null; + try { + current = createWriter( + checkpoint.generation + 1, + getMinFileGeneration(), + checkpoint.globalCheckpoint, + persistedSequenceNumberConsumer + ); + success = true; + } finally { + // we have to close all the recovered ones otherwise we leak file handles here + // for instance if we have a lot of tlog and we can't create the writer we keep + // on holding + // on to all the uncommitted tlog files if we don't close + if (success == false) { + IOUtils.closeWhileHandlingException(readers); + } } } } catch (Exception e) { @@ -386,7 +394,7 @@ private boolean syncToDisk() throws IOException { @Override public void sync() throws IOException { - if (syncToDisk() || syncNeeded()) { + if (syncToDisk() || syncNeeded() || shouldSeedRemote) { prepareAndUpload(primaryTermSupplier.getAsLong(), null); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index e78300e368099..0fca903d177e2 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -525,6 +525,7 @@ TranslogWriter createWriter( tragedy, persistedSequenceNumberConsumer, bigArrays, + indexSettings.isRemoteTranslogStoreEnabled(), indexSettings.isRemoteNode() ); } catch (final IOException e) { @@ -2043,6 +2044,7 @@ public static String createEmptyTranslog( throw new UnsupportedOperationException(); }, BigArrays.NON_RECYCLING_INSTANCE, + null, null ); writer.close(); diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index 86f7567f3333d..f9c6d1ac0cc88 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -119,6 +119,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final Boolean remoteTranslogEnabled; + private final Boolean remoteNode; + private TranslogWriter( final ShardId shardId, final Checkpoint initialCheckpoint, @@ -134,7 +136,8 @@ private TranslogWriter( final LongConsumer persistedSequenceNumberConsumer, final BigArrays bigArrays, TranslogCheckedContainer translogCheckedContainer, - Boolean remoteTranslogEnabled + Boolean remoteTranslogEnabled, + Boolean remoteNode ) throws IOException { super(initialCheckpoint.generation, channel, path, header); assert initialCheckpoint.offset == channel.position() : "initial checkpoint offset [" @@ -161,6 +164,7 @@ private TranslogWriter( this.tragedy = tragedy; this.translogCheckedContainer = translogCheckedContainer; this.remoteTranslogEnabled = remoteTranslogEnabled; + this.remoteNode = remoteNode; } public static TranslogWriter create( @@ -178,7 +182,8 @@ public static TranslogWriter create( TragicExceptionHolder tragedy, final LongConsumer persistedSequenceNumberConsumer, final BigArrays bigArrays, - Boolean remoteTranslogEnabled + Boolean remoteTranslogEnabled, + Boolean remoteNode ) throws IOException { final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME); @@ -187,9 +192,10 @@ public static TranslogWriter create( try { checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE); final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); - header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled)); + boolean fsync = !Boolean.TRUE.equals(remoteTranslogEnabled) && !Boolean.TRUE.equals(remoteNode); + header.write(channel, fsync); TranslogCheckedContainer translogCheckedContainer = null; - if (Boolean.TRUE.equals(remoteTranslogEnabled)) { + if (Boolean.TRUE.equals(remoteTranslogEnabled) || Boolean.TRUE.equals(remoteNode)) { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); header.write(byteArrayOutputStream); translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); @@ -200,16 +206,13 @@ public static TranslogWriter create( initialGlobalCheckpoint, initialMinTranslogGen ); - writeCheckpoint(checkpointChannel, checkpointFile, checkpoint, remoteTranslogEnabled); + writeCheckpoint(checkpointChannel, checkpointFile, checkpoint, fsync); final LongSupplier writerGlobalCheckpointSupplier; if (Assertions.ENABLED) { writerGlobalCheckpointSupplier = () -> { long gcp = globalCheckpointSupplier.getAsLong(); - assert gcp >= initialGlobalCheckpoint || (remoteTranslogEnabled == Boolean.TRUE) : "global checkpoint [" - + gcp - + "] lower than initial gcp [" - + initialGlobalCheckpoint - + "]"; + assert gcp >= initialGlobalCheckpoint || (remoteTranslogEnabled == Boolean.TRUE || remoteNode == Boolean.TRUE) + : "global checkpoint [" + gcp + "] lower than initial gcp [" + initialGlobalCheckpoint + "]"; return gcp; }; } else { @@ -230,7 +233,8 @@ public static TranslogWriter create( persistedSequenceNumberConsumer, bigArrays, translogCheckedContainer, - remoteTranslogEnabled + remoteTranslogEnabled, + remoteNode ); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that @@ -535,10 +539,11 @@ final boolean syncUpTo(long offset) throws IOException { // now do the actual fsync outside of the synchronized block such that // we can continue writing to the buffer etc. try { - if (!Boolean.TRUE.equals(remoteTranslogEnabled)) { + if (!Boolean.TRUE.equals(remoteTranslogEnabled) && !Boolean.TRUE.equals(remoteNode)) { channel.force(false); } - writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync, remoteTranslogEnabled); + boolean fsync = !Boolean.TRUE.equals(remoteTranslogEnabled) && !Boolean.TRUE.equals(remoteNode); + writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync, fsync); } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; @@ -639,9 +644,9 @@ private static void writeCheckpoint( final FileChannel fileChannel, final Path checkpointFile, final Checkpoint checkpoint, - final Boolean remoteTranslogEnabled + final boolean fsync ) throws IOException { - Checkpoint.write(fileChannel, checkpointFile, checkpoint, !Boolean.TRUE.equals(remoteTranslogEnabled)); + Checkpoint.write(fileChannel, checkpointFile, checkpoint, fsync); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 9bc81c1826c2d..176c30dd65843 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -549,14 +549,16 @@ private static BiFunction getTrans repositoriesServiceSupplier, threadPool, indexSettings.getRemoteStoreTranslogRepository(), - remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()) + remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()), + false ); } else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) { return new RemoteBlobStoreInternalTranslogFactory( repositoriesServiceSupplier, threadPool, RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()), - remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()) + remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()), + true ); } return new InternalTranslogFactory(); diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 97bc822be7d51..eb19d0ce4781e 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -236,7 +236,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { repositoriesServiceReference::get, threadPool, indexSettings.getRemoteStoreTranslogRepository(), - new RemoteTranslogTransferTracker(shardRouting.shardId(), 10) + new RemoteTranslogTransferTracker(shardRouting.shardId(), 10), + false ); } return new InternalTranslogFactory(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 9f72d3c7bd825..bceb78cf610ea 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -188,7 +188,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin repository, threadPool, primaryMode::get, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + false ); } @@ -459,7 +460,8 @@ public void testExtraGenToKeep() throws Exception { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + false ) { @Override ChannelFactory getChannelFactory() { @@ -1508,7 +1510,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + false ) { @Override ChannelFactory getChannelFactory() { @@ -1616,7 +1619,8 @@ public void force(boolean metaData) throws IOException { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + false ) { @Override ChannelFactory getChannelFactory() { diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java index 91d51ffd105f0..0c0860d6fce9d 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java @@ -258,6 +258,7 @@ private Tuple, TranslogWriter> createReadersAndWriter(final new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE, + null, null ); writer = Mockito.spy(writer); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a2f9eb677c0ac..3840029dba1f9 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -678,7 +678,8 @@ protected IndexShard newShard( () -> mockRepoSvc, threadPool, settings.getRemoteStoreTranslogRepository(), - new RemoteTranslogTransferTracker(shardRouting.shardId(), 20) + new RemoteTranslogTransferTracker(shardRouting.shardId(), 20), + false ); } return new InternalTranslogFactory();