Skip to content

Commit

Permalink
Refactor comments and test code
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 22, 2024
1 parent b4cfb52 commit dc7fdb8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {

/**
* Represents the basic output schema for the FlintSparkSqlCommand. This schema includes
* essential information about each index, such as:
* - Index Name (`flint_index_name`)
* - Index Kind (`kind`)
* - Associated Database (`database`)
* - Table the index is associated with (`table`), which can be nullable
* - Specific Index Name (`index_name`), which can be nullable
* - Auto-refresh status (`auto_refresh`)
* - Current status of the index (`status`)
* essential information about each index.
*/
private val baseOutputSchema = Seq(
AttributeReference("flint_index_name", StringType, nullable = false)(),
Expand All @@ -48,9 +41,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {

/**
* Extends the base output schema with additional information. This schema is used when the
* EXTENDED keyword is present which includes:
* - Error information (`error`), detailing any errors associated with the index, which can be
* nullable.
* EXTENDED keyword is present.
*/
private val extendedOutputSchema = Seq(
AttributeReference("error", StringType, nullable = true)())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
awaitStreamingComplete(activeJob.get.id.toString)

// Assert output contains empty error message
def outputError: String = {
val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog")
df.columns should contain("error")
df.collect().head.getAs[String]("error")
}
outputError shouldBe empty

// Trigger next micro batch after 5 seconds with index readonly
Future {
Thread.sleep(5000)
Expand All @@ -139,13 +147,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')")
}

// Await and store exception as expected
// Await to store exception and verify if it's as expected
flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))

// Assert output contains error message
val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog")
df.columns should contain("error")
df.select(col("error")).collect().head.getString(0) should include("OpenSearchException")
outputError should include("OpenSearchException")

deleteTestIndex(testSkippingFlintIndex)
}
Expand Down

0 comments on commit dc7fdb8

Please sign in to comment.