Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Avoiding Translog Deletion in case of relocating shards. #9603

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ protected Settings featureFlagSettings() {
.build();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191")
public void testPrimaryRelocationWhileIndexing() throws Exception {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
super.testPrimaryRelocationWhileIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.index.codec.CodecSettings;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogConfig;
Expand All @@ -65,7 +66,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -105,7 +105,7 @@ public final class EngineConfig {
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final IndexShard.IndexShardConfigSupplier indexShardConfigSupplier;
private final Comparator<LeafReader> leafSorter;

/**
Expand Down Expand Up @@ -266,7 +266,7 @@ private EngineConfig(Builder builder) {
this.primaryTermSupplier = builder.primaryTermSupplier;
this.tombstoneDocSupplier = builder.tombstoneDocSupplier;
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.indexShardConfigSupplier = builder.indexShardConfigSupplier;
this.translogFactory = builder.translogFactory;
this.leafSorter = builder.leafSorter;
}
Expand Down Expand Up @@ -477,8 +477,8 @@ public boolean isReadOnlyReplica() {
* Returns the underlying primaryModeSupplier.
* @return the primary mode supplier.
*/
public BooleanSupplier getPrimaryModeSupplier() {
return primaryModeSupplier;
public IndexShard.IndexShardConfigSupplier getIndexShardConfigSupplier() {
return indexShardConfigSupplier;
}

/**
Expand Down Expand Up @@ -555,7 +555,7 @@ public static class Builder {
private TombstoneDocSupplier tombstoneDocSupplier;
private TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private IndexShard.IndexShardConfigSupplier indexShardConfigSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();
Comparator<LeafReader> leafSorter;

Expand Down Expand Up @@ -679,8 +679,8 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) {
return this;
}

public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) {
this.primaryModeSupplier = primaryModeSupplier;
public Builder indexShardConfig(IndexShard.IndexShardConfigSupplier indexShardConfigSupplier) {
this.indexShardConfigSupplier = indexShardConfigSupplier;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
Expand All @@ -40,7 +41,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -152,7 +152,7 @@ public EngineConfig newEngineConfig(
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier,
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter
) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public EngineConfig newEngineConfig(
.primaryTermSupplier(primaryTermSupplier)
.tombstoneDocSupplier(tombstoneDocSupplier)
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.indexShardConfig(indexShardConfigSupplier)
.translogFactory(translogFactory)
.leafSorter(leafSorter)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void onFailure(String reason, Exception ex) {
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen,
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
engineConfig.getIndexShardConfigSupplier()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void onAfterTranslogSync() {
},
this,
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
engineConfig.getIndexShardConfigSupplier()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {},
engineConfig.getPrimaryModeSupplier()
engineConfig.getIndexShardConfigSupplier()
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {},
config.getPrimaryModeSupplier()
config.getIndexShardConfigSupplier()
)
) {
return translog.stats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -337,6 +338,29 @@ Runnable getGlobalCheckpointSyncer() {

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();

/**
* An OpenSearch index shard config supplier
*
* @opensearch.internal
*/
static public class IndexShardConfigSupplier {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure seems to be force-fitting this very specific use case. We should model it more generically

BooleanSupplier primaryModeSupplier;
BooleanSupplier relocatingSupplier;

public IndexShardConfigSupplier(BooleanSupplier primaryModeSupplier, BooleanSupplier relocatingSupplier) {
this.primaryModeSupplier = primaryModeSupplier;
this.relocatingSupplier = relocatingSupplier;
}

public BooleanSupplier getPrimaryModeSupplier() {
return primaryModeSupplier;
}

public BooleanSupplier getRelocatingSupplier() {
return relocatingSupplier;
}
}

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -3733,7 +3757,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
new IndexShardConfigSupplier(replicationTracker::isPrimaryMode, () -> shardRouting.relocating()),
translogFactorySupplier.apply(indexSettings, shardRouting),
isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for
// timeseries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

package org.opensearch.index.translog;

import org.opensearch.index.shard.IndexShard;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -28,7 +29,7 @@ public Translog newTranslog(
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException {

return new LocalTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.LifecycleAware;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.listener.TranslogEventListener;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -57,7 +57,7 @@ public InternalTranslogManager(
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -70,7 +70,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID, translogFactory, primaryModeSupplier);
}, translogUUID, translogFactory, indexShardConfigSupplier);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -357,7 +357,7 @@ protected Translog openTranslog(
LongConsumer persistedSequenceNumberConsumer,
String translogUUID,
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException {
return translogFactory.newTranslog(
translogConfig,
Expand All @@ -366,7 +366,7 @@ protected Translog openTranslog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
primaryModeSupplier
indexShardConfigSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

package org.opensearch.index.translog;

import org.opensearch.index.shard.IndexShard;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -54,7 +54,7 @@ public Translog newTranslog(
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -68,7 +68,7 @@ public Translog newTranslog(
persistedSequenceNumberConsumer,
blobStoreRepository,
threadPool,
primaryModeSupplier
indexShardConfigSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TransferSnapshot;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class RemoteFsTranslog extends Translog {
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
private final BooleanSupplier relocatingSupplier;
private volatile long maxRemoteTranslogGenerationUploaded;

private volatile long minSeqNoToKeep;
Expand All @@ -80,12 +82,13 @@ public RemoteFsTranslog(
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier primaryModeSupplier
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
this.primaryModeSupplier = indexShardConfigSupplier.getPrimaryModeSupplier();
this.relocatingSupplier = indexShardConfigSupplier.getRelocatingSupplier();
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);
try {
Expand Down Expand Up @@ -385,6 +388,10 @@ public void trimUnreferencedReaders() throws IOException {
return;
}

if (relocatingSupplier.getAsBoolean() == true) {
return;
}

gbbafna marked this conversation as resolved.
Show resolved Hide resolved
// cleans up remote translog files not referenced in latest uploaded metadata.
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

package org.opensearch.index.translog;

import org.opensearch.index.shard.IndexShard;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -29,6 +30,6 @@ Translog newTranslog(
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer,
final BooleanSupplier primaryModeSupplier
final IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.LifecycleAware;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.listener.TranslogEventListener;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -38,7 +38,7 @@ public WriteOnlyTranslogManager(
TranslogEventListener translogEventListener,
LifecycleAware engineLifecycleAware,
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
IndexShard.IndexShardConfigSupplier indexShardConfigSupplier
) throws IOException {
super(
translogConfig,
Expand All @@ -52,7 +52,7 @@ public WriteOnlyTranslogManager(
translogEventListener,
engineLifecycleAware,
translogFactory,
primaryModeSupplier
indexShardConfigSupplier
);
}

Expand Down
Loading