diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt index 8872b525..4ecf1e67 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt @@ -21,7 +21,8 @@ data class IndexExecutionContext( val updatedIndexNames: List, val concreteIndexNames: List, val conflictingFields: List, - val docIds: List? = emptyList() + val docIds: List? = emptyList(), + val findingIds: List? = emptyList() ) : Writeable, ToXContent { @Throws(IOException::class) @@ -34,7 +35,8 @@ data class IndexExecutionContext( updatedIndexNames = sin.readStringList(), concreteIndexNames = sin.readStringList(), conflictingFields = sin.readStringList(), - docIds = sin.readOptionalStringList() + docIds = sin.readOptionalStringList(), + findingIds = sin.readOptionalStringList() ) override fun writeTo(out: StreamOutput?) { @@ -47,6 +49,7 @@ data class IndexExecutionContext( out.writeStringCollection(concreteIndexNames) out.writeStringCollection(conflictingFields) out.writeOptionalStringCollection(docIds) + out.writeOptionalStringCollection(findingIds) } override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { @@ -60,6 +63,7 @@ data class IndexExecutionContext( .field("concrete_index_names", concreteIndexNames) .field("conflicting_fields", conflictingFields) .field("doc_ids", docIds) + .field("finding_ids", findingIds) .endObject() return builder } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index bccfccfe..a0a5ed5b 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -43,6 +43,7 @@ data class Monitor( val uiMetadata: Map, val dataSources: DataSources = DataSources(), val deleteQueryIndexInEveryRun: Boolean? = false, + val shouldCreateSingleAlertForFindings: Boolean? = false, val owner: String? = "alerting" ) : ScheduledJob { @@ -112,6 +113,7 @@ data class Monitor( DataSources() }, deleteQueryIndexInEveryRun = sin.readOptionalBoolean(), + shouldCreateSingleAlertForFindings = sin.readOptionalBoolean(), owner = sin.readOptionalString() ) @@ -172,6 +174,7 @@ data class Monitor( if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata) builder.field(DATA_SOURCES_FIELD, dataSources) builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun) + builder.field(SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD, shouldCreateSingleAlertForFindings) builder.field(OWNER_FIELD, owner) if (params.paramAsBoolean("with_type", false)) builder.endObject() return builder.endObject() @@ -224,6 +227,7 @@ data class Monitor( out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field dataSources.writeTo(out) out.writeOptionalBoolean(deleteQueryIndexInEveryRun) + out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) out.writeOptionalString(owner) } @@ -245,6 +249,7 @@ data class Monitor( const val DATA_SOURCES_FIELD = "data_sources" const val ENABLED_TIME_FIELD = "enabled_time" const val DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD = "delete_query_index_in_every_run" + const val SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD = "should_create_single_alert_for_findings" const val OWNER_FIELD = "owner" val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") @@ -274,6 +279,7 @@ data class Monitor( val inputs: MutableList = mutableListOf() var dataSources = DataSources() var deleteQueryIndexInEveryRun = false + var delegateMonitor = false var owner = "alerting" XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -332,6 +338,11 @@ data class Monitor( } else { xcp.booleanValue() } + SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD -> delegateMonitor = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + delegateMonitor + } else { + xcp.booleanValue() + } OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() else -> { xcp.skipChildren() @@ -360,6 +371,7 @@ data class Monitor( uiMetadata, dataSources, deleteQueryIndexInEveryRun, + delegateMonitor, owner ) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt index d478315e..5d3cd7c1 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt @@ -18,7 +18,8 @@ data class WorkflowRunContext( val workflowMetadataId: String, val chainedMonitorId: String?, val matchingDocIdsPerIndex: Map>, - val auditDelegateMonitorAlerts: Boolean + val auditDelegateMonitorAlerts: Boolean, + val findingIds: List? = null ) : Writeable, ToXContentObject { companion object { fun readFrom(sin: StreamInput): WorkflowRunContext { @@ -31,7 +32,8 @@ data class WorkflowRunContext( sin.readString(), sin.readOptionalString(), sin.readMap() as Map>, - sin.readBoolean() + sin.readBoolean(), + sin.readOptionalStringList() ) override fun writeTo(out: StreamOutput) { @@ -40,6 +42,7 @@ data class WorkflowRunContext( out.writeOptionalString(chainedMonitorId) out.writeMap(matchingDocIdsPerIndex) out.writeBoolean(auditDelegateMonitorAlerts) + out.writeOptionalStringCollection(findingIds) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { @@ -49,6 +52,7 @@ data class WorkflowRunContext( .field("chained_monitor_id", chainedMonitorId) .field("matching_doc_ids_per_index", matchingDocIdsPerIndex) .field("audit_delegate_monitor_alerts", auditDelegateMonitorAlerts) + .field("finding_ids", findingIds) .endObject() return builder } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index dda45483..1ef82f18 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -89,4 +89,66 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) } + + @Test + fun `test doc level monitor fan out request as stream with matching docIds with findings per index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true, + listOf("finding1") + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + docLevelMonitorFanOutRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) + assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId) + assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata) + assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) + assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) + assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + } }