Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Mar 21, 2024
1 parent 5e66c19 commit 24e3975
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting
if (shardRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NO_REPLICATION;
}
// Perform full replication during primary failover
// Perform full replication during primary relocation
if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
return ReplicationMode.FULL_REPLICATION;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ public class ShardRouting implements Writeable, ToXContentObject {
private final long expectedShardSize;
@Nullable
private final ShardRouting targetRelocatingShard;
private boolean assignedToRemoteStoreNode;

/*
Local flag to denote whether the shard copy is assigned to a remote enabled node
Not serialized, meant to be accessed from the data nodes only.
Would always return `false` if accessed from the cluster manager nodes
Set on the `createShard` and `updateShard` flow from IndicesClusterStateService state applier
*/
private Boolean assignedToRemoteStoreNode = Boolean.FALSE;

/**
* A constructor to internally create shard routing instances, note, the internal flag should only be set to true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L

private volatile ReplicationCheckpoint latestReplicationCheckpoint;

private boolean ongoingEngineMigration;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -676,10 +674,6 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
assert invariant();
}

public void setOngoingEngineMigration(boolean ongoingEngineMigration) {
this.ongoingEngineMigration = ongoingEngineMigration;
}

/**
* The state of the lucene checkpoint
*
Expand Down Expand Up @@ -1095,10 +1089,6 @@ private ReplicationGroup calculateReplicationGroup() {
newVersion = replicationGroup.getVersion() + 1;
}

assert indexSettings().isRemoteTranslogStoreEnabled() || ongoingEngineMigration == true
|| checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated)
: "In absence of remote translog store and no-ongoing remote migration, all tracked shards must have replication mode as LOGICAL_REPLICATION";

return new ReplicationGroup(
routingTable,
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
Expand Down Expand Up @@ -1255,7 +1245,12 @@ private void createReplicationLagTimers() {
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& (ongoingEngineMigration && routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) {
/*
Handle remote store migration cases. Replication Lag timers would be created if the node on which primary is hosted is either:
- Segrep enabled without remote store
- Destination replica shard is hosted on a remote store enabled node (Remote store enabled nodes have segrep enabled implicitly)
*/
&& (indexSettings.isSegRepLocalEnabled() == true || routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
() -> new ParameterizedMessage(
Expand Down Expand Up @@ -1529,7 +1524,7 @@ private boolean isReplicated(String allocationId, String primaryAllocationId, St
- If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation primary and primary target allocation id.
- During an ongoing remote migration, the above-mentioned checks are considered when the shard is assigned to a remote store backed node
*/
if (indexSettings().isRemoteTranslogStoreEnabled() || (ongoingEngineMigration == true && assignedToRemoteStoreNode == true)) {
if (indexSettings().isRemoteTranslogStoreEnabled() || assignedToRemoteStoreNode == true) {
return (allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId));
}
// For other case which is local translog, return true as the requests are replicated to all shards in the replication group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
Expand Down Expand Up @@ -85,6 +86,7 @@ protected Logger getLogger() {
return LOGGER;
}

private final ClusterService clusterService;
@Inject
public RetentionLeaseBackgroundSyncAction(
final Settings settings,
Expand All @@ -108,6 +110,7 @@ public RetentionLeaseBackgroundSyncAction(
Request::new,
ThreadPool.Names.MANAGEMENT
);
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -203,7 +206,7 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
data consistency on remote to docrep shard copy failover during the
migration process.
*/
if (indexShard.ongoingEngineMigration()) {
if (RemoteStoreUtils.isMigrationDirectionSet(clusterService)) {
return ReplicationMode.FULL_REPLICATION;
}
return super.getReplicationMode(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
Expand Down Expand Up @@ -87,6 +88,8 @@ protected Logger getLogger() {
return LOGGER;
}

private final ClusterService clusterService;

@Inject
public RetentionLeaseSyncAction(
final Settings settings,
Expand Down Expand Up @@ -117,6 +120,7 @@ public RetentionLeaseSyncAction(
systemIndices,
tracer
);
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -209,7 +213,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep
@Override
public ReplicationMode getReplicationMode(IndexShard indexShard) {
// Unblock PRRL publication during remote store migration
if (indexShard.ongoingEngineMigration()) {
if (RemoteStoreUtils.isMigrationDirectionSet(clusterService)) {
return ReplicationMode.FULL_REPLICATION;
}
return super.getReplicationMode(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,7 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()
/*
During remote store migration, the isSegRepWithRemoteEnabled criteria would return false
since we do not alter the remote store based index settings at that stage. Explicitly
blocking checkpoint publication from this refresh listener since it ends up interfering
with the RemoteStoreRefreshListener invocation
*/
&& !shard.ongoingEngineMigration()) {
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,11 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
try {
final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
DiscoveryNode targetNode = nodes.getLocalNode();
// Set remote attributes on Shard routing if the target node for the shards has remote attributes
if (targetNode.isRemoteStoreNode()) {
shardRouting.setAssignedToRemoteStoreNode(true);
}
indicesService.createShard(
shardRouting,
checkpointPublisher,
Expand All @@ -679,7 +684,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
failedShardHandler,
globalCheckpointSyncer,
retentionLeaseSyncer,
nodes.getLocalNode(),
targetNode,
sourceNode,
remoteStoreStatsTrackerFactory
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class PublishCheckpointAction extends TransportReplicationAction<
protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class);

private final SegmentReplicationTargetService replicationService;
private final ClusterService clusterService;

@Inject
public PublishCheckpointAction(
Expand All @@ -84,6 +86,7 @@ public PublishCheckpointAction(
ThreadPool.Names.REFRESH
);
this.replicationService = targetService;
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -200,7 +203,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
ActionListener.completeWith(listener, () -> {
logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId()));
// Ignore replica operation if there is an ongoing remote store migration and the replica copy is assigned to a docrep enabled node
if (replica.ongoingEngineMigration() == true && replica.routingEntry().isAssignedToRemoteStoreNode() == false) {
if (RemoteStoreUtils.isMigrationDirectionSet(clusterService) == true && replica.routingEntry().isAssignedToRemoteStoreNode() == false) {
logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp.");
return new ReplicaResult();
}
Expand Down

0 comments on commit 24e3975

Please sign in to comment.