Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.13] [Backport 2.x] Added input validation, and fixed bug for cross cluster monitors. (#1510) #1516

Merged
merged 1 commit into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.REMOTE_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
)
}

Expand Down
72 changes: 37 additions & 35 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -48,8 +47,6 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -99,36 +96,7 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while (
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
(responseMap.size < input.clusters.size)
) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
results += handleClusterMetricsInput(input)
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down Expand Up @@ -286,4 +254,38 @@ class InputService(

return searchRequest
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val results = mutableListOf<Map<String, Any>>()
val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
// If remote monitoring is enabled, and the monitor is configured to execute against remote clusters,
// execute the API against each cluster, and compile the results.
client.threadPool().threadContext.stashContext().use {
val singleThreadContext = newSingleThreadContext("ClusterMetricsMonitorThread")
withContext(singleThreadContext) {
it.restore()
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
results += responseMap
}
}
} else {
// Else only execute the API against the local cluster.
val response = executeTransportAction(input, client)
results += response.toMap()
}
return results
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX
import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException
Expand Down Expand Up @@ -36,7 +38,38 @@ class GetRemoteIndexesRequest : ActionRequest {
out.writeBoolean(includeMappings)
}

/**
* Validates the request [indexes].
* @return TRUE if all entries are valid; else FALSE.
*/
fun isValid(): Boolean {
return indexes.isNotEmpty() && indexes.all { validPattern(it) }
}

/**
* Validates individual entries in the request [indexes].
*
* @param pattern The entry to evaluate. The expected patterns are `<index-pattern>` for a local index, and
* `<cluster-pattern>:<index-pattern>` for remote indexes. These patterns are consistent with the `GET _resolve/index` API.
* @return TRUE if the entry is valid; else FALSE.
*/
private fun validPattern(pattern: String): Boolean {
// In some situations, `<cluster-pattern>` could contain a `:` character.
// Identifying the `<index-pattern>` based on the last occurrence of `:` in the pattern.
val separatorIndex = pattern.lastIndexOf(":")
return if (separatorIndex == -1) {
// Indicates a local index pattern.
INDEX_PATTERN_REGEX.matches(pattern)
} else {
// Indicates a remote index pattern.
val clusterPattern = pattern.substring(0, separatorIndex)
val indexPattern = pattern.substring(separatorIndex + 1)
CLUSTER_PATTERN_REGEX.matches(clusterPattern) && INDEX_PATTERN_REGEX.matches(indexPattern)
}
}

companion object {
const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern."
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

data class ClusterIndexes(
val clusterName: String,
val clusterHealth: ClusterHealthStatus,
val clusterHealth: ClusterHealthStatus?,
val hubCluster: Boolean,
val indexes: List<ClusterIndex> = listOf(),
val latency: Long
Expand All @@ -51,7 +51,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
clusterName = sin.readString(),
clusterHealth = sin.readEnum(ClusterHealthStatus::class.java),
clusterHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom),
hubCluster = sin.readBoolean(),
indexes = sin.readList((ClusterIndex.Companion)::readFrom),
latency = sin.readLong()
Expand All @@ -72,7 +72,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

override fun writeTo(out: StreamOutput) {
out.writeString(clusterName)
out.writeEnum(clusterHealth)
if (clusterHealth != null) out.writeEnum(clusterHealth)
indexes.forEach { it.writeTo(out) }
out.writeLong(latency)
}
Expand Down Expand Up @@ -100,7 +100,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
indexName = sin.readString(),
indexHealth = sin.readEnum(ClusterHealthStatus::class.java),
indexHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom),
mappings = sin.readOptionalWriteable(::MappingMetadata)
)

Expand All @@ -115,7 +115,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

override fun writeTo(out: StreamOutput) {
out.writeString(indexName)
out.writeEnum(indexHealth)
if (indexHealth != null) out.writeEnum(indexHealth)
if (mappings != null) out.writeMap(mappings.sourceAsMap)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteMonitorRequest
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.node.NodeClient
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -64,7 +65,14 @@ class RestExecuteMonitorAction : BaseRestHandler() {
} else {
val xcp = request.contentParser()
ensureExpectedToken(START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION)

val monitor: Monitor
try {
monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION)
} catch (e: Exception) {
throw AlertingException.wrap(e)
}

val execMonitorRequest = ExecuteMonitorRequest(dryrun, requestEnd, null, monitor)
client.execute(ExecuteMonitorAction.INSTANCE, execMonitorRequest, RestToXContentListener(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import org.opensearch.rest.action.RestToXContentListener
private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java)

class RestGetRemoteIndexesAction : BaseRestHandler() {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
companion object {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
}

override fun getName(): String {
return "get_remote_indexes_action"
Expand All @@ -32,7 +34,7 @@ class RestGetRemoteIndexesAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} $ROUTE")
log.info("${request.method()} $ROUTE")
val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, ""))
val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false)
return RestChannelConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
Expand Down Expand Up @@ -80,49 +81,59 @@ class RestIndexMonitorAction : BaseRestHandler() {

val id = request.param("monitorID", Monitor.NO_ID)
if (request.method() == PUT && Monitor.NO_ID == id) {
throw IllegalArgumentException("Missing monitor ID")
throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID"))
}

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

validateDataSources(monitor)
validateOwner(monitor.owner)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor")

val monitor: Monitor
val rbacRoles: List<String>?
try {
monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())

rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

validateDataSources(monitor)
validateOwner(monitor.owner)

val monitorType = monitor.monitorType
val triggers = monitor.triggers

when (monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw (IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor"))
}
}
}
}
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is BucketLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor")
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is BucketLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor")
}
}
}
}
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor")
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor")
}
}
}
}
Monitor.MonitorType.DOC_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is DocumentLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor")
Monitor.MonitorType.DOC_LEVEL_MONITOR -> {
validateDocLevelQueryName(monitor)
triggers.forEach {
if (it !is DocumentLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor")
}
}
}
validateDocLevelQueryName(monitor)
}
} catch (e: Exception) {
throw AlertingException.wrap(e)
}

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val REMOTE_MONITORING_ENABLED = Setting.boolSetting(
"plugins.alerting.remote_monitoring_enabled",
val CROSS_CLUSTER_MONITORING_ENABLED = Setting.boolSetting(
"plugins.alerting.cross_cluster_monitoring_enabled",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
Expand Down
Loading
Loading