Skip to content

Commit

Permalink
Validate tumble function argument and add IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 29, 2024
1 parent cd730cb commit cac4738
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,14 @@ case class FlintSparkMaterializedView(

// Assume first aggregate item must be time column
val winFunc = winFuncs.head
val timeCol = winFunc.arguments.head.asInstanceOf[Attribute]
Some(agg, timeCol)
val timeCol = winFunc.arguments.head
timeCol match {
case attr: Attribute =>
Some(agg, attr)
case _ =>
throw new IllegalArgumentException(
s"Tumble function only supports time column, but found: $timeCol")
}
}

private def isWindowingFunction(func: UnresolvedFunction): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,5 +448,34 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("tumble function only supports simple column") {
val testTable2 = s"$catalogName.default.mv_test_tumble"
withTable(testTable2) {
createTableHttpLog(testTable2)

withTempDir { checkpointDir =>
val ex = the[IllegalStateException] thrownBy {
sql(s"""
| CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics`
| AS
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable2
| GROUP BY
| TUMBLE(CAST(timestamp AS TIMESTAMP), '10 Minute')
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
|""".stripMargin)
}
ex.getCause should have message
"Tumble function only supports time column, but found: cast('timestamp as timestamp)"
}
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit cac4738

Please sign in to comment.