Skip to content

Commit

Permalink
parse query string query to fetch only fields being queried by doc le…
Browse files Browse the repository at this point in the history
…vel queries

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jan 18, 2024
1 parent b08c53f commit 7d572c6
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 11 deletions.
8 changes: 4 additions & 4 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,9 @@ run {
useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/opensearchplugin-coverage.gradle'
}
//// Only apply jacoco test coverage if we are running a local single node cluster
//if (!usingRemoteCluster && !usingMultiNode) {
// apply from: '../build-tools/opensearchplugin-coverage.gradle'
//}

apply from: '../build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.queryStringQuery.QueryShardContextFactory
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
Expand Down Expand Up @@ -285,8 +286,23 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
)

DeleteMonitorService.initialize(client)
QueryShardContextFactory.init(
client,
clusterService,
scriptService,
xContentRegistry,
namedWriteableRegistry,
environment
)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
return listOf(
sweeper,
scheduler,
runner,
scheduledJobIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator
)
}

override fun getSettings(): List<Setting<*>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.queryStringQuery.QueryStringQueryUtils
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
Expand Down Expand Up @@ -633,6 +634,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorInputIndices: List<String>,
concreteIndices: List<String>,
) {
val docLevelMonitorInput = monitor.inputs.get(0) as DocLevelMonitorInput

val extractFieldsFromQueryString = QueryStringQueryUtils.extractFieldsFromQueries(
docLevelMonitorInput.queries,
concreteIndexName
)
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
Expand All @@ -647,7 +654,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
extractFieldsFromQueryString
)
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
Expand Down Expand Up @@ -742,7 +750,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null
docIds: List<String>? = null,
fieldsToFetch: List<String>,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand All @@ -757,7 +766,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}

val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand All @@ -768,6 +776,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k
)
.preference(Preference.PRIMARY_FIRST.type())

if (fieldsToFetch.isNotEmpty()) {
// request.source().fetchSource(false)
// TODO enable fetchsource=false after checking what _source looks like while transforming hit for percolate
for (field in fieldsToFetch) {
request.source().fetchField(field)
}
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
Expand Down Expand Up @@ -855,7 +871,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
val sourceMap = hit.sourceAsMap
val sourceMap = if (hit.hasSource()) {
constructSourceMapFromFieldsInHit(hit)
} else hit.sourceAsMap
if (sourceMap.isEmpty()) {
logger.debug(
"Doc level Monitor $monitorId:" +
" Doc $hit doesn't have source nor selected fields with values. Skipping doc"
)
return null
}
transformDocumentFieldNames(
sourceMap,
conflictingFields,
Expand All @@ -875,6 +900,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
})
}

private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap<String, Any> {
if (hit.fields == null)
return mutableMapOf()
val sourceMap: MutableMap<String, Any> = mutableMapOf()
for (field in hit.fields) {
if (field.value.values != null && field.value.values.isNotEmpty())
sourceMap[field.key] = field.value.values
}
return sourceMap
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package org.opensearch.alerting.queryStringQuery

import org.opensearch.Version
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsModule
import org.opensearch.common.util.BigArrays
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.index.Index
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.env.Environment
import org.opensearch.index.IndexSettings
import org.opensearch.index.mapper.MapperService
import org.opensearch.index.query.QueryShardContext
import org.opensearch.index.similarity.SimilarityService
import org.opensearch.indices.IndicesModule
import org.opensearch.indices.analysis.AnalysisModule
import org.opensearch.plugins.MapperPlugin
import org.opensearch.plugins.PluginsService
import org.opensearch.script.ScriptService
import java.time.Instant

/**
* Creates QueryShardContext object which is used in QueryStringQuery rewrite.
* We need this because we have to use QueryStringQueryParser class which requires QueryShardContext as parameter
*/
object QueryShardContextFactory {
lateinit var client: Client
lateinit var clusterService: ClusterService
lateinit var scriptService: ScriptService
lateinit var xContentRegistry: NamedXContentRegistry
lateinit var namedWriteableRegistry: NamedWriteableRegistry
lateinit var environment: Environment

@Suppress("LongParameterList")
fun init(
client: Client,
clusterService: ClusterService,
scriptService: ScriptService,
xContentRegistry: NamedXContentRegistry,
namedWriteableRegistry: NamedWriteableRegistry,
environment: Environment
) {
this.client = client
this.clusterService = clusterService
this.scriptService = scriptService
this.xContentRegistry = xContentRegistry
this.namedWriteableRegistry = namedWriteableRegistry
this.environment = environment
}

private fun getIndexSettingsAndMetadata(indexName: String?): Triple<Index?, Settings?, IndexMetadata?> {
val index: Index?
val indexSettings: Settings?
val indexMetadata = clusterService.state().metadata.index(indexName)
?: throw IllegalArgumentException("Can't find IndexMetadata for index: [$indexName]")
index = indexMetadata.index
indexSettings = indexMetadata.settings
return Triple(index, indexSettings, indexMetadata)
}

fun createShardContext(indexName: String?): QueryShardContext {
val (index, indexSettings, indexMetadata) = getIndexSettingsAndMetadata(indexName)
val nodeSettings = Settings.builder()
.put("node.name", "dummyNodeName")
.put(Environment.PATH_HOME_SETTING.key, environment.tmpDir())
.build()
val pluginsService =
PluginsService(nodeSettings, null, null, null, listOf())
val additionalSettings = pluginsService.pluginSettings
val settingsModule = SettingsModule(
nodeSettings,
additionalSettings,
pluginsService.pluginSettingsFilter, emptySet()
)
val indexScopedSettings: IndexScopedSettings = settingsModule.indexScopedSettings
val idxSettings = newIndexSettings(index, indexSettings, indexScopedSettings)
val indicesModule = IndicesModule(pluginsService.filterPlugins(MapperPlugin::class.java))
val mapperRegistry = indicesModule.mapperRegistry
val analysisModule = AnalysisModule(environment, emptyList())
val indexAnalyzers = analysisModule.analysisRegistry.build(idxSettings)
val similarityService = SimilarityService(idxSettings, null, emptyMap())
val mapperService = MapperService(
idxSettings,
indexAnalyzers,
xContentRegistry,
similarityService,
mapperRegistry,
{ createShardContext(null) },
{ false },
scriptService
)
// In order to be able to call toQuery method on QueryBuilder, we need to setup mappings in MapperService
mapperService.merge("_doc", indexMetadata?.mapping()?.source(), MapperService.MergeReason.MAPPING_UPDATE)

return QueryShardContext(
0,
idxSettings,
BigArrays.NON_RECYCLING_INSTANCE,
null,
null,
mapperService,
null,
scriptService,
xContentRegistry,
namedWriteableRegistry,
null,
null,
{ Instant.now().toEpochMilli() },
null,
{ pattern -> Regex.simpleMatch(pattern, index?.name) },
{ true },
null
)
}

private fun newIndexSettings(index: Index?, settings: Settings?, indexScopedSettings: IndexScopedSettings?): IndexSettings? {
val build = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(settings)
.build()
val metadata = IndexMetadata.builder(index?.name).settings(build).build()
return IndexSettings(metadata, Settings.EMPTY, indexScopedSettings)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.alerting.queryStringQuery

import org.apache.lucene.search.Query
import org.opensearch.common.regex.Regex
import org.opensearch.index.query.QueryShardContext
import org.opensearch.index.search.QueryStringQueryParser

const val EXISTS = "_exists_"

class QueryStringQueryParserExt : QueryStringQueryParser {

val discoveredFields = mutableListOf<String>()
var hasLonelyTerms = false

constructor(context: QueryShardContext?, lenient: Boolean) : super(context, lenient)
constructor(context: QueryShardContext?, defaultField: String, lenient: Boolean) : super(context, defaultField, lenient)
constructor(context: QueryShardContext, resolvedFields: Map<String, Float>, lenient: Boolean) : super(context, resolvedFields, lenient)

override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? {
handleFieldQueryDiscovered(field)
return super.getFuzzyQuery(field, termStr, minSimilarity)
}
override fun getPrefixQuery(field: String?, termStr: String?): Query {
handleFieldQueryDiscovered(field)
return super.getPrefixQuery(field, termStr)
}
override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query {
handleFieldQueryDiscovered(field, queryText)
return super.getFieldQuery(field, queryText, quoted)
}
override fun getWildcardQuery(field: String?, termStr: String?): Query {
handleFieldQueryDiscovered(field)
return super.getWildcardQuery(field, termStr)
}
override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query {
handleFieldQueryDiscovered(field, queryText)
return super.getFieldQuery(field, queryText, slop)
}
override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query {
handleFieldQueryDiscovered(field)
return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive)
}
override fun getRegexpQuery(field: String?, termStr: String?): Query {
handleFieldQueryDiscovered(field)
return super.getRegexpQuery(field, termStr)
}

private fun handleFieldQueryDiscovered(field: String?, queryText: String? = null) {
if (field == null || Regex.isSimpleMatchPattern(field)) {
hasLonelyTerms = true
} else {
if (field == EXISTS && queryText?.isNotEmpty() == true) discoveredFields.add(queryText)
else discoveredFields.add(field)
}
}
}
Loading

0 comments on commit 7d572c6

Please sign in to comment.