Skip to content

Commit

Permalink
remove runtime notification settings (opensearch-project#839)
Browse files Browse the repository at this point in the history
Signed-off-by: Hailong Cui <[email protected]>
  • Loading branch information
Hailong-am authored Jul 3, 2023
1 parent f07fa2b commit 2a9bcd6
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,12 @@ class NotificationActionListener<Request : ActionRequest, Response : ActionRespo
}
}
}
}

// remove one time configuration no matter it is enabled or not
removeOneTimePolicy(config)
// remove one time configuration
val runtimeConfig = lronConfigResponse.lronConfigResponses.firstOrNull() { it.lronConfig.taskId != null }
runtimeConfig?.let {
removeOneTimePolicy(it.lronConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.opensearch.action.admin.indices.open.OpenIndexAction
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.client.RestClient
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -198,25 +199,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test notify for reindex`() {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)

Expand Down Expand Up @@ -323,25 +306,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test notify for reindex with duplicate channel`() {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)
val taskId = response.asMap()["task"] as String
Expand Down Expand Up @@ -448,4 +413,78 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {
)
}
}

@Suppress("UNCHECKED_CAST")
fun `test runtime policy been removed when index operation finished`() {
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)
val taskId = response.asMap()["task"]
Assert.assertNotNull(taskId)

// put runtime policy only for failure
client.makeRequest(
"POST", "_plugins/_im/lron",
StringEntity(
"""
{
"lron_config": {
"task_id": "$taskId",
"lron_condition": {
"failure": true,
"success": false
},
"channels": [
{
"id": "$notificationConfId"
}
]
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)

waitFor(Instant.ofEpochSecond(60)) {
assertEquals(
"Notification index does not have a doc",
1,
(
client.makeRequest("GET", "$notificationIndex/_search?q=msg:reindex")
.asMap() as Map<String, Map<String, Map<String, Any>>>
)["hits"]!!["total"]!!["value"]
)

try {
client.makeRequest("GET", "_plugins/_im/lron/LRON:$taskId")
} catch (e: ResponseException) {
// runtime policy been removed
Assert.assertTrue(e.response.restStatus() == RestStatus.NOT_FOUND)
}
}
}

private fun performReindex(): Response {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
return response
}
}

0 comments on commit 2a9bcd6

Please sign in to comment.