From 5e9c1c5fa6339bcf3dcd52819574795e8a7dc64e Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Fri, 27 Sep 2024 14:47:35 +0530 Subject: [PATCH] Allowing non-rollup and rollup indices to be searched together Signed-off-by: Kshitij Tandon --- .../indexmanagement/IndexManagementPlugin.kt | 1 + .../rollup/interceptor/RollupInterceptor.kt | 25 +++++++++++------ .../rollup/settings/RollupSettings.kt | 9 ++++++ .../IndexManagementSettingsTests.kt | 2 ++ .../rollup/RollupRestTestCase.kt | 18 ++++++++++++ .../rollup/interceptor/RollupInterceptorIT.kt | 28 +++++++++++++++++++ 6 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index e59602205..981a59b67 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -534,6 +534,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_DASHBOARDS, RollupSettings.ROLLUP_SEARCH_ALL_JOBS, + RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS, TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 87919c173..322b4c34e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -63,6 +63,8 @@ class RollupInterceptor( @Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings) + @Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) { searchEnabled = it @@ -70,6 +72,9 @@ class RollupInterceptor( clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) { searchAllJobs = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) { + searchRawRollupIndices = it + } } @Suppress("SpreadOperator") @@ -144,15 +149,16 @@ class RollupInterceptor( private fun validateIndicies(concreteIndices: Array, fieldMappings: Set): Map> { var allMatchingRollupJobs: Map> = mapOf() for (concreteIndex in concreteIndices) { - val rollupJobs = - clusterService.state().metadata.index(concreteIndex).getRollupJobs() - ?: throw IllegalArgumentException("Not all indices have rollup job") - - val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) - if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { - throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs() + if (rollupJobs != null) { + val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) + if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { + throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + } + allMatchingRollupJobs += matchingRollupJobs + } else if (!searchRawRollupIndices) { + throw IllegalArgumentException("Not all indices have rollup job") } - allMatchingRollupJobs += matchingRollupJobs } return allMatchingRollupJobs } @@ -347,6 +353,9 @@ class RollupInterceptor( if (searchAllJobs) { request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex)) } else { + if (matchingRollupJobs.keys.size > 1) { + logger.warn("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window") + } request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex)) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index 0554a7061..476fcd32c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -13,6 +13,7 @@ class RollupSettings { companion object { const val DEFAULT_ROLLUP_ENABLED = true const val DEFAULT_SEARCH_ALL_JOBS = false + const val DEFAULT_SEARCH_SOURCE_INDICES = false const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3 const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3 @@ -85,6 +86,14 @@ class RollupSettings { Setting.Property.Dynamic, ) + val ROLLUP_SEARCH_SOURCE_INDICES: Setting = + Setting.boolSetting( + "plugins.rollup.search.search_source_indices", + DEFAULT_SEARCH_SOURCE_INDICES, + Setting.Property.NodeScope, + Setting.Property.Dynamic, + ) + val ROLLUP_DASHBOARDS: Setting = Setting.boolSetting( "plugins.rollup.dashboards.enabled", diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt index 55fce2adc..2546e6271 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt @@ -91,6 +91,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { RollupSettings.ROLLUP_ENABLED, RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_SEARCH_ALL_JOBS, + RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES, RollupSettings.ROLLUP_DASHBOARDS, SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES, ), @@ -176,6 +177,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false) + assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1) assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index a1fec9755..6e527abfb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -269,6 +269,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { assertEquals("Request failed", RestStatus.OK, res.restStatus()) } + protected fun updateSearchRawRollupClusterSetting(value: Boolean) { + val formattedValue = "\"${value}\"" + val request = + """ + { + "persistent": { + "${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue + } + } + """.trimIndent() + val res = + client().makeRequest( + "PUT", "_cluster/settings", emptyMap(), + StringEntity(request, ContentType.APPLICATION_JSON), + ) + assertEquals("Request failed", RestStatus.OK, res.restStatus()) + } + protected fun createSampleIndexForQSQTest(index: String) { val mapping = """ diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index b87fe55ae..a53028b5f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1112,6 +1112,34 @@ class RollupInterceptorIT : RollupRestTestCase() { "Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response", ) + // Updating to allow searching on non-rollup and rolled-up index together + updateSearchRawRollupClusterSetting(true) + val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes1.restStatus() == RestStatus.OK) + val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes2.restStatus() == RestStatus.OK) + val searchResult2 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(searchResult2.restStatus() == RestStatus.OK) + val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map> + val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map> + val rollupAggResMulti = searchResult2.asMap()["aggregations"] as Map> + + val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int + val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double + + assertEquals( + "Searching single raw source index and rollup target index did not return the same sum results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"], + ) + assertEquals( + "Searching rollup target index did not return the sum for all of the rollup jobs on the index", + trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"], + ) + assertEquals( + "Searching rollup target index did not return the value count for all of the rollup jobs on the index", + trueAggCount, rollupAggResMulti.getValue("value_count_passenger_count")["value"], + ) + // Search 2 rollups with different mappings try { client().makeRequest(