Skip to content

Commit

Permalink
Extract replicator logic from SegmentReplicationTargetService (opense…
Browse files Browse the repository at this point in the history
…arch-project#15511)

* Extract SegmentReplicator class from SegmentReplicationTargetService.

This change separates code that initiates replication from the target service component in prepartion
for implementing a task to initate replication events on an interval.

Signed-off-by: Marc Handalian <[email protected]>

* Pass timeout value to Replicator instead of fetching from shard

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Aug 30, 2024
1 parent 839ba0b commit 2224d48
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -24,7 +22,6 @@
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -33,7 +30,6 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.ForceSyncRequest;
Expand Down Expand Up @@ -61,7 +57,7 @@
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT;

/**
* Service class that orchestrates replication events on replicas.
* Service class that handles incoming checkpoints to initiate replication events on replicas.
*
* @opensearch.internal
*/
Expand All @@ -72,17 +68,14 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent
private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;

private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;

private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();

private final SegmentReplicationSourceFactory sourceFactory;

protected final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();

private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private final SegmentReplicator replicator;

/**
* The internal actions
Expand All @@ -94,6 +87,7 @@ public static class Actions {
public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync";
}

@Deprecated
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -113,6 +107,7 @@ public SegmentReplicationTargetService(
);
}

@Deprecated
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -121,14 +116,34 @@ public SegmentReplicationTargetService(
final IndicesService indicesService,
final ClusterService clusterService,
final ReplicationCollection<SegmentReplicationTarget> ongoingSegmentReplications
) {
this(
threadPool,
recoverySettings,
transportService,
sourceFactory,
indicesService,
clusterService,
new SegmentReplicator(threadPool)
);
}

public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
final TransportService transportService,
final SegmentReplicationSourceFactory sourceFactory,
final IndicesService indicesService,
final ClusterService clusterService,
final SegmentReplicator replicator
) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
this.onGoingReplications = ongoingSegmentReplications;
this.sourceFactory = sourceFactory;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
this.replicator = replicator;

transportService.registerRequestHandler(
Actions.FILE_CHUNK,
Expand All @@ -154,7 +169,7 @@ protected void doStart() {
@Override
protected void doStop() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown";
assert replicator.size() == 0 : "Replication collection should be empty on shutdown";
clusterService.removeListener(this);
}
}
Expand Down Expand Up @@ -199,7 +214,7 @@ public void clusterChanged(ClusterChangedEvent event) {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) {
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing");
replicator.cancel(indexShard.shardId(), "Shard closing");
latestReceivedCheckpoint.remove(shardId);
}
}
Expand All @@ -224,7 +239,7 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
&& indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
&& oldRouting.primary() == false
&& newRouting.primary()) {
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary");
replicator.cancel(indexShard.shardId(), "Shard has been promoted to primary");
latestReceivedCheckpoint.remove(indexShard.shardId());
}
}
Expand All @@ -234,17 +249,15 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
*/
@Nullable
public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardId shardId) {
return Optional.ofNullable(onGoingReplications.getOngoingReplicationTarget(shardId))
.map(SegmentReplicationTarget::state)
.orElse(null);
return Optional.ofNullable(replicator.get(shardId)).map(SegmentReplicationTarget::state).orElse(null);
}

/**
* returns SegmentReplicationState of latest completed segment replication events.
*/
@Nullable
public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardId shardId) {
return completedReplications.get(shardId);
return replicator.getCompleted(shardId);
}

/**
Expand All @@ -257,11 +270,11 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) {
}

public ReplicationRef<SegmentReplicationTarget> get(long replicationId) {
return onGoingReplications.get(replicationId);
return replicator.get(replicationId);
}

public SegmentReplicationTarget get(ShardId shardId) {
return onGoingReplications.getOngoingReplicationTarget(shardId);
return replicator.get(shardId);
}

/**
Expand All @@ -285,7 +298,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
// checkpoint to be replayed once the shard is Active.
if (replicaShard.state().equals(IndexShardState.STARTED) == true) {
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
SegmentReplicationTarget ongoingReplicationTarget = replicator.get(replicaShard.shardId());
if (ongoingReplicationTarget != null) {
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.debug(
Expand Down Expand Up @@ -504,28 +517,12 @@ public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
indexShard,
checkpoint,
sourceFactory.get(indexShard),
listener
);
startReplication(target);
return target;
return replicator.startReplication(indexShard, checkpoint, sourceFactory.get(indexShard), listener);
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId;
try {
replicationId = onGoingReplications.startSafe(target, recoverySettings.activityTimeout());
} catch (ReplicationFailedException e) {
// replication already running for shard.
target.fail(e, false);
return;
}
logger.trace(() -> new ParameterizedMessage("Added new replication to collection {}", target.description()));
threadPool.generic().execute(new ReplicationRunner(replicationId));
replicator.startReplication(target, recoverySettings.activityTimeout());
}

/**
Expand All @@ -550,89 +547,14 @@ default void onFailure(ReplicationState state, ReplicationFailedException e, boo
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}

/**
* Runnable implementation to trigger a replication event.
*/
private class ReplicationRunner extends AbstractRunnable {

final long replicationId;

public ReplicationRunner(long replicationId) {
this.replicationId = replicationId;
}

@Override
public void onFailure(Exception e) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false);
}

@Override
public void doRun() {
start(replicationId);
}
}

private void start(final long replicationId) {
final SegmentReplicationTarget target;
try (ReplicationRef<SegmentReplicationTarget> replicationRef = onGoingReplications.get(replicationId)) {
// This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the
// threadpool.
if (replicationRef == null) {
return;
}
target = replicationRef.get();
}
target.startReplication(new ActionListener<>() {
@Override
public void onResponse(Void o) {
logger.debug(() -> new ParameterizedMessage("Finished replicating {} marking as done.", target.description()));
onGoingReplications.markAsDone(replicationId);
if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) {
completedReplications.put(target.shardId(), target.state());
}
}

@Override
public void onFailure(Exception e) {
logger.debug("Replication failed {}", target.description());
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
return;
}
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false);
}
});
}

private boolean isStoreCorrupt(SegmentReplicationTarget target) {
// ensure target is not already closed. In that case
// we can assume the store is not corrupt and that the replication
// event completed successfully.
if (target.refCount() > 0) {
final Store store = target.store();
if (store.tryIncRef()) {
try {
return store.isMarkedCorrupted();
} catch (IOException ex) {
logger.warn("Unable to determine if store is corrupt", ex);
return false;
} finally {
store.decRef();
}
}
}
// store already closed.
return false;
}

private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();

@Override
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<SegmentReplicationTarget> ref = replicator.get(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);
Expand Down
Loading

0 comments on commit 2224d48

Please sign in to comment.