Skip to content

Commit

Permalink
Added mappings for the workflow-metadata. Added integration tests for…
Browse files Browse the repository at this point in the history
… checking workflow metadata. Changed flow of workflow execution

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz committed Mar 9, 2023
1 parent a593d38 commit 8e0d28d
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class InputService(
val aggTriggerAfterKey: MutableMap<String, TriggerAfterKey> = mutableMapOf()

// If monitor execution is triggered from a workflow
val indexToDocIds = workflowRunContext?.matchingDocIdsPerIndex
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

// TODO: If/when multiple input queries are supported for Bucket-Level Monitor execution, aggTriggerAfterKeys will
// need to be updated to account for it
Expand All @@ -79,8 +79,8 @@ class InputService(
val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers)

// Rewrite query to consider the doc ids per given index
if (chainedFindingExist(indexToDocIds)) {
val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), indexToDocIds!!)
if (chainedFindingExist(matchingDocIdsPerIndex)) {
val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!)
rewrittenQuery.query(updatedSourceQuery)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchException
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand All @@ -17,19 +15,14 @@ import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.util.stream.Collectors

private val log = LogManager.getLogger(WorkflowService::class.java)

Expand Down Expand Up @@ -134,70 +127,4 @@ class WorkflowService(
}
return monitors
}

suspend fun getDocIdsPerFindingIndex(monitorId: String, workflowExecutionId: String): Map<String, List<String>> {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)

val getResponse: GetResponse = client.suspendUntil {
client.get(getRequest, it)
}

val monitor = if (!getResponse.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
}
} else throw IllegalStateException("Delegate monitors don't exist $monitorId")
// Search findings index per monitor and workflow execution id
val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, monitor.id))
.filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId))
val searchRequest = SearchRequest()
.source(
SearchSourceBuilder()
.query(bqb)
.version(true)
.seqNoAndPrimaryTerm(true)
)
.indices(monitor.dataSources.findingsIndex)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }

// Get the findings docs
val findings = mutableListOf<Finding>()
for (hit in searchResponse.hits) {
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val finding = Finding.parse(xcp)
findings.add(finding)
}

val indexToRelatedDocIdsMap = mutableMapOf<String, MutableList<String>>()

for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}

val toTypedArray = indexToRelatedDocIdsMap.keys.stream().collect(Collectors.toList()).toTypedArray()
val searchFindings = SearchRequest().indices(*toTypedArray)
val queryBuilder = QueryBuilders.boolQuery()
indexToRelatedDocIdsMap.forEach { entry ->
queryBuilder
.should()
.add(
BoolQueryBuilder()
.must(MatchQueryBuilder("_index", entry.key))
.must(TermsQueryBuilder("_id", entry.value))
)
}
searchFindings.source(SearchSourceBuilder().query(queryBuilder))
val finalQueryResponse: SearchResponse = client.suspendUntil { client.search(searchFindings, it) }

val indexDocIds = mutableMapOf<String, MutableList<String>>()
for (hit in finalQueryResponse.hits) {
indexDocIds.getOrPut(hit.index) { mutableListOf() }.add(hit.id)
}
return indexDocIds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.util.instant
import org.opensearch.commons.alerting.util.optionalTimeField
import java.io.IOException
import java.time.Instant

Expand Down Expand Up @@ -46,7 +47,7 @@ data class WorkflowMetadata(
if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA)
builder.field(WORKFLOW_ID_FIELD, workflowId)
.field(MONITOR_IDS_FIELD, monitorIds)
.field(LATEST_RUN_TIME, latestRunTime)
.optionalTimeField(LATEST_RUN_TIME, latestRunTime)
.field(LATEST_EXECUTION_ID, latestExecutionId)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.AlertingActions
Expand All @@ -44,12 +43,8 @@ import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Schedule
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.WorkflowInput
import org.opensearch.commons.alerting.util.IndexUtils
import org.opensearch.commons.alerting.util.instant
import org.opensearch.commons.authuser.User
import org.opensearch.commons.utils.recreateObject
import org.opensearch.index.IndexNotFoundException
Expand All @@ -58,11 +53,13 @@ import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.Locale

private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java)

/**
* Transport class that deletes the workflow.
* If the deleteDelegateMonitor flag is set to true, deletes the workflow delegates that are not part of another workflow
*/
class TransportDeleteWorkflowAction @Inject constructor(
transportService: TransportService,
val client: Client,
Expand Down Expand Up @@ -94,15 +91,22 @@ class TransportDeleteWorkflowAction @Inject constructor(
}

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteWorkflowAction")) {
DeleteWorkflowHandler(client, actionListener, deleteRequest, transformedRequest.deleteUnderlyingMonitors, user, transformedRequest.workflowId).resolveUserAndStart()
DeleteWorkflowHandler(
client,
actionListener,
deleteRequest,
transformedRequest.deleteDelegateMonitors,
user,
transformedRequest.workflowId
).resolveUserAndStart()
}
}

inner class DeleteWorkflowHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteWorkflowResponse>,
private val deleteRequest: DeleteRequest,
private val deleteUnderlyingMonitors: Boolean?,
private val deleteDelegateMonitors: Boolean?,
private val user: User?,
private val workflowId: String
) {
Expand All @@ -122,11 +126,10 @@ class TransportDeleteWorkflowAction @Inject constructor(

if (canDelete) {
val deleteResponse = deleteWorkflow(workflow)
// TODO - uncomment once the workflow metadata is added
// deleteMetadata(workflow)
if (deleteUnderlyingMonitors == true) {
val underlyingMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds()
val monitorIdsToBeDeleted = monitorsAreNotInDifferentWorkflows(workflowId, underlyingMonitorIds)
deleteMetadata(workflow)
if (deleteDelegateMonitors == true) {
val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds()
val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds)

// Delete the monitor ids
if (!monitorIdsToBeDeleted.isNullOrEmpty()) {
Expand Down Expand Up @@ -169,7 +172,13 @@ class TransportDeleteWorkflowAction @Inject constructor(
}
}

private suspend fun monitorsAreNotInDifferentWorkflows(workflowIdToBeDeleted: String, monitorIds: List<String>): List<String> {
/**
* Returns lit of monitor ids belonging only to a given workflow
* @param workflowIdToBeDeleted Id of the workflow that should be deleted
* @param monitorIds List of delegate monitor ids (underlying monitor ids)
*/
private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List<String>): List<String> {
// Retrieve monitors belonging to another workflows
val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter(
QueryBuilders.nestedQuery(
Workflow.WORKFLOW_DELEGATE_PATH,
Expand Down Expand Up @@ -204,85 +213,10 @@ class TransportDeleteWorkflowAction @Inject constructor(
workflow.copy(id = hit.id, version = hit.version)
}
val workflowMonitors = workflows.filter { it.id != workflowIdToBeDeleted }.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct()

// Monitors that can be deleted -> all monitors - monitors belonging to another workflows
return monitorIds.minus(workflowMonitors.toSet())
}

fun parse(xcp: XContentParser, id: String = Workflow.NO_ID, version: Long = Workflow.NO_VERSION): Workflow {
var name: String? = null
var workflowType: String = Workflow.WorkflowType.COMPOSITE.toString()
var user: User? = null
var schedule: Schedule? = null
var lastUpdateTime: Instant? = null
var enabledTime: Instant? = null
var enabled = true
var schemaVersion = IndexUtils.NO_SCHEMA_VERSION
val inputs: MutableList<WorkflowInput> = mutableListOf()
var owner = "alerting"

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Workflow.SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
Workflow.NAME_FIELD -> name = xcp.text()
Workflow.WORKFLOW_TYPE_FIELD -> {
workflowType = xcp.text()
val allowedTypes = Workflow.WorkflowType.values().map { it.value }
if (!allowedTypes.contains(workflowType)) {
throw IllegalStateException("Workflow type should be one of $allowedTypes")
}
}
Workflow.USER_FIELD -> {
user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp)
}
Workflow.ENABLED_FIELD -> enabled = xcp.booleanValue()
Workflow.SCHEDULE_FIELD -> schedule = Schedule.parse(xcp)
Workflow.INPUTS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
val input = WorkflowInput.parse(xcp)
inputs.add(input)
}
}
Workflow.ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
Workflow.LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant()
Workflow.OWNER_FIELD -> {
owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text()
}
else -> {
xcp.skipChildren()
}
}
}

if (enabled && enabledTime == null) {
enabledTime = Instant.now()
} else if (!enabled) {
enabledTime = null
}
return Workflow(
id,
version,
requireNotNull(name) { "Workflow name is null" },
enabled,
requireNotNull(schedule) { "Workflow schedule is null" },
lastUpdateTime ?: Instant.now(),
enabledTime,
Workflow.WorkflowType.valueOf(workflowType.uppercase(Locale.ROOT)),
user,
schemaVersion,
inputs.toList(),
owner
)
}

private suspend fun getWorkflow(): Workflow {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId)

Expand Down
Loading

0 comments on commit 8e0d28d

Please sign in to comment.