Skip to content

Commit

Permalink
remove refresh action after indexing ioc findings
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed May 1, 2024
1 parent c914b23 commit 9d89a71
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import org.apache.logging.log4j.LogManager
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.admin.indices.refresh.RefreshResponse
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.GroupedActionListener
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
import org.opensearch.client.Client
Expand Down Expand Up @@ -87,13 +85,18 @@ class ThreatIntelDetectionService(
}
return iocsInData
} catch (e: Exception) {
log.error("TI_DEBUG: Failed to extract IoC's from the queryable data to scan against threat intel")
log.error("TI_DEBUG: Failed to extract IoC's from the queryable data to scan against threat intel", e)
return mutableSetOf()
}
}

private suspend fun searchTermsOnIndices(monitor: Monitor, iocs: List<String>, threatIntelIndices: List<String>) {
val iocSubLists = iocs.chunkSublists(BATCH_SIZE)
var i = iocSubLists.size
for (iocSubList in iocSubLists) {
if (iocSubList.isEmpty()) i--
}

// TODO get unique values from list first
val responses: Collection<SearchResponse> =
suspendCoroutine { cont -> // todo implement a listener that tolerates multiple exceptions
Expand All @@ -111,7 +114,7 @@ class ThreatIntelDetectionService(
cont.resumeWithException(e)
}
},
iocSubLists.size
i
)
// chunk all iocs from queryable data and perform terms query for matches
// if matched return only the ioc's that matched and not the entire document
Expand All @@ -125,6 +128,7 @@ class ThreatIntelDetectionService(
client.search(searchRequest, groupedListener)
}
}
log.error("num responses expected in groupedlistener: $i")
val iocMatches = mutableSetOf<String>()
for (response in responses) {
log.error("TI_DEBUG search response took: ${response.took} millis")
Expand All @@ -147,15 +151,12 @@ class ThreatIntelDetectionService(
}

suspend fun createFindings(monitor: Monitor, iocMatches: List<String>) {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
val findingsToTriggeredQueries = mutableMapOf<String, List<DocLevelQuery>>()

for (iocMatch in iocMatches) {
val finding = Finding(
id = "ioc" + UUID.randomUUID().toString(),
relatedDocIds = listOf(iocMatch),
relatedDocIds = listOf("ioc:$iocMatch"),
correlatedDocIds = listOf(),
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -167,34 +168,39 @@ class ThreatIntelDetectionService(
val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
log.debug("Findings: $findingStr")
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.opType(DocWriteRequest.OpType.CREATE)
}
bulkIndexFindings(monitor, indexRequests)
if(indexRequests.isNotEmpty())
bulkIndexFindings(monitor, indexRequests)
}

private suspend fun bulkIndexFindings(
monitor: Monitor,
indexRequests: List<IndexRequest>,
) {
indexRequests.chunked(1000).forEach { batch ->
val bulkResponse: BulkResponse = client.suspendUntil {
bulk(BulkRequest().add(batch), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
log.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
try {
for (batch in indexRequests) {
val bulkResponse: BulkResponse = client.suspendUntil {
val bulkRequest = BulkRequest().add(batch)
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
bulk(bulkRequest, it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
log.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}] with error ${item.failureMessage}")
}
}
} else {
log.error("TI_DEBUG: Monitor ${monitor.id}: [${bulkResponse.items.size}] findings successfully indexed.")
}
} else {
log.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
log.error("TI_DEBUG: completed index requests iteration")
} catch (e: Exception) {
log.error("TI_DEBUG: bulk index findings failed", e)
}
val res: RefreshResponse =
client.suspendUntil { client.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,14 @@ class TransportDocLevelMonitorFanOutAction
}
}
totalDocsQueriedStat += transformedDocs.size.toLong()
if ((monitor.inputs[0] as DocLevelMonitorInput).iocFieldNames.isNotEmpty())
if ((monitor.inputs[0] as DocLevelMonitorInput).iocFieldNames.isNotEmpty()) {
threatIntelDetectionService.scanDataAgainstThreatIntel(
monitor,
listOf(".opensearch-sap-threat-intel*"),
searchHitsBeingProcessed
)
log.error("TI_DEBUG: completed ioc scan for monitor ${monitor.id}")
}
} finally {
transformedDocs.clear()
docsSizeOfBatchInBytes = 0
Expand Down

0 comments on commit 9d89a71

Please sign in to comment.