Skip to content

Commit

Permalink
change Thread.sleep to waitUntil function under test files
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Choi <[email protected]>
  • Loading branch information
JacobCho-i committed Oct 10, 2023
1 parent 6f29716 commit 2128864
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.net.URLEncoder
import java.time.Instant
import java.time.ZonedDateTime
Expand All @@ -55,6 +56,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.DAYS
import java.time.temporal.ChronoUnit.MILLIS
import java.time.temporal.ChronoUnit.MINUTES
import java.util.concurrent.TimeUnit

class MonitorRunnerServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -138,7 +140,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(firstRunAlert, monitor)
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// see lastNotificationTime change.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
val secondRunAlert = searchAlerts(monitor).single()
verifyAlert(secondRunAlert, monitor)
Expand Down Expand Up @@ -265,7 +269,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
Expand Down Expand Up @@ -765,7 +771,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(activeAlert1.single(), monitor, ACTIVE)
val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))

Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id))
executeMonitor(monitor.id)
val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single()
Expand Down Expand Up @@ -1398,7 +1406,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
Expand All @@ -1418,7 +1428,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
)

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil (searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).size == 2)
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
Expand Down Expand Up @@ -1940,7 +1952,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let Action executionTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResultsForBucketLevelMonitor(
monitorRunResult = monitorRunResultThrottled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getAlertIndices().size >= 3)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3)
}

Expand All @@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor.id)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getAlertIndices().size >= 2)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.util.concurrent.TimeUnit

class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -69,7 +71,9 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
// the test execution by a lot (might have to wait for Job Scheduler plugin integration first)
// Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running
// on time
Thread.sleep(60000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 1, TimeUnit.MINUTES)
verifyMonitorStats("/_plugins/_alerting")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,10 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -842,7 +845,10 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -870,7 +876,10 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -956,10 +965,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() {
// Enable Monitor jobs
enableScheduledJob()

var response = enableScheduledJob()
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)
var alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"])
Expand Down Expand Up @@ -989,10 +1001,12 @@ class MonitorRestApiIT : AlertingRestTestCase() {
)

// Re-enable Monitor jobs
enableScheduledJob()
response = enableScheduledJob()

// Sleep briefly so sweep can reschedule the Monitor
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)

alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
Expand All @@ -1015,10 +1029,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor stats jobs`() {
// Enable the Monitor plugin.
enableScheduledJob()

var response = enableScheduledJob()
createRandomMonitor(refresh = true)

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)
val responseMap = getAlertingStats()
assertAlertingStatsSweeperEnabled(responseMap, true)
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
Expand Down Expand Up @@ -1048,10 +1065,12 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor specific metric`() {
// Enable the Monitor plugin.
enableScheduledJob()
var response = enableScheduledJob()
createRandomMonitor(refresh = true)

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)
val responseMap = getAlertingStats("/jobs_info")
assertAlertingStatsSweeperEnabled(responseMap, true)
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
import java.util.UUID
import java.util.concurrent.TimeUnit

@TestLogging("level:DEBUG", reason = "Debug for tests.")
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -1180,7 +1182,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
}"""

indexDoc(index, "1", testDoc)
Thread.sleep(80000)
OpenSearchTestCase.waitUntil({
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
return@waitUntil (findings.size == 1)
}, 80, TimeUnit.SECONDS)

val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType
import org.opensearch.client.ResponseException
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -79,8 +81,11 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {
}

// Create cluster change event and wait for migration service to complete migrating data over
client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
Thread.sleep(120000)
var request: Map<String, Any>? = null
request = client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
OpenSearchTestCase.waitUntil({
return@waitUntil request == null
}, 2, TimeUnit.MINUTES)

for (id in ids) {
val response = client().makeRequest(
Expand Down

0 comments on commit 2128864

Please sign in to comment.