Skip to content

Commit

Permalink
add monitor metadata and monitor objects to fan out request to avoid …
Browse files Browse the repository at this point in the history
…disk seeks

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 9, 2024
1 parent 093ab6f commit 89ebb3f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package org.opensearch.alerting.action
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
Expand All @@ -14,20 +16,23 @@ import java.io.IOException
class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

val nodeId: String
val monitorId: String
val monitor: Monitor
val monitorMetadata: MonitorMetadata
val executionId: String
val indexExecutionContexts: List<IndexExecutionContext>
val shardIds: List<ShardId>

constructor(
nodeId: String,
monitorId: String,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
executionId: String,
indexExecutionContexts: List<IndexExecutionContext>,
shardIds: List<ShardId>,
) : super() {
this.nodeId = nodeId
this.monitorId = monitorId
this.monitor = monitor
this.monitorMetadata = monitorMetadata
this.executionId = executionId
this.indexExecutionContexts = indexExecutionContexts
this.shardIds = shardIds
Expand All @@ -38,7 +43,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
nodeId = sin.readString(),
monitorId = sin.readString(),
monitor = Monitor.readFrom(sin)!!,
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
indexExecutionContexts = sin.readList { IndexExecutionContext(sin) },
shardIds = sin.readList(::ShardId)
Expand All @@ -47,13 +53,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(nodeId)
out.writeString(monitorId)
monitor.writeTo(out)
monitorMetadata.writeTo(out)
out.writeString(executionId)
out.writeCollection(indexExecutionContexts)
out.writeCollection(shardIds)
}

override fun validate(): ActionRequestValidationException? {
override fun validate(): ActionRequestValidationException {
var actionValidationException: ActionRequestValidationException? = null
if (shardIds.isEmpty()) {
actionValidationException = ActionRequestValidationException()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.core.action.ActionListener
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

private val log = LogManager.getLogger(TransportDocLevelMonitorFanOutAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

class TransportDocLevelMonitorFanOutAction @Inject constructor(
transportService: TransportService,
val client: Client,
val actionFilters: ActionFilters,
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val monitorCtx: MonitorRunnerExecutionContext,

) : HandledTransportAction<DocLevelMonitorFanOutRequest, DocLevelMonitorFanOutResponse>(
AlertingActions.INDEX_MONITOR_ACTION_NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest
),
SecureTransportAction {

@Volatile
override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
override fun doExecute(
task: Task,
request: DocLevelMonitorFanOutRequest,
listener: ActionListener<DocLevelMonitorFanOutResponse>,
) {
executeMonitor(request, monitorCtx)
}

private fun executeMonitor(request: DocLevelMonitorFanOutRequest, monitorCtx: MonitorRunnerExecutionContext) {
}
}

0 comments on commit 89ebb3f

Please sign in to comment.