Skip to content

Commit

Permalink
[Backport 2.5] Manual backports of several PRs (#957)
Browse files Browse the repository at this point in the history
* [Backport 2.x] QueryIndex rollover when field mapping limit is reached (#729)

Signed-off-by: Petar Dzepina <[email protected]>
Signed-off-by: AWSHurneyt <[email protected]>

* log error messages and clean up monitor when indexing doc level queries or metadata creation fails (#900) (#912)

* log errors and clean up monitor when indexing doc level queries or metadata creation fails
* refactor delete monitor action to re-use delete methods
Signed-off-by: Surya Sashank Nistala <[email protected]>
Signed-off-by: AWSHurneyt <[email protected]>

* [Backport 2.x] Notification security fix (#861)

* Notification security fix (#852)

* added injecting whole user object in threadContext before calling notification APIs so that backend roles are available to notification plugin

Signed-off-by: Petar Dzepina <[email protected]>

* compile fix

Signed-off-by: Petar Dzepina <[email protected]>

* refactored user_info injection to use InjectSecurity

Signed-off-by: Petar Dzepina <[email protected]>

* ktlint fix

Signed-off-by: Petar Dzepina <[email protected]>

---------

Signed-off-by: Petar Dzepina <[email protected]>
(cherry picked from commit e0b7a5a)

* remove unneeded import

Signed-off-by: Ashish Agrawal <[email protected]>

---------

Signed-off-by: Ashish Agrawal <[email protected]>
Co-authored-by: Petar Dzepina <[email protected]>
Co-authored-by: Ashish Agrawal <[email protected]>
Signed-off-by: AWSHurneyt <[email protected]>

* Added missing imports.

Signed-off-by: AWSHurneyt <[email protected]>

* Multiple indices support in DocLevelMonitorInput (#784) (#808)

Signed-off-by: Petar Dzepina <[email protected]>

* Removed redundant calls to initDocLevelQueryIndex and indexDocLevelQueries.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed a bug that prevented alerts from being generated for doc level monitors that use wildcard characters in index names. (#894) (#902)

Signed-off-by: AWSHurneyt <[email protected]>
(cherry picked from commit 8c033b9)

Co-authored-by: AWSHurneyt <[email protected]>
Signed-off-by: AWSHurneyt <[email protected]>

* Resolved backport issue for PR 729.

Signed-off-by: AWSHurneyt <[email protected]>

* Resolved backport issue for PR 758.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: Petar Dzepina <[email protected]>
Signed-off-by: AWSHurneyt <[email protected]>
Signed-off-by: Ashish Agrawal <[email protected]>
Co-authored-by: Petar Dzepina <[email protected]>
Co-authored-by: Surya Sashank Nistala <[email protected]>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Co-authored-by: Ashish Agrawal <[email protected]>
  • Loading branch information
5 people authored Jun 8, 2023
1 parent 5cc4229 commit 8cb40c8
Show file tree
Hide file tree
Showing 14 changed files with 583 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerindexNameExpressionResolver(indexNameExpressionResolver)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
Expand All @@ -24,9 +22,11 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
Expand Down Expand Up @@ -54,6 +54,7 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand Down Expand Up @@ -88,13 +89,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
monitor,
createWithRunContext = false,
skipIndex = isTempMonitor
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]

val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
Expand All @@ -107,6 +108,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsToQueries = mutableMapOf<String, MutableList<String>>()

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
Expand All @@ -115,12 +123,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexTimeout = monitorCtx.indexTimeout!!
)

val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
Expand All @@ -131,8 +133,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
Expand Down Expand Up @@ -385,9 +392,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
throw IOException("Invalid input with document-level-monitor.")
}

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
if (docLevelMonitorInput.indices.size > 1) {
throw IOException("Only one index is supported with document-level-monitor.")
if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) {
throw IllegalArgumentException("DocLevelMonitorInput has no indices")
}
}

Expand All @@ -414,13 +420,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// new index is monitored from the beginning of that index
private fun createdRecently(
monitor: Monitor,
index: String,
periodStart: Instant,
periodEnd: Instant,
getIndexResponse: GetIndexResponse
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli()
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
Expand Down
28 changes: 18 additions & 10 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,24 @@ abstract class MonitorRunner {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
withClosableContext(
InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)
) {
actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification(
action,
monitorCtx,
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!
)
val client = monitorCtx.client
client!!.threadPool().threadContext.stashContext().use {
withClosableContext(
InjectorContextElement(
monitor.id,
monitorCtx.settings!!,
monitorCtx.threadPool!!.threadContext,
monitor.user?.roles,
monitor.user
)
) {
actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification(
action,
monitorCtx,
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!
)
}
}
}
ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
Expand All @@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext(
var clusterService: ClusterService? = null,
var client: Client? = null,
var xContentRegistry: NamedXContentRegistry? = null,
var indexNameExpressionResolver: IndexNameExpressionResolver? = null,
var scriptService: ScriptService? = null,
var settings: Settings? = null,
var threadPool: ThreadPool? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -75,6 +76,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService {
this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver
return this
}

fun registerScriptService(scriptService: ScriptService): MonitorRunnerService {
this.monitorCtx.scriptService = scriptService
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.alerts

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
Expand Down Expand Up @@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
Expand Down Expand Up @@ -357,8 +359,12 @@ class AlertIndices(
return try {
val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
createIndexResponse.isAcknowledged
} catch (e: ResourceAlreadyExistsException) {
true
} catch (t: Exception) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
true
} else {
throw AlertingException.wrap(t)
}
}
}

Expand Down
Loading

0 comments on commit 8cb40c8

Please sign in to comment.