Skip to content

Commit

Permalink
clean up doc level queries on dry run (#1430)
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored and engechas committed Mar 14, 2024
1 parent 9824ef0 commit 763d97a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
} else {
// Clean up any queries created by the dry run monitor
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
Expand All @@ -39,7 +42,14 @@ import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

Expand Down Expand Up @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return true
}

suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
try {
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.existsQuery("monitor_id"))
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))

val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(queryBuilder)
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}
response.bulkFailures.forEach {
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
}
}
} catch (e: Exception) {
log.error("Failed to delete doc level queries on dry run", e)
}
}

fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
val clusterState = clusterService.state()
return clusterState.metadata.hasAlias(dataSources.queryIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {
Expand Down

0 comments on commit 763d97a

Please sign in to comment.