Skip to content

Commit

Permalink
Revert timestamp column to time
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 10, 2023
1 parent 507e72a commit 031eb9e
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
}
}

test("should build without ID column if not auto refreshed but no checkpoint location") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING, age INTEGER) USING JSON")
val index = FlintSparkCoveringIndex(
"name_idx",
testTable,
Map("name" -> "string"),
options = FlintSparkIndexOptions(Map("auto_refresh" -> "true")))

assertDataFrameEquals(
index.build(spark, None),
spark
.table(testTable)
.select(col("name")))
}
}

test(
"should build failed if auto refresh and checkpoint location provided but no ID generated") {
withTable(testTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true", "id_expression" -> "name")))
.create()

val jobId = flint.refreshIndex(testFlintIndex, INCREMENTAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WHERE address = 'Portland'
| WITH (
| auto_refresh = true,
| id_expression = 'name'
| )
| WITH (auto_refresh = true)
|""".stripMargin)

// Wait for streaming job complete current micro batch
Expand Down Expand Up @@ -239,11 +236,8 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
test("create covering index on time series time with ID expression") {
sql(s"""
| CREATE INDEX $testIndex ON $testTimeSeriesTable
| (timestamp, age)
| WITH (
| auto_refresh = true,
| id_expression = 'address'
| )
| (time, age)
| WITH (auto_refresh = true)
|""".stripMargin)

val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
assertion
.run { checkpointDir =>
s""" CREATE INDEX $testIndex ON $testTable
| (timestamp, name)
| (time, name)
| WITH (
| auto_refresh = true,
| id_expression = 'timestamp',
| id_expression = 'time',
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(timestamp, '10 Minutes')
| GROUP BY TUMBLE(time, '10 Minutes')
|""".stripMargin

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -134,7 +134,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(timestamp, '1 Hour')
| GROUP BY TUMBLE(time, '1 Hour')
|""".stripMargin

withIncrementalMaterializedView(largeWindowQuery) { indexData =>
Expand All @@ -159,7 +159,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| COUNT(*) AS count
| FROM $testTable
| WHERE address = 'Seattle'
| GROUP BY TUMBLE(timestamp, '5 Minutes')
| GROUP BY TUMBLE(time, '5 Minutes')
|""".stripMargin

withIncrementalMaterializedView(filterQuery) { indexData =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(timestamp, '10 Minutes')
| GROUP BY TUMBLE(time, '10 Minutes')
|""".stripMargin

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -163,7 +163,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| window.start AS `start.time`,
| COUNT(*) AS `count`
| FROM `spark_catalog`.`default`.`mv_test`
| GROUP BY TUMBLE(`timestamp`, '10 Minutes')""".stripMargin.trim
| GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim

sql(s"""
| CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"""
| CREATE TABLE $testTable
| (
| timestamp TIMESTAMP,
| time TIMESTAMP,
| name STRING,
| age INT,
| address STRING
Expand Down

0 comments on commit 031eb9e

Please sign in to comment.