Skip to content

Commit

Permalink
Skip translog creation and Lucene commits when recovering searchable …
Browse files Browse the repository at this point in the history
…snapshot shards (elastic#118606)

In order to leverage Lucene N-2 version support for searchable snapshots, 
we'd like to avoid executing Lucene commits during searchable snapshots 
shards recovery. This is because Lucene commits require to open an 
IndexWriter, something that Lucene does not support for N-2 versions.

Today when searchable snapshot shards are recovering they create a 
translog on disk as well as a Lucene commit:
- the translog is created as an empty translog with a new UUID and an 
initial global checkpoint value that is the same as the LOCAL_CHECKPOINT_KEY 
stored in the last Lucene commit data from the snapshot.
- a Lucene commit is executed to associate the translog with the Lucene 
index by storing new translog UUID in the Lucene commit data.
- later during recovery, the replication tracker is initialized with a global 
checkpoint value equals to the LOCAL_CHECKPOINT_KEY stored in the 
Lucene commit.

We can skip the creation of the translog because searchable snapshot 
shard do not need one, and it's only use to store the local checkpoint 
locally to be read later during recovery. If we don't have a translog then 
we don't need to associate it with the Lucene index, so we can skip the 
commit too.

This change introduce an hasTranslog method that is used to know 
when it is safe to NOT create a translog, in which case the global 
checkpoint is read from the last Lucene commit during primary shard 
recovery from snapshot, peer-recovery and recovery from existing store.

In case an existing translog exist on disk, it will be cleaned up.

They are also few discoveries around some assertions introduced with 
snapshot based recoveries, as well as a cached estimation of the size 
of directories that was refreshed due to Lucene commit but now 
requires to be "marked as stale".
  • Loading branch information
tlrx authored Dec 16, 2024
1 parent cf7edbb commit b461baf
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
public NoOpEngine(EngineConfig config) {
this(
config,
config.isPromotableToPrimary() ? null : new TranslogStats(0, 0, 0, 0, 0),
config.isPromotableToPrimary() && config.getTranslogConfig().hasTranslog() ? null : new TranslogStats(0, 0, 0, 0, 0),
config.isPromotableToPrimary()
? null
: new SeqNoStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class ReadOnlyEngine extends Engine {
public ReadOnlyEngine(
EngineConfig config,
SeqNoStats seqNoStats,
TranslogStats translogStats,
@Nullable TranslogStats translogStats,
boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction,
boolean requireCompleteHistory,
Expand Down Expand Up @@ -251,6 +251,7 @@ private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos info
}

private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException {
assert config.getTranslogConfig().hasTranslog();
final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
Expand Down
43 changes: 36 additions & 7 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,27 @@ public void flush(FlushRequest request, ActionListener<Boolean> listener) {
});
}

/**
* @return true the shard has a translog.
*/
public boolean hasTranslog() {
return translogConfig.hasTranslog();
}

/**
* Reads the global checkpoint from the translog checkpoint file if the shard has a translog. Otherwise, reads the local checkpoint from
* the provided commit user data.
*
* @return the global checkpoint to use for recovery
* @throws IOException
*/
public long readGlobalCheckpointForRecovery(Map<String, String> commitUserData) throws IOException {
if (hasTranslog()) {
return Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), commitUserData.get(Translog.TRANSLOG_UUID_KEY));
}
return Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}

/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
Expand Down Expand Up @@ -1859,8 +1880,7 @@ public void recoverLocallyUpToGlobalCheckpoint(ActionListener<Long> recoveryStar
}
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
try {
final var translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final var globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final var globalCheckpoint = readGlobalCheckpointForRecovery(store.readLastCommittedSegmentsInfo().getUserData());
final var safeCommit = store.findSafeIndexCommit(globalCheckpoint);
ActionListener.run(recoveryStartingSeqNoListener.delegateResponse((l, e) -> {
logger.debug(() -> format("failed to recover shard locally up to global checkpoint %s", globalCheckpoint), e);
Expand Down Expand Up @@ -2084,8 +2104,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long globalCheckpoint = readGlobalCheckpointForRecovery(store.readLastCommittedSegmentsInfo().getUserData());
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
} else {
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckPointIfUnpromotable, "from CleanFilesRequest");
Expand Down Expand Up @@ -2162,7 +2181,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assert assertSequenceNumbersInCommit();
assert assertLastestCommitUserData();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
checkAndCallWaitForEngineOrClosedShardListeners();
}
Expand All @@ -2183,9 +2202,13 @@ private Engine createEngine(EngineConfig config) {
}
}

private boolean assertSequenceNumbersInCommit() throws IOException {
/**
* Asserts that the latest Lucene commit contains expected information about sequence numbers or ES version.
*/
private boolean assertLastestCommitUserData() throws IOException {
final SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(store.directory());
final Map<String, String> userData = segmentCommitInfos.getUserData();
// Ensure sequence numbers are present in commit data
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
Expand All @@ -2195,10 +2218,16 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
+ "] is different than engine ["
+ getHistoryUUID()
+ "]";

assert userData.containsKey(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)
: "opening index which was created post 5.5.0 but " + Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit";

// From 7.16.0, the ES version is included in the Lucene commit user data as well as in the snapshot metadata in the repository.
// This is used during primary failover to detect if the latest snapshot can be used to recover the new primary, because the failed
// primary may have created new segments in a more recent Lucene version, that may have been later snapshotted, meaning that the
// snapshotted files cannot be recovered on a node with a less recent Lucene version. Note that for versions <= 7.15 this assertion
// relies in the previous minor having a different lucene version.
final org.apache.lucene.util.Version commitLuceneVersion = segmentCommitInfos.getCommitLuceneVersion();
// This relies in the previous minor having another lucene version
assert commitLuceneVersion.onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION.luceneVersion()) == false
|| userData.containsKey(Engine.ES_VERSION)
&& Engine.readIndexVersion(userData.get(Engine.ES_VERSION)).onOrBefore(IndexVersion.current())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType;
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource();
restore(indexShard, repository, recoverySource, recoveryListener(indexShard, listener).map(ignored -> true));
recoverFromRepository(indexShard, repository, recoverySource, recoveryListener(indexShard, listener).map(ignored -> true));
} else {
listener.onResponse(false);
}
Expand Down Expand Up @@ -459,7 +459,7 @@ private void internalRecoverFromStore(IndexShard indexShard, ActionListener<Void
}
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
bootstrap(indexShard, store);
bootstrap(indexShard);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
Expand Down Expand Up @@ -523,12 +523,13 @@ private static void addRecoveredFileDetails(SegmentInfos si, Store store, Recove
/**
* Restores shard from {@link SnapshotRecoverySource} associated with this shard in routing table
*/
private void restore(
private void recoverFromRepository(
IndexShard indexShard,
Repository repository,
SnapshotRecoverySource restoreSource,
ActionListener<Void> outerListener
) {
assert indexShard.shardRouting.primary() : "only primary shards can recover from snapshot";
logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource());

record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
Expand All @@ -538,13 +539,13 @@ record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
.newForked(indexShard::preRecovery)

.<ShardAndIndexIds>andThen(shardAndIndexIdsListener -> {
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId);
}
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
translogState.totalOperations(0);
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
Expand Down Expand Up @@ -588,9 +589,7 @@ record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}

.<Void>andThen(l -> {
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
final Store store = indexShard.store();
bootstrap(indexShard, store);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
bootstrap(indexShard);
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog(l);
})
Expand All @@ -610,19 +609,37 @@ record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
}));
}

/**
* @deprecated use {@link #bootstrap(IndexShard)} instead
*/
@Deprecated(forRemoval = true)
public static void bootstrap(final IndexShard indexShard, final Store store) throws IOException {
if (indexShard.indexSettings.getIndexMetadata().isSearchableSnapshot() == false) {
// not bootstrapping new history for searchable snapshots (which are read-only) allows sequence-number based peer recoveries
assert indexShard.store() == store;
bootstrap(indexShard);
}

private static void bootstrap(final IndexShard indexShard) throws IOException {
assert indexShard.routingEntry().primary();
final var store = indexShard.store();
store.incRef();
try {
final var translogLocation = indexShard.shardPath().resolveTranslog();
if (indexShard.hasTranslog() == false) {
Translog.deleteAll(translogLocation);
return;
}
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
translogLocation,
localCheckpoint,
indexShard.shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
} finally {
store.decRef();
}
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
localCheckpoint,
indexShard.shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
Expand All @@ -19,7 +20,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;

final class ByteSizeCachingDirectory extends ByteSizeDirectory {
public final class ByteSizeCachingDirectory extends ByteSizeDirectory {

private static class SizeAndModCount {
final long size;
Expand Down Expand Up @@ -174,9 +175,29 @@ public void deleteFile(String name) throws IOException {
try {
super.deleteFile(name);
} finally {
synchronized (this) {
modCount++;
markEstimatedSizeAsStale();
}
}

/**
* Mark the cached size as stale so that it is guaranteed to be refreshed the next time.
*/
public void markEstimatedSizeAsStale() {
synchronized (this) {
modCount++;
}
}

public static ByteSizeCachingDirectory unwrapDirectory(Directory dir) {
while (dir != null) {
if (dir instanceof ByteSizeCachingDirectory) {
return (ByteSizeCachingDirectory) dir;
} else if (dir instanceof FilterDirectory) {
dir = ((FilterDirectory) dir).getDelegate();
} else {
dir = null;
}
}
return null;
}
}
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ public void bootstrapNewHistory() throws IOException {
* @see SequenceNumbers#MAX_SEQ_NO
*/
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
assert indexSettings.getIndexMetadata().isSearchableSnapshot() == false;
metadataLock.writeLock().lock();
try (IndexWriter writer = newTemporaryAppendingIndexWriter(directory, null)) {
final Map<String, String> map = new HashMap<>();
Expand Down Expand Up @@ -1572,6 +1573,7 @@ private IndexWriter newTemporaryEmptyIndexWriter(final Directory dir, final Vers
}

private IndexWriterConfig newTemporaryIndexWriterConfig() {
assert indexSettings.getIndexMetadata().isSearchableSnapshot() == false;
// this config is only used for temporary IndexWriter instances, used to initialize the index or update the commit data,
// so we don't want any merges to happen
var iwc = indexWriterConfigWithNoMerging(null).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD).setCommitOnClose(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ public Translog(
}
}

public static void deleteAll(Path translogLocation) throws IOException {
IOUtils.rm(translogLocation);
}

/** recover all translog files found on disk */
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
boolean success = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,13 @@ public OperationListener getOperationListener() {
public boolean fsync() {
return fsync;
}

/**
* @return {@code true} if the configuration allows the Translog files to exist, {@code false} otherwise. In the case there is no
* translog, the shard is not writeable.
*/
public boolean hasTranslog() {
// Expect no translog files to exist for searchable snapshots
return false == indexSettings.getIndexMetadata().isSearchableSnapshot();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.StoreRecovery;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -385,15 +383,8 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
if (indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
// for archives indices mounted as searchable snapshots, we need to call this
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
final Store store = indexShard.store();
store.incRef();
try {
StoreRecovery.bootstrap(indexShard, store);
} finally {
store.decRef();
}
}
indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l));
})
Expand Down Expand Up @@ -488,8 +479,8 @@ public static StartRecoveryRequest getStartRecoveryRequest(
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
// index.
try {
final String expectedTranslogUUID = metadataSnapshot.commitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
final long globalCheckpoint = recoveryTarget.indexShard()
.readGlobalCheckpointForRecovery(metadataSnapshot.commitUserData());
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logGlobalCheckpointWarning(logger, startingSeqNo, e);
Expand Down
Loading

0 comments on commit b461baf

Please sign in to comment.