Skip to content

Commit

Permalink
bucket level monitor: opitmize to read only from write index when ali…
Browse files Browse the repository at this point in the history
…as is configured

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 17, 2024
1 parent d7580b3 commit ad57b56
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID

//TODO raise PR for bucket level monitor optimization also. dont miss
// TODO raise PR for bucket level monitor optimization also. dont miss
object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)

Expand Down
31 changes: 27 additions & 4 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -102,8 +102,7 @@ class InputService(
.execute()

val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
.indices(*resolveIndices(monitor, input.indices).toTypedArray())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down Expand Up @@ -197,7 +196,6 @@ class InputService(

val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand All @@ -224,4 +222,29 @@ class InputService(
InputRunResults(emptyList(), e)
}
}

/** accepts list of index names and returns back a list where :
* i. if it's a concrete index or an index pattern - no changes is made
* ii. if it's a data stream or an alias - we return the active write index
*/
private fun resolveIndices(
monitor: Monitor,
indices: List<String>,
): List<String> {
val resolvedIndices = mutableListOf<String>()
for (it in indices) {
if (IndexUtils.isAlias(it, clusterService.state()) || IndexUtils.isDataStream(it, clusterService.state())) {
val writeIndex = IndexUtils.getWriteIndex(it, clusterService.state())
if (writeIndex == null) {
logger.error("Monitor $monitor.id: Write Index not found for $it")
continue
}
resolvedIndices.add(writeIndex)
// TODO add edge case where periodStart < writeIndex's creationTime
} else {
resolvedIndices.add(it)
}
}
return resolvedIndices
}
}

0 comments on commit ad57b56

Please sign in to comment.