Skip to content

Commit

Permalink
fan out monitor request, response, action
Browse files Browse the repository at this point in the history
  • Loading branch information
eirsep committed Feb 9, 2024
1 parent d7580b3 commit 27322c6
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID

//TODO raise PR for bucket level monitor optimization also. dont miss
// TODO raise PR for bucket level monitor optimization also. dont miss
object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)
// map<nodeid, List<shardId>
// build DocLevelMonitorFanOutRequest
// groupedlistener
// monitorCtx.client.send request parallel calls

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
Expand Down Expand Up @@ -359,6 +363,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
"Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " +
"execution $executionId"
)
// construct metadata from all nodes' fanout
// response
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.alerting.action

import org.opensearch.action.ActionType

class DocLevelMonitorFanOutAction private constructor() : ActionType<DocLevelMonitorFanOutResponse>(NAME, ::DocLevelMonitorFanOutResponse) {
companion object {
val INSTANCE = DocLevelMonitorFanOutAction()
const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

val nodeId: String
val monitorId: String
val executionId: String
val indexExecutionContexts: List<IndexExecutionContext>
val shardIds: List<ShardId>

constructor(
nodeId: String,
monitorId: String,
executionId: String,
indexExecutionContexts: List<IndexExecutionContext>,
shardIds: List<ShardId>,
) : super() {
this.nodeId = nodeId
this.monitorId = monitorId
this.executionId = executionId
this.indexExecutionContexts = indexExecutionContexts
this.shardIds = shardIds
require(shardIds.isEmpty()) { }
require(indexExecutionContexts.isEmpty()) { }
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
nodeId = sin.readString(),
monitorId = sin.readString(),
executionId = sin.readString(),
indexExecutionContexts = sin.readList { IndexExecutionContext(sin) },
shardIds = sin.readList(::ShardId)
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(nodeId)
out.writeString(monitorId)
out.writeString(executionId)
out.writeCollection(indexExecutionContexts)
out.writeCollection(shardIds)
}

override fun validate(): ActionRequestValidationException? {
var actionValidationException: ActionRequestValidationException? = null
if (shardIds.isEmpty()) {
actionValidationException = ActionRequestValidationException()
actionValidationException.addValidationError("shard_ids is null or empty")
}
if (indexExecutionContexts.isEmpty())
if (actionValidationException == null)
actionValidationException = ActionRequestValidationException()
actionValidationException!!.addValidationError("index_execution_contexts is null or empty")
return actionValidationException
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("node_id", nodeId)
.field("monitor_id", nodeId)
.field("execution_id", nodeId)
.field("index_execution_contexts", indexExecutionContexts)
.field("shard_ids", shardIds)
return builder.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.alerting.action

import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {

val nodeId: String
val executionId: String
val monitorId: String
val shardIdFailureMap: Map<ShardId, Exception>
val findingIds: List<String>
// for shards not delegated to nodes sequence number would be -3 (new number shard was not queried),
val lastRunContexts: Map<String, Any>

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readString(),
sin.readString(),
sin.readMap() as Map<ShardId, Exception>,
sin.readStringList(),
sin.readMap()!! as Map<String, Any>,
)

constructor(
nodeId: String,
executionId: String,
monitorId: String,
shardIdFailureMap: Map<ShardId, Exception>,
findingIds: List<String>,
lastRunContexts: Map<String, Any>,
) : super() {
this.nodeId = nodeId
this.executionId = executionId
this.monitorId = monitorId
this.shardIdFailureMap = shardIdFailureMap
this.findingIds = findingIds
this.lastRunContexts = lastRunContexts
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeMap(lastRunContexts)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("last_run_contexts", lastRunContexts)
.endObject()
return builder
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,60 @@
package org.opensearch.alerting.model

import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

data class IndexExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: MutableMap<String, Any>,
val updatedLastRunContext: MutableMap<String, Any>,
val lastRunContext: MutableMap<String, Any>, // previous execution
val updatedLastRunContext: MutableMap<String, Any>, // without sequence numbers
val indexName: String,
val concreteIndexName: String,
val conflictingFields: List<String>,
val docIds: List<String>? = null
)
val docIds: List<String>? = emptyList(),
) : Writeable, ToXContent {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
queries = sin.readList { DocLevelQuery(sin) },
lastRunContext = sin.readMap(),
updatedLastRunContext = sin.readMap(),
indexName = sin.readString(),
concreteIndexName = sin.readString(),
conflictingFields = sin.readStringList(),
docIds = sin.readStringList()
)

override fun writeTo(out: StreamOutput?) {
out!!.writeCollection(queries)
out.writeMap(lastRunContext)
out.writeMap(updatedLastRunContext)
out.writeString(indexName)
out.writeString(concreteIndexName)
out.writeStringCollection(conflictingFields)
out.writeOptionalStringCollection(docIds)
}

override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
builder!!.startObject()
.field("queries", queries)
.field("last_run_context", lastRunContext)
.field("updated_last_run_context", updatedLastRunContext)
.field("index_name", indexName)
.field("concrete_index_name", concreteIndexName)
.field("conflicting_fields", conflictingFields)
.field("doc_ids", docIds)
.endObject()
return builder
}

companion object {
fun readFrom(sin: StreamInput): List<IndexExecutionContext> {
TODO("Not yet implemented")
}
}
}

0 comments on commit 27322c6

Please sign in to comment.