diff --git a/alerting/build.gradle b/alerting/build.gradle index efaff1b6f..5f6bd9098 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -424,9 +424,4 @@ run { useCluster testClusters.integTest } -// Only apply jacoco test coverage if we are running a local single node cluster -if (!usingRemoteCluster && !usingMultiNode) { - apply from: '../build-tools/opensearchplugin-coverage.gradle' -} - apply from: '../build-tools/pkgbuild.gradle' diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 4f8e473d9..e53187c7b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -338,6 +338,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED, DestinationSettings.EMAIL_USERNAME, DestinationSettings.EMAIL_PASSWORD, DestinationSettings.ALLOW_LIST, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index e5147a3e6..1d0473543 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException @@ -230,6 +231,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) } } + val fieldsToBeQueried = mutableSetOf() + + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) @@ -248,7 +264,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { transformedDocs, docsSizeInBytes, updatedIndexNames, - concreteIndicesSeenSoFar + concreteIndicesSeenSoFar, + ArrayList(fieldsToBeQueried) ) } } @@ -632,6 +649,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes: AtomicLong, monitorInputIndices: List, concreteIndices: List, + fieldsToBeQueried: List ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -647,7 +665,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo, maxSeqNo, null, - docIds + docIds, + fieldsToBeQueried ) transformedDocs.addAll( transformSearchHitsAndReconstructDocs( @@ -742,7 +761,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo: Long?, maxSeqNo: Long, query: String?, - docIds: List? = null + docIds: List? = null, + fieldsToFetch: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -768,6 +788,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) .preference(Preference.PRIMARY_FIRST.type()) + + if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { + logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") @@ -834,6 +862,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Response status is ${response.status()}" ) } + logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}") return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ @@ -855,7 +884,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ): List> { return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { - val sourceMap = hit.sourceAsMap + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only") + constructSourceMapFromFieldsInHit(hit) + } transformDocumentFieldNames( sourceMap, conflictingFields, @@ -875,6 +909,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() { }) } + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values + } + return sourceMap + } + /** * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index c7d571824..1bf2dc663 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -48,6 +48,16 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** + * Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries. + * Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards. + */ + val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( + "plugins.alerting.monitor.doc_level_monitor_fetch_only_query_fields_enabled", + true, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index d2ecc0907..1bfea4ebc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -49,6 +49,9 @@ }, "fields": { "type": "text" + }, + "query_field_names": { + "type": "keyword" } } }, diff --git a/core/build.gradle b/core/build.gradle index b1ecf7eac..7aeb8c284 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -19,7 +19,7 @@ dependencies { exclude group: 'com.google.guava' } implementation 'com.google.guava:guava:32.0.1-jre' - api "org.opensearch:common-utils:${common_utils_version}@jar" + api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar") implementation 'commons-validator:commons-validator:1.7' testImplementation "org.opensearch.test:framework:${opensearch_version}"