From 3de3bbabd971d15935e6d9c47d3ad283ef1d7106 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 27 Sep 2023 14:37:38 -0700 Subject: [PATCH] Add primary first calls for Document Level Alerting Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b62912fa6..c132714b8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory @@ -641,6 +642,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .query(boolQueryBuilder) .size(10000) // fixme: make this configurable. ) + .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: $shard") @@ -673,7 +675,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex) + val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder)