Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication][Remote Store] Remove commits when remote store is enabled #8050

Merged
merged 5 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

Expand Down Expand Up @@ -113,6 +114,7 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -163,7 +165,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
if (shouldCommit) {
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
}
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}
Expand Down Expand Up @@ -383,15 +387,21 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
// if remote store is enabled, all segments durably persisted
if (shouldCommit) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
} else {
store.directory().sync(List.of(store.directory().listAll()));
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
store.directory().syncMetaData();
}
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down
47 changes: 25 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4643,31 +4643,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
// Replicas never need a local commit
if (shouldCommit) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
if (this.shardRouting.primary()) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
// latest commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public class NRTReplicationEngineTests extends EngineTestCase {
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
);

private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
.build()
);

public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
Expand Down Expand Up @@ -132,6 +140,29 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
}
}

public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS)
) {
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration());

// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());

// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException {
// if the replica is already at segments_N that is received, it will commit segments_N+1.
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down Expand Up @@ -315,18 +346,11 @@ public void testCommitSegmentInfos() throws Exception {
}
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings)
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
Lucene.cleanLuceneIndex(store.directory());
final Path translogDir = createTempDir();
final EngineConfig replicaConfig = config(
defaultSettings,
store,
translogDir,
NoMergePolicy.INSTANCE,
null,
null,
globalCheckpoint::get
);
final EngineConfig replicaConfig = config(settings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
if (Lucene.indexExists(store.directory()) == false) {
store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(
Expand All @@ -339,4 +363,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint,
}
return new NRTReplicationEngine(replicaConfig);
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings);
}
}