Skip to content

Commit

Permalink
[Remote Store] Add extra buffer before deleting older generations of …
Browse files Browse the repository at this point in the history
…translog (opensearch-project#10817)

---------

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Oct 23, 2023
1 parent 7453daa commit 218a2ef
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {

// Settings for remote translog
IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING,

// Settings for remote store enablement
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,14 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

public static final Setting<Integer> INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING = Setting.intSetting(
"index.remote_store.translog.keep_extra_gen",
100,
0,
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand All @@ -680,6 +688,7 @@ public static IndexMergePolicy fromString(String text) {
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private int remoteTranslogKeepExtraGen;
private Version extendedCompatibilitySnapshotVersion;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -850,6 +859,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
Expand Down Expand Up @@ -1021,6 +1031,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
this::setRemoteTranslogUploadBufferInterval
);
scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1300,6 +1311,10 @@ public TimeValue getRemoteTranslogUploadBufferInterval() {
return remoteTranslogUploadBufferInterval;
}

public int getRemoteTranslogExtraKeep() {
return remoteTranslogKeepExtraGen;
}

/**
* Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set.
*/
Expand All @@ -1311,6 +1326,10 @@ public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUpload
this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval;
}

public void setRemoteTranslogKeepExtraGen(int extraGen) {
this.remoteTranslogKeepExtraGen = extraGen;
}

/**
* Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public void trimUnreferencedReaders() throws IOException {
// cleans up remote translog files not referenced in latest uploaded metadata.
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) {
for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) {
if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.zip.CheckedInputStream;

import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
Expand Down Expand Up @@ -124,6 +125,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase {
private ThreadPool threadPool;
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

AtomicInteger writeCalls = new AtomicInteger();
BlobStoreRepository repository;

BlobStoreTransferService blobStoreTransferService;
Expand Down Expand Up @@ -163,13 +166,13 @@ public void tearDown() throws Exception {

private RemoteFsTranslog create(Path path) throws IOException {
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
return create(path, createRepository(), translogUUID);
return create(path, createRepository(), translogUUID, 0);
}

private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException {
private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID, int extraGenToKeep) throws IOException {
this.repository = repository;
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogConfig translogConfig = getTranslogConfig(path, extraGenToKeep);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
threadPool = new TestThreadPool(getClass().getName());
blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool);
Expand All @@ -185,17 +188,25 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10)
);
}

private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException {
return create(path, repository, translogUUID, 0);
}

private TranslogConfig getTranslogConfig(final Path path) {
return getTranslogConfig(path, 0);
}

private TranslogConfig getTranslogConfig(final Path path, int gensToKeep) {
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
// only randomize between nog age retention and a long one, so failures will have a chance of reproducing
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), gensToKeep)
.build();
return getTranslogConfig(path, settings);
}
Expand Down Expand Up @@ -372,6 +383,111 @@ public void testSimpleOperations() throws IOException {

}

private TranslogConfig getConfig(int gensToKeep) {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir, gensToKeep);
final TranslogConfig config = new TranslogConfig(
temp.getShardId(),
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
);
return config;
}

private ChannelFactory getChannelFactory() {
writeCalls = new AtomicInteger();
final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel delegate = FileChannel.open(file, openOption);
boolean success = false;
try {
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");

final FileChannel channel;
if (isCkpFile) {
channel = delegate;
} else {
channel = new FilterFileChannel(delegate) {

@Override
public int write(ByteBuffer src) throws IOException {
writeCalls.incrementAndGet();
return super.write(src);
}
};
}
success = true;
return channel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(delegate);
}
}
};
return channelFactory;
}

public void testExtraGenToKeep() throws Exception {
TranslogConfig config = getConfig(1);
ChannelFactory channelFactory = getChannelFactory();
final Set<Long> persistedSeqNos = new HashSet<>();
String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
channelFactory,
primaryTerm.get()
);
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings());
ArrayList<Translog.Operation> ops = new ArrayList<>();
try (
RemoteFsTranslog translog = new RemoteFsTranslog(
config,
translogUUID,
deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
persistedSeqNos::add,
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
) {
@Override
ChannelFactory getChannelFactory() {
return channelFactory;
}
}
) {
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 }));

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 }));

// expose the new checkpoint (simulating a commit), before we trim the translog
translog.setMinSeqNoToKeep(2);

// Trims from local
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 3, primaryTerm.get(), new byte[] { 1 }));

// Trims from remote now
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);

}
}

public void testReadLocation() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
ArrayList<Translog.Location> locs = new ArrayList<>();
Expand Down Expand Up @@ -619,14 +735,22 @@ public void testSimpleOperationsUpload() throws Exception {
// this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
assertEquals(2, translog.stats().estimatedNumberOfOperations());
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

translog.setMinSeqNoToKeep(2);

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
// this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

}

public void testMetadataFileDeletion() throws Exception {
Expand Down Expand Up @@ -1273,49 +1397,10 @@ public void testTranslogWriter() throws IOException {
}

public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir);
final TranslogConfig config = new TranslogConfig(
temp.getShardId(),
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
);

final TranslogConfig config = getConfig(1);
final Set<Long> persistedSeqNos = new HashSet<>();
final AtomicInteger writeCalls = new AtomicInteger();

final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel delegate = FileChannel.open(file, openOption);
boolean success = false;
try {
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");

final FileChannel channel;
if (isCkpFile) {
channel = delegate;
} else {
channel = new FilterFileChannel(delegate) {

@Override
public int write(ByteBuffer src) throws IOException {
writeCalls.incrementAndGet();
return super.write(src);
}
};
}
success = true;
return channel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(delegate);
}
}
};

writeCalls = new AtomicInteger();
final ChannelFactory channelFactory = getChannelFactory();
String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down

0 comments on commit 218a2ef

Please sign in to comment.