Skip to content

Commit

Permalink
fan_out logic
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 28, 2024
1 parent da6afbc commit 43d03c3
Show file tree
Hide file tree
Showing 26 changed files with 1,975 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.opensearch.action.ActionRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
Expand Down Expand Up @@ -51,6 +52,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
Expand Down Expand Up @@ -211,7 +213,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
)
}

Expand Down Expand Up @@ -322,6 +325,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.DOC_LEVEL_MONITOR_FAN_OUT_NODES,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID

// TODO raise PR for bucket level monitor optimization also. dont miss
object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)

Expand All @@ -62,7 +64,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
periodEnd: Instant,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String
executionId: String,
transportService: TransportService?
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.commons.alerting.model.Table
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.core.common.Strings
import org.opensearch.transport.TransportService
import java.time.Instant

abstract class MonitorRunner {
Expand All @@ -43,7 +44,8 @@ abstract class MonitorRunner {
periodEnd: Instant,
dryRun: Boolean,
workflowRunContext: WorkflowRunContext? = null,
executionId: String
executionId: String,
transportService: TransportService?
): MonitorRunResult<*>

suspend fun runAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteMonitorRequest
import org.opensearch.alerting.action.ExecuteMonitorResponse
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.ExecuteWorkflowRequest
import org.opensearch.alerting.action.ExecuteWorkflowResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
Expand All @@ -21,9 +27,12 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FAN_OUT_NODES
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
Expand All @@ -40,6 +49,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.lifecycle.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
Expand All @@ -53,10 +63,12 @@ import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext

object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
Expand Down Expand Up @@ -173,6 +185,24 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.maxActionableAlertCount = it
}

monitorCtx.totalNodesFanOut = DOC_LEVEL_MONITOR_FAN_OUT_NODES.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FAN_OUT_NODES) {
monitorCtx.totalNodesFanOut = it
}

monitorCtx.percQueryMaxNumDocsInMemory =
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) {
monitorCtx.percQueryMaxNumDocsInMemory = it
}

monitorCtx.percQueryDocsSizeMemoryPercentageLimit =
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) {
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)

return this
Expand Down Expand Up @@ -261,29 +291,70 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)

monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
ExecuteWorkflowAction.INSTANCE,
ExecuteWorkflowRequest(false, TimeValue(1, TimeUnit.DAYS), job.id, job),
it
)
}
}
}

is Monitor -> {
launch {
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
// runJob(job, periodStart, periodEnd, false)
val executeMonitorRequest = ExecuteMonitorRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorResponse> {
monitorCtx.client!!.execute(
ExecuteMonitorAction.INSTANCE,
executeMonitorRequest,
it
)
}
}
}

else -> {
throw IllegalArgumentException("Invalid job type")
}
}
}

suspend fun runJob(workflow: Workflow, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): WorkflowRunResult {
return CompositeWorkflowRunner.runWorkflow(workflow, monitorCtx, periodStart, periodEnd, dryrun)
suspend fun runJob(
workflow: Workflow,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
transportService: TransportService,
): WorkflowRunResult {
return CompositeWorkflowRunner.runWorkflow(
workflow,
monitorCtx,
periodStart,
periodEnd,
dryrun,
transportService
)
}

suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): MonitorRunResult<*> {
suspend fun runJob(
job: ScheduledJob,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
transportService: TransportService,
): MonitorRunResult<*> {
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
// has not been updated.
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
Expand All @@ -303,7 +374,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon

if (job is Workflow) {
logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun")
CompositeWorkflowRunner.runWorkflow(workflow = job, monitorCtx, periodStart, periodEnd, dryrun)
monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!
.execute(
ExecuteWorkflowAction.INSTANCE,
ExecuteWorkflowRequest(false, TimeValue(1, TimeUnit.DAYS), job.id, job)
)
}
}
val monitor = job as Monitor
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
Expand All @@ -312,11 +389,35 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
BucketLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
DocumentLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else {
QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
QueryLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
}
return runResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.transport.TransportService
import java.time.Instant

object QueryLevelMonitorRunner : MonitorRunner() {
Expand All @@ -28,7 +29,8 @@ object QueryLevelMonitorRunner : MonitorRunner() {
periodEnd: Instant,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String
executionId: String,
transportService: TransportService?
): MonitorRunResult<QueryLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionType

class DocLevelMonitorFanOutAction private constructor() : ActionType<DocLevelMonitorFanOutResponse>(NAME, ::DocLevelMonitorFanOutResponse) {
companion object {
val INSTANCE = DocLevelMonitorFanOutAction()
const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout"
}
}
Loading

0 comments on commit 43d03c3

Please sign in to comment.