diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b62912fa6..c132714b8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory @@ -641,6 +642,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .query(boolQueryBuilder) .size(10000) // fixme: make this configurable. ) + .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: $shard") @@ -673,7 +675,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex) + val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 39d6b0e48..aa385d989 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -20,6 +20,7 @@ import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.use import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.settings.Settings @@ -100,7 +101,9 @@ class InputService( .newInstance(searchParams) .execute() - val searchRequest = SearchRequest().indices(*input.indices.toTypedArray()) + val searchRequest = SearchRequest() + .indices(*input.indices.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) } @@ -192,7 +195,9 @@ class InputService( .newInstance(searchParams) .execute() - val searchRequest = SearchRequest().indices(*input.indices.toTypedArray()) + val searchRequest = SearchRequest() + .indices(*input.indices.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) }