Skip to content

Commit

Permalink
[Backport 2.11] optimize doc-level monitor execution workflow for dat…
Browse files Browse the repository at this point in the history
…astreams (#1323)

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Dec 9, 2023
1 parent 30f7360 commit 96fc035
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val concreteIndices = IndexUtils.resolveAllIndices(
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
if (allConcreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}
Expand All @@ -141,7 +141,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// cleanup old indices that are not monitored anymore from the same monitor
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!concreteIndices.contains(ind)) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}
Expand All @@ -150,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

docLevelMonitorInput.indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
var lastWriteIndex: String? = null
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
Expand All @@ -179,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -216,11 +217,19 @@ object MonitorMetadataService :
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)

val indices = mutableListOf<String>()
if (IndexUtils.isAlias(index, clusterService.state()) ||
IndexUtils.isDataStream(index, clusterService.state())
) {
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
} else {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
indices.addAll(getIndexResponse.indices())
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

// Run through each backing index and apply appropriate mappings to query index
indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val updatedProperties = mutableMapOf<String, Any>()
val allFlattenPaths = mutableSetOf<Pair<String, String>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -153,5 +154,47 @@ class IndexUtils {

return result
}

@JvmStatic
fun isDataStream(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().dataStreams().containsKey(name)
}

@JvmStatic
fun isAlias(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().hasAlias(name)
}

@JvmStatic
fun getWriteIndex(index: String, clusterState: ClusterState): String? {
if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
if (metadata != null) {
return metadata.index.name
}
}
return null
}

@JvmStatic
fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
val filteredIndices = mutableListOf<String>()
val lookup = clusterState.metadata().indicesLookup
concreteIndices.forEach { indexName ->
val index = lookup[indexName]
val indexMetadata = clusterState.metadata.index(indexName)
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
if (indexMetadata.creationDate >= thresholdDate) {
filteredIndices.add(indexName)
}
}
}
return filteredIndices
}

@JvmStatic
fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
return clusterState.metadata.index(index).creationDate
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Superclass for tests that interact with an external test cluster using OpenSearch's RestClient
Expand Down Expand Up @@ -909,7 +911,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
Expand Down Expand Up @@ -945,6 +947,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return index
}

protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
return index
}

protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String {
try {
createIndex(
Expand Down Expand Up @@ -981,7 +988,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
val indexName = createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(indexName, false)
indicesMap[indexName] = isWriteIndex
val indexMap = mapOf(
Expand All @@ -998,17 +1005,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return mutableMapOf(alias to indicesMap)
}

protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
val indexPattern = "$datastream*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
if (useComponentTemplate) {
// Setup index_template
createComponentTemplateWithMappings(
"my_ds_component_template-$datastream",
componentTemplateMappings
)
}
createComposableIndexTemplate(
"my_index_template_ds-$datastream",
listOf(indexPattern),
(if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
mappings,
true,
0
)
createDataStream(datastream)
}

protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
client().makeRequest("PUT", "_data_stream/$datastream")
}

protected fun deleteDataStream(datastream: String) {
client().makeRequest("DELETE", "_data_stream/$datastream")
}

protected fun createIndexAlias(alias: String, mappings: String?) {
val indexPattern = "$alias*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
createComponentTemplateWithMappings(
"my_alias_component_template-$alias",
componentTemplateMappings
)
createComposableIndexTemplate(
"my_index_template_alias-$alias",
listOf(indexPattern),
"my_alias_component_template-$alias",
mappings,
false,
0
)
createTestIndex(
"$alias-000001",
null,
"""
"$alias": {
"is_write_index": true
}
""".trimIndent()
)
}

protected fun deleteIndexAlias(alias: String) {
client().makeRequest("DELETE", "$alias*/_alias/$alias")
}

protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
val body = """{"template" : { "mappings": {$mappings} }}"""
client().makeRequest(
"PUT",
"_component_template/$componentTemplateName",
emptyMap(),
StringEntity(body, ContentType.APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun createComposableIndexTemplate(
templateName: String,
indexPatterns: List<String>,
componentTemplateName: String?,
mappings: String?,
isDataStream: Boolean,
priority: Int
) {
var body = "{\n"
if (isDataStream) {
body += "\"data_stream\": { },"
}
body += "\"index_patterns\": [" +
indexPatterns.stream().collect(
Collectors.joining(",", "\"", "\"")
) + "],"
if (componentTemplateName == null) {
body += "\"template\": {\"mappings\": {$mappings}},"
}
if (componentTemplateName != null) {
body += "\"composed_of\": [\"$componentTemplateName\"],"
}
body += "\"priority\":$priority}"
client().makeRequest(
"PUT",
"_index_template/$templateName",
emptyMap(),
StringEntity(body, APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun getDatastreamWriteIndex(datastream: String): String {
val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
var respAsMap = responseAsMap(response)
if (respAsMap.containsKey("data_streams")) {
respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
val indices = respAsMap["indices"] as List<Map<String, Any>>
val index = indices.last()
return index["index_name"] as String
} else {
respAsMap = respAsMap[datastream] as Map<String, Object>
}
val indices = respAsMap["indices"] as Array<String>
return indices.last()
}

protected fun rolloverDatastream(datastream: String) {
client().makeRequest(
"POST",
datastream + "/_rollover",
emptyMap(),
null
)
}

protected fun randomAliasIndices(
alias: String,
num: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true,
): Map<String, Boolean> {
val indices = mutableMapOf<String, Boolean>()
val writeIndex = randomIntBetween(0, num)
val writeIndex = randomIntBetween(0, num - 1)
for (i: Int in 0 until num) {
var indexName = randomAlphaOfLength(10)
var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
while (indexName.equals(alias) || indices.containsKey(indexName))
indexName = randomAlphaOfLength(10)
indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
indices[indexName] = includeWriteIndex && i == writeIndex
}
return indices
Expand Down
Loading

0 comments on commit 96fc035

Please sign in to comment.