Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to support generic inputs and triggers in remote monitors #664

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package org.opensearch.commons.alerting.model
import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD
import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD
import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput.Companion.REMOTE_MONITOR_INPUT_FIELD
import org.opensearch.commons.notifications.model.BaseModel
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
Expand All @@ -14,7 +16,8 @@ interface Input : BaseModel {
enum class Type(val value: String) {
DOCUMENT_LEVEL_INPUT(DOC_LEVEL_INPUT_FIELD),
CLUSTER_METRICS_INPUT(URI_FIELD),
SEARCH_INPUT(SEARCH_FIELD);
SEARCH_INPUT(SEARCH_FIELD),
REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD);

override fun toString(): String {
return value
Expand All @@ -32,8 +35,10 @@ interface Input : BaseModel {
SearchInput.parseInner(xcp)
} else if (xcp.currentName() == Type.CLUSTER_METRICS_INPUT.value) {
ClusterMetricsInput.parseInner(xcp)
} else {
} else if (xcp.currentName() == Type.DOCUMENT_LEVEL_INPUT.value) {
DocLevelMonitorInput.parse(xcp)
} else {
RemoteMonitorInput.parse(xcp)
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
return input
Expand All @@ -46,6 +51,7 @@ interface Input : BaseModel {
Type.DOCUMENT_LEVEL_INPUT -> DocLevelMonitorInput(sin)
Type.CLUSTER_METRICS_INPUT -> ClusterMetricsInput(sin)
Type.SEARCH_INPUT -> SearchInput(sin)
Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.commons.alerting.model

import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.commons.notifications.model.BaseModel
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
Expand All @@ -14,7 +15,8 @@ interface Trigger : BaseModel {
QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD),
BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD),
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD),
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD);
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD),
REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD);

override fun toString(): String {
return value
Expand Down Expand Up @@ -55,6 +57,7 @@ interface Trigger : BaseModel {
Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin)
Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin)
Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin)
Type.REMOTE_MONITOR_TRIGGER -> RemoteMonitorTrigger(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.commons.alerting.model.remote.monitors

import org.opensearch.commons.alerting.model.Input
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.nio.ByteBuffer

data class RemoteMonitorInput(val input: BytesReference) : Input {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readBytesReference()
)

fun asTemplateArg(): Map<String, Any?> {
val bytes = input.toBytesRef().bytes
return mapOf(
INPUT_SIZE to bytes.size,
INPUT_FIELD to bytes
)
}

override fun name(): String {
return REMOTE_MONITOR_INPUT_FIELD
}

override fun writeTo(out: StreamOutput) {
out.writeBytesReference(input)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val bytes = input.toBytesRef().bytes
return builder.startObject()
.startObject(REMOTE_MONITOR_INPUT_FIELD)
.field(INPUT_SIZE, bytes.size)
.field(INPUT_FIELD, bytes)
.endObject()
.endObject()
}

companion object {
const val INPUT_FIELD = "input"
const val INPUT_SIZE = "size"
const val REMOTE_MONITOR_INPUT_FIELD = "remote_monitor_input"

fun parse(xcp: XContentParser): RemoteMonitorInput {
var bytes: ByteArray? = null
var size: Int = 0

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
INPUT_FIELD -> bytes = xcp.binaryValue()
INPUT_SIZE -> size = xcp.intValue()
}
}
val input = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size))
return RemoteMonitorInput(input)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.opensearch.commons.alerting.model.remote.monitors

import org.opensearch.common.CheckedFunction
import org.opensearch.common.UUIDs
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.core.ParseField
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.nio.ByteBuffer

data class RemoteMonitorTrigger(
override val id: String,
override val name: String,
override val severity: String,
override val actions: List<Action>,
val trigger: BytesReference
) : Trigger {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readString(),
sin.readString(),
sin.readList(::Action),
sin.readBytesReference()
)

fun asTemplateArg(): Map<String, Any?> {
val bytes = trigger.toBytesRef().bytes
return mapOf(
Trigger.ID_FIELD to id,
Trigger.NAME_FIELD to name,
Trigger.SEVERITY_FIELD to severity,
Trigger.ACTIONS_FIELD to actions.map { it.asTemplateArg() },
TRIGGER_SIZE to bytes.size,
TRIGGER_FIELD to bytes
)
}

override fun name(): String {
return REMOTE_MONITOR_TRIGGER_FIELD
}

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
out.writeString(severity)
out.writeCollection(actions)
out.writeBytesReference(trigger)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val bytes = trigger.toBytesRef().bytes
return builder.startObject()
.startObject(REMOTE_MONITOR_TRIGGER_FIELD)
.field(Trigger.ID_FIELD, id)
.field(Trigger.NAME_FIELD, name)
.field(Trigger.SEVERITY_FIELD, severity)
.field(Trigger.ACTIONS_FIELD, actions.toTypedArray())
.field(TRIGGER_SIZE, bytes.size)
.field(TRIGGER_FIELD, bytes)
.endObject()
.endObject()
}

companion object {
const val TRIGGER_FIELD = "trigger"
const val TRIGGER_SIZE = "size"
const val REMOTE_MONITOR_TRIGGER_FIELD = "remote_monitor_trigger"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
Trigger::class.java,
ParseField(REMOTE_MONITOR_TRIGGER_FIELD),
CheckedFunction { parseInner(it) }
)

fun parseInner(xcp: XContentParser): RemoteMonitorTrigger {
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
lateinit var name: String
lateinit var severity: String
val actions: MutableList<Action> = mutableListOf()
var bytes: ByteArray? = null
var size: Int = 0

if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) {
XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation)
}

// If the parser began on START_OBJECT, move to the next token so that the while loop enters on
// the fieldName (or END_OBJECT if it's empty).
if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken()
while (xcp.currentToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Trigger.ID_FIELD -> id = xcp.text()
Trigger.NAME_FIELD -> name = xcp.text()
Trigger.SEVERITY_FIELD -> severity = xcp.text()
Trigger.ACTIONS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
}
TRIGGER_FIELD -> bytes = xcp.binaryValue()
TRIGGER_SIZE -> size = xcp.intValue()
}
xcp.nextToken()
}
val trigger = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size))
return RemoteMonitorTrigger(id, name, severity, actions, trigger)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.commons.alerting.model.action.Throttle
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.commons.alerting.util.getBucketKeysHash
import org.opensearch.commons.alerting.util.string
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -514,6 +515,12 @@ fun parser(xc: String): XContentParser {
return parser
}

fun parser(xc: ByteArray): XContentParser {
val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc)
parser.nextToken()
return parser
}

fun xContentRegistry(): NamedXContentRegistry {
return NamedXContentRegistry(
listOf(
Expand All @@ -523,7 +530,8 @@ fun xContentRegistry(): NamedXContentRegistry {
BucketLevelTrigger.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
NoOpTrigger.XCONTENT_REGISTRY
NoOpTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY
) + SearchModule(Settings.EMPTY, emptyList()).namedXContents
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.ActionExecutionTime
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand All @@ -21,13 +23,13 @@ import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.script.Script
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID

class DocLevelMonitorFanOutRequestTests : OpenSearchTestCase() {
class DocLevelMonitorFanOutRequestTests {

@Test
fun `test doc level monitor fan out request as stream`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.randomDocumentLevelTriggerRunResult
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.test.OpenSearchTestCase

class DocLevelMonitorFanOutResponseTests : OpenSearchTestCase() {
class DocLevelMonitorFanOutResponseTests {

@Test
fun `test doc level monitor fan out response with errors as stream`() {
val docLevelMonitorFanOutResponse = DocLevelMonitorFanOutResponse(
"nodeid",
Expand All @@ -33,6 +36,7 @@ class DocLevelMonitorFanOutResponseTests : OpenSearchTestCase() {
assertEquals(docLevelMonitorFanOutResponse.triggerResults, newDocLevelMonitorFanOutResponse.triggerResults)
}

@Test
fun `test doc level monitor fan out response as stream`() {
val workflow = DocLevelMonitorFanOutResponse(
"nodeid",
Expand Down
Loading
Loading