Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add extra buffer before deleting older generations of translog #10817

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {

// Settings for remote translog
IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING,

// Settings for remote store enablement
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,14 @@
Property.IndexScope
);

public static final Setting<Integer> INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING = Setting.intSetting(
"index.remote_store.translog.keep_extra_gen",
100,
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
0,
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand All @@ -680,6 +688,7 @@
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private int remoteTranslogKeepExtraGen;
private Version extendedCompatibilitySnapshotVersion;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -850,6 +859,7 @@
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
Expand Down Expand Up @@ -1021,6 +1031,7 @@
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
this::setRemoteTranslogUploadBufferInterval
);
scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1300,6 +1311,10 @@
return remoteTranslogUploadBufferInterval;
}

public int getRemoteTranslogExtraKeep() {
return remoteTranslogKeepExtraGen;
}

/**
* Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set.
*/
Expand All @@ -1311,6 +1326,10 @@
this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval;
}

public void setRemoteTranslogKeepExtraGen(int extraGen) {
this.remoteTranslogKeepExtraGen = extraGen;
}

Check warning on line 1331 in server/src/main/java/org/opensearch/index/IndexSettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexSettings.java#L1330-L1331

Added lines #L1330 - L1331 were not covered by tests

/**
* Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ public void trimUnreferencedReaders() throws IOException {
// cleans up remote translog files not referenced in latest uploaded metadata.
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) {
for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) {
if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.zip.CheckedInputStream;

import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
Expand Down Expand Up @@ -124,6 +125,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase {
private ThreadPool threadPool;
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

AtomicInteger writeCalls = new AtomicInteger();
BlobStoreRepository repository;

BlobStoreTransferService blobStoreTransferService;
Expand Down Expand Up @@ -163,13 +166,13 @@ public void tearDown() throws Exception {

private RemoteFsTranslog create(Path path) throws IOException {
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
return create(path, createRepository(), translogUUID);
return create(path, createRepository(), translogUUID, 0);
}

private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException {
private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID, int extraGenToKeep) throws IOException {
this.repository = repository;
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogConfig translogConfig = getTranslogConfig(path, extraGenToKeep);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
threadPool = new TestThreadPool(getClass().getName());
blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool);
Expand All @@ -185,17 +188,25 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10)
);
}

private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException {
return create(path, repository, translogUUID, 0);
}

private TranslogConfig getTranslogConfig(final Path path) {
return getTranslogConfig(path, 0);
}

private TranslogConfig getTranslogConfig(final Path path, int gensToKeep) {
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
// only randomize between nog age retention and a long one, so failures will have a chance of reproducing
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), gensToKeep)
.build();
return getTranslogConfig(path, settings);
}
Expand Down Expand Up @@ -372,6 +383,111 @@ public void testSimpleOperations() throws IOException {

}

private TranslogConfig getConfig(int gensToKeep) {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir, gensToKeep);
final TranslogConfig config = new TranslogConfig(
temp.getShardId(),
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
);
return config;
}

private ChannelFactory getChannelFactory() {
writeCalls = new AtomicInteger();
final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel delegate = FileChannel.open(file, openOption);
boolean success = false;
try {
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");

final FileChannel channel;
if (isCkpFile) {
channel = delegate;
} else {
channel = new FilterFileChannel(delegate) {

@Override
public int write(ByteBuffer src) throws IOException {
writeCalls.incrementAndGet();
return super.write(src);
}
};
}
success = true;
return channel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(delegate);
}
}
};
return channelFactory;
}

public void testExtraGenToKeep() throws Exception {
TranslogConfig config = getConfig(1);
ChannelFactory channelFactory = getChannelFactory();
final Set<Long> persistedSeqNos = new HashSet<>();
String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
channelFactory,
primaryTerm.get()
);
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings());
ArrayList<Translog.Operation> ops = new ArrayList<>();
try (
RemoteFsTranslog translog = new RemoteFsTranslog(
config,
translogUUID,
deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
persistedSeqNos::add,
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
) {
@Override
ChannelFactory getChannelFactory() {
return channelFactory;
}
}
) {
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 }));

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 }));

// expose the new checkpoint (simulating a commit), before we trim the translog
translog.setMinSeqNoToKeep(2);

// Trims from local
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 3, primaryTerm.get(), new byte[] { 1 }));

// Trims from remote now
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(
6,
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);

}
}

public void testReadLocation() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
ArrayList<Translog.Location> locs = new ArrayList<>();
Expand Down Expand Up @@ -619,14 +735,22 @@ public void testSimpleOperationsUpload() throws Exception {
// this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
assertEquals(2, translog.stats().estimatedNumberOfOperations());
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

translog.setMinSeqNoToKeep(2);

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
// this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

}

public void testMetadataFileDeletion() throws Exception {
Expand Down Expand Up @@ -1273,49 +1397,10 @@ public void testTranslogWriter() throws IOException {
}

public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir);
final TranslogConfig config = new TranslogConfig(
temp.getShardId(),
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
);

final TranslogConfig config = getConfig(1);
final Set<Long> persistedSeqNos = new HashSet<>();
final AtomicInteger writeCalls = new AtomicInteger();

final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel delegate = FileChannel.open(file, openOption);
boolean success = false;
try {
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");

final FileChannel channel;
if (isCkpFile) {
channel = delegate;
} else {
channel = new FilterFileChannel(delegate) {

@Override
public int write(ByteBuffer src) throws IOException {
writeCalls.incrementAndGet();
return super.write(src);
}
};
}
success = true;
return channel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(delegate);
}
}
};

writeCalls = new AtomicInteger();
final ChannelFactory channelFactory = getChannelFactory();
String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down
Loading