Skip to content

Commit

Permalink
shard failures map from fan out action
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 9, 2024
1 parent e573ea6 commit cbe3f78
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.Strings
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
Expand Down Expand Up @@ -137,6 +138,7 @@ class TransportDocLevelMonitorFanOutAction
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
var docTransformTimeTaken = AtomicLong(0)
val shardIdFailureMap = mutableMapOf<String, Exception>()
val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames
val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames
val monitorMetadata = request.monitorMetadata
Expand Down Expand Up @@ -193,7 +195,8 @@ class TransportDocLevelMonitorFanOutAction
nonPercolateSearchesTimeTaken,
percolateQueriesTimeTaken,
totalDocsQueried,
docTransformTimeTaken
docTransformTimeTaken,
shardIdFailureMap,
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
Expand Down Expand Up @@ -276,7 +279,7 @@ class TransportDocLevelMonitorFanOutAction
nodeId = monitorCtx.clusterService!!.localNode().id,
executionId = request.executionId,
monitorId = monitor.id,
shardIdFailureMap = emptyMap(),
shardIdFailureMap = shardIdFailureMap,
findingIds = emptyList(),
lastRunContext as MutableMap<String, Any>,
InputRunResults(listOf(inputRunResults)),
Expand Down Expand Up @@ -403,6 +406,7 @@ class TransportDocLevelMonitorFanOutAction
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong,
docTransformTimeTake: AtomicLong,
shardIdFailureMap: MutableMap<String, Exception>,
updateLastRunContext: (String, String) -> Unit,
) {
val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int
Expand Down Expand Up @@ -476,6 +480,12 @@ class TransportDocLevelMonitorFanOutAction
"Error: ${e.message}",
e
)
val s = ShardId(
indexExecutionCtx.concreteIndexName,
monitorCtx.clusterService!!.state().metadata.index(indexExecutionCtx.concreteIndexName).indexUUID,
i // shard id
)
shardIdFailureMap[s.toString()] = e
if (e is IndexClosedException) {
throw e
}
Expand Down

0 comments on commit cbe3f78

Please sign in to comment.