Skip to content

Commit

Permalink
optimize to fetch only relevant fields from source data in doc level …
Browse files Browse the repository at this point in the history
…monitor to submit to percolate query instead of docs_source

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jan 25, 2024
1 parent be29a78 commit 009cd61
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 10 deletions.
5 changes: 0 additions & 5 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,4 @@ run {
useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/opensearchplugin-coverage.gradle'
}

apply from: '../build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT,
LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE,
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED,
DestinationSettings.EMAIL_USERNAME,
DestinationSettings.EMAIL_PASSWORD,
DestinationSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.util.AlertingException
Expand Down Expand Up @@ -230,6 +231,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}
val fieldsToBeQueried = mutableSetOf<String>()

for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
logger.debug(
"Monitor ${monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
Expand All @@ -248,7 +264,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
transformedDocs,
docsSizeInBytes,
updatedIndexNames,
concreteIndicesSeenSoFar
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
}
}
Expand Down Expand Up @@ -632,6 +649,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsSizeInBytes: AtomicLong,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand All @@ -647,7 +665,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
fieldsToBeQueried
)
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
Expand Down Expand Up @@ -742,7 +761,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null
docIds: List<String>? = null,
fieldsToFetch: List<String>,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand All @@ -768,6 +788,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k
)
.preference(Preference.PRIMARY_FIRST.type())

if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) {
logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}")
request.source().fetchSource(false)
for (field in fieldsToFetch) {
request.source().fetchField(field)
}
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
Expand Down Expand Up @@ -834,6 +862,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
"Response status is ${response.status()}"
)
}
logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
return response.hits
}
/** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/
Expand All @@ -855,7 +884,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
val sourceMap = hit.sourceAsMap
val sourceMap = if (hit.hasSource()) {
hit.sourceAsMap
} else {
logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only")
constructSourceMapFromFieldsInHit(hit)
}
transformDocumentFieldNames(
sourceMap,
conflictingFields,
Expand All @@ -875,6 +909,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
})
}

private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap<String, Any> {
if (hit.fields == null)
return mutableMapOf()
val sourceMap: MutableMap<String, Any> = mutableMapOf()
for (field in hit.fields) {
if (field.value.values != null && field.value.values.isNotEmpty())
if (field.value.values.size == 1) {
sourceMap[field.key] = field.value.values[0]
} else sourceMap[field.key] = field.value.values
}
return sourceMap
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/**
* Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries.
* Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards.
*/
val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting(
"plugins.alerting.monitor.doc_level_monitor_fetch_only_query_fields_enabled",
true,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val INPUT_TIMEOUT = Setting.positiveTimeSetting(
"plugins.alerting.input_timeout",
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
},
"fields": {
"type": "text"
},
"query_field_names": {
"type": "keyword"
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies {
exclude group: 'com.google.guava'
}
implementation 'com.google.guava:guava:32.0.1-jre'
api "org.opensearch:common-utils:${common_utils_version}@jar"
api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar")
implementation 'commons-validator:commons-validator:1.7'

testImplementation "org.opensearch.test:framework:${opensearch_version}"
Expand Down

0 comments on commit 009cd61

Please sign in to comment.