From 3f60cf18f70d8469d2b5d5d8097d1c8d499ce2c4 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 22:11:14 -0700 Subject: [PATCH] [Backport 2.x] Added input validation, and fixed bug for cross cluster monitors. (#1510) (#1515) * Added input validation, and fixed bug for cross cluster monitors. (#1510) * Fixed issue where InputService wouldn't wait for cluster metrics monitor to finish executing against all clusters. Signed-off-by: AWSHurneyt * Added input validation for GetRemoteIndexes API, and added related unit and integration tests. Signed-off-by: AWSHurneyt * Removed unused variable, and imports. Signed-off-by: AWSHurneyt * Made initial call to GetRemoteIndexes API log INFO level to capture timestamp of incoming request. Signed-off-by: AWSHurneyt * Fixed comment. Signed-off-by: AWSHurneyt * Moved some regex to common utils. Signed-off-by: AWSHurneyt * Renamed cross cluster monitor setting. Signed-off-by: AWSHurneyt * Fixed import. Signed-off-by: AWSHurneyt * Fixed import. Signed-off-by: AWSHurneyt * Fixed ktlint error. Signed-off-by: AWSHurneyt * Added null checks for health statuses. Signed-off-by: AWSHurneyt * Wrapped Monitor.parse calls in AlertingExceptions so IllegalArgumentExceptions are wrapped in 4xx-level exceptions. Signed-off-by: AWSHurneyt * Fixed merge error. Signed-off-by: AWSHurneyt * Fixed test. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt * Fixed test. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/AlertingPlugin.kt | 2 +- .../org/opensearch/alerting/InputService.kt | 72 ++-- .../alerting/QueryLevelMonitorRunner.kt | 2 +- .../action/GetRemoteIndexesRequest.kt | 33 ++ .../action/GetRemoteIndexesResponse.kt | 10 +- .../resthandler/RestExecuteMonitorAction.kt | 10 +- .../resthandler/RestGetRemoteIndexesAction.kt | 6 +- .../resthandler/RestIndexMonitorAction.kt | 69 ++-- .../alerting/settings/AlertingSettings.kt | 4 +- .../TransportGetRemoteIndexesAction.kt | 20 +- .../opensearch/alerting/util/IndexUtils.kt | 2 - .../CatIndicesHelpers.kt | 2 +- .../CatShardsHelpers.kt | 2 +- .../alerting/AlertingRestTestCase.kt | 18 +- .../alerting/DocumentMonitorRunnerIT.kt | 4 +- .../action/GetRemoteIndexesActionTests.kt | 104 ++++++ .../transport/GetRemoteIndexesActionIT.kt | 339 ++++++++++++++++++ 17 files changed, 607 insertions(+), 92 deletions(-) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index e2cb704e7..f908f4ebe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -373,7 +373,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE, - AlertingSettings.REMOTE_MONITORING_ENABLED + AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 6865085be..48cd37e81 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -5,9 +5,8 @@ package org.opensearch.alerting -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -48,8 +47,6 @@ import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant -private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) - /** Service that handles the collection of input results for Monitor executions */ class InputService( val client: Client, @@ -99,36 +96,7 @@ class InputService( results += searchResponse.convertToMap() } is ClusterMetricsInput -> { - logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) - - val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) - logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) - - val responseMap = mutableMapOf>() - if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { - client.threadPool().threadContext.stashContext().use { - scope.launch { - input.clusters.forEach { cluster -> - val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) - val response = executeTransportAction(input, targetClient) - // Not all supported API reference the cluster name in their response. - // Mapping each response to the cluster name before adding to results. - // Not adding this same logic for local-only monitors to avoid breaking existing monitors. - responseMap[cluster] = response.toMap() - } - } - } - val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT) - val startTime = Instant.now().toEpochMilli() - while ( - (Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) || - (responseMap.size < input.clusters.size) - ) { /* Wait for responses */ } - results += responseMap - } else { - val response = executeTransportAction(input, client) - results += response.toMap() - } + results += handleClusterMetricsInput(input) } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") @@ -286,4 +254,38 @@ class InputService( return searchRequest } + + private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList> { + logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) + + val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + + val results = mutableListOf>() + val responseMap = mutableMapOf>() + if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { + // If remote monitoring is enabled, and the monitor is configured to execute against remote clusters, + // execute the API against each cluster, and compile the results. + client.threadPool().threadContext.stashContext().use { + val singleThreadContext = newSingleThreadContext("ClusterMetricsMonitorThread") + withContext(singleThreadContext) { + it.restore() + input.clusters.forEach { cluster -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) + val response = executeTransportAction(input, targetClient) + // Not all supported API reference the cluster name in their response. + // Mapping each response to the cluster name before adding to results. + // Not adding this same logic for local-only monitors to avoid breaking existing monitors. + responseMap[cluster] = response.toMap() + } + results += responseMap + } + } + } else { + // Else only execute the API against the local cluster. + val response = executeTransportAction(input, client) + results += response.toMap() + } + return results + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index a77121069..3d9ab5a78 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -71,7 +71,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { val remoteMonitoringEnabled = - monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) if (remoteMonitoringEnabled) monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt index 733bc3a04..8b371ba26 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -7,6 +7,8 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX +import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import java.io.IOException @@ -36,7 +38,38 @@ class GetRemoteIndexesRequest : ActionRequest { out.writeBoolean(includeMappings) } + /** + * Validates the request [indexes]. + * @return TRUE if all entries are valid; else FALSE. + */ + fun isValid(): Boolean { + return indexes.isNotEmpty() && indexes.all { validPattern(it) } + } + + /** + * Validates individual entries in the request [indexes]. + * + * @param pattern The entry to evaluate. The expected patterns are `` for a local index, and + * `:` for remote indexes. These patterns are consistent with the `GET _resolve/index` API. + * @return TRUE if the entry is valid; else FALSE. + */ + private fun validPattern(pattern: String): Boolean { + // In some situations, `` could contain a `:` character. + // Identifying the `` based on the last occurrence of `:` in the pattern. + val separatorIndex = pattern.lastIndexOf(":") + return if (separatorIndex == -1) { + // Indicates a local index pattern. + INDEX_PATTERN_REGEX.matches(pattern) + } else { + // Indicates a remote index pattern. + val clusterPattern = pattern.substring(0, separatorIndex) + val indexPattern = pattern.substring(separatorIndex + 1) + CLUSTER_PATTERN_REGEX.matches(clusterPattern) && INDEX_PATTERN_REGEX.matches(indexPattern) + } + } + companion object { + const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern." const val INDEXES_FIELD = "indexes" const val INCLUDE_MAPPINGS_FIELD = "include_mappings" } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt index 1572b4228..0f694bbf5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt @@ -42,7 +42,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { data class ClusterIndexes( val clusterName: String, - val clusterHealth: ClusterHealthStatus, + val clusterHealth: ClusterHealthStatus?, val hubCluster: Boolean, val indexes: List = listOf(), val latency: Long @@ -51,7 +51,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( clusterName = sin.readString(), - clusterHealth = sin.readEnum(ClusterHealthStatus::class.java), + clusterHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom), hubCluster = sin.readBoolean(), indexes = sin.readList((ClusterIndex.Companion)::readFrom), latency = sin.readLong() @@ -72,7 +72,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { override fun writeTo(out: StreamOutput) { out.writeString(clusterName) - out.writeEnum(clusterHealth) + if (clusterHealth != null) out.writeEnum(clusterHealth) indexes.forEach { it.writeTo(out) } out.writeLong(latency) } @@ -100,7 +100,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( indexName = sin.readString(), - indexHealth = sin.readEnum(ClusterHealthStatus::class.java), + indexHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom), mappings = sin.readOptionalWriteable(::MappingMetadata) ) @@ -115,7 +115,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { override fun writeTo(out: StreamOutput) { out.writeString(indexName) - out.writeEnum(indexHealth) + if (indexHealth != null) out.writeEnum(indexHealth) if (mappings != null) out.writeMap(mappings.sourceAsMap) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt index 740dcb2d6..f9d88bc5b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest +import org.opensearch.alerting.util.AlertingException import org.opensearch.client.node.NodeClient import org.opensearch.common.unit.TimeValue import org.opensearch.commons.alerting.model.Monitor @@ -64,7 +65,14 @@ class RestExecuteMonitorAction : BaseRestHandler() { } else { val xcp = request.contentParser() ensureExpectedToken(START_OBJECT, xcp.nextToken(), xcp) - val monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION) + + val monitor: Monitor + try { + monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION) + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + val execMonitorRequest = ExecuteMonitorRequest(dryrun, requestEnd, null, monitor) client.execute(ExecuteMonitorAction.INSTANCE, execMonitorRequest, RestToXContentListener(channel)) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt index 591ab2c3e..5d91767bd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt @@ -19,7 +19,9 @@ import org.opensearch.rest.action.RestToXContentListener private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java) class RestGetRemoteIndexesAction : BaseRestHandler() { - val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes" + companion object { + val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes" + } override fun getName(): String { return "get_remote_indexes_action" @@ -32,7 +34,7 @@ class RestGetRemoteIndexesAction : BaseRestHandler() { } override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - log.debug("${request.method()} $ROUTE") + log.info("${request.method()} $ROUTE") val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, "")) val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false) return RestChannelConsumer { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt index 93e61efb0..7dfde7cc5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IF_PRIMARY_TERM import org.opensearch.alerting.util.IF_SEQ_NO import org.opensearch.alerting.util.REFRESH @@ -80,49 +81,59 @@ class RestIndexMonitorAction : BaseRestHandler() { val id = request.param("monitorID", Monitor.NO_ID) if (request.method() == PUT && Monitor.NO_ID == id) { - throw IllegalArgumentException("Missing monitor ID") + throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID")) } // Validate request by parsing JSON to Monitor val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) - val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) - val rbacRoles = request.contentParser().map()["rbac_roles"] as List? - - validateDataSources(monitor) - validateOwner(monitor.owner) - val monitorType = monitor.monitorType - val triggers = monitor.triggers - when (monitorType) { - Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is QueryLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor") + + val monitor: Monitor + val rbacRoles: List? + try { + monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + + rbacRoles = request.contentParser().map()["rbac_roles"] as List? + + validateDataSources(monitor) + validateOwner(monitor.owner) + + val monitorType = monitor.monitorType + val triggers = monitor.triggers + + when (monitorType) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is QueryLevelTrigger) { + throw (IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor")) + } } } - } - Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is BucketLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor") + Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is BucketLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor") + } } } - } - Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { - triggers.forEach { - if (it !is QueryLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor") + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + triggers.forEach { + if (it !is QueryLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor") + } } } - } - Monitor.MonitorType.DOC_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is DocumentLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") + Monitor.MonitorType.DOC_LEVEL_MONITOR -> { + validateDocLevelQueryName(monitor) + triggers.forEach { + if (it !is DocumentLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") + } } } - validateDocLevelQueryName(monitor) } + } catch (e: Exception) { + throw AlertingException.wrap(e) } val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index a0eda830a..8f6eb31b7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -208,8 +208,8 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) - val REMOTE_MONITORING_ENABLED = Setting.boolSetting( - "plugins.alerting.remote_monitoring_enabled", + val CROSS_CLUSTER_MONITORING_ENABLED = Setting.boolSetting( + "plugins.alerting.cross_cluster_monitoring_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt index 5b35d493a..446c96649 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt @@ -27,7 +27,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings -import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.CROSS_CLUSTER_MONITORING_ENABLED import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.client.Client @@ -62,10 +62,10 @@ class TransportGetRemoteIndexesAction @Inject constructor( @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) - @Volatile private var remoteMonitoringEnabled = REMOTE_MONITORING_ENABLED.get(settings) + @Volatile private var remoteMonitoringEnabled = CROSS_CLUSTER_MONITORING_ENABLED.get(settings) init { - clusterService.clusterSettings.addSettingsUpdateConsumer(REMOTE_MONITORING_ENABLED) { remoteMonitoringEnabled = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(CROSS_CLUSTER_MONITORING_ENABLED) { remoteMonitoringEnabled = it } listenFilterBySettingChange(clusterService) } @@ -87,6 +87,15 @@ class TransportGetRemoteIndexesAction @Inject constructor( val user = readUserFromThreadContext(client) if (!validateUserBackendRoles(user, actionListener)) return + if (!request.isValid()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException(GetRemoteIndexesRequest.INVALID_PATTERN_MESSAGE, RestStatus.BAD_REQUEST) + ) + ) + return + } + client.threadPool().threadContext.stashContext().use { scope.launch { val singleThreadContext = newSingleThreadContext("GetRemoteIndexesActionThread") @@ -96,8 +105,7 @@ class TransportGetRemoteIndexesAction @Inject constructor( var resolveIndexResponse: ResolveIndexAction.Response? = null try { - resolveIndexResponse = - getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService)) + resolveIndexResponse = getRemoteClusters(request.indexes) } catch (e: Exception) { log.error("Failed to retrieve indexes for request $request", e) actionListener.onFailure(AlertingException.wrap(e)) @@ -151,7 +159,7 @@ class TransportGetRemoteIndexesAction @Inject constructor( clusterIndexesList.add( ClusterIndexes( clusterName = clusterName, - clusterHealth = clusterHealthResponse!!.status, + clusterHealth = clusterHealthResponse?.status, hubCluster = clusterName == clusterService.clusterName.value(), indexes = clusterIndexList, latency = latency diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 387f5cb22..41e4a7bbd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -26,8 +26,6 @@ import org.opensearch.core.xcontent.XContentParser class IndexUtils { companion object { - val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""") - const val _META = "_meta" const val SCHEMA_VERSION = "schema_version" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt index 8e92b597f..36c09f244 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt @@ -19,9 +19,9 @@ import org.opensearch.action.admin.indices.stats.CommonStats import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.IndicesOptions -import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.time.DateFormatter +import org.opensearch.commons.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.core.action.ActionResponse import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt index 12152e69d..f092b7f12 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt @@ -14,9 +14,9 @@ import org.opensearch.action.admin.indices.stats.CommonStats import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.admin.indices.stats.ShardStats -import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.cluster.routing.UnassignedInfo import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.core.action.ActionResponse import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 50cae9d8c..251ef98f7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -986,16 +986,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { num = randomIntBetween(1, 10), includeWriteIndex = true ), + createIndices: Boolean = true ): MutableMap> { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - val indexName = createTestIndex(index = it, mapping = "") - val isWriteIndex = indices.getOrDefault(indexName, false) - indicesMap[indexName] = isWriteIndex + if (createIndices) createTestIndex(index = it, mapping = "") + val isWriteIndex = indices.getOrDefault(it, false) + indicesMap[it] = isWriteIndex val indexMap = mapOf( "add" to mapOf( - "index" to indexName, + "index" to it, "alias" to alias, "is_write_index" to isWriteIndex ) @@ -1317,6 +1318,15 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response.asMap() } + fun RestClient.getSettings(): Map { + val response = this.makeRequest( + "GET", + "_cluster/settings?flat_settings=true" + ) + assertEquals(RestStatus.OK, response.restStatus()) + return response.asMap() + } + fun RestClient.updateSettings(setting: String, value: Any): Map { val settings = jsonBuilder() .startObject() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 2934de83e..b9ab6289b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2033,7 +2033,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) fail("Expected create monitor to fail") } catch (e: ResponseException) { - assertTrue(e.message!!.contains("illegal_argument_exception")) + assertTrue(e.message!!.contains("alerting_exception")) } } @@ -2687,7 +2687,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) fail("Expected create monitor to fail") } catch (e: ResponseException) { - assertTrue(e.message!!.contains("illegal_argument_exception")) + assertTrue(e.message!!.contains("alerting_exception")) } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt new file mode 100644 index 000000000..f4639dd6e --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.test.OpenSearchTestCase + +class GetRemoteIndexesActionTests : OpenSearchTestCase() { + private val validPatterns = listOf( + "local-index-name", + "localindexname", + "local-index-*-pattern-*", + "*local-index-*-pattern-*", + "cluster-name:remote-index-name", + "cluster-name:remoteindexname", + "cluster-name:remote-index-*-pattern-*", + "cluster-name:*remote-index-*-pattern-*", + "cluster-*pattern-*:remote-index-name", + "cluster-*pattern-*:remoteindexname", + "cluster-*pattern-*:remote-index-*-pattern-*", + "cluster-*pattern-*:*remote-index-*-pattern-*", + "*cluster-*pattern-*:remote-index-*-pattern-*", + "cluster-*:pattern-*:remote-index-name", + "cluster-*:pattern-*:remoteindexname", + "cluster-*:pattern-*:remote-index-*-pattern-*", + "*cluster-*:pattern-*:remote-index-*-pattern-*", + ) + + private val invalidPatterns = listOf( + // `` character length less than 1 should return FALSE + ":remote-index-name", + + // `` character length greater than 255 should return FALSE + "${randomAlphaOfLength(256)}:remote-index-name", + + // Invalid characters should return FALSE + "local-index#-name", + "cluster-name:remote-#index-name", + "cluster-#name:remote-index-name", + "cluster-#name:remote-#index-name", + + // More than 1 `:` character in `` should return FALSE + "bad:cluster:name:remote-index-name", + ) + + fun `test get remote indexes action name`() { + assertNotNull(GetRemoteIndexesAction.INSTANCE.name()) + assertEquals(GetRemoteIndexesAction.INSTANCE.name(), GetRemoteIndexesAction.NAME) + } + + fun `test GetRemoteIndexesRequest isValid with empty array`() { + val request = GetRemoteIndexesRequest( + indexes = emptyList(), + includeMappings = false + ) + assertFalse(request.isValid()) + } + + fun `test GetRemoteIndexesRequest isValid with one valid entry`() { + validPatterns.forEach { + val request = GetRemoteIndexesRequest( + indexes = listOf(it), + includeMappings = false + ) + assertTrue("Expected pattern '$it' to be valid.", request.isValid()) + } + } + + fun `test GetRemoteIndexesRequest isValid with multiple valid entries`() { + val request = GetRemoteIndexesRequest( + indexes = validPatterns, + includeMappings = false + ) + assertTrue(request.isValid()) + } + + fun `test GetRemoteIndexesRequest isValid with one invalid entry`() { + invalidPatterns.forEach { + val request = GetRemoteIndexesRequest( + indexes = listOf(it), + includeMappings = false + ) + assertFalse("Expected pattern '$it' to be invalid.", request.isValid()) + } + } + + fun `test GetRemoteIndexesRequest isValid with multiple invalid entries`() { + val request = GetRemoteIndexesRequest( + indexes = invalidPatterns, + includeMappings = false + ) + assertFalse(request.isValid()) + } + + fun `test GetRemoteIndexesRequest isValid with valid and invalid entries`() { + val request = GetRemoteIndexesRequest( + indexes = validPatterns + invalidPatterns, + includeMappings = false + ) + assertFalse(request.isValid()) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt new file mode 100644 index 000000000..99556fe3c --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt @@ -0,0 +1,339 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.action.GetRemoteIndexesRequest.Companion.INCLUDE_MAPPINGS_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesRequest.Companion.INDEXES_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesRequest.Companion.INVALID_PATTERN_MESSAGE +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.INDEX_HEALTH_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.INDEX_NAME_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.MAPPINGS_FIELD +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction +import org.opensearch.alerting.settings.AlertingSettings.Companion.CROSS_CLUSTER_MONITORING_ENABLED +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.cluster.health.ClusterHealthStatus +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.alerting.util.string +import org.opensearch.core.rest.RestStatus +import java.util.* + +@Suppress("UNCHECKED_CAST") +class GetRemoteIndexesActionIT : AlertingRestTestCase() { + private var remoteMonitoringEnabled = false + private var remoteClusters = listOf() + + private val mappingFieldToTypePairs1 = listOf( + "timestamp" to "date", + "color" to "keyword", + "message" to "text", + ) + + private val mappingFieldToTypePairs2 = listOf( + "timestamp" to "date", + "message" to "text", + ) + + fun `test with remote monitoring disabled`() { + // Disable remote monitoring if not already disabled + toggleRemoteMonitoring(false) + try { + getRemoteIndexes("$INDEXES_FIELD=*,*:*&$INCLUDE_MAPPINGS_FIELD=false") + fail("Expected 403 Method FORBIDDEN response.") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN, e.response.restStatus()) + assertEquals( + "Remote monitoring is not enabled.", + (e.response.asMap()["error"] as Map)["reason"] + ) + } + } + + fun `test with blank indexes param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + try { + getRemoteIndexes("$INCLUDE_MAPPINGS_FIELD=false") + fail("Expected 400 Method BAD_REQUEST response.") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + assertEquals( + INVALID_PATTERN_MESSAGE, + (e.response.asMap()["error"] as Map)["reason"] + ) + } + } + + fun `test with blank include_mappings param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1, index2) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=*,*:*") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + assertTrue((indexDetails[MAPPINGS_FIELD] as Map).isEmpty()) + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + fun `test with FALSE include_mappings param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1, index2) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=*,*:*&$INCLUDE_MAPPINGS_FIELD=false") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + assertTrue((indexDetails[MAPPINGS_FIELD] as Map).isEmpty()) + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + fun `test with TRUE include_mappings param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1, index2) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=*,*:*&$INCLUDE_MAPPINGS_FIELD=true") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + + // Validate index mappings + val mappings = (indexDetails[MAPPINGS_FIELD] as Map)["properties"] as Map> + if (indexName == index1) { + mappingFieldToTypePairs1.forEach { + assertNotNull(mappings[it.first]) + assertEquals(it.second, mappings[it.first]!!["type"]) + } + } else { + mappingFieldToTypePairs2.forEach { + assertNotNull(mappings[it.first]) + assertEquals(it.second, mappings[it.first]!!["type"]) + } + } + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + fun `test with specific index name`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=$index1:*&$INCLUDE_MAPPINGS_FIELD=true") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + val mappings = (indexDetails[MAPPINGS_FIELD] as Map)["properties"] as Map> + + // Validate index mappings + mappingFieldToTypePairs1.forEach { + assertNotNull(mappings[it.first]) + assertEquals(it.second, mappings[it.first]!!["type"]) + } + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + private fun getRemoteIndexes(params: String): Response { + return client().makeRequest("GET", "${RestGetRemoteIndexesAction.ROUTE}?$params") + } + + private fun toggleRemoteMonitoring(setting: Boolean) { + if (remoteMonitoringEnabled != setting) { + client().updateSettings(CROSS_CLUSTER_MONITORING_ENABLED.key, setting) + + val settings = client().getSettings() + val updatedSetting = getEnabledSetting(settings) + + if (setting) assertTrue(updatedSetting) + else assertFalse(updatedSetting) + + remoteMonitoringEnabled = updatedSetting + + compileRemoteClustersList(settings) + } + } + + private fun compileRemoteClustersList(settings: Map) { + if (remoteClusters.isEmpty()) { + val remotes = settings["persistent.cluster.remote"] as Map? + remoteClusters = remotes?.keys?.toList() ?: emptyList() + } + } + + private fun getEnabledSetting(settings: Map): Boolean { + val persistentSettings = settings["persistent"] as Map + val updatedSetting = persistentSettings[CROSS_CLUSTER_MONITORING_ENABLED.key] + assertNotNull(updatedSetting) + return (updatedSetting as String).toBoolean() + } + + private fun formatMappingsJson(fieldToTypePairs: List>): String { + val builder = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + fieldToTypePairs.forEach { + builder.startObject(it.first) + .field("type", it.second) + .endObject() + } + builder.endObject().endObject() + val mappingsJson = builder.string() + return mappingsJson.substring(1, mappingsJson.lastIndex) + } + + private fun assertClusterHealth(health: String) { + try { + ClusterHealthStatus.fromString(health) + } catch (e: IllegalArgumentException) { + fail("Should not throw IllegalArgumentException.") + } + } +}