Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 15, 2023
1 parent b4fd306 commit 02d3827
Showing 1 changed file with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ package org.opensearch.flint.spark
import java.util.Base64
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy}
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.spark.FlintSpark.RefreshMode._
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
Expand All @@ -27,6 +31,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
override def beforeAll(): Unit = {
super.beforeAll()
createPartitionedTable(testTable)
}

override def beforeEach(): Unit = {
super.beforeEach()

// Replace mock executor with real one and change its delay
val realExecutor = newDaemonThreadPoolScheduledExecutor("flint-index-heartbeat", 1)
Expand All @@ -36,9 +44,8 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
realExecutor.scheduleWithFixedDelay(invocation.getArgument(0), 5, 1, TimeUnit.SECONDS)
}).when(FlintSparkIndexMonitor.executor)
.scheduleWithFixedDelay(any[Runnable], any[Long], any[Long], any[TimeUnit])
}

test("test") {
// Create an auto refreshed index for test
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -51,17 +58,45 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
val jobId = spark.streams.active.find(_.name == testFlintIndex).get.id.toString
awaitStreamingComplete(jobId)
Thread.sleep(5000L)
}

var (prevJobStartTime, prevLastUpdateTime) = getLatestTimestamp
override def afterEach(): Unit = {
flint.deleteIndex(testFlintIndex)
FlintSparkIndexMonitor.executor.shutdownNow()
super.afterEach()
}

// jobStartTime should stay same while lastUpdateTime keep updated
test("job start time should not change and last update time keep updated") {
var (prevJobStartTime, prevLastUpdateTime) = getLatestTimestamp
3 times { (jobStartTime, lastUpdateTime) =>
jobStartTime shouldBe prevJobStartTime
lastUpdateTime should be > prevLastUpdateTime
prevLastUpdateTime = lastUpdateTime
}
}

test("monitor task should not terminate if any exception") {
// Block write on metadata log index
setWriteBlockOnMetadataLogIndex(true)
Thread.sleep(2000)

// Monitor task should stop working after blocking writes
var (_, prevLastUpdateTime) = getLatestTimestamp
1 times { (_, lastUpdateTime) =>
lastUpdateTime shouldBe prevLastUpdateTime
}

// Unblock write and wait for monitor task attempt to update again
setWriteBlockOnMetadataLogIndex(false)
Thread.sleep(2000)

// Monitor task continue working after unblocking write
3 times { (_, lastUpdateTime) =>
lastUpdateTime should be > prevLastUpdateTime
prevLastUpdateTime = lastUpdateTime
}
}

private def getLatestTimestamp: (Long, Long) = {
val latest = latestLogEntry(testLatestId)
(latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])
Expand All @@ -80,4 +115,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
}
}
}

private def setWriteBlockOnMetadataLogIndex(isBlock: Boolean): Unit = {
val request = new UpdateSettingsRequest(testMetaLogIndex)
.settings(Map("blocks.write" -> isBlock).asJava) // Blocking write operations
openSearchClient.indices().putSettings(request, RequestOptions.DEFAULT)
}
}

0 comments on commit 02d3827

Please sign in to comment.