Skip to content

Commit

Permalink
fix workflow execution for first run
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Oct 3, 2023
1 parent 3647e9c commit 6a68bda
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,13 @@ class TransportIndexMonitorAction @Inject constructor(
}
try {
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
docLevelMonitorQueries.indexDocLevelMonitorQueries(
request.monitor,
indexResponse.id,
metadata,
request.refreshPolicy,
indexTimeout
)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(metadata, updating = true)
Expand Down Expand Up @@ -548,28 +554,6 @@ class TransportIndexMonitorAction @Inject constructor(
}
}

@Suppress("UNCHECKED_CAST")
private suspend fun indexDocLevelMonitorQueries(
monitor: Monitor,
monitorId: String,
monitorMetadata: MonitorMetadata,
refreshPolicy: RefreshPolicy
) {
val queryIndex = monitor.dataSources.queryIndex
if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) {
docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources)
log.info("Central Percolation index $queryIndex created")
}
docLevelMonitorQueries.indexDocLevelQueries(
monitor,
monitorId,
monitorMetadata,
refreshPolicy,
indexTimeout
)
log.debug("Queries inserted into Percolate index $queryIndex")
}

private suspend fun updateMonitor() {
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId)
try {
Expand Down Expand Up @@ -669,7 +653,13 @@ class TransportIndexMonitorAction @Inject constructor(
.filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id))
.execute(it)
}
indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy)
docLevelMonitorQueries.indexDocLevelMonitorQueries(
request.monitor,
currentMonitor.id,
updatedMetadata,
request.refreshPolicy,
indexTimeout
)
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
}
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunnerService.monitorCtx
import org.opensearch.alerting.WorkflowMetadataService
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.addFilter
Expand All @@ -39,10 +42,12 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_TH
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.CompositeWorkflowRunner
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
Expand Down Expand Up @@ -71,6 +76,9 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
Expand All @@ -90,6 +98,7 @@ class TransportIndexWorkflowAction @Inject constructor(
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val namedWriteableRegistry: NamedWriteableRegistry,
val docLevelMonitorQueries: DocLevelMonitorQueries
) : HandledTransportAction<ActionRequest, IndexWorkflowResponse>(
AlertingActions.INDEX_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::IndexWorkflowRequest
),
Expand Down Expand Up @@ -372,6 +381,42 @@ class TransportIndexWorkflowAction @Inject constructor(
)
return
}

val createdWorkflow = request.workflow.copy(id = indexResponse.id)
val executionId = CompositeWorkflowRunner.generateExecutionId(false, createdWorkflow)

val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = createdWorkflow,
skipIndex = false,
executionId = executionId
)

val delegates = (createdWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
)

if (created == false) {
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
docLevelMonitorQueries.indexDocLevelMonitorQueries(
monitor,
monitor.id,
monitorMetadata,
request.refreshPolicy,
indexTimeout
)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(monitorMetadata, updating = true)
}
actionListener.onResponse(
IndexWorkflowResponse(
indexResponse.id, indexResponse.version, indexResponse.seqNo,
Expand Down Expand Up @@ -499,6 +544,44 @@ class TransportIndexWorkflowAction @Inject constructor(
)
return
}

val updatedWorkflow = request.workflow.copy(id = indexResponse.id)
val executionId = CompositeWorkflowRunner.generateExecutionId(false, updatedWorkflow)

val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = updatedWorkflow,
skipIndex = false,
executionId = executionId
)

val delegates = (updatedWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
)

if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor)
client.suspendUntil<Client, BulkByScrollResponse> {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(monitor.dataSources.queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitor.id))
.execute(it)
}
docLevelMonitorQueries.indexDocLevelMonitorQueries(
monitor,
monitor.id,
updatedMetadata,
request.refreshPolicy,
indexTimeout
)
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
}
}
actionListener.onResponse(
IndexWorkflowResponse(
indexResponse.id, indexResponse.version, indexResponse.seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,27 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
private fun getWriteIndexNameForAlias(alias: String): String? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

@Suppress("UNCHECKED_CAST")
public suspend fun indexDocLevelMonitorQueries(
monitor: Monitor,
monitorId: String,
monitorMetadata: MonitorMetadata,
refreshPolicy: RefreshPolicy,
indexTimeout: TimeValue
) {
val queryIndex = monitor.dataSources.queryIndex
if (!docLevelQueryIndexExists(monitor.dataSources)) {
initDocLevelQueryIndex(monitor.dataSources)
log.info("Central Percolation index $queryIndex created")
}
indexDocLevelQueries(
monitor,
monitorId,
monitorMetadata,
refreshPolicy,
indexTimeout
)
log.debug("Queries inserted into Percolate index $queryIndex")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}
}

private fun generateExecutionId(
fun generateExecutionId(
isTempWorkflow: Boolean,
workflow: Workflow,
): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3151,7 +3151,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
} catch (ex: java.lang.Exception) {
exception = ex
}
assertTrue(exception is java.util.NoSuchElementException)
assertFalse(exception is java.util.NoSuchElementException)
}

fun `test execute workflow with custom alerts and finding index with bucket and doc monitor bucket monitor used as chained finding`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.HttpHeaders
import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.hc.core5.http.message.BasicHeader
import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Ignore
import org.opensearch.alerting.ALERTING_BASE_URI
import org.opensearch.alerting.ALERTING_DELETE_WORKFLOW_ACCESS
import org.opensearch.alerting.ALERTING_EXECUTE_WORKFLOW_ACCESS
Expand Down Expand Up @@ -63,7 +63,7 @@ import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant

// TODO investigate flaky nature of tests. not reproducible in local but fails in jenkins CI
@Ignore
@AwaitsFix(bugUrl = "")
@TestLogging("level:DEBUG", reason = "Debug for tests.")
@Suppress("UNCHECKED_CAST")
class SecureWorkflowRestApiIT : AlertingRestTestCase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,49 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
val acknowledged = acknowledgeChainedAlertsResponse["success"] as List<String>
Assert.assertEquals(acknowledged[0], alerts1[0]["id"])
}

fun `test run workflow as scheduled job success`() {
val index = createTestIndex()
val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)

val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = false
)
val monitorResponse = createMonitor(monitor)

val workflow = randomWorkflow(
monitorIds = listOf(monitorResponse.id),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)

val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity())

assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus())

val responseBody = createResponse.asMap()
val createdId = responseBody["_id"] as String
val createdVersion = responseBody["_version"] as Int

assertNotEquals("response is missing Id", Workflow.NO_ID, createdId)
assertTrue("incorrect version", createdVersion > 0)
assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location"))

val testDoc = """{
"message" : "This is an error from IAD region",
"test_field" : "us-west-2"
}"""

indexDoc(index, "1", testDoc)
Thread.sleep(80000)

val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
}
}

0 comments on commit 6a68bda

Please sign in to comment.