Skip to content

Commit

Permalink
Change log level of SegRep cancellations to debug (opensearch-project…
Browse files Browse the repository at this point in the history
…#9445)

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored and austintlee committed Aug 25, 2023
1 parent e92d7c1 commit d017e21
Showing 1 changed file with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -269,15 +271,7 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.error(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
replicaShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
),
e
);
logReplicationFailure(state, e, replicaShard);
if (sendShardFailure == true) {
failShard(e, replicaShard);
} else {
Expand All @@ -293,6 +287,30 @@ public void onReplicationFailure(
}
}

private void logReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, IndexShard replicaShard) {
// only log as error if error is not a cancellation.
if (ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class) == null) {
logger.error(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
replicaShard.shardId(),
state.getReplicationId(),
state.getTimingData()
),
e
);
} else {
logger.debug(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication cancelled",
replicaShard.shardId(),
state.getReplicationId()
),
e
);
}
}

protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) {
// Update replication checkpoint on source via transport call only supported for remote store integration. For node-
// node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call
Expand Down Expand Up @@ -503,7 +521,6 @@ public void onResponse(Void o) {

@Override
public void onFailure(Exception e) {
logger.error(() -> new ParameterizedMessage("Exception replicating {} marking as failed.", target.description()), e);
if (e instanceof OpenSearchCorruptionException) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
return;
Expand Down Expand Up @@ -584,15 +601,7 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.error(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Force replication Sync failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
),
e
);
logReplicationFailure(state, e, indexShard);
if (sendShardFailure) {
failShard(e, indexShard);
}
Expand Down

0 comments on commit d017e21

Please sign in to comment.