Skip to content

Commit

Permalink
Added workflow metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz committed Mar 9, 2023
1 parent 8ded8b8 commit a593d38
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
@JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts"
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

var monitorMetadata = getMonitorMetadata(monitorCtx.client!!, monitorCtx.xContentRegistry!!, "${monitor.id}-metadata")
if (monitorMetadata == null) {
monitorMetadata = createMonitorMetadata(monitor.id)
monitorMetadata = createMonitorMetadata(monitor.id, workflowRunContext?.workflowId)
}

val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ abstract class MonitorRunner {
return NotificationActionConfigs(destination, channel)
}

protected fun createMonitorMetadata(monitorId: String): MonitorMetadata {
return MonitorMetadata("$monitorId-metadata", monitorId, emptyList(), emptyMap())
protected fun createMonitorMetadata(monitorId: String, workflowId: String? = null): MonitorMetadata {
return if (workflowId.isNullOrEmpty()) {
MonitorMetadata("$monitorId-metadata", monitorId, emptyList(), emptyMap())
} else {
MonitorMetadata("$monitorId-$workflowId-metadata", monitorId, emptyList(), emptyMap())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,28 @@ class AlertingConfigAccessor {
}
}

suspend fun getWorkflowMetadata(client: Client, xContentRegistry: NamedXContentRegistry, metadataId: String): WorkflowMetadata? {
return try {
val jobSource = getAlertingConfigDocumentSource(client, "Workflow Metadata", metadataId)
withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
jobSource, XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
WorkflowMetadata.parse(xcp)
}
} catch (e: IllegalStateException) {
if (e.message?.equals("Workflow Metadata document with id $metadataId not found or source is empty") == true) {
return null
} else throw e
} catch (e: IndexNotFoundException) {
if (e.message?.equals("no such index [.opendistro-alerting-config]") == true) {
return null
} else throw e
}
}

suspend fun getEmailAccountInfo(client: Client, xContentRegistry: NamedXContentRegistry, emailAccountId: String): EmailAccount {
val source = getAlertingConfigDocumentSource(client, "Email account", emailAccountId)
return withContext(Dispatchers.IO) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.util.instant
import java.io.IOException
import java.time.Instant

data class WorkflowMetadata(
val id: String,
val workflowId: String,
val monitorIds: List<String>,
val latestRunTime: Instant,
val latestExecutionId: String
) : Writeable, ToXContent {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
workflowId = sin.readString(),
monitorIds = sin.readStringList(),
latestRunTime = sin.readInstant(),
latestExecutionId = sin.readString()
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(workflowId)
out.writeStringCollection(monitorIds)
out.writeInstant(latestRunTime)
out.writeString(latestExecutionId)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA)
builder.field(WORKFLOW_ID_FIELD, workflowId)
.field(MONITOR_IDS_FIELD, monitorIds)
.field(LATEST_RUN_TIME, latestRunTime)
.field(LATEST_EXECUTION_ID, latestExecutionId)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}

companion object {
const val METADATA = "workflow_metadata"
const val WORKFLOW_ID_FIELD = "workflow_id"
const val MONITOR_IDS_FIELD = "monitor_ids"
const val LATEST_RUN_TIME = "latest_run_time"
const val LATEST_EXECUTION_ID = "latest_execution_id"

@JvmStatic @JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser): WorkflowMetadata {
lateinit var workflowId: String
var monitorIds = mutableListOf<String>()
lateinit var latestRunTime: Instant
lateinit var latestExecutionId: String

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
WORKFLOW_ID_FIELD -> workflowId = xcp.text()
MONITOR_IDS_FIELD -> {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
monitorIds.add(xcp.text())
}
}
LATEST_RUN_TIME -> latestRunTime = xcp.instant()!!
LATEST_EXECUTION_ID -> latestExecutionId = xcp.text()
}
}
return WorkflowMetadata(
"$workflowId-metadata",
workflowId = workflowId,
monitorIds = monitorIds,
latestRunTime = latestRunTime,
latestExecutionId = latestExecutionId
)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): WorkflowMetadata {
return WorkflowMetadata(sin)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.time.Instant
data class WorkflowRunResult(
val workflowRunResult: List<MonitorRunResult<*>> = mutableListOf(),
val executionStartTime: Instant,
val executionEndTime: Instant,
val executionEndTime: Instant? = null,
val executionId: String,
val error: Exception? = null
) : Writeable, ToXContent {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.WorkflowMetadata
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
Expand Down Expand Up @@ -134,3 +135,13 @@ suspend fun updateMonitorMetadata(client: Client, settings: Settings, monitorMet

return client.suspendUntil { client.index(indexRequest, it) }
}

suspend fun updateWorkflowMetadata(client: Client, settings: Settings, workflowMetadata: WorkflowMetadata): IndexResponse {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(workflowMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(workflowMetadata.id)
.timeout(AlertingSettings.INDEX_TIMEOUT.get(settings))

return client.suspendUntil { client.index(indexRequest, it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import org.opensearch.alerting.BucketLevelMonitorRunner
import org.opensearch.alerting.DocumentLevelMonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.QueryLevelMonitorRunner
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowMetadata
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.alerting.util.updateWorkflowMetadata
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.Monitor
Expand All @@ -34,15 +37,28 @@ object CompositeWorkflowRunner : WorkflowRunner() {
periodEnd: Instant,
dryRun: Boolean
): WorkflowRunResult {
val workflowExecutionId = workflow.id.plus(LocalDateTime.now()).plus(UUID.randomUUID().toString())
var workflowResult = WorkflowRunResult(mutableListOf(), periodStart, periodEnd, workflowExecutionId)
logger.debug("Workflow ${workflow.id} in $workflowExecutionId execution is running")
val workflowExecutionStartTime = Instant.now()

val executionId = workflow.id.plus(LocalDateTime.now()).plus(UUID.randomUUID().toString())
var workflowResult = WorkflowRunResult(mutableListOf(), workflowExecutionStartTime, null, executionId)
val isTempMonitor = dryRun || workflow.id == Workflow.NO_ID

logger.debug("Workflow ${workflow.id} in $executionId execution is running")
try {
val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
var monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)
// Validate the monitors size
validateMonitorSize(delegates, monitors, workflow)

var workflowMetadata = AlertingConfigAccessor.getWorkflowMetadata(
monitorCtx.client!!,
monitorCtx.xContentRegistry!!,
"${workflow.id}-metadata"
)
if (workflowMetadata == null) {
workflowMetadata = createWorkflowMetadata(workflow.id, delegates.map { it.monitorId }, executionId)
}

val monitorsById = monitors.associateBy { it.id }
val resultList = mutableListOf<MonitorRunResult<*>>()

Expand All @@ -58,10 +74,10 @@ object CompositeWorkflowRunner : WorkflowRunner() {
?: throw AlertingException.wrap(
IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, workflowExecutionId)
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId)
}

val workflowRunContext = WorkflowRunContext(delegate.chainedFindings?.monitorId, workflowExecutionId, indexToDocIds)
val workflowRunContext = WorkflowRunContext(workflow.id, delegate.chainedFindings?.monitorId, executionId, indexToDocIds)

val runResult = if (delegateMonitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(
Expand Down Expand Up @@ -97,8 +113,18 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}
resultList.add(runResult)
}
logger.debug("Workflow ${workflow.id} in $workflowExecutionId finished")
return workflowResult.copy(workflowRunResult = resultList)

logger.debug("Workflow ${workflow.id} in $executionId finished")
// Update metadata only if the workflow is not temp
if (!isTempMonitor) {
updateWorkflowMetadata(
monitorCtx.client!!,
monitorCtx.settings!!,
workflowMetadata.copy(latestRunTime = workflowExecutionStartTime, latestExecutionId = executionId)
)
}

return workflowResult.copy(workflowRunResult = resultList, executionEndTime = Instant.now())
} catch (e: Exception) {
logger.error("Failed to execute workflow. Error: ${e.message}")
return workflowResult.copy(error = AlertingException.wrap(e))
Expand All @@ -115,4 +141,8 @@ object CompositeWorkflowRunner : WorkflowRunner() {
throw IllegalStateException("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id")
}
}

private fun createWorkflowMetadata(workflowId: String, monitors: List<String>, executionId: String): WorkflowMetadata {
return WorkflowMetadata("$workflowId-metadata", workflowId, monitors, Instant.now(), executionId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.workflow

data class WorkflowRunContext(
val workflowId: String,
val chainedMonitorId: String?,
val workflowExecutionId: String,
val matchingDocIdsPerIndex: Map<String, List<String>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
}
}

fun `test execute workflow inout error`() {
fun `test execute workflow input error`() {
val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3"))
)
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/resources/mappings/scheduled-jobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,45 @@
"enabled": false
}
}
},
"workflow_metadata" : {
"properties": {
"workflow_id": {
"type": "keyword"
},
"monitor_ids": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"latest_run_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"latest_execution_id": {
"type": "keyword"
},
"last_action_execution_times": {
"type": "nested",
"properties": {
"action_id": {
"type": "keyword"
},
"execution_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
}
}
},
"last_run_context": {
"type": "object",
"enabled": false
}
}
}
}
}

0 comments on commit a593d38

Please sign in to comment.