Skip to content

Commit

Permalink
Update error message
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 22, 2024
1 parent 7f6c7a7 commit a662c4e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ case class FlintSparkMaterializedView(
private def watermark(timeCol: Attribute, child: LogicalPlan) = {
require(
options.watermarkDelay().isDefined,
"watermark delay is required for auto refresh and incremental refresh with aggregation")
"watermark delay is required for auto or incremental refresh with aggregation")

val delay = options.watermarkDelay().get
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
Expand Down Expand Up @@ -126,7 +126,7 @@ case class FlintSparkMaterializedView(

require(
winFuncs.size == 1,
"A windowing function is required for incremental refresh with aggregation")
"A windowing function is required for auto or incremental refresh with aggregation")

// Assume first aggregate item must be time column
val winFunc = winFuncs.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
Expand Down Expand Up @@ -202,7 +204,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS
| SELECT time FROM $testTable WITH
| SELECT time FROM $testTable WITH (
| """.stripMargin)
}
}
Expand Down Expand Up @@ -232,7 +234,12 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| )
| """.stripMargin)
} should have message
"requirement failed: A windowing function is required for incremental refresh with aggregation"
"requirement failed: A windowing function is required for auto or incremental refresh with aggregation"

// OS index should not be created because of pre-validation failed above
openSearchClient
.indices()
.exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false
}
}

Expand All @@ -249,7 +256,12 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| )
| """.stripMargin)
} should have message
"requirement failed: watermark delay is required for auto refresh and incremental refresh with aggregation"
"requirement failed: watermark delay is required for auto or incremental refresh with aggregation"

// OS index should not be created because of pre-validation failed above
openSearchClient
.indices()
.exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false
}
}

Expand Down

0 comments on commit a662c4e

Please sign in to comment.