From e6e791b073c015c93ae7313efdb2b62aebb13b45 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 6 Feb 2024 09:28:45 -0800 Subject: [PATCH] Added clusters field to support cross cluster cluster metrics monitors. Signed-off-by: AWSHurneyt --- .../commons/alerting/model/Alert.kt | 46 +++++++++--- .../alerting/model/ClusterMetricsInput.kt | 73 +++++++++++-------- 2 files changed, 81 insertions(+), 38 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index 04df1b28..3a3a21e4 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -43,6 +43,7 @@ data class Alert( val aggregationResultBucket: AggregationResultBucket? = null, val executionId: String? = null, val associatedAlertIds: List, + val clusters: List? = null, ) : Writeable, ToXContent { init { @@ -61,6 +62,7 @@ data class Alert( chainedAlertTrigger: ChainedAlertTrigger, workflow: Workflow, associatedAlertIds: List, + clusters: List? = null ) : this( monitorId = NO_ID, monitorName = "", @@ -82,7 +84,8 @@ data class Alert( executionId = executionId, workflowId = workflow.id, workflowName = workflow.name, - associatedAlertIds = associatedAlertIds + associatedAlertIds = associatedAlertIds, + clusters = clusters ) constructor( @@ -97,6 +100,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -118,7 +122,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -134,6 +139,7 @@ data class Alert( findingIds: List = emptyList(), executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -155,7 +161,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -172,6 +179,7 @@ data class Alert( findingIds: List = emptyList(), executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -193,7 +201,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -211,6 +220,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( id = id, monitorId = monitor.id, @@ -233,7 +243,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -248,6 +259,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, workflowId: String? = null, executionId: String?, + clusters: List? = null ) : this( id = id, monitorId = monitor.id, @@ -270,7 +282,8 @@ data class Alert( relatedDocIds = listOf(), workflowId = workflowId ?: "", executionId = executionId, - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) enum class State { @@ -311,7 +324,8 @@ data class Alert( actionExecutionResults = sin.readList(::ActionExecutionResult), aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null, executionId = sin.readOptionalString(), - associatedAlertIds = sin.readStringList() + associatedAlertIds = sin.readStringList(), + clusters = sin.readOptionalStringList() ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -349,6 +363,7 @@ data class Alert( } out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) + if (!clusters.isNullOrEmpty()) out.writeStringArray(clusters.toTypedArray()) } companion object { @@ -379,6 +394,7 @@ data class Alert( const val ASSOCIATED_ALERT_IDS_FIELD = "associated_alert_ids" const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH + const val CLUSTERS_FIELD = "clusters" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -409,6 +425,7 @@ data class Alert( val actionExecutionResults: MutableList = mutableListOf() var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() + val clusters = mutableListOf() ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -475,6 +492,12 @@ data class Alert( AggregationResultBucket.parse(xcp) } } + CLUSTERS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + clusters.add(xcp.text()) + } + } } } @@ -503,7 +526,8 @@ data class Alert( executionId = executionId, workflowId = workflowId, workflowName = workflowName, - associatedAlertIds = associatedAlertIds + associatedAlertIds = associatedAlertIds, + clusters = if (clusters.size > 0) clusters else null ) } @@ -553,6 +577,9 @@ data class Alert( .optionalTimeField(END_TIME_FIELD, endTime) .optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime) aggregationResultBucket?.innerXContent(builder) + + if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) + builder.endObject() return builder } @@ -576,7 +603,8 @@ data class Alert( BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","), PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath, FINDING_IDS to findingIds.joinToString(","), - RELATED_DOC_IDS to relatedDocIds.joinToString(",") + RELATED_DOC_IDS to relatedDocIds.joinToString(","), + CLUSTERS_FIELD to clusters?.joinToString(",") ) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt index 81432546..0c38f8a5 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt @@ -13,6 +13,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException import java.net.URI +import java.net.URISyntaxException val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '#', '>', '<', ' ') @@ -22,7 +23,8 @@ val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', ' data class ClusterMetricsInput( var path: String, var pathParams: String = "", - var url: String + var url: String, + var clusters: List = listOf() ) : Input { val clusterMetricType: ClusterMetricType val constructedUri: URI @@ -43,11 +45,10 @@ data class ClusterMetricsInput( "Invalid URI constructed from the path and path_params inputs, or the url input." } - if (url.isNotEmpty() && validateFieldsNotEmpty()) { + if (url.isNotEmpty() && validateFieldsNotEmpty()) require(constructedUri == constructUrlFromInputs()) { "The provided URL and URI fields form different URLs." } - } require(constructedUri.host.lowercase() == SUPPORTED_HOST) { "Only host '$SUPPORTED_HOST' is supported." @@ -74,6 +75,7 @@ data class ClusterMetricsInput( .field(PATH_FIELD, path) .field(PATH_PARAMS_FIELD, pathParams) .field(URL_FIELD, url) + .field(CLUSTERS_FIELD, clusters) .endObject() .endObject() } @@ -87,6 +89,7 @@ data class ClusterMetricsInput( out.writeString(path) out.writeString(pathParams) out.writeString(url) + out.writeStringArray(clusters.toTypedArray()) } companion object { @@ -99,18 +102,19 @@ data class ClusterMetricsInput( const val PATH_PARAMS_FIELD = "path_params" const val URL_FIELD = "url" const val URI_FIELD = "uri" + const val CLUSTERS_FIELD = "clusters" val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(URI_FIELD), CheckedFunction { parseInner(it) }) /** * This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [ClusterMetricsInput] object */ - @JvmStatic - @Throws(IOException::class) + @JvmStatic @Throws(IOException::class) fun parseInner(xcp: XContentParser): ClusterMetricsInput { var path = "" var pathParams = "" var url = "" + val clusters = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -121,9 +125,17 @@ data class ClusterMetricsInput( PATH_FIELD -> path = xcp.text() PATH_PARAMS_FIELD -> pathParams = xcp.text() URL_FIELD -> url = xcp.text() + CLUSTERS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) clusters.add(xcp.text()) + } } } - return ClusterMetricsInput(path, pathParams, url) + return ClusterMetricsInput(path, pathParams, url, clusters) } } @@ -163,20 +175,17 @@ data class ClusterMetricsInput( if (pathParams.isNotEmpty()) { pathParams = pathParams.trim('/') ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character -> - if (pathParams.contains(character)) { + if (pathParams.contains(character)) throw IllegalArgumentException( - "The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}" + "The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ") ) - } } } - if (apiType.requiresPathParams && pathParams.isEmpty()) { + if (apiType.requiresPathParams && pathParams.isEmpty()) throw IllegalArgumentException("The API requires path parameters.") - } - if (!apiType.supportsPathParams && pathParams.isNotEmpty()) { + if (!apiType.supportsPathParams && pathParams.isNotEmpty()) throw IllegalArgumentException("The API does not use path parameters.") - } return pathParams } @@ -192,13 +201,11 @@ data class ClusterMetricsInput( ClusterMetricType.values() .filter { option -> option != ClusterMetricType.BLANK } .forEach { option -> - if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) { + if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) apiType = option - } } - if (apiType.isBlank()) { + if (apiType.isBlank()) throw IllegalArgumentException("The API could not be determined from the provided URI.") - } return apiType } @@ -207,12 +214,23 @@ data class ClusterMetricsInput( * @return The constructed [URI]. */ private fun constructUrlFromInputs(): URI { - val uriBuilder = URIBuilder() - .setScheme(SUPPORTED_SCHEME) - .setHost(SUPPORTED_HOST) - .setPort(SUPPORTED_PORT) - .setPath(path + pathParams) - return uriBuilder.build() + /** + * this try-catch block is required due to a httpcomponents 5.1.x library issue + * it auto encodes path params in the url. + */ + return try { + val formattedPath = if (path.startsWith("/") || path.isBlank()) path else "/$path" + val formattedPathParams = if (pathParams.startsWith("/") || pathParams.isBlank()) pathParams else "/$pathParams" + val uriBuilder = URIBuilder("$SUPPORTED_SCHEME://$SUPPORTED_HOST:$SUPPORTED_PORT$formattedPath$formattedPathParams") + uriBuilder.build() + } catch (ex: URISyntaxException) { + val uriBuilder = URIBuilder() + .setScheme(SUPPORTED_SCHEME) + .setHost(SUPPORTED_HOST) + .setPort(SUPPORTED_PORT) + .setPath(path + pathParams) + uriBuilder.build() + } } /** @@ -220,15 +238,12 @@ data class ClusterMetricsInput( * If [path] and [pathParams] are empty, populates them with values from [url]. */ private fun parseEmptyFields() { - if (pathParams.isEmpty()) { + if (pathParams.isEmpty()) pathParams = this.parsePathParams() - } - if (path.isEmpty()) { + if (path.isEmpty()) path = if (pathParams.isEmpty()) clusterMetricType.defaultPath else clusterMetricType.prependPath - } - if (url.isEmpty()) { + if (url.isEmpty()) url = constructedUri.toString() - } } /**