From e8949fa48784ddbb5d1db5fda4169a2eacac59c8 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Mon, 23 Oct 2023 14:46:26 +0530 Subject: [PATCH] [Remote Store] Add extra buffer before deleting older generations of translog (#10817) --------- Signed-off-by: Gaurav Bafna Signed-off-by: Shivansh Arora --- .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 19 ++ .../index/translog/RemoteFsTranslog.java | 2 +- .../index/translog/RemoteFsTranslogTests.java | 181 +++++++++++++----- 4 files changed, 154 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 83bf8c82ee3dd..62e8faf33e1fa 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for remote translog IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, + IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, // Settings for remote store enablement IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 99d2b5a74c406..00e765d73f77f 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -668,6 +668,14 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + public static final Setting INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING = Setting.intSetting( + "index.remote_store.translog.keep_extra_gen", + 100, + 0, + Property.Dynamic, + Property.IndexScope + ); + private final Index index; private final Version version; private final Logger logger; @@ -680,6 +688,7 @@ public static IndexMergePolicy fromString(String text) { private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; + private int remoteTranslogKeepExtraGen; private Version extendedCompatibilitySnapshotVersion; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock @@ -850,6 +859,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY); + this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { @@ -1021,6 +1031,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setRemoteTranslogUploadBufferInterval ); + scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1300,6 +1311,10 @@ public TimeValue getRemoteTranslogUploadBufferInterval() { return remoteTranslogUploadBufferInterval; } + public int getRemoteTranslogExtraKeep() { + return remoteTranslogKeepExtraGen; + } + /** * Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set. */ @@ -1311,6 +1326,10 @@ public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUpload this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval; } + public void setRemoteTranslogKeepExtraGen(int extraGen) { + this.remoteTranslogKeepExtraGen = extraGen; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index db85a37b556fc..a305a774f5854 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -431,7 +431,7 @@ public void trimUnreferencedReaders() throws IOException { // cleans up remote translog files not referenced in latest uploaded metadata. // This enables us to restore translog from the metadata in case of failover or relocation. Set generationsToDelete = new HashSet<>(); - for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { + for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) { if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { break; } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 42e0df2dc90c1..3cb65610fab58 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -97,6 +97,7 @@ import java.util.zip.CheckedInputStream; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -124,6 +125,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; + + AtomicInteger writeCalls = new AtomicInteger(); BlobStoreRepository repository; BlobStoreTransferService blobStoreTransferService; @@ -163,13 +166,13 @@ public void tearDown() throws Exception { private RemoteFsTranslog create(Path path) throws IOException { final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - return create(path, createRepository(), translogUUID); + return create(path, createRepository(), translogUUID, 0); } - private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID, int extraGenToKeep) throws IOException { this.repository = repository; globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogConfig translogConfig = getTranslogConfig(path, extraGenToKeep); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); @@ -185,10 +188,17 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin primaryMode::get, new RemoteTranslogTransferTracker(shardId, 10) ); + } + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + return create(path, repository, translogUUID, 0); } private TranslogConfig getTranslogConfig(final Path path) { + return getTranslogConfig(path, 0); + } + + private TranslogConfig getTranslogConfig(final Path path, int gensToKeep) { final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) // only randomize between nog age retention and a long one, so failures will have a chance of reproducing @@ -196,6 +206,7 @@ private TranslogConfig getTranslogConfig(final Path path) { .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b") .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), gensToKeep) .build(); return getTranslogConfig(path, settings); } @@ -372,6 +383,111 @@ public void testSimpleOperations() throws IOException { } + private TranslogConfig getConfig(int gensToKeep) { + Path tempDir = createTempDir(); + final TranslogConfig temp = getTranslogConfig(tempDir, gensToKeep); + final TranslogConfig config = new TranslogConfig( + temp.getShardId(), + temp.getTranslogPath(), + temp.getIndexSettings(), + temp.getBigArrays(), + new ByteSizeValue(1, ByteSizeUnit.KB), + "" + ); + return config; + } + + private ChannelFactory getChannelFactory() { + writeCalls = new AtomicInteger(); + final ChannelFactory channelFactory = (file, openOption) -> { + FileChannel delegate = FileChannel.open(file, openOption); + boolean success = false; + try { + // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); + + final FileChannel channel; + if (isCkpFile) { + channel = delegate; + } else { + channel = new FilterFileChannel(delegate) { + + @Override + public int write(ByteBuffer src) throws IOException { + writeCalls.incrementAndGet(); + return super.write(src); + } + }; + } + success = true; + return channel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(delegate); + } + } + }; + return channelFactory; + } + + public void testExtraGenToKeep() throws Exception { + TranslogConfig config = getConfig(1); + ChannelFactory channelFactory = getChannelFactory(); + final Set persistedSeqNos = new HashSet<>(); + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + channelFactory, + primaryTerm.get() + ); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); + ArrayList ops = new ArrayList<>(); + try ( + RemoteFsTranslog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + persistedSeqNos::add, + repository, + threadPool, + () -> Boolean.TRUE, + new RemoteTranslogTransferTracker(shardId, 10) + ) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + } + ) { + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 })); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 })); + + // expose the new checkpoint (simulating a commit), before we trim the translog + translog.setMinSeqNoToKeep(2); + + // Trims from local + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 3, primaryTerm.get(), new byte[] { 1 })); + + // Trims from remote now + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + + } + } + public void testReadLocation() throws IOException { ArrayList ops = new ArrayList<>(); ArrayList locs = new ArrayList<>(); @@ -619,14 +735,22 @@ public void testSimpleOperationsUpload() throws Exception { // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); assertEquals(2, translog.stats().estimatedNumberOfOperations()); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); translog.setMinSeqNoToKeep(2); - - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations()); - assertBusy(() -> assertEquals(4, translog.allUploaded().size())); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + } public void testMetadataFileDeletion() throws Exception { @@ -1273,49 +1397,10 @@ public void testTranslogWriter() throws IOException { } public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { - Path tempDir = createTempDir(); - final TranslogConfig temp = getTranslogConfig(tempDir); - final TranslogConfig config = new TranslogConfig( - temp.getShardId(), - temp.getTranslogPath(), - temp.getIndexSettings(), - temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB), - "" - ); - + final TranslogConfig config = getConfig(1); final Set persistedSeqNos = new HashSet<>(); - final AtomicInteger writeCalls = new AtomicInteger(); - - final ChannelFactory channelFactory = (file, openOption) -> { - FileChannel delegate = FileChannel.open(file, openOption); - boolean success = false; - try { - // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation - final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); - - final FileChannel channel; - if (isCkpFile) { - channel = delegate; - } else { - channel = new FilterFileChannel(delegate) { - - @Override - public int write(ByteBuffer src) throws IOException { - writeCalls.incrementAndGet(); - return super.write(src); - } - }; - } - success = true; - return channel; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(delegate); - } - } - }; - + writeCalls = new AtomicInteger(); + final ChannelFactory channelFactory = getChannelFactory(); String translogUUID = Translog.createEmptyTranslog( config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED,