From 32d70e975a7b0b7b0d1ee9dd35ebc3f2620fab68 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 5 Feb 2024 10:16:58 -0800 Subject: [PATCH] Added clusters field to support cross cluster cluster metrics monitors. Signed-off-by: AWSHurneyt --- .../commons/alerting/model/Alert.kt | 46 ++++++++++++---- .../alerting/model/ClusterMetricsInput.kt | 53 +++++++++++-------- 2 files changed, 69 insertions(+), 30 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index af9b0141..4c4c211b 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -43,6 +43,7 @@ data class Alert( val aggregationResultBucket: AggregationResultBucket? = null, val executionId: String? = null, val associatedAlertIds: List, + val clusters: List? = null, ) : Writeable, ToXContent { init { @@ -61,6 +62,7 @@ data class Alert( chainedAlertTrigger: ChainedAlertTrigger, workflow: Workflow, associatedAlertIds: List, + clusters: List? = null ) : this( monitorId = NO_ID, monitorName = "", @@ -82,7 +84,8 @@ data class Alert( executionId = executionId, workflowId = workflow.id, workflowName = workflow.name, - associatedAlertIds = associatedAlertIds + associatedAlertIds = associatedAlertIds, + clusters = clusters ) constructor( @@ -97,6 +100,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -118,7 +122,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -134,6 +139,7 @@ data class Alert( findingIds: List = emptyList(), executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -155,7 +161,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -172,6 +179,7 @@ data class Alert( findingIds: List = emptyList(), executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -193,7 +201,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -211,6 +220,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( id = id, monitorId = monitor.id, @@ -233,7 +243,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -248,6 +259,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, workflowId: String? = null, executionId: String?, + clusters: List? = null ) : this( id = id, monitorId = monitor.id, @@ -270,7 +282,8 @@ data class Alert( relatedDocIds = listOf(), workflowId = workflowId ?: "", executionId = executionId, - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) enum class State { @@ -311,7 +324,8 @@ data class Alert( actionExecutionResults = sin.readList(::ActionExecutionResult), aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null, executionId = sin.readOptionalString(), - associatedAlertIds = sin.readStringList() + associatedAlertIds = sin.readStringList(), + clusters = sin.readOptionalStringList() ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -349,6 +363,7 @@ data class Alert( } out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) + if (!clusters.isNullOrEmpty()) out.writeStringArray(clusters.toTypedArray()) } companion object { @@ -379,6 +394,7 @@ data class Alert( const val ASSOCIATED_ALERT_IDS_FIELD = "associated_alert_ids" const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH + const val CLUSTERS_FIELD = "clusters" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -410,6 +426,7 @@ data class Alert( val actionExecutionResults: MutableList = mutableListOf() var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() + val clusters = mutableListOf() ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -476,6 +493,12 @@ data class Alert( AggregationResultBucket.parse(xcp) } } + CLUSTERS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + clusters.add(xcp.text()) + } + } } } @@ -504,7 +527,8 @@ data class Alert( executionId = executionId, workflowId = workflowId, workflowName = workflowName, - associatedAlertIds = associatedAlertIds + associatedAlertIds = associatedAlertIds, + clusters = if (clusters.size > 0) clusters else null ) } @@ -554,6 +578,9 @@ data class Alert( .optionalTimeField(END_TIME_FIELD, endTime) .optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime) aggregationResultBucket?.innerXContent(builder) + + if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) + builder.endObject() return builder } @@ -577,7 +604,8 @@ data class Alert( BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","), PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath, FINDING_IDS to findingIds.joinToString(","), - RELATED_DOC_IDS to relatedDocIds.joinToString(",") + RELATED_DOC_IDS to relatedDocIds.joinToString(","), + CLUSTERS_FIELD to clusters?.joinToString(",") ) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt index 89be9f07..e876dbc8 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt @@ -12,8 +12,8 @@ import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException +import java.lang.StringBuilder import java.net.URI -import java.net.URISyntaxException val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '#', '>', '<', ' ') @@ -23,7 +23,8 @@ val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', ' data class ClusterMetricsInput( var path: String, var pathParams: String = "", - var url: String + var url: String, + var clusters: List = listOf() ) : Input { val clusterMetricType: ClusterMetricType val constructedUri: URI @@ -74,6 +75,7 @@ data class ClusterMetricsInput( .field(PATH_FIELD, path) .field(PATH_PARAMS_FIELD, pathParams) .field(URL_FIELD, url) + .field(CLUSTERS_FIELD, clusters) .endObject() .endObject() } @@ -87,6 +89,7 @@ data class ClusterMetricsInput( out.writeString(path) out.writeString(pathParams) out.writeString(url) + out.writeStringArray(clusters.toTypedArray()) } companion object { @@ -99,6 +102,7 @@ data class ClusterMetricsInput( const val PATH_PARAMS_FIELD = "path_params" const val URL_FIELD = "url" const val URI_FIELD = "uri" + const val CLUSTERS_FIELD = "clusters" val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(URI_FIELD), CheckedFunction { parseInner(it) }) @@ -110,6 +114,7 @@ data class ClusterMetricsInput( var path = "" var pathParams = "" var url = "" + val clusters = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -120,9 +125,17 @@ data class ClusterMetricsInput( PATH_FIELD -> path = xcp.text() PATH_PARAMS_FIELD -> pathParams = xcp.text() URL_FIELD -> url = xcp.text() + CLUSTERS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) clusters.add(xcp.text()) + } } } - return ClusterMetricsInput(path, pathParams, url) + return ClusterMetricsInput(path, pathParams, url, clusters) } } @@ -164,7 +177,7 @@ data class ClusterMetricsInput( ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character -> if (pathParams.contains(character)) throw IllegalArgumentException( - "The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}" + "The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ") ) } } @@ -201,23 +214,21 @@ data class ClusterMetricsInput( * @return The constructed [URI]. */ private fun constructUrlFromInputs(): URI { - /** - * this try-catch block is required due to a httpcomponents 5.1.x library issue - * it auto encodes path params in the url. - */ - return try { - val formattedPath = if (path.startsWith("/") || path.isBlank()) path else "/$path" - val formattedPathParams = if (pathParams.startsWith("/") || pathParams.isBlank()) pathParams else "/$pathParams" - val uriBuilder = URIBuilder("$SUPPORTED_SCHEME://$SUPPORTED_HOST:$SUPPORTED_PORT$formattedPath$formattedPathParams") - uriBuilder.build() - } catch (ex: URISyntaxException) { - val uriBuilder = URIBuilder() - .setScheme(SUPPORTED_SCHEME) - .setHost(SUPPORTED_HOST) - .setPort(SUPPORTED_PORT) - .setPath(path + pathParams) - uriBuilder.build() - } + val fullPath = StringBuilder() + .append(path.trim('/')) + + if (pathParams.isNotEmpty()) + fullPath + .append('/') + .append(pathParams.trim('/')) + + val uriBuilder = URIBuilder() + .setScheme(SUPPORTED_SCHEME) + .setHost(SUPPORTED_HOST) + .setPort(SUPPORTED_PORT) + .setPath(fullPath.toString()) + + return uriBuilder.build() } /**