From 6eaaf5388890821b761d2bfdc2742fa61e57c133 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 30 May 2023 21:14:17 -0700 Subject: [PATCH] Support for system index interface (#789) (#799) * Support for system index interface Signed-off-by: bowenlan-amzn * Handle rejected execution exception as retryable in retry block Signed-off-by: bowenlan-amzn * Fix tests Signed-off-by: bowenlan-amzn * Fix tests Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn (cherry picked from commit fa7e46f1595cd0f46dba254806f662d9bfc831e4) Co-authored-by: bowenlan-amzn --- .../opensearch/indexmanagement/IndexManagementPlugin.kt | 8 +++++++- .../opensearchapi/OpenSearchExtensions.kt | 7 +++++++ .../indexmanagement/IndexManagementRestTestCase.kt | 9 +++++++-- .../kotlin/org/opensearch/indexmanagement/TestHelpers.kt | 6 ++++++ .../rollup/resthandler/RestStartRollupActionIT.kt | 2 ++ .../rollup/resthandler/RestStopRollupActionIT.kt | 2 ++ 6 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 600dc8938..5c6fd2441 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -175,6 +175,7 @@ import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransform import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction import org.opensearch.indexmanagement.transform.settings.TransformSettings +import org.opensearch.indices.SystemIndexDescriptor import org.opensearch.jobscheduler.spi.JobSchedulerExtension import org.opensearch.jobscheduler.spi.ScheduledJobParser import org.opensearch.jobscheduler.spi.ScheduledJobRunner @@ -183,6 +184,7 @@ import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.ExtensiblePlugin import org.opensearch.plugins.NetworkPlugin import org.opensearch.plugins.Plugin +import org.opensearch.plugins.SystemIndexPlugin import org.opensearch.repositories.RepositoriesService import org.opensearch.rest.RestController import org.opensearch.rest.RestHandler @@ -195,7 +197,7 @@ import org.opensearch.watcher.ResourceWatcherService import java.util.function.Supplier @Suppress("TooManyFunctions") -class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() { +class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() { private val logger = LogManager.getLogger(javaClass) lateinit var indexManagementIndices: IndexManagementIndices @@ -608,6 +610,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin override fun getActionFilters(): List { return listOf(fieldCapsFilter, indexOperationActionFilter) } + + override fun getSystemIndexDescriptors(settings: Settings): Collection { + return listOf(SystemIndexDescriptor(INDEX_MANAGEMENT_INDEX, "Index for storing index management configuration and metadata.")) + } } class GuiceHolder @Inject constructor( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 7d21aabea..0feecbd89 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -38,6 +38,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException import org.opensearch.core.xcontent.MediaType import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent @@ -183,6 +184,12 @@ suspend fun BackoffPolicy.retry( } else { throw e } + } catch (rje: OpenSearchRejectedExecutionException) { + if (iter.hasNext()) { + backoff = iter.next() + logger.warn("Rejected execution. Retrying in $backoff.", rje) + delay((backoff.millis)) + } } } while (true) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index b98fbb125..0924b3539 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -56,7 +56,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { // preemptively seems to give the job scheduler time to listen to operations. @Before fun initializeManagedIndex() { - if (!indexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) { + if (!isIndexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) { val request = Request("PUT", "/${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}") var entity = "{\"settings\": " + Strings.toString(XContentType.JSON, Settings.builder().put(INDEX_HIDDEN, true).build()) entity += ",\"mappings\" : ${IndexManagementIndices.indexManagementMappings}}" @@ -77,6 +77,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode) + protected fun isIndexExists(index: String): Boolean { + val response = client().makeRequest("HEAD", index) + return RestStatus.OK == response.restStatus() + } + protected fun assertIndexExists(index: String) { val response = client().makeRequest("HEAD", index) assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus()) @@ -165,7 +170,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { override fun preserveIndicesUponCompletion(): Boolean = true companion object { @JvmStatic - protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 + val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 protected val defaultKeepIndexSet = setOf(".opendistro_security") /** * We override preserveIndicesUponCompletion to true and use this function to clean up indices diff --git a/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt index 41d1a4409..0ce249203 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt @@ -13,6 +13,7 @@ import org.opensearch.client.Response import org.opensearch.client.RestClient import org.opensearch.client.WarningsHandler import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.IndexManagementRestTestCase.Companion.isMultiNode import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.jobscheduler.spi.schedule.Schedule @@ -113,6 +114,11 @@ fun waitFor( timeout: Instant = Instant.ofEpochSecond(20), block: () -> T ): T { + if (isMultiNode) { + // job scheduling could be skipped in multi-node tests + // https://github.com/opensearch-project/job-scheduler/issues/173 + timeout.plusSeconds(60) + } val startTime = Instant.now().toEpochMilli() do { try { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt index ddb008dc7..1f9a32f3c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt @@ -247,6 +247,8 @@ class RestStartRollupActionIT : RollupRestTestCase() { assertEquals("Rollup is not RETRY", RollupMetadata.Status.RETRY, rollupMetadata.status) // clearing the config index to prevent other tests using this multi shard index + Thread.sleep(2000L) deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + Thread.sleep(2000L) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt index 819853ee0..3a7c60256 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt @@ -310,6 +310,8 @@ class RestStopRollupActionIT : RollupRestTestCase() { assertEquals("Rollup is not STOPPED", RollupMetadata.Status.STOPPED, rollupMetadata.status) // clearing the config index to prevent other tests using this multi shard index + Thread.sleep(2000L) deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + Thread.sleep(2000L) } }