Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Dec 5, 2024
1 parent 4531e75 commit bdd8396
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data class Monitor(
val uiMetadata: Map<String, Any>,
val dataSources: DataSources = DataSources(),
val deleteQueryIndexInEveryRun: Boolean? = false,
val shouldPersistFindingsAndAlerts: Boolean? = false,
val shouldCreateSingleAlertForFindings: Boolean? = false,
val owner: String? = "alerting"
) : ScheduledJob {

Expand Down Expand Up @@ -113,7 +113,7 @@ data class Monitor(
DataSources()
},
deleteQueryIndexInEveryRun = sin.readOptionalBoolean(),
shouldPersistFindingsAndAlerts = sin.readOptionalBoolean(),
shouldCreateSingleAlertForFindings = sin.readOptionalBoolean(),
owner = sin.readOptionalString()
)

Expand Down Expand Up @@ -174,7 +174,7 @@ data class Monitor(
if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata)
builder.field(DATA_SOURCES_FIELD, dataSources)
builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun)
builder.field(SHOULD_PERSIST_FINDINGS_AND_ALERTS_FIELD, shouldPersistFindingsAndAlerts)
builder.field(SHOULD_PERSIST_FINDINGS_AND_ALERTS_FIELD, shouldCreateSingleAlertForFindings)
builder.field(OWNER_FIELD, owner)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
Expand Down Expand Up @@ -227,7 +227,7 @@ data class Monitor(
out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field
dataSources.writeTo(out)
out.writeOptionalBoolean(deleteQueryIndexInEveryRun)
out.writeOptionalBoolean(shouldPersistFindingsAndAlerts)
out.writeOptionalBoolean(shouldCreateSingleAlertForFindings)
out.writeOptionalString(owner)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ data class WorkflowRunContext(
val workflowId: String,
val workflowMetadataId: String,
val chainedMonitorId: String?,
val matchingDocIdsPerIndex: Pair<Map<String, List<String>>, List<String>>,
val auditDelegateMonitorAlerts: Boolean
val matchingDocIdsPerIndex: Map<String, List<String>>,
val auditDelegateMonitorAlerts: Boolean,
val findingIds: List<String>? = null
) : Writeable, ToXContentObject {
companion object {
fun readFrom(sin: StreamInput): WorkflowRunContext {
Expand All @@ -30,17 +31,18 @@ data class WorkflowRunContext(
sin.readString(),
sin.readString(),
sin.readOptionalString(),
Pair(sin.readMap() as Map<String, List<String>>, sin.readStringList()),
sin.readBoolean()
sin.readMap() as Map<String, List<String>>,
sin.readBoolean(),
sin.readOptionalStringList()
)

override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeString(workflowMetadataId)
out.writeOptionalString(chainedMonitorId)
out.writeMap(matchingDocIdsPerIndex.first)
out.writeStringCollection(matchingDocIdsPerIndex.second)
out.writeMap(matchingDocIdsPerIndex)
out.writeBoolean(auditDelegateMonitorAlerts)
out.writeOptionalStringCollection(findingIds)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
Expand All @@ -50,6 +52,7 @@ data class WorkflowRunContext(
.field("chained_monitor_id", chainedMonitorId)
.field("matching_doc_ids_per_index", matchingDocIdsPerIndex)
.field("audit_delegate_monitor_alerts", auditDelegateMonitorAlerts)
.field("finding_ids", findingIds)
.endObject()
return builder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DocLevelMonitorFanOutRequestTests {
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
Pair(mutableMapOf("index" to listOf("1")), listOf("finding1")),
mutableMapOf("index" to listOf("1")),
true
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
Expand All @@ -89,4 +89,66 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
}

@Test
fun `test doc level monitor fan out request as stream with matching docIds with findings per index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
val monitorMetadata = MonitorMetadata(
"test",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Monitor.NO_ID,
listOf(ActionExecutionTime("", Instant.now())),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
)
val indexExecutionContext = IndexExecutionContext(
listOf(docQuery),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("index" to mutableMapOf("1" to "1")),
"test-index",
"test-index",
listOf("test-index"),
listOf("test-index"),
listOf("test-field"),
listOf("1", "2")
)
val workflowRunContext = WorkflowRunContext(
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
mutableMapOf("index" to listOf("1")),
true,
listOf("finding1")
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
false,
monitorMetadata,
UUID.randomUUID().toString(),
indexExecutionContext,
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
listOf("test-index"),
workflowRunContext
)
val out = BytesStreamOutput()
docLevelMonitorFanOutRequest.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId)
assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata)
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
}
}

0 comments on commit bdd8396

Please sign in to comment.