diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt index f26b94910..9d581560e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/threatintel/ThreatIntelDetectionService.kt @@ -38,6 +38,7 @@ import kotlin.math.min private val log = LogManager.getLogger(TransportDocLevelMonitorFanOutAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + // todo logging n try-catch class ThreatIntelDetectionService( val client: Client, @@ -49,8 +50,8 @@ class ThreatIntelDetectionService( suspend fun scanDataAgainstThreatIntel(monitor: Monitor, threatIntelIndices: List, hits: List) { try { - val stringList = buildTerms(monitor, hits) // todo remove stringlist - searchTermsOnIndices(stringList.toList(), threatIntelIndices) + val stringList = buildTerms(monitor, hits) + searchTermsOnIndices(monitor, stringList.toList(), threatIntelIndices) } catch (e: Exception) { log.error("TI_DEBUG: failed to scan data against threat intel") } @@ -84,7 +85,7 @@ class ThreatIntelDetectionService( } } - private suspend fun searchTermsOnIndices(iocs: List, threatIntelIndices: List) { + private suspend fun searchTermsOnIndices(monitor: Monitor, iocs: List, threatIntelIndices: List) { val iocSubLists = iocs.chunkSublists(BATCH_SIZE) // TODO get unique values from list first val responses: Collection = @@ -108,9 +109,10 @@ class ThreatIntelDetectionService( // chunk all iocs from queryable data and perform terms query for matches // if matched return only the ioc's that matched and not the entire document for (iocSubList in iocSubLists) { + if (iocSubList.isEmpty()) continue val searchRequest = SearchRequest(*threatIntelIndices.toTypedArray()) val queryBuilder = QueryBuilders.boolQuery() - queryBuilder.filter(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery(IOC_FIELD_NAME, iocSubList))) + queryBuilder.filter(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery(IOC_FIELD_NAME, iocSubList))) searchRequest.source().query(queryBuilder) searchRequest.source().fetchSource(false).fetchField(IOC_FIELD_NAME) client.search(searchRequest, groupedListener) @@ -127,6 +129,7 @@ class ThreatIntelDetectionService( } } } + createFindings(monitor, iocMatches.toList()) } // Function to chunk a list into sublists of specified size diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 720c20c90..52256d70a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -142,7 +142,7 @@ class TransportDocLevelMonitorFanOutAction val scriptService: ScriptService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, - val threatIntelDetectionService: ThreatIntelDetectionService + val threatIntelDetectionService: ThreatIntelDetectionService, ) : HandledTransportAction( DocLevelMonitorFanOutAction.NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest ), @@ -898,7 +898,7 @@ class TransportDocLevelMonitorFanOutAction maxSeqNo: Long, docIds: List? = null, fieldsToFetch: List, - iocFields: List + iocFields: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -929,6 +929,7 @@ class TransportDocLevelMonitorFanOutAction } } + iocFields.forEach { request.source().fetchField(it) } val response: SearchResponse = client.suspendUntil { client.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 09594711d..598787559 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -442,7 +442,23 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { "type.subtype" : "some subtype", "supertype.type" : "some type" }""" + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 123456, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + val doc = "{\"ioc\" : \"12345\"}" + val doc1 = "{\"ioc\" : \"123456\"}" indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc1) + indexDoc(".opensearch-sap-threat-intel", "1", doc) + indexDoc(".opensearch-sap-threat-intel", "2", doc1) client().admin().indices().putMapping( PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") )