Skip to content

Commit

Permalink
addressing review comments on settings, log messages, stats
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 20, 2024
1 parent 5bf9f72 commit e05f344
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_MAX_DOCS,
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.DOC_LEVEL_MONITOR_DOCS_SIZE_HEAP_PERCENTAGE_LIMIT,
AlertingSettings.DOC_LEVEL_MONITOR_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_DOCS_SIZE_HEAP_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -69,7 +69,7 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.LongAdder
import java.util.stream.Collectors
import kotlin.math.max

Expand All @@ -88,10 +88,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
var nonPercolateSearchesTimeTaken = AtomicLong(0)
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
var docTransformTimeTaken = AtomicLong(0)
var nonPercolateSearchesTimeTaken = LongAdder()
var percolateQueriesTimeTaken = LongAdder()
var totalDocsQueried = LongAdder()
var docTransformTimeTaken = LongAdder()
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
Expand Down Expand Up @@ -163,7 +163,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
/* Contains list of docs source that are held in memory to submit to percolate query against query index.
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
val docsSizeInBytes = AtomicLong(0)
val docsSizeInBytes = LongAdder()
val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
docLevelMonitorInput.indices.forEach { indexName ->
Expand Down Expand Up @@ -350,20 +350,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
} finally {
logger.debug(
"PERF_DEBUG_STATS: Monitor ${monitor.id} " +
"Monitor ${monitor.id} " +
"Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken"
)
logger.debug(
"PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}",
"Monitor {} Time spent on percolate queries in millis: {}",
monitor.id,
percolateQueriesTimeTaken
)
logger.debug(
"PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}",
"Monitor {} Time spent on transforming doc fields in millis: {}",
monitor.id,
docTransformTimeTaken
)
logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried)
logger.debug("Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried)
}
}

Expand Down Expand Up @@ -605,7 +605,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
nonPercolateSearchesTimeTaken: AtomicLong
nonPercolateSearchesTimeTaken: LongAdder
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
Expand Down Expand Up @@ -648,7 +648,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long {
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: LongAdder): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand All @@ -664,7 +664,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
nonPercolateSearchesTimeTaken.add(response.took.millis)
if (response.hits.hits.isEmpty())
return -1L

Expand Down Expand Up @@ -693,13 +693,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
docsSizeInBytes: AtomicLong,
docsSizeInBytes: LongAdder,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
nonPercolateSearchesTimeTaken: AtomicLong,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong,
docTransformTimeTake: AtomicLong,
nonPercolateSearchesTimeTaken: LongAdder,
percolateQueriesTimeTaken: LongAdder,
totalDocsQueried: LongAdder,
docTransformTimeTake: LongAdder,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down Expand Up @@ -729,7 +729,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsSizeInBytes
)
)
docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime)
docTransformTimeTake.add(System.currentTimeMillis() - startTime)
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
Expand Down Expand Up @@ -759,26 +759,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

private fun shouldPerformPercolateQueryAndFlushInMemoryDocs(
docsSizeInBytes: AtomicLong,
docsSizeInBytes: LongAdder,
numDocs: Int,
monitorCtx: MonitorRunnerExecutionContext,
): Boolean {
return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) ||
return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.toLong(), monitorCtx) ||
isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx)
}

private suspend fun performPercolateQueryAndResetCounters(
monitorCtx: MonitorRunnerExecutionContext,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
docsSizeInBytes: AtomicLong,
docsSizeInBytes: LongAdder,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong
percolateQueriesTimeTaken: LongAdder,
totalDocsQueried: LongAdder
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
Expand All @@ -802,10 +802,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
totalDocsQueried.getAndAdd(transformedDocs.size.toLong())
totalDocsQueried.add(transformedDocs.size.toLong())
} finally {
transformedDocs.clear()
docsSizeInBytes.set(0)
docsSizeInBytes.reset()
}
}

Expand All @@ -820,7 +820,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null,
nonPercolateSearchesTimeTaken: AtomicLong,
nonPercolateSearchesTimeTaken: LongAdder,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand Down Expand Up @@ -851,7 +851,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
nonPercolateSearchesTimeTaken.add(response.took.millis)
return response.hits
}

Expand All @@ -863,7 +863,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
concreteIndices: List<String>,
monitorInputIndices: List<String>,
percolateQueriesTimeTaken: AtomicLong,
percolateQueriesTimeTaken: LongAdder,
): SearchHits {
val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList())
val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices))
Expand Down Expand Up @@ -915,8 +915,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
"Response status is ${response.status()}"
)
}
logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
percolateQueriesTimeTaken.getAndAdd(response.took.millis)
logger.debug("Monitor ${monitor.id} Percolate query time taken millis = ${response.took}")
percolateQueriesTimeTaken.add(response.took.millis)
return response.hits
}
/** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/
Expand All @@ -934,7 +934,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>,
docsSizeInBytes: AtomicLong,
docsSizeInBytes: LongAdder,
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
Expand All @@ -948,7 +948,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)
val sourceRef = BytesReference.bytes(xContentBuilder)
docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed())
docsSizeInBytes.add(sourceRef.ramBytesUsed())
return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef))
} catch (e: Exception) {
logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e)
Expand Down Expand Up @@ -1015,15 +1015,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
*
*/
private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
var thresholdPercentage = DOC_LEVEL_MONITOR_DOCS_SIZE_HEAP_PERCENTAGE_LIMIT.get(monitorCtx.settings)
val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

return docsBytesSize > thresholdBytes
}

private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
var maxNumDocsThreshold = DOC_LEVEL_MONITOR_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
return numDocs >= maxNumDocsThreshold
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
is Workflow -> {
launch {
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
"Executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
Expand All @@ -274,7 +274,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
is Monitor -> {
launch {
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
"Executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class AlertingSettings {
/** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query
* index in document level monitor execution.
*/
val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit",
10,
val DOC_LEVEL_MONITOR_DOCS_SIZE_HEAP_PERCENTAGE_LIMIT = Setting.intSetting(
"plugins.alerting.monitor.doc_level_monitor_docs_size_heap_percentage_limit",
5,
0,
100,
Setting.Property.NodeScope, Setting.Property.Dynamic
Expand All @@ -42,8 +42,8 @@ class AlertingSettings {
* query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current
* execution
*/
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
val DOC_LEVEL_MONITOR_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.doc_level_monitor_max_num_docs_in_memory",
300000, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
Expand Down

0 comments on commit e05f344

Please sign in to comment.